Repository: incubator-streams Updated Branches: refs/heads/master 67d5cca51 -> d5ef370c0
level up sysomos provider add main methods to each Provider (STREAMS-412) add real integration tests (STREAMS-415) tune up POM Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ac523f62 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ac523f62 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ac523f62 Branch: refs/heads/master Commit: ac523f62bb3ee157dcc8043f2a7b00d14e66a537 Parents: 11e3a0f Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Fri Oct 21 11:03:13 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Fri Oct 21 11:03:13 2016 -0500 ---------------------------------------------------------------------- .../streams-provider-sysomos/pom.xml | 8 +- .../sysomos/provider/SysomosProvider.java | 64 ++++++++++++++++ .../src/main/resources/sysomos.conf | 27 ------- .../test/provider/SysomosProviderIT.java | 80 ++++++++++++++++++++ .../src/test/resources/sysomos.conf | 27 +++++++ 5 files changed, 174 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac523f62/streams-contrib/streams-provider-sysomos/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/pom.xml b/streams-contrib/streams-provider-sysomos/pom.xml index f96d61b..fadbd91 100644 --- a/streams-contrib/streams-provider-sysomos/pom.xml +++ b/streams-contrib/streams-provider-sysomos/pom.xml @@ -159,7 +159,6 @@ <plugin> <groupId>org.jvnet.jaxb2.maven2</groupId> <artifactId>maven-jaxb2-plugin</artifactId> - <version>0.8.3</version> <configuration> <schemaDirectory>src/main/xmlschema/com/sysomos/</schemaDirectory> <generateDirectory>target/generated-sources/jaxb2</generateDirectory> @@ -181,7 +180,6 @@ <plugin> <groupId>com.googlecode.maven-download-plugin</groupId> <artifactId>download-maven-plugin</artifactId> - <version>1.2.1</version> <executions> <execution> <id>download-it-data</id> @@ -205,10 +203,10 @@ <skipTests>${skipITs}</skipTests> </configuration> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> + <plugin> + <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> - </plugin> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac523f62/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java index 48e2ccb..824ede2 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java @@ -19,20 +19,36 @@ package org.apache.streams.sysomos.provider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.Queues; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; import com.sysomos.SysomosConfiguration; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.data.util.RFC3339Utils; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; import java.math.BigInteger; +import java.util.Iterator; import java.util.Map; import java.util.Queue; import java.util.Set; @@ -50,6 +66,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * This configuration will configure the provider to backfill to the specified document and either terminate or not * depending on the mode flag. Continuous mode is assumed, and is the ony mode supported by the String configuration. * + * To use from command line: + * + * Supply configuration similar to src/test/resources/rss.conf + * + * Launch using: + * + * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json" */ public class SysomosProvider implements StreamsProvider { @@ -302,4 +325,45 @@ public class SysomosProvider implements StreamsProvider { public int getCount() { return this.count.get(); } + + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + SysomosConfiguration config = new ComponentConfigurator<>(SysomosConfiguration.class).detectConfiguration(typesafe, "rss"); + SysomosProvider provider = new SysomosProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException e) { + System.err.println(e.getMessage()); + } + } + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac523f62/streams-contrib/streams-provider-sysomos/src/main/resources/sysomos.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/resources/sysomos.conf b/streams-contrib/streams-provider-sysomos/src/main/resources/sysomos.conf deleted file mode 100644 index 13af8b8..0000000 --- a/streams-contrib/streams-provider-sysomos/src/main/resources/sysomos.conf +++ /dev/null @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -sysomos { - heartbeatIds = [ - HBID - ] - apiBatchSize = 500 - apiKey = KEY - minDelayMs = 10000 - scheduledDelayMs = 120000 - maxBatchSize = 10000 -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac523f62/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java b/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java new file mode 100644 index 0000000..b4289ee --- /dev/null +++ b/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.sysomos.test.provider; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import com.sysomos.SysomosConfiguration; +import org.apache.commons.lang.StringUtils; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.sysomos.provider.SysomosProvider; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.List; + +/** + * Integration test for SysomosProviderIT + * + * Created by sblackmon on 10/21/16. + */ +@Ignore("this is ignored because the project doesn't have credentials to test it with during CI") +public class SysomosProviderIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(SysomosProviderIT.class); + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + @Test + public void testRssStreamProvider() throws Exception { + + String configfile = "./target/test-classes/RssStreamProviderIT.conf"; + String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt"; + + SysomosProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); + + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); + + while(outCounter.readLine() != null) {} + + assert (outCounter.getLineNumber() >= 1); + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ac523f62/streams-contrib/streams-provider-sysomos/src/test/resources/sysomos.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/test/resources/sysomos.conf b/streams-contrib/streams-provider-sysomos/src/test/resources/sysomos.conf new file mode 100644 index 0000000..13af8b8 --- /dev/null +++ b/streams-contrib/streams-provider-sysomos/src/test/resources/sysomos.conf @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +sysomos { + heartbeatIds = [ + HBID + ] + apiBatchSize = 500 + apiKey = KEY + minDelayMs = 10000 + scheduledDelayMs = 120000 + maxBatchSize = 10000 +}
