Repository: incubator-streams Updated Branches: refs/heads/master e6ffe29e8 -> ae27541e0
Added user information and made some other modifications to increase the readability. Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/741a4544 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/741a4544 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/741a4544 Branch: refs/heads/master Commit: 741a45445ca2e6ad49f18c5ddd151c04de58fb6f Parents: e6ffe29 Author: Matthew Hager <matthew.ha...@gmail.com> Authored: Fri May 2 12:47:35 2014 -0500 Committer: Matthew Hager <matthew.ha...@gmail.com> Committed: Fri May 2 12:47:35 2014 -0500 ---------------------------------------------------------------------- .../streams-provider-twitter/pom.xml | 10 +- .../provider/TwitterStreamConfigurator.java | 62 +++- .../twitter/provider/TwitterStreamProvider.java | 1 - .../provider/TwitterTimelineProvider.java | 119 +++----- .../provider/TwitterTimelineProviderTask.java | 7 - .../TwitterUserInformationProvider.java | 286 +++++++++++++++++++ .../com/twitter/TwitterConfiguration.json | 70 +++++ .../com/twitter/TwitterStreamConfiguration.json | 61 +--- .../TwitterUserInformationConfiguration.json | 17 ++ .../streams/twitter/test/SimpleTweetTest.java | 4 + 10 files changed, 472 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index 3c27b8c..8a41ca5 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -49,11 +49,6 @@ <artifactId>streams-config</artifactId> </dependency> <dependency> - <groupId>org.apache.streams</groupId> - <artifactId>streams-util</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> @@ -73,9 +68,8 @@ <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-core</artifactId> - <version>3.0.5</version> + <version>[4.0,)</version> </dependency> - </dependencies> <build> @@ -118,7 +112,9 @@ <addCompileSourceRoot>true</addCompileSourceRoot> <generateBuilders>true</generateBuilders> <sourcePaths> + <sourcePath>src/main/jsonschema/com/twitter/TwitterConfiguration.json</sourcePath> <sourcePath>src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json</sourcePath> + <sourcePath>src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json</sourcePath> <sourcePath>src/main/jsonschema/com/twitter/Delete.json</sourcePath> <sourcePath>src/main/jsonschema/com/twitter/Retweet.json</sourcePath> <sourcePath>src/main/jsonschema/com/twitter/tweet.json</sourcePath> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java index 9bf2d9a..5435f24 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java @@ -1,15 +1,15 @@ package org.apache.streams.twitter.provider; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.typesafe.config.Config; import com.typesafe.config.ConfigException; import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.twitter.TwitterBasicAuthConfiguration; -import org.apache.streams.twitter.TwitterOAuthConfiguration; -import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; /** @@ -18,19 +18,18 @@ import java.util.List; public class TwitterStreamConfigurator { private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamConfigurator.class); + private final static ObjectMapper mapper = new ObjectMapper(); - public static TwitterStreamConfiguration detectConfiguration(Config twitter) { - TwitterStreamConfiguration twitterStreamConfiguration = new TwitterStreamConfiguration(); - - twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint")); + public static TwitterConfiguration detectTwitterConfiguration(Config config) { + TwitterConfiguration twitterConfiguration = new TwitterConfiguration(); try { Config basicauth = StreamsConfigurator.config.getConfig("twitter.basicauth"); TwitterBasicAuthConfiguration twitterBasicAuthConfiguration = new TwitterBasicAuthConfiguration(); twitterBasicAuthConfiguration.setUsername(basicauth.getString("username")); twitterBasicAuthConfiguration.setPassword(basicauth.getString("password")); - twitterStreamConfiguration.setBasicauth(twitterBasicAuthConfiguration); + twitterConfiguration.setBasicauth(twitterBasicAuthConfiguration); } catch( ConfigException ce ) {} try { @@ -40,27 +39,60 @@ public class TwitterStreamConfigurator { twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret")); twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken")); twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret")); - twitterStreamConfiguration.setOauth(twitterOAuthConfiguration); + twitterConfiguration.setOauth(twitterOAuthConfiguration); } catch( ConfigException ce ) {} + twitterConfiguration.setEndpoint(config.getString("endpoint")); + + return twitterConfiguration; + } + + public static TwitterStreamConfiguration detectConfiguration(Config config) { + + TwitterStreamConfiguration twitterStreamConfiguration = mapper.convertValue(detectTwitterConfiguration(config), TwitterStreamConfiguration.class); + try { - twitterStreamConfiguration.setTrack(twitter.getStringList("track")); + twitterStreamConfiguration.setTrack(config.getStringList("track")); } catch( ConfigException ce ) {} try { + // create the array List<Long> follows = Lists.newArrayList(); - for( Integer id : twitter.getIntList("follow")) - follows.add(new Long(id)); + // add the ids of the people we want to 'follow' + for(Integer id : config.getIntList("follow")) + follows.add((long)id); + // set the array twitterStreamConfiguration.setFollow(follows); + } catch( ConfigException ce ) {} - twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level")); - twitterStreamConfiguration.setWith(twitter.getString("with")); - twitterStreamConfiguration.setReplies(twitter.getString("replies")); + twitterStreamConfiguration.setFilterLevel(config.getString("filter-level")); + twitterStreamConfiguration.setWith(config.getString("with")); + twitterStreamConfiguration.setReplies(config.getString("replies")); twitterStreamConfiguration.setJsonStoreEnabled("true"); twitterStreamConfiguration.setIncludeEntities("true"); return twitterStreamConfiguration; } + public static TwitterUserInformationConfiguration detectTwitterUserInformationConfiguration(Config config) { + + TwitterUserInformationConfiguration twitterUserInformationConfiguration = mapper.convertValue(detectTwitterConfiguration(config), TwitterUserInformationConfiguration.class); + + try { + if(config.hasPath("info")) + { + List<String> info = new ArrayList<String>(); + + for (String s : config.getStringList("info")) + info.add(s); + } + } + catch(Exception e) { + LOGGER.error("There was an error: {}", e.getMessage()); + } + + return twitterUserInformationConfiguration; + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index 3df7d02..b1785e5 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -1,6 +1,5 @@ package org.apache.streams.twitter.provider; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index b9551ad..2c39cf9 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -1,11 +1,7 @@ package org.apache.streams.twitter.provider; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; +import com.google.common.collect.Queues; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -49,17 +45,11 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { this.config = config; } - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); + protected final Queue<StreamsDatum> providerQueue = Queues.synchronizedQueue(new ArrayBlockingQueue<StreamsDatum>(500)); - protected Twitter client; + protected int idsCount; protected Iterator<Long> ids; - ListenableFuture providerTaskComplete; -// -// public BlockingQueue<Object> getInQueue() { -// return inQueue; -// } - protected ListeningExecutorService executor; protected DateTime start; @@ -74,6 +64,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { public TwitterTimelineProvider() { Config config = StreamsConfigurator.config.getConfig("twitter"); this.config = TwitterStreamConfigurator.detectConfiguration(config); + } public TwitterTimelineProvider(TwitterStreamConfiguration config) { @@ -95,43 +86,19 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { return this.providerQueue; } -// public void run() { -// -// LOGGER.info("{} Running", STREAMS_ID); -// -// while( ids.hasNext() ) { -// Long currentId = ids.next(); -// LOGGER.info("Provider Task Starting: {}", currentId); -// captureTimeline(currentId); -// } -// -// LOGGER.info("{} Finished. Cleaning up...", STREAMS_ID); -// -// client.shutdown(); -// -// LOGGER.info("{} Exiting", STREAMS_ID); -// -// while(!providerTaskComplete.isDone() && !providerTaskComplete.isCancelled() ) { -// try { -// Thread.sleep(100); -// } catch (InterruptedException e) {} -// } -// } - @Override public void startStream() { // no op } - private void captureTimeline(long currentId) { + protected void captureTimeline(long currentId) { Paging paging = new Paging(1, 200); List<Status> statuses = null; - boolean KeepGoing = true; - boolean hadFailure = false; do { + Twitter client = getTwitterClient(); int keepTrying = 0; // keep trying to load, give it 5 attempts. @@ -143,20 +110,12 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { { statuses = client.getUserTimeline(currentId, paging); - for (Status tStat : statuses) - { -// if( provider.start != null && -// provider.start.isAfter(new DateTime(tStat.getCreatedAt()))) -// { -// // they hit the last date we wanted to collect -// // we can now exit early -// KeepGoing = false; -// } - // emit the record - String json = DataObjectFactory.getRawJSON(tStat); - - providerQueue.offer(new StreamsDatum(json)); + for (Status tStat : statuses) { + String json = TwitterObjectFactory.getRawJSON(tStat); + while(!providerQueue.offer(new StreamsDatum(json))) { + sleep(); + } } paging.setPage(paging.getPage() + 1); @@ -166,19 +125,36 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { catch(TwitterException twitterException) { keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); } - catch(Exception e) - { - hadFailure = true; + catch(Exception e) { keepTrying += TwitterErrorHandler.handleTwitterError(client, e); } - finally - { - // Shutdown the twitter to release the resources - client.shutdown(); - } } } - while ((statuses != null) && (statuses.size() > 0) && KeepGoing); + while (shouldContinuePulling(statuses)); + } + + private Map<Long, Long> userPullInfo; + + protected boolean shouldContinuePulling(List<Status> statuses) { + return (statuses != null) && (statuses.size() > 0); + } + + private void sleep() + { + Thread.yield(); + try { + // wait one tenth of a millisecond + Thread.yield(); + Thread.sleep(new Random().nextInt(2)); + Thread.yield(); + } + catch(IllegalArgumentException e) { + // passing in static values, this will never happen + } + catch(InterruptedException e) { + // noOp, there must have been an issue sleeping + } + Thread.yield(); } public StreamsResultSet readCurrent() { @@ -244,21 +220,19 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(config.getOauth().getConsumerKey()); Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); Preconditions.checkNotNull(config.getOauth().getAccessToken()); Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - Preconditions.checkNotNull(config.getFollow()); - Boolean jsonStoreEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true); - Boolean includeEntitiesEnabled = Optional.fromNullable(new Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true); - + idsCount = config.getFollow().size(); ids = config.getFollow().iterator(); + } + protected Twitter getTwitterClient() + { String baseUrl = "https://api.twitter.com:443/1.1/"; ConfigurationBuilder builder = new ConfigurationBuilder() @@ -266,23 +240,18 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) .setOAuthAccessToken(config.getOauth().getAccessToken()) .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) - .setIncludeEntitiesEnabled(includeEntitiesEnabled) - .setJSONStoreEnabled(jsonStoreEnabled) + .setIncludeEntitiesEnabled(true) + .setJSONStoreEnabled(true) .setAsyncNumThreads(3) .setRestBaseURL(baseUrl) .setIncludeMyRetweetEnabled(Boolean.TRUE) - .setIncludeRTsEnabled(Boolean.TRUE) .setPrettyDebugEnabled(Boolean.TRUE); - client = new TwitterFactory(builder.build()).getInstance(); - + return new TwitterFactory(builder.build()).getInstance(); } @Override public void cleanUp() { - - client.shutdown(); - shutdownAndAwaitTermination(executor); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java index 9619f4f..9a1d4e7 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java @@ -74,19 +74,12 @@ public class TwitterTimelineProviderTask implements Runnable { hadFailure = true; keepTrying += TwitterErrorHandler.handleTwitterError(twitter, e); } - finally - { - // Shutdown the twitter to release the resources - twitter.shutdown(); - } } } while ((statuses != null) && (statuses.size() > 0) && KeepGoing); LOGGER.info("Provider Finished. Cleaning up..."); - twitter.shutdown(); - LOGGER.info("Provider Exiting"); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java new file mode 100644 index 0000000..dac5cd6 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -0,0 +1,286 @@ +package org.apache.streams.twitter.provider; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.TwitterUserInformationConfiguration; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; +import twitter4j.*; +import twitter4j.conf.ConfigurationBuilder; +import twitter4j.json.DataObjectFactory; + +import java.io.Serializable; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.*; + +public class TwitterUserInformationProvider implements StreamsProvider, Serializable +{ + + public static final String STREAMS_ID = "TwitterUserInformationProvider"; + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class); + + + private TwitterUserInformationConfiguration twitterUserInformationConfiguration; + + private Class klass; + protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); + + public TwitterUserInformationConfiguration getConfig() { return twitterUserInformationConfiguration; } + + public void setConfig(TwitterUserInformationConfiguration config) { this.twitterUserInformationConfiguration = config; } + + protected Iterator<Long[]> idsBatches; + protected Iterator<String[]> screenNameBatches; + + protected ListeningExecutorService executor; + + protected DateTime start; + protected DateTime end; + + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public TwitterUserInformationProvider() { + Config config = StreamsConfigurator.config.getConfig("twitter"); + this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config); + + } + + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) { + this.twitterUserInformationConfiguration = config; + } + + public TwitterUserInformationProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("twitter"); + this.twitterUserInformationConfiguration = TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config); + this.klass = klass; + } + + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config, Class klass) { + this.twitterUserInformationConfiguration = config; + this.klass = klass; + } + + public Queue<StreamsDatum> getProviderQueue() { + return this.providerQueue; + } + + @Override + public void startStream() { + // no op + } + + + private void loadBatch(Long[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; + + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) + { + try + { + long[] toQuery = new long[ids.length]; + for(int i = 0; i < ids.length; i++) + toQuery[i] = ids[i]; + + for (User tStat : client.lookupUsers(toQuery)) { + String json = DataObjectFactory.getRawJSON(tStat); + providerQueue.offer(new StreamsDatum(json)); + } + keepTrying = 10; + } + catch(TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } + catch(Exception e) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, e); + } + } + } + + private void loadBatch(String[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; + + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) + { + try + { + for (User tStat : client.lookupUsers(ids)) { + String json = DataObjectFactory.getRawJSON(tStat); + providerQueue.offer(new StreamsDatum(json)); + } + keepTrying = 10; + } + catch(TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } + catch(Exception e) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, e); + } + } + } + + public StreamsResultSet readCurrent() { + + Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); + + LOGGER.info("readCurrent"); + + while(idsBatches.hasNext()) + loadBatch(idsBatches.next()); + + while(screenNameBatches.hasNext()) + loadBatch(screenNameBatches.next()); + + + LOGGER.info("Finished. Cleaning up..."); + + LOGGER.info("Providing {} docs", providerQueue.size()); + + StreamsResultSet result = new StreamsResultSet(providerQueue); + + LOGGER.info("Exiting"); + + return result; + + } + + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); + return result; + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public void prepare(Object o) { + + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerKey()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerSecret()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessToken()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessTokenSecret()); + Preconditions.checkNotNull(twitterUserInformationConfiguration.getInfo()); + + List<String> screenNames = new ArrayList<String>(); + List<String[]> screenNameBatches = new ArrayList<String[]>(); + + List<Long> ids = new ArrayList<Long>(); + List<Long[]> idsBatches = new ArrayList<Long[]>(); + + for(String s : twitterUserInformationConfiguration.getInfo()) { + if(s != null) + { + String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase(); + + // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the + // screen name list + try { + ids.add(Long.parseLong(potentialScreenName)); + } catch (Exception e) { + screenNames.add(potentialScreenName); + } + + // Twitter allows for batches up to 100 per request, but you cannot mix types + + if(ids.size() >= 100) { + // add the batch + idsBatches.add(ids.toArray(new Long[ids.size()])); + // reset the Ids + ids = new ArrayList<Long>(); + } + + if(screenNames.size() >= 100) { + // add the batch + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); + // reset the Ids + screenNames = new ArrayList<String>(); + } + } + } + + + if(ids.size() > 0) + idsBatches.add(ids.toArray(new Long[ids.size()])); + + if(screenNames.size() > 0) + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); + + this.idsBatches = idsBatches.iterator(); + this.screenNameBatches = screenNameBatches.iterator(); + } + + protected Twitter getTwitterClient() + { + String baseUrl = "https://api.twitter.com:443/1.1/"; + + ConfigurationBuilder builder = new ConfigurationBuilder() + .setOAuthConsumerKey(twitterUserInformationConfiguration.getOauth().getConsumerKey()) + .setOAuthConsumerSecret(twitterUserInformationConfiguration.getOauth().getConsumerSecret()) + .setOAuthAccessToken(twitterUserInformationConfiguration.getOauth().getAccessToken()) + .setOAuthAccessTokenSecret(twitterUserInformationConfiguration.getOauth().getAccessTokenSecret()) + .setIncludeEntitiesEnabled(true) + .setJSONStoreEnabled(true) + .setAsyncNumThreads(3) + .setRestBaseURL(baseUrl) + .setIncludeMyRetweetEnabled(Boolean.TRUE) + .setPrettyDebugEnabled(Boolean.TRUE); + + return new TwitterFactory(builder.build()).getInstance(); + } + + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json new file mode 100644 index 0000000..9e22b93 --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json @@ -0,0 +1,70 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.twitter.TwitterConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "protocol": { + "type": "string", + "description": "The protocol" + }, + "host": { + "type": "string", + "description": "The host" + }, + "port": { + "type": "integer", + "description": "The port" + }, + "version": { + "type": "string", + "description": "The version" + }, + "endpoint": { + "type": "string", + "description": "The endpoint" + }, + "jsonStoreEnabled": { + "default" : true, + "type": "string" + }, + "oauth": { + "type": "object", + "dynamic": "true", + "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "appName": { + "type": "string" + }, + "consumerKey": { + "type": "string" + }, + "consumerSecret": { + "type": "string" + }, + "accessToken": { + "type": "string" + }, + "accessTokenSecret": { + "type": "string" + } + } + }, + "basicauth": { + "type": "object", + "dynamic": "true", + "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "username": { + "type": "string" + }, + "password": { + "type": "string" + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json index c1a0d0c..2ff7362 100644 --- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json @@ -3,34 +3,12 @@ "$schema": "http://json-schema.org/draft-03/schema", "id": "#", "javaType" : "org.apache.streams.twitter.TwitterStreamConfiguration", + "extends": {"$ref":"TwitterConfiguration.json"}, "javaInterfaces": ["java.io.Serializable"], "properties": { - "protocol": { - "type": "string", - "description": "The protocol" - }, - "host": { - "type": "string", - "description": "The host" - }, - "port": { - "type": "integer", - "description": "The port" - }, - "version": { - "type": "string", - "description": "The version" - }, - "endpoint": { - "type": "string", - "description": "The endpoint" - }, "includeEntities": { "type": "string" }, - "jsonStoreEnabled": { - "type": "string" - }, "truncated": { "type": "boolean" }, @@ -59,43 +37,6 @@ "items": { "type": "string" } - }, - "oauth": { - "type": "object", - "dynamic": "true", - "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "appName": { - "type": "string" - }, - "consumerKey": { - "type": "string" - }, - "consumerSecret": { - "type": "string" - }, - "accessToken": { - "type": "string" - }, - "accessTokenSecret": { - "type": "string" - } - } - }, - "basicauth": { - "type": "object", - "dynamic": "true", - "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration", - "javaInterfaces": ["java.io.Serializable"], - "properties": { - "username": { - "type": "string" - }, - "password": { - "type": "string" - } - } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json new file mode 100644 index 0000000..afd203f --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json @@ -0,0 +1,17 @@ +{ + "type": "object", + "$schema": "http://json-schema.org/draft-03/schema", + "id": "#", + "javaType" : "org.apache.streams.twitter.TwitterUserInformationConfiguration", + "extends": {"$ref":"TwitterConfiguration.json"}, + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "info": { + "type": "array", + "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream", + "items": { + "type": "string" + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java index b8bfe1a..31ddfce 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java @@ -6,6 +6,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.streams.core.StreamsDatum; import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.processor.TwitterTypeConverter; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; @@ -21,6 +23,8 @@ import java.io.InputStream; import java.io.InputStreamReader; import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /**