more main methods: STREAMS-411, better thread tracking: STREAMS-425, misc 
cleanup

more main methods: STREAMS-411
better thread tracking: STREAMS-425
misc cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/170cb8b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/170cb8b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/170cb8b6

Branch: refs/heads/master
Commit: 170cb8b6b9d647dc2b7ff82b87edf060f078585c
Parents: f1540b1
Author: Steve Blackmon @steveblackmon <[email protected]>
Authored: Thu Oct 6 14:01:04 2016 -0500
Committer: Steve Blackmon @steveblackmon <[email protected]>
Committed: Thu Oct 6 14:01:04 2016 -0500

----------------------------------------------------------------------
 .../provider/TwitterFollowingProvider.java      | 120 +++++++---
 .../twitter/provider/TwitterStreamProvider.java |  55 +++++
 .../provider/TwitterTimelineProvider.java       | 191 ++++++++--------
 .../TwitterUserInformationProvider.java         | 227 ++++++++++++-------
 .../twitter/TwitterFollowingConfiguration.json  |   2 +-
 5 files changed, 386 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 4c3a828..66c1104 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -18,22 +18,43 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
 import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import twitter4j.Twitter;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -49,6 +70,49 @@ public class TwitterFollowingProvider extends 
TwitterUserInformationProvider {
 
     private TwitterFollowingConfiguration config;
 
+    List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+        TwitterFollowingConfiguration config = new 
ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(typesafe,
 "twitter");
+        TwitterFollowingProvider provider = new 
TwitterFollowingProvider(config);
+
+        ObjectMapper mapper = new 
StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = mapper.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
+
     public TwitterFollowingConfiguration getConfig()              { return 
config; }
 
     public static final int MAX_NUMBER_WAITING = 10000;
@@ -63,14 +127,24 @@ public class TwitterFollowingProvider extends 
TwitterUserInformationProvider {
     }
 
     @Override
+    public void prepare(Object o) {
+        super.prepare(config);
+        Preconditions.checkNotNull(getConfig().getEndpoint());
+        
Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || 
getConfig().getEndpoint().equals("followers"));
+        return;
+    }
+
+    @Override
     public void startStream() {
 
-        running.set(true);
+        Preconditions.checkNotNull(executor);
 
         Preconditions.checkArgument(idsBatches.hasNext() || 
screenNameBatches.hasNext());
 
         LOGGER.info("startStream");
 
+        running.set(true);
+
         while (idsBatches.hasNext()) {
             submitFollowingThreads(idsBatches.next());
         }
@@ -78,8 +152,6 @@ public class TwitterFollowingProvider extends 
TwitterUserInformationProvider {
             submitFollowingThreads(screenNameBatches.next());
         }
 
-        running.set(true);
-
         executor.shutdown();
 
     }
@@ -89,7 +161,9 @@ public class TwitterFollowingProvider extends 
TwitterUserInformationProvider {
 
         for (int i = 0; i < ids.length; i++) {
             TwitterFollowingProviderTask providerTask = new 
TwitterFollowingProviderTask(this, client, ids[i]);
-            executor.submit(providerTask);
+            ListenableFuture future = executor.submit(providerTask);
+            futures.add(future);
+            LOGGER.info("submitted {}", ids[i]);
         }
     }
 
@@ -98,7 +172,9 @@ public class TwitterFollowingProvider extends 
TwitterUserInformationProvider {
 
         for (int i = 0; i < screenNames.length; i++) {
             TwitterFollowingProviderTask providerTask = new 
TwitterFollowingProviderTask(this, client, screenNames[i]);
-            executor.submit(providerTask);
+            ListenableFuture future = executor.submit(providerTask);
+            futures.add(future);
+            LOGGER.info("submitted {}", screenNames[i]);
         }
 
     }
@@ -120,41 +196,17 @@ public class TwitterFollowingProvider extends 
TwitterUserInformationProvider {
             lock.writeLock().unlock();
         }
 
-        if (providerQueue.isEmpty() && executor.isTerminated()) {
-            LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
-
-            running.set(false);
-
-            LOGGER.info("Exiting");
-        }
-
         return result;
 
     }
 
-    protected Queue<StreamsDatum> constructQueue() {
-        return new ConcurrentLinkedQueue<StreamsDatum>();
-    }
-
-    @Override
-    public void prepare(Object o) {
-        super.prepare(config);
-        Preconditions.checkNotNull(getConfig().getEndpoint());
-        
Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || 
getConfig().getEndpoint().equals("followers"));
-        return;
-    }
-
-    public void addDatum(StreamsDatum datum) {
-        try {
-            lock.readLock().lock();
-            ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        } finally {
-            lock.readLock().unlock();
-        }
-    }
-
     @Override
     public boolean isRunning() {
+        if (providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
+            LOGGER.info("Completed");
+            running.set(false);
+            LOGGER.info("Exiting");
+        }
         return running.get();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
index f584950..b414074 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java
@@ -18,9 +18,12 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.core.Constants;
 import com.twitter.hbc.core.Hosts;
@@ -35,7 +38,11 @@ import com.twitter.hbc.httpclient.auth.Authentication;
 import com.twitter.hbc.httpclient.auth.BasicAuth;
 import com.twitter.hbc.httpclient.auth.OAuth1;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.DatumStatusCountable;
@@ -43,14 +50,21 @@ import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
 import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -72,6 +86,47 @@ public class TwitterStreamProvider implements 
StreamsProvider, Serializable, Dat
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterStreamProvider.class);
 
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+        TwitterStreamConfiguration config = new 
ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe,
 "twitter");
+        TwitterStreamProvider provider = new TwitterStreamProvider(config);
+
+        ObjectMapper mapper = new 
StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = mapper.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
+
     public static final int MAX_BATCH = 1000;
 
     private TwitterStreamConfiguration config;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 2924623..cea9829 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -22,6 +22,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -68,6 +72,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
 /**
  *  Retrieve recent posts from a list of user ids or names.
  *
@@ -91,7 +97,39 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
 
     private final static Logger LOGGER = 
LoggerFactory.getLogger(TwitterTimelineProvider.class);
 
-    private static ObjectMapper MAPPER = new 
StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+    public static final int MAX_NUMBER_WAITING = 10000;
+
+    private TwitterUserInformationConfiguration config;
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public TwitterUserInformationConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(TwitterUserInformationConfiguration config) {
+        this.config = config;
+    }
+
+    protected Collection<String[]> screenNameBatches;
+    protected Collection<Long> ids;
+
+    protected volatile Queue<StreamsDatum> providerQueue;
+
+    protected int idsCount;
+    protected Twitter client;
+
+    protected ListeningExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    protected final AtomicBoolean running = new AtomicBoolean();
+
+    List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+    Boolean jsonStoreEnabled;
+    Boolean includeEntitiesEnabled;
 
     public static void main(String[] args) throws Exception {
 
@@ -111,6 +149,8 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
         TwitterUserInformationConfiguration config = new 
ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe,
 "twitter");
         TwitterTimelineProvider provider = new TwitterTimelineProvider(config);
 
+        ObjectMapper mapper = new 
StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT));
+
         PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
         provider.prepare(config);
         provider.startStream();
@@ -121,7 +161,7 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
                 StreamsDatum datum = iterator.next();
                 String json;
                 try {
-                    json = MAPPER.writeValueAsString(datum.getDocument());
+                    json = mapper.writeValueAsString(datum.getDocument());
                     outStream.println(json);
                 } catch (JsonProcessingException e) {
                     System.err.println(e.getMessage());
@@ -132,42 +172,6 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
         outStream.flush();
     }
 
-    public static final int MAX_NUMBER_WAITING = 10000;
-
-    private TwitterUserInformationConfiguration config;
-
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    public TwitterUserInformationConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(TwitterUserInformationConfiguration config) {
-        this.config = config;
-    }
-
-    protected Collection<String[]> screenNameBatches;
-    protected Collection<Long> ids;
-
-    protected volatile Queue<StreamsDatum> providerQueue;
-
-    protected int idsCount;
-    protected Twitter client;
-
-    protected ExecutorService executor;
-
-    protected DateTime start;
-    protected DateTime end;
-
-    protected final AtomicBoolean running = new AtomicBoolean();
-
-    Boolean jsonStoreEnabled;
-    Boolean includeEntitiesEnabled;
-
-    private static ExecutorService getExecutor() {
-        return Executors.newSingleThreadExecutor();
-    }
-
     public TwitterTimelineProvider(TwitterUserInformationConfiguration config) 
{
         this.config = config;
     }
@@ -182,17 +186,43 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
     }
 
     @Override
+    public void prepare(Object o) {
+
+
+
+        try {
+            lock.writeLock().lock();
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        Preconditions.checkNotNull(providerQueue);
+        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+        Preconditions.checkNotNull(config.getOauth().getAccessToken());
+        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+        Preconditions.checkNotNull(config.getInfo());
+
+        consolidateToIDs();
+
+        if(ids.size() > 1)
+            executor = 
MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5,
 ids.size()));
+        else
+            executor = 
MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+    }
+
+    @Override
     public void startStream() {
+
         LOGGER.debug("{} startStream", STREAMS_ID);
 
         Preconditions.checkArgument(!ids.isEmpty());
 
-        LOGGER.debug("{} - readCurrent", ids);
+        running.set(true);
 
         submitTimelineThreads(ids.toArray(new Long[0]));
 
-        running.set(true);
-
         executor.shutdown();
 
     }
@@ -202,13 +232,15 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
     }
 
     protected void submitTimelineThreads(Long[] ids) {
+
         Twitter client = getTwitterClient();
 
         for(int i = 0; i < ids.length; i++) {
 
             TwitterTimelineProviderTask providerTask = new 
TwitterTimelineProviderTask(this, client, ids[i]);
-            executor.submit(providerTask);
-
+            ListenableFuture future = executor.submit(providerTask);
+            futures.add(future);
+            LOGGER.info("submitted {}", ids[i]);
         }
 
     }
@@ -242,7 +274,7 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
             lock.writeLock().unlock();
         }
 
-        if( providerQueue.isEmpty() && executor.isTerminated()) {
+        if( result.size() == 0 && providerQueue.isEmpty() && 
executor.isTerminated() ) {
             LOGGER.info("Finished.  Cleaning up...");
 
             running.set(false);
@@ -268,50 +300,7 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
         throw new NotImplementedException();
     }
 
-    @Override
-    public boolean isRunning() {
-        return running.get();
-    }
-
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
 
-        executor = getExecutor();
-
-        try {
-            lock.writeLock().lock();
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-        Preconditions.checkNotNull(config.getOauth().getAccessToken());
-        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-        Preconditions.checkNotNull(config.getInfo());
-
-        consolidateToIDs();
-    }
 
     /**
      * Using the "info" list that is contained in the configuration, ensure 
that all
@@ -375,13 +364,31 @@ public class TwitterTimelineProvider implements 
StreamsProvider, Serializable {
         shutdownAndAwaitTermination(executor);
     }
 
-    public void addDatum(StreamsDatum datum) {
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
         try {
-            lock.readLock().lock();
-            ComponentUtils.offerUntilSuccess(datum, providerQueue);
-        } finally {
-            lock.readLock().unlock();
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
         }
     }
 
+    @Override
+    public boolean isRunning() {
+        if (providerQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
+            LOGGER.info("Completed");
+            running.set(false);
+            LOGGER.info("Exiting");
+        }
+        return running.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 44f8a24..d6e783b 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -18,13 +18,20 @@
 
 package org.apache.streams.twitter.provider;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
@@ -45,6 +52,10 @@ import twitter4j.TwitterFactory;
 import twitter4j.conf.ConfigurationBuilder;
 import twitter4j.json.DataObjectFactory;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -75,6 +86,45 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
 
     private TwitterUserInformationConfiguration config;
 
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+        TwitterUserInformationConfiguration config = new 
ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe,
 "twitter");
+        TwitterUserInformationProvider provider = new 
TwitterUserInformationProvider(config);
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = MAPPER.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
+
     protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     protected volatile Queue<StreamsDatum> providerQueue;
@@ -93,7 +143,7 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
 
     protected final AtomicBoolean running = new AtomicBoolean();
 
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int 
nThreads, int queueSize) {
+    public static ExecutorService newFixedThreadPoolWithQueueSize(int 
nThreads, int queueSize) {
         return new ThreadPoolExecutor(nThreads, nThreads,
                 5000L, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
@@ -117,8 +167,88 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
     }
 
     @Override
+    public void prepare(Object o) {
+
+        if( o instanceof TwitterFollowingConfiguration )
+            config = (TwitterUserInformationConfiguration) o;
+
+        Preconditions.checkNotNull(config);
+        Preconditions.checkNotNull(config.getOauth());
+        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
+        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
+        Preconditions.checkNotNull(config.getOauth().getAccessToken());
+        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
+        Preconditions.checkNotNull(config.getInfo());
+
+        try {
+            lock.writeLock().lock();
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        Preconditions.checkNotNull(providerQueue);
+
+        List<String> screenNames = new ArrayList<String>();
+        List<String[]> screenNameBatches = new ArrayList<String[]>();
+
+        List<Long> ids = new ArrayList<Long>();
+        List<Long[]> idsBatches = new ArrayList<Long[]>();
+
+        for(String s : config.getInfo()) {
+            if(s != null)
+            {
+                String potentialScreenName = s.replaceAll("@", 
"").trim().toLowerCase();
+
+                // See if it is a long, if it is, add it to the user iD list, 
if it is not, add it to the
+                // screen name list
+                try {
+                    ids.add(Long.parseLong(potentialScreenName));
+                } catch (Exception e) {
+                    screenNames.add(potentialScreenName);
+                }
+
+                // Twitter allows for batches up to 100 per request, but you 
cannot mix types
+
+                if(ids.size() >= 100) {
+                    // add the batch
+                    idsBatches.add(ids.toArray(new Long[ids.size()]));
+                    // reset the Ids
+                    ids = new ArrayList<Long>();
+                }
+
+                if(screenNames.size() >= 100) {
+                    // add the batch
+                    screenNameBatches.add(screenNames.toArray(new 
String[ids.size()]));
+                    // reset the Ids
+                    screenNames = new ArrayList<String>();
+                }
+            }
+        }
+
+
+        if(ids.size() > 0)
+            idsBatches.add(ids.toArray(new Long[ids.size()]));
+
+        if(screenNames.size() > 0)
+            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
+
+        if(ids.size() + screenNames.size() > 0)
+            executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() 
+ screenNames.size())));
+        else
+            executor = 
MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+
+        Preconditions.checkNotNull(executor);
+
+        this.idsBatches = idsBatches.iterator();
+        this.screenNameBatches = screenNameBatches.iterator();
+    }
+
+    @Override
     public void startStream() {
 
+        Preconditions.checkNotNull(executor);
+
         Preconditions.checkArgument(idsBatches.hasNext() || 
screenNameBatches.hasNext());
 
         LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches);
@@ -214,16 +344,6 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
             lock.writeLock().unlock();
         }
 
-        if( providerQueue.isEmpty() && executor.isTerminated()) {
-            LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
-
-            running.set(false);
-
-            LOGGER.info("Exiting");
-        }
-
-        return result;
-
     }
 
     protected Queue<StreamsDatum> constructQueue() {
@@ -246,6 +366,15 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
 
     @Override
     public boolean isRunning() {
+
+        if( providerQueue.isEmpty() && executor.isTerminated() ) {
+            LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
+
+            running.set(false);
+
+            LOGGER.info("Exiting");
+        }
+
         return running.get();
     }
 
@@ -267,78 +396,7 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
         }
     }
 
-    @Override
-    public void prepare(Object o) {
 
-        if( o instanceof TwitterFollowingConfiguration )
-            config = (TwitterUserInformationConfiguration) o;
-
-        try {
-            lock.writeLock().lock();
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(config.getOauth().getConsumerKey());
-        Preconditions.checkNotNull(config.getOauth().getConsumerSecret());
-        Preconditions.checkNotNull(config.getOauth().getAccessToken());
-        Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret());
-        Preconditions.checkNotNull(config.getInfo());
-
-        List<String> screenNames = new ArrayList<String>();
-        List<String[]> screenNameBatches = new ArrayList<String[]>();
-
-        List<Long> ids = new ArrayList<Long>();
-        List<Long[]> idsBatches = new ArrayList<Long[]>();
-
-        for(String s : config.getInfo()) {
-            if(s != null)
-            {
-                String potentialScreenName = s.replaceAll("@", 
"").trim().toLowerCase();
-
-                // See if it is a long, if it is, add it to the user iD list, 
if it is not, add it to the
-                // screen name list
-                try {
-                    ids.add(Long.parseLong(potentialScreenName));
-                } catch (Exception e) {
-                    screenNames.add(potentialScreenName);
-                }
-
-                // Twitter allows for batches up to 100 per request, but you 
cannot mix types
-
-                if(ids.size() >= 100) {
-                    // add the batch
-                    idsBatches.add(ids.toArray(new Long[ids.size()]));
-                    // reset the Ids
-                    ids = new ArrayList<Long>();
-                }
-
-                if(screenNames.size() >= 100) {
-                    // add the batch
-                    screenNameBatches.add(screenNames.toArray(new 
String[ids.size()]));
-                    // reset the Ids
-                    screenNames = new ArrayList<String>();
-                }
-            }
-        }
-
-
-        if(ids.size() > 0)
-            idsBatches.add(ids.toArray(new Long[ids.size()]));
-
-        if(screenNames.size() > 0)
-            screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-
-        if(ids.size() + screenNames.size() > 0)
-            executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() 
+ screenNames.size())));
-        else
-            executor = 
MoreExecutors.listeningDecorator(newSingleThreadExecutor());
-
-        this.idsBatches = idsBatches.iterator();
-        this.screenNameBatches = screenNameBatches.iterator();
-    }
 
     protected Twitter getTwitterClient()
     {
@@ -359,6 +417,11 @@ public class TwitterUserInformationProvider implements 
StreamsProvider, Serializ
         return new TwitterFactory(builder.build()).getInstance();
     }
 
+    protected void callback() {
+
+
+    }
+
     @Override
     public void cleanUp() {
         shutdownAndAwaitTermination(executor);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
index c72f3cf..dda5d1b 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
+++ 
b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json
@@ -12,7 +12,7 @@
         "ids_only": {
             "type": "boolean",
             "description": "Whether to collect ids only, or full profiles",
-            "value": "true"
+            "default": "true"
         }
     }
 }
\ No newline at end of file


Reply via email to