example of STREAMS-415 using twitter
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0813b11e Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0813b11e Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0813b11e Branch: refs/heads/master Commit: 0813b11edd535322cbabafd9a91e77136812e8bb Parents: 9bf8ef9 Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Tue Oct 4 17:37:06 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Tue Oct 4 17:37:24 2016 -0500 ---------------------------------------------------------------------- .../provider/TwitterTimelineProvider.java | 40 ++++++++- .../src/site/markdown/index.md | 18 ++++ .../provider/TwitterTimelineProviderIT.java | 93 ++++++++++++++++++++ .../provider/TwitterTimelineProviderTest.java | 39 -------- .../resources/TwitterTimelineProviderTest.conf | 4 + 5 files changed, 154 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index a8eada4..b8653b8 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -18,15 +18,23 @@ package org.apache.streams.twitter.provider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Queues; +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.TwitterUserInformationConfiguration; +import org.apache.streams.twitter.converter.TwitterDateTimeFormat; +import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -48,12 +56,40 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Retrieve recent posts from a list of user ids or names. */ -public class TwitterTimelineProvider implements StreamsProvider, Serializable { +public class TwitterTimelineProvider implements StreamsProvider, Serializable, Runnable { public final static String STREAMS_ID = "TwitterTimelineProvider"; private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class); + private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + + public static void main(String[] args) { + TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration("twitter"); + TwitterTimelineProvider provider = new TwitterTimelineProvider(config); + provider.run(); + } + + @Override + public void run() { + prepare(config); + startStream(); + do { + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + Iterator<StreamsDatum> iterator = readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = MAPPER.writeValueAsString(datum.getDocument()); + System.out.println(json); + } catch (JsonProcessingException e) { + System.err.println(e.getMessage()); + } + } + } while( isRunning()); + } + public static final int MAX_NUMBER_WAITING = 10000; private TwitterUserInformationConfiguration config; @@ -116,6 +152,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { running.set(true); executor.shutdown(); + } public boolean shouldContinuePulling(List<Status> statuses) { @@ -304,4 +341,5 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { lock.readLock().unlock(); } } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/site/markdown/index.md b/streams-contrib/streams-provider-twitter/src/site/markdown/index.md index ec5d1c8..4249956 100644 --- a/streams-contrib/streams-provider-twitter/src/site/markdown/index.md +++ b/streams-contrib/streams-provider-twitter/src/site/markdown/index.md @@ -31,6 +31,24 @@ streams-provider-twitter contains schema definitions, providers, conversions, an | TwitterStreamProvider [TwitterStreamProvider.html](apidocs/org/apache/streams/twitter/TwitterStreamProvider.html "javadoc") | [TwitterStreamConfiguration.json](com/twitter/TwitterStreamConfiguration.json "TwitterStreamConfiguration.json") [TwitterUserInformationConfiguration.html](apidocs/org/apache/streams/twitter/pojo/TwitterStreamConfiguration.html "javadoc") | [sample.conf](sample.conf "sample.conf")<br/>[userstream.conf](userstream.conf "userstream.conf") | | TwitterFollowingProvider [TwitterFollowingProvider.html](apidocs/org/apache/streams/twitter/TwitterFollowingConfiguration.html "javadoc") | [TwitterFollowingConfiguration.json](com/twitter/TwitterFollowingConfiguration.json "TwitterFollowingConfiguration.json") [TwitterFollowingConfiguration.html](apidocs/org/apache/streams/twitter/pojo/TwitterFollowingConfiguration.html "javadoc") | [friends.conf](friends.conf "friends.conf")<br/>[followers.conf](followers.conf "followers.conf") | +Test: +----- + +Create a local file `application.conf` with valid twitter credentials + + twitter { + oauth { + consumerKey = "" + consumerSecret = "" + accessToken = "" + accessTokenSecret = "" + } + } + +Build with integration testing enabled, using your credentials + + mvn clean test verify -DskipITs=false -DargLine="-Dconfig.file=`pwd`/application.conf" + [JavaDocs](apidocs/index.html "JavaDocs") ###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java new file mode 100644 index 0000000..e0f3b6a --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderIT.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.provider; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.twitter.TwitterUserInformationConfiguration; +import org.junit.Test; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.LineNumberReader; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; + +public class TwitterTimelineProviderIT { + + @Test + public void testTwitterTimelineProvider() throws Exception { + + PrintStream stdout = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stdout.txt"))); + PrintStream stderr = new PrintStream(new BufferedOutputStream(new FileOutputStream("./target/test-classes/TwitterTimelineProviderTest.stderr.txt"))); + + System.setOut(stdout); + System.setErr(stderr); + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/TwitterTimelineProviderTest.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + TwitterUserInformationConfiguration testConfig = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe.getConfig("twitter")); + + TwitterTimelineProvider provider = new TwitterTimelineProvider(testConfig); + provider.run(); + + stdout.flush(); + stderr.flush(); + + File out = new File("target/test-classes/TwitterTimelineProviderTest.stdout.txt"); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); + + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); + + while(outCounter.readLine() != null) {} + + assert (outCounter.getLineNumber() == 1000); + + File err = new File("target/test-classes/TwitterTimelineProviderTest.stderr.txt"); + assert (err.exists()); + assert (err.canRead()); + assert (err.isFile()); + + FileReader errReader = new FileReader(err); + LineNumberReader errCounter = new LineNumberReader(errReader); + + while(errCounter.readLine() != null) {} + + assert (errCounter.getLineNumber() == 0); + + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java deleted file mode 100644 index 0cdede0..0000000 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.provider; - -import org.apache.streams.twitter.TwitterUserInformationConfiguration; -import org.junit.Test; - -import java.util.Arrays; -import java.util.List; - -public class TwitterTimelineProviderTest { - - @Test - public void consolidateToIDsTest() { - List<String> ids = Arrays.asList("2342342", "", "144523", null); - - TwitterUserInformationConfiguration twitterUserInformationConfiguration = new TwitterUserInformationConfiguration(); - twitterUserInformationConfiguration.setInfo(ids); - TwitterTimelineProvider twitterTimelineProvider = new TwitterTimelineProvider(twitterUserInformationConfiguration); - - twitterTimelineProvider.consolidateToIDs(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0813b11e/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf new file mode 100644 index 0000000..a7862c4 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/test/resources/TwitterTimelineProviderTest.conf @@ -0,0 +1,4 @@ +twitter.info = [ + 18055613 +] +twitter.max_items = 1000 \ No newline at end of file
