Repository: incubator-streams Updated Branches: refs/heads/master dd58c877b -> cae682d6e
level up rss provider add main methods to each Provider (STREAMS-412) add real integration tests (STREAMS-415) fix a failing test Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/427faf53 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/427faf53 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/427faf53 Branch: refs/heads/master Commit: 427faf5378b3de99df81604dd4ec856c89766224 Parents: 2c12724 Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Mon Oct 17 11:44:20 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Mon Oct 17 11:44:20 2016 -0500 ---------------------------------------------------------------------- .../streams/rss/provider/RssStreamProvider.java | 59 +++++++++ .../src/site/markdown/index.md | 7 ++ .../rss/provider/RssStreamProviderTaskIT.java | 2 +- .../streams/rss/test/RssStreamProviderIT.java | 121 +++++++++++++++++++ 4 files changed, 188 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java index e444ed3..a0e8ea1 100644 --- a/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java +++ b/streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProvider.java @@ -18,22 +18,30 @@ package org.apache.streams.rss.provider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Queues; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; import com.sun.syndication.feed.synd.SyndEntry; import com.sun.syndication.feed.synd.SyndFeed; import com.sun.syndication.io.FeedException; import com.sun.syndication.io.SyndFeedInput; import com.sun.syndication.io.XmlReader; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.rss.FeedDetails; import org.apache.streams.rss.RssStreamConfiguration; import org.apache.streams.rss.provider.perpetual.RssFeedScheduler; @@ -42,7 +50,11 @@ import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.PrintStream; import java.io.Serializable; import java.math.BigInteger; import java.net.MalformedURLException; @@ -52,7 +64,15 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** + * RSS {@link org.apache.streams.core.StreamsProvider} that provides content from rss feeds in boilerpipe format * + * To use from command line: + * + * Supply configuration similar to src/test/resources/rss.conf + * + * Launch using: + * + * mvn exec:java -Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider -Dexec.args="rss.conf articles.json" */ public class RssStreamProvider implements StreamsProvider { @@ -177,5 +197,44 @@ public class RssStreamProvider implements StreamsProvider { ComponentUtils.shutdownExecutor(this.executor, 10, 10); } + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + RssStreamConfiguration config = new ComponentConfigurator<>(RssStreamConfiguration.class).detectConfiguration(typesafe, "rss"); + RssStreamProvider provider = new RssStreamProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException e) { + System.err.println(e.getMessage()); + } + } + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/site/markdown/index.md b/streams-contrib/streams-provider-rss/src/site/markdown/index.md index 3a4a561..8563909 100644 --- a/streams-contrib/streams-provider-rss/src/site/markdown/index.md +++ b/streams-contrib/streams-provider-rss/src/site/markdown/index.md @@ -15,6 +15,13 @@ streams-provider-rss |-------|---------------|--------------------------| | RssStreamProvider [RssStreamProvider.html](apidocs/org/apache/streams/rss/provider/RssStreamProvider.html "javadoc") | [RssStreamConfiguration.json](RssStreamConfiguration.json "RssStreamConfiguration.json") [RssStreamConfiguration.html](apidocs/org/apache/streams/rss/RssStreamConfiguration.html "javadoc") | [rss.conf](rss.conf "rss.conf") | +Test: +----- + +Build with integration testing enabled + + mvn clean test verify -DskipITs=false + [JavaDocs](apidocs/index.html "JavaDocs") ###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java index a838c78..cb71c90 100644 --- a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/provider/RssStreamProviderTaskIT.java @@ -73,7 +73,7 @@ public class RssStreamProviderTaskIT { public void testPerpetualNoTimeFramePull() throws Exception { com.healthmarketscience.common.util.resource.Handler.init(); BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); - RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(1), 10000, true); + RssStreamProviderTask task = new RssStreamProviderTask(queue, "fake url", new DateTime().minusYears(5), 10000, true); Set<String> batch = task.queueFeedEntries(new URL("resource:///test_rss_xml/economist1.xml")); assertEquals("Expected batch size to be the same as amount of queued datums", batch.size(), queue.size()); RssStreamProviderTask.PREVIOUSLY_SEEN.put("fake url", batch); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/427faf53/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java new file mode 100644 index 0000000..ccac8aa --- /dev/null +++ b/streams-contrib/streams-provider-rss/src/test/java/org/apache/streams/rss/test/RssStreamProviderIT.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.rss.test; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.lang.StringUtils; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.apache.streams.rss.FeedDetails; +import org.apache.streams.rss.RssStreamConfiguration; +import org.apache.streams.rss.provider.RssStreamProvider; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.List; + +import static org.hamcrest.number.OrderingComparison.greaterThan; + +/** + * Created by sblackmon on 2/5/14. + */ +public class RssStreamProviderIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(RssStreamProviderIT.class); + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + @Test + public void testRssStreamProvider() throws Exception { + + String configfile = "./target/test-classes/RssStreamProviderIT.conf"; + String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt"; + + InputStream is = RssStreamProviderIT.class.getResourceAsStream("/top100.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + + RssStreamConfiguration configuration = new RssStreamConfiguration(); + List<FeedDetails> feedArray = Lists.newArrayList(); + try { + while (br.ready()) { + String line = br.readLine(); + if(!StringUtils.isEmpty(line)) + { + feedArray.add(new FeedDetails().withUrl(line).withPollIntervalMillis(5000l)); + } + } + configuration.setFeeds(feedArray); + } catch( Exception e ) { + System.out.println(e); + e.printStackTrace(); + Assert.fail(); + } + + Assert.assertThat(configuration.getFeeds().size(), greaterThan(70)); + + OutputStream os = new FileOutputStream(configfile); + OutputStreamWriter osw = new OutputStreamWriter(os); + BufferedWriter bw = new BufferedWriter(osw); + + // write conf + ObjectNode feedsNode = mapper.convertValue(configuration, ObjectNode.class); + JsonNode configNode = mapper.createObjectNode().set("rss", feedsNode); + + bw.write(mapper.writeValueAsString(configNode)); + bw.flush(); + bw.close(); + + File config = new File(configfile); + assert (config.exists()); + assert (config.canRead()); + assert (config.isFile()); + + RssStreamProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2])); + + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); + + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); + + while(outCounter.readLine() != null) {} + + assert (outCounter.getLineNumber() >= 200); + + } +} \ No newline at end of file
