more main methods: STREAMS-411, better thread tracking: STREAMS-425, misc cleanup
more main methods: STREAMS-411 better thread tracking: STREAMS-425 misc cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/170cb8b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/170cb8b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/170cb8b6 Branch: refs/heads/master Commit: 170cb8b6b9d647dc2b7ff82b87edf060f078585c Parents: f1540b1 Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Thu Oct 6 14:01:04 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Thu Oct 6 14:01:04 2016 -0500 ---------------------------------------------------------------------- .../provider/TwitterFollowingProvider.java | 120 +++++++--- .../twitter/provider/TwitterStreamProvider.java | 55 +++++ .../provider/TwitterTimelineProvider.java | 191 ++++++++-------- .../TwitterUserInformationProvider.java | 227 ++++++++++++------- .../twitter/TwitterFollowingConfiguration.json | 2 +- 5 files changed, 386 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java index 4c3a828..66c1104 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java @@ -18,22 +18,43 @@ package org.apache.streams.twitter.provider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Queues; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.TwitterFollowingConfiguration; +import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.converter.TwitterDateTimeFormat; import org.apache.streams.util.ComponentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import twitter4j.Twitter; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -49,6 +70,49 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider { private TwitterFollowingConfiguration config; + List<ListenableFuture<Object>> futures = new ArrayList<>(); + + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(typesafe, "twitter"); + TwitterFollowingProvider provider = new TwitterFollowingProvider(config); + + ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException e) { + System.err.println(e.getMessage()); + } + } + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } + public TwitterFollowingConfiguration getConfig() { return config; } public static final int MAX_NUMBER_WAITING = 10000; @@ -63,14 +127,24 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider { } @Override + public void prepare(Object o) { + super.prepare(config); + Preconditions.checkNotNull(getConfig().getEndpoint()); + Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers")); + return; + } + + @Override public void startStream() { - running.set(true); + Preconditions.checkNotNull(executor); Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); LOGGER.info("startStream"); + running.set(true); + while (idsBatches.hasNext()) { submitFollowingThreads(idsBatches.next()); } @@ -78,8 +152,6 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider { submitFollowingThreads(screenNameBatches.next()); } - running.set(true); - executor.shutdown(); } @@ -89,7 +161,9 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider { for (int i = 0; i < ids.length; i++) { TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]); - executor.submit(providerTask); + ListenableFuture future = executor.submit(providerTask); + futures.add(future); + LOGGER.info("submitted {}", ids[i]); } } @@ -98,7 +172,9 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider { for (int i = 0; i < screenNames.length; i++) { TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]); - executor.submit(providerTask); + ListenableFuture future = executor.submit(providerTask); + futures.add(future); + LOGGER.info("submitted {}", screenNames[i]); } } @@ -120,41 +196,17 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider { lock.writeLock().unlock(); } - if (providerQueue.isEmpty() && executor.isTerminated()) { - LOGGER.info("{}{} - completed", idsBatches, screenNameBatches); - - running.set(false); - - LOGGER.info("Exiting"); - } - return result; } - protected Queue<StreamsDatum> constructQueue() { - return new ConcurrentLinkedQueue<StreamsDatum>(); - } - - @Override - public void prepare(Object o) { - super.prepare(config); - Preconditions.checkNotNull(getConfig().getEndpoint()); - Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers")); - return; - } - - public void addDatum(StreamsDatum datum) { - try { - lock.readLock().lock(); - ComponentUtils.offerUntilSuccess(datum, providerQueue); - } finally { - lock.readLock().unlock(); - } - } - @Override public boolean isRunning() { + if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + running.set(false); + LOGGER.info("Exiting"); + } return running.get(); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index f584950..b414074 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -18,9 +18,12 @@ package org.apache.streams.twitter.provider; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Queues; +import com.google.common.util.concurrent.Uninterruptibles; import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.core.Constants; import com.twitter.hbc.core.Hosts; @@ -35,7 +38,11 @@ import com.twitter.hbc.httpclient.auth.Authentication; import com.twitter.hbc.httpclient.auth.BasicAuth; import com.twitter.hbc.httpclient.auth.OAuth1; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; import org.apache.commons.lang.NotImplementedException; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.DatumStatus; import org.apache.streams.core.DatumStatusCountable; @@ -43,14 +50,21 @@ import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.converter.TwitterDateTimeFormat; import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; import java.io.Serializable; import java.math.BigInteger; +import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; @@ -72,6 +86,47 @@ public class TwitterStreamProvider implements StreamsProvider, Serializable, Dat private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class); + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter"); + TwitterStreamProvider provider = new TwitterStreamProvider(config); + + ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException e) { + System.err.println(e.getMessage()); + } + } + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } + public static final int MAX_BATCH = 1000; private TwitterStreamConfiguration config; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index 2924623..cea9829 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -22,6 +22,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -68,6 +72,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static java.util.concurrent.Executors.newSingleThreadExecutor; + /** * Retrieve recent posts from a list of user ids or names. * @@ -91,7 +97,39 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class); - private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + public static final int MAX_NUMBER_WAITING = 10000; + + private TwitterUserInformationConfiguration config; + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + public TwitterUserInformationConfiguration getConfig() { + return config; + } + + public void setConfig(TwitterUserInformationConfiguration config) { + this.config = config; + } + + protected Collection<String[]> screenNameBatches; + protected Collection<Long> ids; + + protected volatile Queue<StreamsDatum> providerQueue; + + protected int idsCount; + protected Twitter client; + + protected ListeningExecutorService executor; + + protected DateTime start; + protected DateTime end; + + protected final AtomicBoolean running = new AtomicBoolean(); + + List<ListenableFuture<Object>> futures = new ArrayList<>(); + + Boolean jsonStoreEnabled; + Boolean includeEntitiesEnabled; public static void main(String[] args) throws Exception { @@ -111,6 +149,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter"); TwitterTimelineProvider provider = new TwitterTimelineProvider(config); + ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); provider.prepare(config); provider.startStream(); @@ -121,7 +161,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { StreamsDatum datum = iterator.next(); String json; try { - json = MAPPER.writeValueAsString(datum.getDocument()); + json = mapper.writeValueAsString(datum.getDocument()); outStream.println(json); } catch (JsonProcessingException e) { System.err.println(e.getMessage()); @@ -132,42 +172,6 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { outStream.flush(); } - public static final int MAX_NUMBER_WAITING = 10000; - - private TwitterUserInformationConfiguration config; - - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - public TwitterUserInformationConfiguration getConfig() { - return config; - } - - public void setConfig(TwitterUserInformationConfiguration config) { - this.config = config; - } - - protected Collection<String[]> screenNameBatches; - protected Collection<Long> ids; - - protected volatile Queue<StreamsDatum> providerQueue; - - protected int idsCount; - protected Twitter client; - - protected ExecutorService executor; - - protected DateTime start; - protected DateTime end; - - protected final AtomicBoolean running = new AtomicBoolean(); - - Boolean jsonStoreEnabled; - Boolean includeEntitiesEnabled; - - private static ExecutorService getExecutor() { - return Executors.newSingleThreadExecutor(); - } - public TwitterTimelineProvider(TwitterUserInformationConfiguration config) { this.config = config; } @@ -182,17 +186,43 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { } @Override + public void prepare(Object o) { + + + + try { + lock.writeLock().lock(); + providerQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); + } + + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(config.getOauth().getConsumerKey()); + Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); + Preconditions.checkNotNull(config.getOauth().getAccessToken()); + Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); + Preconditions.checkNotNull(config.getInfo()); + + consolidateToIDs(); + + if(ids.size() > 1) + executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size())); + else + executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor()); + } + + @Override public void startStream() { + LOGGER.debug("{} startStream", STREAMS_ID); Preconditions.checkArgument(!ids.isEmpty()); - LOGGER.debug("{} - readCurrent", ids); + running.set(true); submitTimelineThreads(ids.toArray(new Long[0])); - running.set(true); - executor.shutdown(); } @@ -202,13 +232,15 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { } protected void submitTimelineThreads(Long[] ids) { + Twitter client = getTwitterClient(); for(int i = 0; i < ids.length; i++) { TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]); - executor.submit(providerTask); - + ListenableFuture future = executor.submit(providerTask); + futures.add(future); + LOGGER.info("submitted {}", ids[i]); } } @@ -242,7 +274,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { lock.writeLock().unlock(); } - if( providerQueue.isEmpty() && executor.isTerminated()) { + if( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) { LOGGER.info("Finished. Cleaning up..."); running.set(false); @@ -268,50 +300,7 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { throw new NotImplementedException(); } - @Override - public boolean isRunning() { - return running.get(); - } - - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - @Override - public void prepare(Object o) { - executor = getExecutor(); - - try { - lock.writeLock().lock(); - providerQueue = constructQueue(); - } finally { - lock.writeLock().unlock(); - } - - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(config.getOauth().getConsumerKey()); - Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); - Preconditions.checkNotNull(config.getOauth().getAccessToken()); - Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - Preconditions.checkNotNull(config.getInfo()); - - consolidateToIDs(); - } /** * Using the "info" list that is contained in the configuration, ensure that all @@ -375,13 +364,31 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { shutdownAndAwaitTermination(executor); } - public void addDatum(StreamsDatum datum) { + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted try { - lock.readLock().lock(); - ComponentUtils.offerUntilSuccess(datum, providerQueue); - } finally { - lock.readLock().unlock(); + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) + System.err.println("Pool did not terminate"); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); } } + @Override + public boolean isRunning() { + if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + running.set(false); + LOGGER.info("Exiting"); + } + return running.get(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java index 44f8a24..d6e783b 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -18,13 +18,20 @@ package org.apache.streams.twitter.provider; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; import org.apache.commons.lang.NotImplementedException; import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; @@ -45,6 +52,10 @@ import twitter4j.TwitterFactory; import twitter4j.conf.ConfigurationBuilder; import twitter4j.json.DataObjectFactory; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.PrintStream; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; @@ -75,6 +86,45 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ private TwitterUserInformationConfiguration config; + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File conf_file = new File(configfile); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter"); + TwitterUserInformationProvider provider = new TwitterUserInformationProvider(config); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while(iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = MAPPER.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException e) { + System.err.println(e.getMessage()); + } + } + } while( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); protected volatile Queue<StreamsDatum> providerQueue; @@ -93,7 +143,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ protected final AtomicBoolean running = new AtomicBoolean(); - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + public static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); @@ -117,8 +167,88 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ } @Override + public void prepare(Object o) { + + if( o instanceof TwitterFollowingConfiguration ) + config = (TwitterUserInformationConfiguration) o; + + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(config.getOauth()); + Preconditions.checkNotNull(config.getOauth().getConsumerKey()); + Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); + Preconditions.checkNotNull(config.getOauth().getAccessToken()); + Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); + Preconditions.checkNotNull(config.getInfo()); + + try { + lock.writeLock().lock(); + providerQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); + } + + Preconditions.checkNotNull(providerQueue); + + List<String> screenNames = new ArrayList<String>(); + List<String[]> screenNameBatches = new ArrayList<String[]>(); + + List<Long> ids = new ArrayList<Long>(); + List<Long[]> idsBatches = new ArrayList<Long[]>(); + + for(String s : config.getInfo()) { + if(s != null) + { + String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase(); + + // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the + // screen name list + try { + ids.add(Long.parseLong(potentialScreenName)); + } catch (Exception e) { + screenNames.add(potentialScreenName); + } + + // Twitter allows for batches up to 100 per request, but you cannot mix types + + if(ids.size() >= 100) { + // add the batch + idsBatches.add(ids.toArray(new Long[ids.size()])); + // reset the Ids + ids = new ArrayList<Long>(); + } + + if(screenNames.size() >= 100) { + // add the batch + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); + // reset the Ids + screenNames = new ArrayList<String>(); + } + } + } + + + if(ids.size() > 0) + idsBatches.add(ids.toArray(new Long[ids.size()])); + + if(screenNames.size() > 0) + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); + + if(ids.size() + screenNames.size() > 0) + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size()))); + else + executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor()); + + Preconditions.checkNotNull(executor); + + this.idsBatches = idsBatches.iterator(); + this.screenNameBatches = screenNameBatches.iterator(); + } + + @Override public void startStream() { + Preconditions.checkNotNull(executor); + Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches); @@ -214,16 +344,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ lock.writeLock().unlock(); } - if( providerQueue.isEmpty() && executor.isTerminated()) { - LOGGER.info("{}{} - completed", idsBatches, screenNameBatches); - - running.set(false); - - LOGGER.info("Exiting"); - } - - return result; - } protected Queue<StreamsDatum> constructQueue() { @@ -246,6 +366,15 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ @Override public boolean isRunning() { + + if( providerQueue.isEmpty() && executor.isTerminated() ) { + LOGGER.info("{}{} - completed", idsBatches, screenNameBatches); + + running.set(false); + + LOGGER.info("Exiting"); + } + return running.get(); } @@ -267,78 +396,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ } } - @Override - public void prepare(Object o) { - if( o instanceof TwitterFollowingConfiguration ) - config = (TwitterUserInformationConfiguration) o; - - try { - lock.writeLock().lock(); - providerQueue = constructQueue(); - } finally { - lock.writeLock().unlock(); - } - - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(config.getOauth().getConsumerKey()); - Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); - Preconditions.checkNotNull(config.getOauth().getAccessToken()); - Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - Preconditions.checkNotNull(config.getInfo()); - - List<String> screenNames = new ArrayList<String>(); - List<String[]> screenNameBatches = new ArrayList<String[]>(); - - List<Long> ids = new ArrayList<Long>(); - List<Long[]> idsBatches = new ArrayList<Long[]>(); - - for(String s : config.getInfo()) { - if(s != null) - { - String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase(); - - // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the - // screen name list - try { - ids.add(Long.parseLong(potentialScreenName)); - } catch (Exception e) { - screenNames.add(potentialScreenName); - } - - // Twitter allows for batches up to 100 per request, but you cannot mix types - - if(ids.size() >= 100) { - // add the batch - idsBatches.add(ids.toArray(new Long[ids.size()])); - // reset the Ids - ids = new ArrayList<Long>(); - } - - if(screenNames.size() >= 100) { - // add the batch - screenNameBatches.add(screenNames.toArray(new String[ids.size()])); - // reset the Ids - screenNames = new ArrayList<String>(); - } - } - } - - - if(ids.size() > 0) - idsBatches.add(ids.toArray(new Long[ids.size()])); - - if(screenNames.size() > 0) - screenNameBatches.add(screenNames.toArray(new String[ids.size()])); - - if(ids.size() + screenNames.size() > 0) - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size()))); - else - executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor()); - - this.idsBatches = idsBatches.iterator(); - this.screenNameBatches = screenNameBatches.iterator(); - } protected Twitter getTwitterClient() { @@ -359,6 +417,11 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ return new TwitterFactory(builder.build()).getInstance(); } + protected void callback() { + + + } + @Override public void cleanUp() { shutdownAndAwaitTermination(executor); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/170cb8b6/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json index c72f3cf..dda5d1b 100644 --- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json +++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/com/twitter/TwitterFollowingConfiguration.json @@ -12,7 +12,7 @@ "ids_only": { "type": "boolean", "description": "Whether to collect ids only, or full profiles", - "value": "true" + "default": "true" } } } \ No newline at end of file
