Repository: incubator-streams
Updated Branches:
  refs/heads/master e6ffe29e8 -> ae27541e0


Added user information and made some other modifications to increase the 
readability.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/741a4544
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/741a4544
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/741a4544

Branch: refs/heads/master
Commit: 741a45445ca2e6ad49f18c5ddd151c04de58fb6f
Parents: e6ffe29
Author: Matthew Hager <matthew.ha...@gmail.com>
Authored: Fri May 2 12:47:35 2014 -0500
Committer: Matthew Hager <matthew.ha...@gmail.com>
Committed: Fri May 2 12:47:35 2014 -0500

----------------------------------------------------------------------
 .../streams-provider-twitter/pom.xml            |  10 +-
 .../provider/TwitterStreamConfigurator.java     |  62 +++-
 .../twitter/provider/TwitterStreamProvider.java |   1 -
 .../provider/TwitterTimelineProvider.java       | 119 +++-----
 .../provider/TwitterTimelineProviderTask.java   |   7 -
 .../TwitterUserInformationProvider.java         | 286 +++++++++++++++++++
 .../com/twitter/TwitterConfiguration.json       |  70 +++++
 .../com/twitter/TwitterStreamConfiguration.json |  61 +---
 .../TwitterUserInformationConfiguration.json    |  17 ++
 .../streams/twitter/test/SimpleTweetTest.java   |   4 +
 10 files changed, 472 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml 
b/streams-contrib/streams-provider-twitter/pom.xml
index 3c27b8c..8a41ca5 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -49,11 +49,6 @@
             <artifactId>streams-config</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.streams</groupId>
-            <artifactId>streams-util</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
@@ -73,9 +68,8 @@
         <dependency>
             <groupId>org.twitter4j</groupId>
             <artifactId>twitter4j-core</artifactId>
-            <version>3.0.5</version>
+            <version>[4.0,)</version>
         </dependency>
-
     </dependencies>
 
     <build>
@@ -118,7 +112,9 @@
                     <addCompileSourceRoot>true</addCompileSourceRoot>
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
+                        
<sourcePath>src/main/jsonschema/com/twitter/TwitterConfiguration.json</sourcePath>
                         
<sourcePath>src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json</sourcePath>
+                        
<sourcePath>src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json</sourcePath>
                         
<sourcePath>src/main/jsonschema/com/twitter/Delete.json</sourcePath>
                         
<sourcePath>src/main/jsonschema/com/twitter/Retweet.json</sourcePath>
                         
<sourcePath>src/main/jsonschema/com/twitter/tweet.json</sourcePath>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
index 9bf2d9a..5435f24 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamConfigurator.java
@@ -1,15 +1,15 @@
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigException;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.twitter.TwitterBasicAuthConfiguration;
-import org.apache.streams.twitter.TwitterOAuthConfiguration;
-import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -18,19 +18,18 @@ import java.util.List;
 public class TwitterStreamConfigurator {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterStreamConfigurator.class);
+    private final static ObjectMapper mapper = new ObjectMapper();
 
-    public static TwitterStreamConfiguration detectConfiguration(Config 
twitter) {
 
-        TwitterStreamConfiguration twitterStreamConfiguration = new 
TwitterStreamConfiguration();
-
-        twitterStreamConfiguration.setEndpoint(twitter.getString("endpoint"));
+    public static TwitterConfiguration detectTwitterConfiguration(Config 
config) {
+        TwitterConfiguration twitterConfiguration = new TwitterConfiguration();
 
         try {
             Config basicauth = 
StreamsConfigurator.config.getConfig("twitter.basicauth");
             TwitterBasicAuthConfiguration twitterBasicAuthConfiguration = new 
TwitterBasicAuthConfiguration();
             
twitterBasicAuthConfiguration.setUsername(basicauth.getString("username"));
             
twitterBasicAuthConfiguration.setPassword(basicauth.getString("password"));
-            
twitterStreamConfiguration.setBasicauth(twitterBasicAuthConfiguration);
+            twitterConfiguration.setBasicauth(twitterBasicAuthConfiguration);
         } catch( ConfigException ce ) {}
 
         try {
@@ -40,27 +39,60 @@ public class TwitterStreamConfigurator {
             
twitterOAuthConfiguration.setConsumerSecret(oauth.getString("consumerSecret"));
             
twitterOAuthConfiguration.setAccessToken(oauth.getString("accessToken"));
             
twitterOAuthConfiguration.setAccessTokenSecret(oauth.getString("accessTokenSecret"));
-            twitterStreamConfiguration.setOauth(twitterOAuthConfiguration);
+            twitterConfiguration.setOauth(twitterOAuthConfiguration);
         } catch( ConfigException ce ) {}
 
+        twitterConfiguration.setEndpoint(config.getString("endpoint"));
+
+        return twitterConfiguration;
+    }
+
+    public static TwitterStreamConfiguration detectConfiguration(Config 
config) {
+
+        TwitterStreamConfiguration twitterStreamConfiguration = 
mapper.convertValue(detectTwitterConfiguration(config), 
TwitterStreamConfiguration.class);
+
         try {
-            
twitterStreamConfiguration.setTrack(twitter.getStringList("track"));
+            twitterStreamConfiguration.setTrack(config.getStringList("track"));
         } catch( ConfigException ce ) {}
 
         try {
+            // create the array
             List<Long> follows = Lists.newArrayList();
-            for( Integer id : twitter.getIntList("follow"))
-                follows.add(new Long(id));
+            // add the ids of the people we want to 'follow'
+            for(Integer id : config.getIntList("follow"))
+                follows.add((long)id);
+            // set the array
             twitterStreamConfiguration.setFollow(follows);
+
         } catch( ConfigException ce ) {}
 
-        
twitterStreamConfiguration.setFilterLevel(twitter.getString("filter-level"));
-        twitterStreamConfiguration.setWith(twitter.getString("with"));
-        twitterStreamConfiguration.setReplies(twitter.getString("replies"));
+        
twitterStreamConfiguration.setFilterLevel(config.getString("filter-level"));
+        twitterStreamConfiguration.setWith(config.getString("with"));
+        twitterStreamConfiguration.setReplies(config.getString("replies"));
         twitterStreamConfiguration.setJsonStoreEnabled("true");
         twitterStreamConfiguration.setIncludeEntities("true");
 
         return twitterStreamConfiguration;
     }
 
+    public static TwitterUserInformationConfiguration 
detectTwitterUserInformationConfiguration(Config config) {
+
+        TwitterUserInformationConfiguration 
twitterUserInformationConfiguration = 
mapper.convertValue(detectTwitterConfiguration(config), 
TwitterUserInformationConfiguration.class);
+
+        try {
+            if(config.hasPath("info"))
+            {
+                List<String> info = new ArrayList<String>();
+
+                for (String s : config.getStringList("info"))
+                    info.add(s);
+            }
+        }
+        catch(Exception e) {
+            LOGGER.error("There was an error: {}", e.getMessage());
+        }
+
+        return twitterUserInformationConfiguration;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index 3df7d02..b1785e5 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -1,6 +1,5 @@
 package org.apache.streams.twitter.provider;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index b9551ad..2c39cf9 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -1,11 +1,7 @@
 package org.apache.streams.twitter.provider;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
+import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -49,17 +45,11 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
         this.config = config;
     }
 
-    protected volatile Queue<StreamsDatum> providerQueue = new 
LinkedBlockingQueue<StreamsDatum>();
+    protected final Queue<StreamsDatum> providerQueue = 
Queues.synchronizedQueue(new ArrayBlockingQueue<StreamsDatum>(500));
 
-    protected Twitter client;
+    protected int idsCount;
     protected Iterator<Long> ids;
 
-    ListenableFuture providerTaskComplete;
-//
-//    public BlockingQueue<Object> getInQueue() {
-//        return inQueue;
-//    }
-
     protected ListeningExecutorService executor;
 
     protected DateTime start;
@@ -74,6 +64,7 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
     public TwitterTimelineProvider() {
         Config config = StreamsConfigurator.config.getConfig("twitter");
         this.config = TwitterStreamConfigurator.detectConfiguration(config);
+
     }
 
     public TwitterTimelineProvider(TwitterStreamConfiguration config) {
@@ -95,43 +86,19 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
         return this.providerQueue;
     }
 
-//    public void run() {
-//
-//        LOGGER.info("{} Running", STREAMS_ID);
-//
-//        while( ids.hasNext() ) {
-//            Long currentId = ids.next();
-//            LOGGER.info("Provider Task Starting: {}", currentId);
-//            captureTimeline(currentId);
-//        }
-//
-//        LOGGER.info("{} Finished.  Cleaning up...", STREAMS_ID);
-//
-//        client.shutdown();
-//
-//        LOGGER.info("{} Exiting", STREAMS_ID);
-//
-//        while(!providerTaskComplete.isDone() && 
!providerTaskComplete.isCancelled() ) {
-//            try {
-//                Thread.sleep(100);
-//            } catch (InterruptedException e) {}
-//        }
-//    }
-
     @Override
     public void startStream() {
         // no op
     }
 
-    private void captureTimeline(long currentId) {
+    protected void captureTimeline(long currentId) {
 
         Paging paging = new Paging(1, 200);
         List<Status> statuses = null;
-        boolean KeepGoing = true;
-        boolean hadFailure = false;
 
         do
         {
+            Twitter client = getTwitterClient();
             int keepTrying = 0;
 
             // keep trying to load, give it 5 attempts.
@@ -143,20 +110,12 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
                 {
                     statuses = client.getUserTimeline(currentId, paging);
 
-                    for (Status tStat : statuses)
-                    {
-//                        if( provider.start != null &&
-//                                provider.start.isAfter(new 
DateTime(tStat.getCreatedAt())))
-//                        {
-//                            // they hit the last date we wanted to collect
-//                            // we can now exit early
-//                            KeepGoing = false;
-//                        }
-                        // emit the record
-                        String json = DataObjectFactory.getRawJSON(tStat);
-
-                        providerQueue.offer(new StreamsDatum(json));
+                    for (Status tStat : statuses) {
+                        String json = TwitterObjectFactory.getRawJSON(tStat);
 
+                        while(!providerQueue.offer(new StreamsDatum(json))) {
+                            sleep();
+                        }
                     }
 
                     paging.setPage(paging.getPage() + 1);
@@ -166,19 +125,36 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
                 catch(TwitterException twitterException) {
                     keepTrying += 
TwitterErrorHandler.handleTwitterError(client, twitterException);
                 }
-                catch(Exception e)
-                {
-                    hadFailure = true;
+                catch(Exception e) {
                     keepTrying += 
TwitterErrorHandler.handleTwitterError(client, e);
                 }
-                finally
-                {
-                    // Shutdown the twitter to release the resources
-                    client.shutdown();
-                }
             }
         }
-        while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
+        while (shouldContinuePulling(statuses));
+    }
+
+    private Map<Long, Long> userPullInfo;
+
+    protected boolean shouldContinuePulling(List<Status> statuses) {
+        return (statuses != null) && (statuses.size() > 0);
+    }
+
+    private void sleep()
+    {
+        Thread.yield();
+        try {
+            // wait one tenth of a millisecond
+            Thread.yield();
+            Thread.sleep(new Random().nextInt(2));
+            Thread.yield();
+        }
+        catch(IllegalArgumentException e) {
+            // passing in static values, this will never happen
+        }
+        catch(InterruptedException e) {
+            // noOp, there must have been an issue sleeping
+        }
+        Thread.yield();
     }
 
     public StreamsResultSet readCurrent() {
@@ -244,21 +220,19 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
         executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
 
         Preconditions.checkNotNull(providerQueue);
-
         Preconditions.checkNotNull(this.klass);
-
         Preconditions.checkNotNull(config.getOauth().getConsumerKey());
         Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
         Preconditions.checkNotNull(config.getOauth().getAccessToken());
         Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-
         Preconditions.checkNotNull(config.getFollow());
 
-        Boolean jsonStoreEnabled = Optional.fromNullable(new 
Boolean(Boolean.parseBoolean(config.getJsonStoreEnabled()))).or(true);
-        Boolean includeEntitiesEnabled = Optional.fromNullable(new 
Boolean(Boolean.parseBoolean(config.getIncludeEntities()))).or(true);
-
+        idsCount = config.getFollow().size();
         ids = config.getFollow().iterator();
+    }
 
+    protected Twitter getTwitterClient()
+    {
         String baseUrl = "https://api.twitter.com:443/1.1/";;
 
         ConfigurationBuilder builder = new ConfigurationBuilder()
@@ -266,23 +240,18 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
                 .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
                 .setOAuthAccessToken(config.getOauth().getAccessToken())
                 
.setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-                .setIncludeEntitiesEnabled(includeEntitiesEnabled)
-                .setJSONStoreEnabled(jsonStoreEnabled)
+                .setIncludeEntitiesEnabled(true)
+                .setJSONStoreEnabled(true)
                 .setAsyncNumThreads(3)
                 .setRestBaseURL(baseUrl)
                 .setIncludeMyRetweetEnabled(Boolean.TRUE)
-                .setIncludeRTsEnabled(Boolean.TRUE)
                 .setPrettyDebugEnabled(Boolean.TRUE);
 
-        client = new TwitterFactory(builder.build()).getInstance();
-
+        return new TwitterFactory(builder.build()).getInstance();
     }
 
     @Override
     public void cleanUp() {
-
-        client.shutdown();
-
         shutdownAndAwaitTermination(executor);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 9619f4f..9a1d4e7 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -74,19 +74,12 @@ public class TwitterTimelineProviderTask implements 
Runnable {
                     hadFailure = true;
                     keepTrying += 
TwitterErrorHandler.handleTwitterError(twitter, e);
                 }
-                finally
-                {
-                    // Shutdown the twitter to release the resources
-                    twitter.shutdown();
-                }
             }
         }
         while ((statuses != null) && (statuses.size() > 0) && KeepGoing);
 
         LOGGER.info("Provider Finished.  Cleaning up...");
 
-        twitter.shutdown();
-
         LOGGER.info("Provider Exiting");
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
new file mode 100644
index 0000000..dac5cd6
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -0,0 +1,286 @@
+package org.apache.streams.twitter.provider;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+import twitter4j.*;
+import twitter4j.conf.ConfigurationBuilder;
+import twitter4j.json.DataObjectFactory;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.*;
+
+public class TwitterUserInformationProvider implements StreamsProvider, 
Serializable
+{
+
+    public static final String STREAMS_ID = "TwitterUserInformationProvider";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterUserInformationProvider.class);
+
+
+    private TwitterUserInformationConfiguration 
twitterUserInformationConfiguration;
+
+    private Class klass;
+    protected volatile Queue<StreamsDatum> providerQueue = new 
LinkedBlockingQueue<StreamsDatum>();
+
+    public TwitterUserInformationConfiguration getConfig()              { 
return twitterUserInformationConfiguration; }
+
+    public void setConfig(TwitterUserInformationConfiguration config)   { 
this.twitterUserInformationConfiguration = config; }
+
+    protected Iterator<Long[]> idsBatches;
+    protected Iterator<String[]> screenNameBatches;
+
+    protected ListeningExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int 
nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public TwitterUserInformationProvider() {
+        Config config = StreamsConfigurator.config.getConfig("twitter");
+        this.twitterUserInformationConfiguration = 
TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config);
+
+    }
+
+    public TwitterUserInformationProvider(TwitterUserInformationConfiguration 
config) {
+        this.twitterUserInformationConfiguration = config;
+    }
+
+    public TwitterUserInformationProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("twitter");
+        this.twitterUserInformationConfiguration = 
TwitterStreamConfigurator.detectTwitterUserInformationConfiguration(config);
+        this.klass = klass;
+    }
+
+    public TwitterUserInformationProvider(TwitterUserInformationConfiguration 
config, Class klass) {
+        this.twitterUserInformationConfiguration = config;
+        this.klass = klass;
+    }
+
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
+    public void startStream() {
+        // no op
+    }
+
+
+    private void loadBatch(Long[] ids) {
+        Twitter client = getTwitterClient();
+        int keepTrying = 0;
+
+        // keep trying to load, give it 5 attempts.
+        //while (keepTrying < 10)
+        while (keepTrying < 1)
+        {
+            try
+            {
+                long[] toQuery = new long[ids.length];
+                for(int i = 0; i < ids.length; i++)
+                    toQuery[i] = ids[i];
+
+                for (User tStat : client.lookupUsers(toQuery)) {
+                    String json = DataObjectFactory.getRawJSON(tStat);
+                    providerQueue.offer(new StreamsDatum(json));
+                }
+                keepTrying = 10;
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, 
twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, 
e);
+            }
+        }
+    }
+
+    private void loadBatch(String[] ids) {
+        Twitter client = getTwitterClient();
+        int keepTrying = 0;
+
+        // keep trying to load, give it 5 attempts.
+        //while (keepTrying < 10)
+        while (keepTrying < 1)
+        {
+            try
+            {
+                for (User tStat : client.lookupUsers(ids)) {
+                    String json = DataObjectFactory.getRawJSON(tStat);
+                    providerQueue.offer(new StreamsDatum(json));
+                }
+                keepTrying = 10;
+            }
+            catch(TwitterException twitterException) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, 
twitterException);
+            }
+            catch(Exception e) {
+                keepTrying += TwitterErrorHandler.handleTwitterError(client, 
e);
+            }
+        }
+    }
+
+    public StreamsResultSet readCurrent() {
+
+        Preconditions.checkArgument(idsBatches.hasNext() || 
screenNameBatches.hasNext());
+
+        LOGGER.info("readCurrent");
+
+        while(idsBatches.hasNext())
+            loadBatch(idsBatches.next());
+
+        while(screenNameBatches.hasNext())
+            loadBatch(screenNameBatches.next());
+
+
+        LOGGER.info("Finished.  Cleaning up...");
+
+        LOGGER.info("Providing {} docs", providerQueue.size());
+
+        StreamsResultSet result =  new StreamsResultSet(providerQueue);
+
+        LOGGER.info("Exiting");
+
+        return result;
+
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
+        throw new NotImplementedException();
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
+        this.start = start;
+        this.end = end;
+        readCurrent();
+        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+        return result;
+    }
+
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void prepare(Object o) {
+
+        executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+        Preconditions.checkNotNull(providerQueue);
+        Preconditions.checkNotNull(this.klass);
+        
Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerKey());
+        
Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getConsumerSecret());
+        
Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessToken());
+        
Preconditions.checkNotNull(twitterUserInformationConfiguration.getOauth().getAccessTokenSecret());
+        
Preconditions.checkNotNull(twitterUserInformationConfiguration.getInfo());
+
+        List<String> screenNames = new ArrayList<String>();
+        List<String[]> screenNameBatches = new ArrayList<String[]>();
+
+        List<Long> ids = new ArrayList<Long>();
+        List<Long[]> idsBatches = new ArrayList<Long[]>();
+
+        for(String s : twitterUserInformationConfiguration.getInfo()) {
+            if(s != null)
+            {
+                String potentialScreenName = s.replaceAll("@", 
"").trim().toLowerCase();
+
+                // See if it is a long, if it is, add it to the user iD list, 
if it is not, add it to the
+                // screen name list
+                try {
+                    ids.add(Long.parseLong(potentialScreenName));
+                } catch (Exception e) {
+                    screenNames.add(potentialScreenName);
+                }
+
+                // Twitter allows for batches up to 100 per request, but you 
cannot mix types
+
+                if(ids.size() >= 100) {
+                    // add the batch
+                    idsBatches.add(ids.toArray(new Long[ids.size()]));
+                    // reset the Ids
+                    ids = new ArrayList<Long>();
+                }
+
+                if(screenNames.size() >= 100) {
+                    // add the batch
+                    screenNameBatches.add(screenNames.toArray(new 
String[ids.size()]));
+                    // reset the Ids
+                    screenNames = new ArrayList<String>();
+                }
+            }
+        }
+
+
+        if(ids.size() > 0)
+            idsBatches.add(ids.toArray(new Long[ids.size()]));
+
+        if(screenNames.size() > 0)
+            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+
+        this.idsBatches = idsBatches.iterator();
+        this.screenNameBatches = screenNameBatches.iterator();
+    }
+
+    protected Twitter getTwitterClient()
+    {
+        String baseUrl = "https://api.twitter.com:443/1.1/";;
+
+        ConfigurationBuilder builder = new ConfigurationBuilder()
+                
.setOAuthConsumerKey(twitterUserInformationConfiguration.getOauth().getConsumerKey())
+                
.setOAuthConsumerSecret(twitterUserInformationConfiguration.getOauth().getConsumerSecret())
+                
.setOAuthAccessToken(twitterUserInformationConfiguration.getOauth().getAccessToken())
+                
.setOAuthAccessTokenSecret(twitterUserInformationConfiguration.getOauth().getAccessTokenSecret())
+                .setIncludeEntitiesEnabled(true)
+                .setJSONStoreEnabled(true)
+                .setAsyncNumThreads(3)
+                .setRestBaseURL(baseUrl)
+                .setIncludeMyRetweetEnabled(Boolean.TRUE)
+                .setPrettyDebugEnabled(Boolean.TRUE);
+
+        return new TwitterFactory(builder.build()).getInstance();
+    }
+
+    @Override
+    public void cleanUp() {
+        shutdownAndAwaitTermination(executor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
new file mode 100644
index 0000000..9e22b93
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterConfiguration.json
@@ -0,0 +1,70 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema";,
+    "id": "#",
+    "javaType" : "org.apache.streams.twitter.TwitterConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "protocol": {
+            "type": "string",
+            "description": "The protocol"
+        },
+        "host": {
+            "type": "string",
+            "description": "The host"
+        },
+        "port": {
+            "type": "integer",
+            "description": "The port"
+        },
+        "version": {
+            "type": "string",
+            "description": "The version"
+        },
+        "endpoint": {
+            "type": "string",
+            "description": "The endpoint"
+        },
+        "jsonStoreEnabled": {
+            "default" : true,
+            "type": "string"
+        },
+        "oauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : 
"org.apache.streams.twitter.TwitterOAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "properties": {
+                "appName": {
+                    "type": "string"
+                },
+                "consumerKey": {
+                    "type": "string"
+                },
+                "consumerSecret": {
+                    "type": "string"
+                },
+                "accessToken": {
+                    "type": "string"
+                },
+                "accessTokenSecret": {
+                    "type": "string"
+                }
+            }
+        },
+        "basicauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : 
"org.apache.streams.twitter.TwitterBasicAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "properties": {
+                "username": {
+                    "type": "string"
+                },
+                "password": {
+                    "type": "string"
+                }
+            }
+        }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
index c1a0d0c..2ff7362 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
+++ 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterStreamConfiguration.json
@@ -3,34 +3,12 @@
     "$schema": "http://json-schema.org/draft-03/schema";,
     "id": "#",
     "javaType" : "org.apache.streams.twitter.TwitterStreamConfiguration",
+    "extends": {"$ref":"TwitterConfiguration.json"},
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
-        "protocol": {
-            "type": "string",
-            "description": "The protocol"
-        },
-        "host": {
-            "type": "string",
-            "description": "The host"
-        },
-        "port": {
-            "type": "integer",
-            "description": "The port"
-        },
-        "version": {
-            "type": "string",
-            "description": "The version"
-        },
-        "endpoint": {
-            "type": "string",
-            "description": "The endpoint"
-        },
         "includeEntities": {
             "type": "string"
         },
-        "jsonStoreEnabled": {
-            "type": "string"
-        },
         "truncated": {
             "type": "boolean"
         },
@@ -59,43 +37,6 @@
             "items": {
                 "type": "string"
             }
-        },
-        "oauth": {
-            "type": "object",
-            "dynamic": "true",
-            "javaType" : 
"org.apache.streams.twitter.TwitterOAuthConfiguration",
-            "javaInterfaces": ["java.io.Serializable"],
-            "properties": {
-                "appName": {
-                    "type": "string"
-                },
-                "consumerKey": {
-                    "type": "string"
-                },
-                "consumerSecret": {
-                    "type": "string"
-                },
-                "accessToken": {
-                    "type": "string"
-                },
-                "accessTokenSecret": {
-                    "type": "string"
-                }
-            }
-        },
-        "basicauth": {
-            "type": "object",
-            "dynamic": "true",
-            "javaType" : 
"org.apache.streams.twitter.TwitterBasicAuthConfiguration",
-            "javaInterfaces": ["java.io.Serializable"],
-            "properties": {
-                "username": {
-                    "type": "string"
-                },
-                "password": {
-                    "type": "string"
-                }
-            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
new file mode 100644
index 0000000..afd203f
--- /dev/null
+++ 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterUserInformationConfiguration.json
@@ -0,0 +1,17 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema";,
+    "id": "#",
+    "javaType" : 
"org.apache.streams.twitter.TwitterUserInformationConfiguration",
+    "extends": {"$ref":"TwitterConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "info": {
+            "type": "array",
+            "description": "A list of user IDs, indicating the users whose 
Tweets should be delivered on the stream",
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/741a4544/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
index b8bfe1a..31ddfce 100644
--- 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
+++ 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java
@@ -6,6 +6,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.exceptions.ActivitySerializerException;
 import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.twitter.pojo.Delete;
+import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.processor.TwitterTypeConverter;
 import org.apache.streams.twitter.serializer.StreamsTwitterMapper;
@@ -21,6 +23,8 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 
 import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**

Reply via email to