http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java index 66c1104..2527d29 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java @@ -18,28 +18,26 @@ package org.apache.streams.twitter.provider; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.TwitterFollowingConfiguration; +import org.apache.streams.twitter.converter.TwitterDateTimeFormat; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Queues; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.twitter.TwitterFollowingConfiguration; -import org.apache.streams.twitter.TwitterStreamConfiguration; -import org.apache.streams.twitter.converter.TwitterDateTimeFormat; -import org.apache.streams.util.ComponentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import twitter4j.Twitter; @@ -51,162 +49,184 @@ import java.io.PrintStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * Created by sblackmon on 11/25/14. + * Retrieve all follow adjacencies from a list of user ids or names. */ public class TwitterFollowingProvider extends TwitterUserInformationProvider { - public static final String STREAMS_ID = "TwitterFollowingProvider"; - private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class); - - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - private TwitterFollowingConfiguration config; - - List<ListenableFuture<Object>> futures = new ArrayList<>(); - - public static void main(String[] args) throws Exception { + public static final String STREAMS_ID = "TwitterFollowingProvider"; + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class); + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + + private TwitterFollowingConfiguration config; + + List<ListenableFuture<Object>> futures = new ArrayList<>(); + + /** + * To use from command line: + * + * <p/> + * Supply (at least) the following required configuration in application.conf: + * + * <p/> + * twitter.oauth.consumerKey + * twitter.oauth.consumerSecret + * twitter.oauth.accessToken + * twitter.oauth.accessTokenSecret + * twitter.info + * + * <p/> + * Launch using: + * + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterFollowingProvider -Dexec.args="application.conf tweets.json" + * + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(typesafe, "twitter"); + TwitterFollowingProvider provider = new TwitterFollowingProvider(config); + + ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); + } + } + } + while ( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } - Preconditions.checkArgument(args.length >= 2); + public TwitterFollowingConfiguration getConfig() { + return config; + } - String configfile = args[0]; - String outfile = args[1]; + public static final int MAX_NUMBER_WAITING = 10000; - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + public TwitterFollowingProvider() { + this.config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter")); + } - Config typesafe = testResourceConfig.withFallback(reference).resolve(); + public TwitterFollowingProvider(TwitterFollowingConfiguration config) { + super(config); + this.config = config; + } - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - TwitterFollowingConfiguration config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(typesafe, "twitter"); - TwitterFollowingProvider provider = new TwitterFollowingProvider(config); + @Override + public void prepare(Object configurationObject) { + super.prepare(config); + Preconditions.checkNotNull(getConfig().getEndpoint()); + Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers")); + return; + } - ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + @Override + public void startStream() { - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException e) { - System.err.println(e.getMessage()); - } - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); - } + Preconditions.checkNotNull(executor); - public TwitterFollowingConfiguration getConfig() { return config; } + Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); - public static final int MAX_NUMBER_WAITING = 10000; + LOGGER.info("startStream"); - public TwitterFollowingProvider() { - this.config = new ComponentConfigurator<>(TwitterFollowingConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter")); - } + running.set(true); - public TwitterFollowingProvider(TwitterFollowingConfiguration config) { - super(config); - this.config = config; + while (idsBatches.hasNext()) { + submitFollowingThreads(idsBatches.next()); } - - @Override - public void prepare(Object o) { - super.prepare(config); - Preconditions.checkNotNull(getConfig().getEndpoint()); - Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers")); - return; + while (screenNameBatches.hasNext()) { + submitFollowingThreads(screenNameBatches.next()); } - @Override - public void startStream() { + executor.shutdown(); - Preconditions.checkNotNull(executor); - - Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); - - LOGGER.info("startStream"); - - running.set(true); - - while (idsBatches.hasNext()) { - submitFollowingThreads(idsBatches.next()); - } - while (screenNameBatches.hasNext()) { - submitFollowingThreads(screenNameBatches.next()); - } + } - executor.shutdown(); + protected void submitFollowingThreads(Long[] ids) { + Twitter client = getTwitterClient(); + for (int i = 0; i < ids.length; i++) { + TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]); + ListenableFuture future = executor.submit(providerTask); + futures.add(future); + LOGGER.info("submitted {}", ids[i]); } + } - protected void submitFollowingThreads(Long[] ids) { - Twitter client = getTwitterClient(); + protected void submitFollowingThreads(String[] screenNames) { + Twitter client = getTwitterClient(); - for (int i = 0; i < ids.length; i++) { - TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]); - ListenableFuture future = executor.submit(providerTask); - futures.add(future); - LOGGER.info("submitted {}", ids[i]); - } + for (int i = 0; i < screenNames.length; i++) { + TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]); + ListenableFuture future = executor.submit(providerTask); + futures.add(future); + LOGGER.info("submitted {}", screenNames[i]); } - protected void submitFollowingThreads(String[] screenNames) { - Twitter client = getTwitterClient(); - - for (int i = 0; i < screenNames.length; i++) { - TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]); - ListenableFuture future = executor.submit(providerTask); - futures.add(future); - LOGGER.info("submitted {}", screenNames[i]); - } - - } + } - @Override - public StreamsResultSet readCurrent() { + @Override + public StreamsResultSet readCurrent() { - LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches); + LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches); - StreamsResultSet result; + StreamsResultSet result; - try { - lock.writeLock().lock(); - result = new StreamsResultSet(providerQueue); - result.setCounter(new DatumStatusCounter()); - providerQueue = constructQueue(); - LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size()); - } finally { - lock.writeLock().unlock(); - } + try { + lock.writeLock().lock(); + result = new StreamsResultSet(providerQueue); + result.setCounter(new DatumStatusCounter()); + providerQueue = constructQueue(); + LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size()); + } finally { + lock.writeLock().unlock(); + } - return result; + return result; - } + } - @Override - public boolean isRunning() { - if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); - running.set(false); - LOGGER.info("Exiting"); - } - return running.get(); + @Override + public boolean isRunning() { + if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + running.set(false); + LOGGER.info("Exiting"); } + return running.get(); + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java index f2346fb..ee800fa 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java @@ -18,13 +18,14 @@ package org.apache.streams.twitter.provider; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.pojo.Follow; import org.apache.streams.twitter.pojo.User; import org.apache.streams.util.ComponentUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import twitter4j.PagableResponseList; @@ -37,188 +38,208 @@ import twitter4j.TwitterObjectFactory; */ public class TwitterFollowingProviderTask implements Runnable { - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class); + + private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + protected TwitterFollowingProvider provider; + protected Twitter client; + protected Long id; + protected String screenName; + + int count = 0; + + /** + * TwitterFollowingProviderTask constructor. + * @param provider TwitterFollowingProvider + * @param twitter Twitter + * @param id numeric id + */ + public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) { + this.provider = provider; + this.client = twitter; + this.id = id; + } + + /** + * TwitterFollowingProviderTask constructor. + * @param provider TwitterFollowingProvider + * @param twitter Twitter + * @param screenName screenName + */ + public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) { + this.provider = provider; + this.client = twitter; + this.screenName = screenName; + } + + + @Override + public void run() { + + Preconditions.checkArgument(id != null || screenName != null); + + if ( id != null ) { + getFollowing(id); + } else if ( screenName != null) { + getFollowing(screenName); + } - private final static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished"); - protected TwitterFollowingProvider provider; - protected Twitter client; - protected Long id; - protected String screenName; + } - int count = 0; + protected void getFollowing(Long id) { - public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) { - this.provider = provider; - this.client = twitter; - this.id = id; - } + Preconditions.checkArgument( + provider.getConfig().getEndpoint().equals("friends") + || provider.getConfig().getEndpoint().equals("followers") + ); - public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) { - this.provider = provider; - this.client = twitter; - this.screenName = screenName; + if ( provider.getConfig().getIdsOnly() ) { + collectIds(id); + } else { + collectUsers(id); } + } + protected void getFollowing(String screenName) { - @Override - public void run() { + twitter4j.User user = null; + try { + user = client.users().showUser(screenName); + } catch (TwitterException ex) { + LOGGER.error("Failure looking up " + id); + } + Preconditions.checkNotNull(user); + getFollowing(user.getId()); + } - Preconditions.checkArgument(id != null || screenName != null); + private void collectUsers(Long id) { + int keepTrying = 0; - if( id != null ) - getFollowing(id); - else if( screenName != null) - getFollowing(screenName); + long curser = -1; - LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished"); + do { + try { + twitter4j.User user; + String userJson; + try { + user = client.users().showUser(id); + userJson = TwitterObjectFactory.getRawJSON(user); + } catch (TwitterException ex) { + LOGGER.error("Failure looking up " + id); + break; + } - } + PagableResponseList<twitter4j.User> list = null; + if ( provider.getConfig().getEndpoint().equals("followers") ) { + list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); + } else if ( provider.getConfig().getEndpoint().equals("friends") ) { + list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); + } - protected void getFollowing(Long id) { + Preconditions.checkNotNull(list); + Preconditions.checkArgument(list.size() > 0); - Preconditions.checkArgument(provider.getConfig().getEndpoint().equals("friends") || provider.getConfig().getEndpoint().equals("followers")); + for (twitter4j.User other : list) { - if( provider.getConfig().getIdsOnly() ) - collectIds(id); - else - collectUsers(id); - } + String otherJson = TwitterObjectFactory.getRawJSON(other); - private void collectUsers(Long id) { - int keepTrying = 0; - - long curser = -1; - - do - { - try - { - twitter4j.User user; - String userJson; - try { - user = client.users().showUser(id); - userJson = TwitterObjectFactory.getRawJSON(user); - } catch (TwitterException e) { - LOGGER.error("Failure looking up " + id); - break; - } - - PagableResponseList<twitter4j.User> list = null; - if( provider.getConfig().getEndpoint().equals("followers") ) - list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); - else if( provider.getConfig().getEndpoint().equals("friends") ) - list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); - - Preconditions.checkNotNull(list); - Preconditions.checkArgument(list.size() > 0); - - for (twitter4j.User other : list) { - - String otherJson = TwitterObjectFactory.getRawJSON(other); - - try { - Follow follow = null; - if( provider.getConfig().getEndpoint().equals("followers") ) { - follow = new Follow() - .withFollowee(mapper.readValue(userJson, User.class)) - .withFollower(mapper.readValue(otherJson, User.class)); - } else if( provider.getConfig().getEndpoint().equals("friends") ) { - follow = new Follow() - .withFollowee(mapper.readValue(otherJson, User.class)) - .withFollower(mapper.readValue(userJson, User.class)); - } - - Preconditions.checkNotNull(follow); - - if( count < provider.getConfig().getMaxItems()) { - ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue); - count++; - } - - } catch (Exception e) { - LOGGER.warn("Exception: {}", e); - } - } - if( !list.hasNext() ) break; - if( list.getNextCursor() == 0 ) break; - curser = list.getNextCursor(); - } - catch(TwitterException twitterException) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + try { + Follow follow = null; + if ( provider.getConfig().getEndpoint().equals("followers") ) { + follow = new Follow() + .withFollowee(mapper.readValue(userJson, User.class)) + .withFollower(mapper.readValue(otherJson, User.class)); + } else if ( provider.getConfig().getEndpoint().equals("friends") ) { + follow = new Follow() + .withFollowee(mapper.readValue(otherJson, User.class)) + .withFollower(mapper.readValue(userJson, User.class)); } - catch(Exception e) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, e); + + Preconditions.checkNotNull(follow); + + if ( count < provider.getConfig().getMaxItems()) { + ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue); + count++; } - } while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems()); + + } catch (Exception ex) { + LOGGER.warn("Exception: {}", ex); + } + } + if ( !list.hasNext() ) { + break; + } + if ( list.getNextCursor() == 0 ) { + break; + } + curser = list.getNextCursor(); + } catch (TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } catch (Exception ex) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, ex); + } } + while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems()); + } - private void collectIds(Long id) { - int keepTrying = 0; - - long curser = -1; - - do - { - try - { - twitter4j.IDs ids = null; - if( provider.getConfig().getEndpoint().equals("followers") ) - ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); - else if( provider.getConfig().getEndpoint().equals("friends") ) - ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); - - Preconditions.checkNotNull(ids); - Preconditions.checkArgument(ids.getIDs().length > 0); - - for (long otherId : ids.getIDs()) { - - try { - Follow follow = null; - if( provider.getConfig().getEndpoint().equals("followers") ) { - follow = new Follow() - .withFollowee(new User().withId(id)) - .withFollower(new User().withId(otherId)); - } else if( provider.getConfig().getEndpoint().equals("friends") ) { - follow = new Follow() - .withFollowee(new User().withId(otherId)) - .withFollower(new User().withId(id)); - } - - Preconditions.checkNotNull(follow); - - if( count < provider.getConfig().getMaxItems()) { - ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue); - count++; - } - } catch (Exception e) { - LOGGER.warn("Exception: {}", e); - } - } - if( !ids.hasNext() ) break; - if( ids.getNextCursor() == 0 ) break; - curser = ids.getNextCursor(); - } - catch(TwitterException twitterException) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException); - } - catch(Exception e) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, e); + private void collectIds(Long id) { + int keepTrying = 0; + + long curser = -1; + + do { + try { + twitter4j.IDs ids = null; + if ( provider.getConfig().getEndpoint().equals("followers") ) { + ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); + } else if ( provider.getConfig().getEndpoint().equals("friends") ) { + ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); + } + + Preconditions.checkNotNull(ids); + Preconditions.checkArgument(ids.getIDs().length > 0); + + for (long otherId : ids.getIDs()) { + + try { + Follow follow = null; + if ( provider.getConfig().getEndpoint().equals("followers") ) { + follow = new Follow() + .withFollowee(new User().withId(id)) + .withFollower(new User().withId(otherId)); + } else if ( provider.getConfig().getEndpoint().equals("friends") ) { + follow = new Follow() + .withFollowee(new User().withId(otherId)) + .withFollower(new User().withId(id)); } - } while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems()); - } - protected void getFollowing(String screenName) { + Preconditions.checkNotNull(follow); - twitter4j.User user = null; - try { - user = client.users().showUser(screenName); - } catch (TwitterException e) { - LOGGER.error("Failure looking up " + id); + if ( count < provider.getConfig().getMaxItems()) { + ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue); + count++; + } + } catch (Exception ex) { + LOGGER.warn("Exception: {}", ex); + } + } + if ( !ids.hasNext() ) { + break; } - Preconditions.checkNotNull(user); - getFollowing(user.getId()); + if ( ids.getNextCursor() == 0 ) { + break; + } + curser = ids.getNextCursor(); + } catch (TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException); + } catch (Exception ex) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, ex); + } } - + while (curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems()); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java index d9f4ec2..48666cb 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterProviderUtil.java @@ -16,28 +16,34 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.twitter.provider; import org.apache.streams.twitter.TwitterConfiguration; /** - * Created by sblackmon on 7/26/15. + * TwitterProviderUtil contains utilities for Twitter Providers. */ public class TwitterProviderUtil { - public static String baseUrl(TwitterConfiguration config) { + /** + * baseUrl from TwitterConfiguration. + * @param config TwitterConfiguration + * @return baseUrl + */ + public static String baseUrl(TwitterConfiguration config) { - String baseUrl = new StringBuilder() - .append(config.getProtocol()) - .append("://") - .append(config.getHost()) - .append(":") - .append(config.getPort()) - .append("/") - .append(config.getVersion()) - .append("/") - .toString(); + String baseUrl = new StringBuilder() + .append(config.getProtocol()) + .append("://") + .append(config.getHost()) + .append(":") + .append(config.getPort()) + .append("/") + .append(config.getVersion()) + .append("/") + .toString(); - return baseUrl; - } + return baseUrl; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java new file mode 100644 index 0000000..a4562ef --- /dev/null +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamHelper.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.twitter.provider; + +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.converter.TwitterDocumentClassifier; +import org.apache.streams.util.ComponentUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import com.twitter.hbc.core.processor.StringDelimitedProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * TwitterStreamHelper helps with hosebird twitter stream. + */ +public class TwitterStreamHelper extends StringDelimitedProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamHelper.class); + private static final int DEFAULT_POOL_SIZE = 5; + + private static final TwitterDocumentClassifier TWITTER_DOCUMENT_CLASSIFIER = new TwitterDocumentClassifier(); + + private final TwitterStreamProvider provider; + private final ExecutorService service; + + public TwitterStreamHelper(TwitterStreamProvider provider) { + this(provider, DEFAULT_POOL_SIZE); + } + + /** + * TwitterStreamHelper constructor. + * @param provider TwitterStreamProvider + * @param poolSize poolSize + */ + public TwitterStreamHelper(TwitterStreamProvider provider, int poolSize) { + //We are only going to use the Hosebird processor to manage the extraction of the tweets from the Stream + super(null); + service = Executors.newFixedThreadPool(poolSize); + this.provider = provider; + } + + @Override + public boolean process() throws IOException, InterruptedException { + String msg; + do { + msg = this.processNextMessage(); + if (msg == null) { + Thread.sleep(10); + } + } + while (msg == null); + + //Deserializing to an ObjectNode can take time. Parallelize the task to improve throughput + return provider.addDatum(service.submit(new StreamDeserializer(msg))); + } + + public void cleanUp() { + ComponentUtils.shutdownExecutor(service, 1, 30); + } + + protected static class StreamDeserializer implements Callable<List<StreamsDatum>> { + + protected static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + protected String item; + + public StreamDeserializer(String item) { + this.item = item; + } + + @Override + public List<StreamsDatum> call() throws Exception { + if (item != null) { + Class itemClass = TWITTER_DOCUMENT_CLASSIFIER.detectClasses(item).get(0); + Object document = mapper.readValue(item, itemClass); + StreamsDatum rawDatum = new StreamsDatum(document); + return Lists.newArrayList(rawDatum); + } + return new ArrayList<>(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java deleted file mode 100644 index 96df67b..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.provider; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import com.twitter.hbc.core.processor.StringDelimitedProcessor; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.ComponentUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -/** - * - */ -public class TwitterStreamProcessor extends StringDelimitedProcessor { - - private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProcessor.class); - private static final int DEFAULT_POOL_SIZE = 5; - - private final TwitterStreamProvider provider; - private final ExecutorService service; - - public TwitterStreamProcessor(TwitterStreamProvider provider) { - this(provider, DEFAULT_POOL_SIZE); - } - - public TwitterStreamProcessor(TwitterStreamProvider provider, int poolSize) { - //We are only going to use the Hosebird processor to manage the extraction of the tweets from the Stream - super(null); - service = Executors.newFixedThreadPool(poolSize); - this.provider = provider; - } - - - @Override - public boolean process() throws IOException, InterruptedException { - String msg; - do { - msg = this.processNextMessage(); - if(msg == null) { - Thread.sleep(10); - } - } while(msg == null); - - //Deserializing to an ObjectNode can take time. Parallelize the task to improve throughput - return provider.addDatum(service.submit(new StreamDeserializer(msg))); - } - - public void cleanUp() { - ComponentUtils.shutdownExecutor(service, 1, 30); - } - - protected static class StreamDeserializer implements Callable<List<StreamsDatum>> { - - protected static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - protected String item; - - public StreamDeserializer(String item) { - this.item = item; - } - - @Override - public List<StreamsDatum> call() throws Exception { - if(item != null) { - Class itemClass = TwitterEventClassifier.detectClass(item); - Object document = mapper.readValue(item, itemClass); - StreamsDatum rawDatum = new StreamsDatum(document); - return Lists.newArrayList(rawDatum); - } - return new ArrayList<>(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java index 3856935..1895ee2 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProvider.java @@ -18,6 +18,20 @@ package org.apache.streams.twitter.provider; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.converter.TwitterDateTimeFormat; +import org.apache.streams.util.ComponentUtils; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; @@ -41,19 +55,6 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatus; -import org.apache.streams.core.DatumStatusCountable; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.twitter.TwitterStreamConfiguration; -import org.apache.streams.twitter.converter.TwitterDateTimeFormat; -import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,269 +85,282 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class TwitterStreamProvider implements StreamsProvider, Serializable, DatumStatusCountable { - public final static String STREAMS_ID = "TwitterStreamProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class); - - public static void main(String[] args) { - - Preconditions.checkArgument(args.length >= 2); - - String configfile = args[0]; - String outfile = args[1]; - - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter"); - TwitterStreamProvider provider = new TwitterStreamProvider(config); - - ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); - - PrintStream outStream = null; + public static final String STREAMS_ID = "TwitterStreamProvider"; + + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProvider.class); + + /** + * To use from command line: + * + * <p/> + * Supply (at least) the following required configuration in application.conf: + * + * <p/> + * twitter.oauth.consumerKey + * twitter.oauth.consumerSecret + * twitter.oauth.accessToken + * twitter.oauth.accessTokenSecret + * + * <p/> + * Launch using: + * + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterStreamProvider -Dexec.args="application.conf tweets.json" + * + * @param args + */ + public static void main(String[] args) { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + TwitterStreamConfiguration config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(typesafe, "twitter"); + TwitterStreamProvider provider = new TwitterStreamProvider(config); + + ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + + PrintStream outStream = null; + try { + outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + } catch (FileNotFoundException ex) { + LOGGER.error("FileNotFoundException", ex); + return; + } + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; try { - outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - } catch (FileNotFoundException e) { - LOGGER.error("FileNotFoundException", e); - return; + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); } - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException e) { - System.err.println(e.getMessage()); - } - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); + } } - - public static final int MAX_BATCH = 1000; - - private TwitterStreamConfiguration config; - - public TwitterStreamConfiguration getConfig() { - return config; + while ( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } + + public static final int MAX_BATCH = 1000; + + private TwitterStreamConfiguration config; + + public TwitterStreamConfiguration getConfig() { + return config; + } + + public void setConfig(TwitterStreamConfiguration config) { + this.config = config; + } + + protected volatile Queue<Future<List<StreamsDatum>>> providerQueue; + + protected Hosts hosebirdHosts; + protected Authentication auth; + protected StreamingEndpoint endpoint; + protected BasicClient client; + protected AtomicBoolean running = new AtomicBoolean(false); + protected TwitterStreamHelper processor = new TwitterStreamHelper(this); + private DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private DatumStatusCounter countersTotal = new DatumStatusCounter(); + + public TwitterStreamProvider() { + this.config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter"); + } + + public TwitterStreamProvider(TwitterStreamConfiguration config) { + this.config = config; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void startStream() { + client.connect(); + running.set(true); + } + + @Override + public synchronized StreamsResultSet readCurrent() { + + StreamsResultSet current; + synchronized (this) { + Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque(); + drainTo(drain); + current = new StreamsResultSet(drain); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); } - public void setConfig(TwitterStreamConfiguration config) { - this.config = config; - } + return current; + } - protected volatile Queue<Future<List<StreamsDatum>>> providerQueue; - - protected Hosts hosebirdHosts; - protected Authentication auth; - protected StreamingEndpoint endpoint; - protected BasicClient client; - protected AtomicBoolean running = new AtomicBoolean(false); - protected TwitterStreamProcessor processor = new TwitterStreamProcessor(this); - private DatumStatusCounter countersCurrent = new DatumStatusCounter(); - private DatumStatusCounter countersTotal = new DatumStatusCounter(); - - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } + @Override + public StreamsResultSet readNew(BigInteger sequence) { + throw new NotImplementedException(); + } - public TwitterStreamProvider() { - this.config = new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter"); - } + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + throw new NotImplementedException(); + } - public TwitterStreamProvider(TwitterStreamConfiguration config) { - this.config = config; - } + @Override + public boolean isRunning() { + return this.running.get() && !client.isDone(); + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public void prepare(Object configurationObject) { - @Override - public void startStream() { - client.connect(); - running.set(true); - } + Preconditions.checkNotNull(config.getEndpoint()); - @Override - public synchronized StreamsResultSet readCurrent() { - - StreamsResultSet current; - synchronized(this) { - Queue<StreamsDatum> drain = Queues.newLinkedBlockingDeque(); - drainTo(drain); - current = new StreamsResultSet(drain); - current.setCounter(new DatumStatusCounter()); - current.getCounter().add(countersCurrent); - countersTotal.add(countersCurrent); - countersCurrent = new DatumStatusCounter(); - } + if (config.getEndpoint().equals("userstream") ) { - return current; - } + hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST); - @Override - public StreamsResultSet readNew(BigInteger sequence) { - throw new NotImplementedException(); - } + UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint(); + userstreamEndpoint.withFollowings(true); + userstreamEndpoint.withUser(false); + userstreamEndpoint.allReplies(false); + endpoint = userstreamEndpoint; + } else if (config.getEndpoint().equals("sample") ) { - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - throw new NotImplementedException(); - } - - @Override - public boolean isRunning() { - return this.running.get() && !client.isDone(); - } + hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); - @Override - public void prepare(Object o) { + boolean track = config.getTrack() != null && !config.getTrack().isEmpty(); + boolean follow = config.getFollow() != null && !config.getFollow().isEmpty(); - Preconditions.checkNotNull(config.getEndpoint()); - - if(config.getEndpoint().equals("userstream") ) { - - hosebirdHosts = new HttpHosts(Constants.USERSTREAM_HOST); - - UserstreamEndpoint userstreamEndpoint = new UserstreamEndpoint(); - userstreamEndpoint.withFollowings(true); - userstreamEndpoint.withUser(false); - userstreamEndpoint.allReplies(false); - endpoint = userstreamEndpoint; + if ( track || follow ) { + LOGGER.debug("***\tPRESENT\t***"); + StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint(); + if ( track ) { + statusesFilterEndpoint.trackTerms(config.getTrack()); } - else if(config.getEndpoint().equals("sample") ) { - - hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); - - boolean track = config.getTrack() != null && !config.getTrack().isEmpty(); - boolean follow = config.getFollow() != null && !config.getFollow().isEmpty(); - - if( track || follow ) { - LOGGER.debug("***\tPRESENT\t***"); - StatusesFilterEndpoint statusesFilterEndpoint = new StatusesFilterEndpoint(); - if( track ) { - statusesFilterEndpoint.trackTerms(config.getTrack()); - } - if( follow ) { - statusesFilterEndpoint.followings(config.getFollow()); - } - this.endpoint = statusesFilterEndpoint; - } else { - endpoint = new StatusesSampleEndpoint(); - } - + if ( follow ) { + statusesFilterEndpoint.followings(config.getFollow()); } - else if( config.getEndpoint().endsWith("firehose")) { - hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); - endpoint = new StatusesFirehoseEndpoint(); - } else { - LOGGER.error("NO ENDPOINT RESOLVED"); - return; - } - - if( config.getBasicauth() != null ) { - - Preconditions.checkNotNull(config.getBasicauth().getUsername()); - Preconditions.checkNotNull(config.getBasicauth().getPassword()); - - auth = new BasicAuth( - config.getBasicauth().getUsername(), - config.getBasicauth().getPassword() - ); - - } else if( config.getOauth() != null ) { - - Preconditions.checkNotNull(config.getOauth().getConsumerKey()); - Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); - Preconditions.checkNotNull(config.getOauth().getAccessToken()); - Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - - auth = new OAuth1(config.getOauth().getConsumerKey(), - config.getOauth().getConsumerSecret(), - config.getOauth().getAccessToken(), - config.getOauth().getAccessTokenSecret()); + this.endpoint = statusesFilterEndpoint; + } else { + endpoint = new StatusesSampleEndpoint(); + } + + } else if ( config.getEndpoint().endsWith("firehose")) { + hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); + endpoint = new StatusesFirehoseEndpoint(); + } else { + LOGGER.error("NO ENDPOINT RESOLVED"); + return; + } - } else { - LOGGER.error("NO AUTH RESOLVED"); - return; - } + if ( config.getBasicauth() != null ) { - LOGGER.debug("host={}\tendpoint={}\taut={}", hosebirdHosts, endpoint, auth); + Preconditions.checkNotNull(config.getBasicauth().getUsername()); + Preconditions.checkNotNull(config.getBasicauth().getPassword()); - providerQueue = new LinkedBlockingQueue<>(MAX_BATCH); + auth = new BasicAuth( + config.getBasicauth().getUsername(), + config.getBasicauth().getPassword() + ); - client = new ClientBuilder() - .name("apache/streams/streams-contrib/streams-provider-twitter") - .hosts(hosebirdHosts) - .endpoint(endpoint) - .authentication(auth) - .connectionTimeout(1200000) - .processor(processor) - .build(); + } else if ( config.getOauth() != null ) { - } + Preconditions.checkNotNull(config.getOauth().getConsumerKey()); + Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); + Preconditions.checkNotNull(config.getOauth().getAccessToken()); + Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - @Override - public void cleanUp() { - this.client.stop(); - this.processor.cleanUp(); - this.running.set(false); - } + auth = new OAuth1(config.getOauth().getConsumerKey(), + config.getOauth().getConsumerSecret(), + config.getOauth().getAccessToken(), + config.getOauth().getAccessTokenSecret()); - @Override - public DatumStatusCounter getDatumStatusCounter() { - return countersTotal; + } else { + LOGGER.error("NO AUTH RESOLVED"); + return; } - protected boolean addDatum(Future<List<StreamsDatum>> future) { - try { - ComponentUtils.offerUntilSuccess(future, providerQueue); - countersCurrent.incrementStatus(DatumStatus.SUCCESS); - return true; - } catch (Exception e) { - countersCurrent.incrementStatus(DatumStatus.FAIL); - LOGGER.warn("Unable to enqueue item from Twitter stream"); - return false; - } + LOGGER.debug("host={}\tendpoint={}\taut={}", hosebirdHosts, endpoint, auth); + + providerQueue = new LinkedBlockingQueue<>(MAX_BATCH); + + client = new ClientBuilder() + .name("apache/streams/streams-contrib/streams-provider-twitter") + .hosts(hosebirdHosts) + .endpoint(endpoint) + .authentication(auth) + .connectionTimeout(1200000) + .processor(processor) + .build(); + + } + + @Override + public void cleanUp() { + this.client.stop(); + this.processor.cleanUp(); + this.running.set(false); + } + + @Override + public DatumStatusCounter getDatumStatusCounter() { + return countersTotal; + } + + protected boolean addDatum(Future<List<StreamsDatum>> future) { + try { + ComponentUtils.offerUntilSuccess(future, providerQueue); + countersCurrent.incrementStatus(DatumStatus.SUCCESS); + return true; + } catch (Exception ex) { + countersCurrent.incrementStatus(DatumStatus.FAIL); + LOGGER.warn("Unable to enqueue item from Twitter stream"); + return false; } - - protected void drainTo(Queue<StreamsDatum> drain) { - int count = 0; - while(!providerQueue.isEmpty() && count <= MAX_BATCH) { - for(StreamsDatum datum : pollForDatum()) { - ComponentUtils.offerUntilSuccess(datum, drain); - count++; - } - } + } + + protected void drainTo(Queue<StreamsDatum> drain) { + int count = 0; + while (!providerQueue.isEmpty() && count <= MAX_BATCH) { + for (StreamsDatum datum : pollForDatum()) { + ComponentUtils.offerUntilSuccess(datum, drain); + count++; + } } - - protected List<StreamsDatum> pollForDatum() { - try { - return providerQueue.poll().get(); - } catch (InterruptedException e) { - LOGGER.warn("Interrupted while waiting for future. Initiate shutdown."); - this.cleanUp(); - Thread.currentThread().interrupt(); - return new ArrayList<>(); - } catch (ExecutionException e) { - LOGGER.warn("Error getting tweet from future"); - return new ArrayList<>(); - } + } + + protected List<StreamsDatum> pollForDatum() { + try { + return providerQueue.poll().get(); + } catch (InterruptedException ex) { + LOGGER.warn("Interrupted while waiting for future. Initiate shutdown."); + this.cleanUp(); + Thread.currentThread().interrupt(); + return new ArrayList<>(); + } catch (ExecutionException ex) { + LOGGER.warn("Error getting tweet from future"); + return new ArrayList<>(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index cea9829..7461356 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -18,6 +18,17 @@ package org.apache.streams.twitter.provider; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.TwitterUserInformationConfiguration; +import org.apache.streams.twitter.converter.TwitterDateTimeFormat; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; @@ -31,17 +42,6 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.config.ComponentConfigurator; -import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.twitter.TwitterUserInformationConfiguration; -import org.apache.streams.twitter.converter.TwitterDateTimeFormat; -import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +65,6 @@ import java.util.List; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -75,320 +74,335 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static java.util.concurrent.Executors.newSingleThreadExecutor; /** - * Retrieve recent posts from a list of user ids or names. - * - * To use from command line: - * - * Supply (at least) the following required configuration in application.conf: - * - * twitter.oauth.consumerKey - * twitter.oauth.consumerSecret - * twitter.oauth.accessToken - * twitter.oauth.accessTokenSecret - * twitter.info - * - * Launch using: - * - * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterTimelineProvider -Dexec.args="application.conf tweets.json" + * Retrieve recent posts from a list of user ids or names. */ public class TwitterTimelineProvider implements StreamsProvider, Serializable { - public final static String STREAMS_ID = "TwitterTimelineProvider"; - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class); - - public static final int MAX_NUMBER_WAITING = 10000; - - private TwitterUserInformationConfiguration config; - - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - public TwitterUserInformationConfiguration getConfig() { - return config; - } - - public void setConfig(TwitterUserInformationConfiguration config) { - this.config = config; - } - - protected Collection<String[]> screenNameBatches; - protected Collection<Long> ids; + public static final String STREAMS_ID = "TwitterTimelineProvider"; - protected volatile Queue<StreamsDatum> providerQueue; + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProvider.class); - protected int idsCount; - protected Twitter client; + public static final int MAX_NUMBER_WAITING = 10000; - protected ListeningExecutorService executor; + private TwitterUserInformationConfiguration config; - protected DateTime start; - protected DateTime end; + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - protected final AtomicBoolean running = new AtomicBoolean(); + public TwitterUserInformationConfiguration getConfig() { + return config; + } - List<ListenableFuture<Object>> futures = new ArrayList<>(); + public void setConfig(TwitterUserInformationConfiguration config) { + this.config = config; + } - Boolean jsonStoreEnabled; - Boolean includeEntitiesEnabled; + protected Collection<String[]> screenNameBatches; + protected Collection<Long> ids; - public static void main(String[] args) throws Exception { + protected volatile Queue<StreamsDatum> providerQueue; - Preconditions.checkArgument(args.length >= 2); + protected int idsCount; + protected Twitter client; - String configfile = args[0]; - String outfile = args[1]; + protected ListeningExecutorService executor; - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + protected DateTime start; + protected DateTime end; - Config typesafe = testResourceConfig.withFallback(reference).resolve(); + protected final AtomicBoolean running = new AtomicBoolean(); - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter"); - TwitterTimelineProvider provider = new TwitterTimelineProvider(config); + List<ListenableFuture<Object>> futures = new ArrayList<>(); - ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + Boolean jsonStoreEnabled; + Boolean includeEntitiesEnabled; - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - try { - json = mapper.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException e) { - System.err.println(e.getMessage()); - } - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); - } + /** + * To use from command line: + * + * <p/> + * Supply (at least) the following required configuration in application.conf: + * + * <p/> + * twitter.oauth.consumerKey + * twitter.oauth.consumerSecret + * twitter.oauth.accessToken + * twitter.oauth.accessTokenSecret + * twitter.info + * + * <p/> + * Launch using: + * + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterTimelineProvider -Dexec.args="application.conf tweets.json" + * + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { - public TwitterTimelineProvider(TwitterUserInformationConfiguration config) { - this.config = config; - } + Preconditions.checkArgument(args.length >= 2); - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; - } + String configfile = args[0]; + String outfile = args[1]; - @Override - public String getId() { - return STREAMS_ID; - } + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); - @Override - public void prepare(Object o) { + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter"); + TwitterTimelineProvider provider = new TwitterTimelineProvider(config); + ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; try { - lock.writeLock().lock(); - providerQueue = constructQueue(); - } finally { - lock.writeLock().unlock(); + json = mapper.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); } - - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(config.getOauth().getConsumerKey()); - Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); - Preconditions.checkNotNull(config.getOauth().getAccessToken()); - Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - Preconditions.checkNotNull(config.getInfo()); - - consolidateToIDs(); - - if(ids.size() > 1) - executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size())); - else - executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor()); + } + } + while ( provider.isRunning() ); + provider.cleanUp(); + outStream.flush(); + } + + public TwitterTimelineProvider(TwitterUserInformationConfiguration config) { + this.config = config; + } + + public Queue<StreamsDatum> getProviderQueue() { + return this.providerQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void prepare(Object configurationObject) { + + try { + lock.writeLock().lock(); + providerQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); } - @Override - public void startStream() { + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(config.getOauth().getConsumerKey()); + Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); + Preconditions.checkNotNull(config.getOauth().getAccessToken()); + Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); + Preconditions.checkNotNull(config.getInfo()); - LOGGER.debug("{} startStream", STREAMS_ID); + consolidateToIDs(); - Preconditions.checkArgument(!ids.isEmpty()); + if (ids.size() > 1) { + executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size())); + } else { + executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor()); + } + } - running.set(true); + @Override + public void startStream() { - submitTimelineThreads(ids.toArray(new Long[0])); + LOGGER.debug("{} startStream", STREAMS_ID); - executor.shutdown(); + Preconditions.checkArgument(!ids.isEmpty()); - } + running.set(true); - public boolean shouldContinuePulling(List<Status> statuses) { - return (statuses != null) && (statuses.size() > 0); - } + submitTimelineThreads(ids.toArray(new Long[0])); - protected void submitTimelineThreads(Long[] ids) { + executor.shutdown(); - Twitter client = getTwitterClient(); + } - for(int i = 0; i < ids.length; i++) { + public boolean shouldContinuePulling(List<Status> statuses) { + return (statuses != null) && (statuses.size() > 0); + } - TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]); - ListenableFuture future = executor.submit(providerTask); - futures.add(future); - LOGGER.info("submitted {}", ids[i]); - } + protected void submitTimelineThreads(Long[] ids) { - } + Twitter client = getTwitterClient(); - private Collection<Long> retrieveIds(String[] screenNames) { - Twitter client = getTwitterClient(); + for (int i = 0; i < ids.length; i++) { - List<Long> ids = Lists.newArrayList(); - try { - for (User tStat : client.lookupUsers(screenNames)) { - ids.add(tStat.getId()); - } - } catch (TwitterException e) { - LOGGER.error("Failure retrieving user details.", e.getMessage()); - } - return ids; + TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]); + ListenableFuture future = executor.submit(providerTask); + futures.add(future); + LOGGER.info("submitted {}", ids[i]); } - public StreamsResultSet readCurrent() { + } - StreamsResultSet result; + private Collection<Long> retrieveIds(String[] screenNames) { + Twitter client = getTwitterClient(); - LOGGER.debug("Providing {} docs", providerQueue.size()); + List<Long> ids = Lists.newArrayList(); + try { + for (User twitterUser : client.lookupUsers(screenNames)) { + ids.add(twitterUser.getId()); + } + } catch (TwitterException ex) { + LOGGER.error("Failure retrieving user details.", ex.getMessage()); + } + return ids; + } - try { - lock.writeLock().lock(); - result = new StreamsResultSet(providerQueue); - result.setCounter(new DatumStatusCounter()); - providerQueue = constructQueue(); - } finally { - lock.writeLock().unlock(); - } + @Override + public StreamsResultSet readCurrent() { - if( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) { - LOGGER.info("Finished. Cleaning up..."); + StreamsResultSet result; - running.set(false); + LOGGER.debug("Providing {} docs", providerQueue.size()); - LOGGER.info("Exiting"); - } + try { + lock.writeLock().lock(); + result = new StreamsResultSet(providerQueue); + result.setCounter(new DatumStatusCounter()); + providerQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); + } - return result; + if ( result.size() == 0 && providerQueue.isEmpty() && executor.isTerminated() ) { + LOGGER.info("Finished. Cleaning up..."); - } + running.set(false); - protected Queue<StreamsDatum> constructQueue() { - return new LinkedBlockingQueue<StreamsDatum>(); + LOGGER.info("Exiting"); } - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } + return result; - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - throw new NotImplementedException(); - } + } + protected Queue<StreamsDatum> constructQueue() { + return new LinkedBlockingQueue<StreamsDatum>(); + } + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } - /** - * Using the "info" list that is contained in the configuration, ensure that all - * account identifiers are converted to IDs (Longs) instead of screenNames (Strings) - */ - protected void consolidateToIDs() { - List<String> screenNames = Lists.newArrayList(); - ids = Lists.newArrayList(); - - for(String account : config.getInfo()) { - try { - if (new Long(account) != null) { - ids.add(Long.parseLong(Objects.toString(account, null))); - } else { - screenNames.add(account); - } - } catch (Exception e) { - LOGGER.error("Exception while trying to add ID: {{}}, {}", account, e); - } - } + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + throw new NotImplementedException(); + } - // Twitter allows for batches up to 100 per request, but you cannot mix types - screenNameBatches = new ArrayList<String[]>(); - while(screenNames.size() >= 100) { - screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0])); - screenNames = screenNames.subList(100, screenNames.size()); - } - if(screenNames.size() > 0) - screenNameBatches.add(screenNames.toArray(new String[ids.size()])); - Iterator<String[]> screenNameBatchIterator = screenNameBatches.iterator(); + /** + * Using the "info" list that is contained in the configuration, ensure that all + * account identifiers are converted to IDs (Longs) instead of screenNames (Strings). + */ + protected void consolidateToIDs() { + List<String> screenNames = Lists.newArrayList(); + ids = Lists.newArrayList(); - while(screenNameBatchIterator.hasNext()) { - Collection<Long> batchIds = retrieveIds(screenNameBatchIterator.next()); - ids.addAll(batchIds); + for (String account : config.getInfo()) { + try { + if (new Long(account) != null) { + ids.add(Long.parseLong(Objects.toString(account, null))); + } else { + screenNames.add(account); } + } catch (Exception ex) { + LOGGER.error("Exception while trying to add ID: {{}}, {}", account, ex); + } } - public Twitter getTwitterClient() { - - String baseUrl = TwitterProviderUtil.baseUrl(config); - - ConfigurationBuilder builder = new ConfigurationBuilder() - .setOAuthConsumerKey(config.getOauth().getConsumerKey()) - .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) - .setOAuthAccessToken(config.getOauth().getAccessToken()) - .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) - .setIncludeEntitiesEnabled(true) - .setJSONStoreEnabled(true) - .setAsyncNumThreads(3) - .setRestBaseURL(baseUrl) - .setIncludeMyRetweetEnabled(Boolean.TRUE) - .setPrettyDebugEnabled(Boolean.TRUE); - - return new TwitterFactory(builder.build()).getInstance(); + // Twitter allows for batches up to 100 per request, but you cannot mix types + screenNameBatches = new ArrayList<String[]>(); + while (screenNames.size() >= 100) { + screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0])); + screenNames = screenNames.subList(100, screenNames.size()); } - @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); + if (screenNames.size() > 0) { + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); } - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } + Iterator<String[]> screenNameBatchIterator = screenNameBatches.iterator(); - @Override - public boolean isRunning() { - if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { - LOGGER.info("Completed"); - running.set(false); - LOGGER.info("Exiting"); + while (screenNameBatchIterator.hasNext()) { + Collection<Long> batchIds = retrieveIds(screenNameBatchIterator.next()); + ids.addAll(batchIds); + } + } + + /** + * get Twitter Client from TwitterUserInformationConfiguration. + * @return result + */ + public Twitter getTwitterClient() { + + String baseUrl = TwitterProviderUtil.baseUrl(config); + + ConfigurationBuilder builder = new ConfigurationBuilder() + .setOAuthConsumerKey(config.getOauth().getConsumerKey()) + .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) + .setOAuthAccessToken(config.getOauth().getAccessToken()) + .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) + .setIncludeEntitiesEnabled(true) + .setJSONStoreEnabled(true) + .setAsyncNumThreads(3) + .setRestBaseURL(baseUrl) + .setIncludeMyRetweetEnabled(Boolean.TRUE) + .setPrettyDebugEnabled(Boolean.TRUE); + + return new TwitterFactory(builder.build()).getInstance(); + } + + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + System.err.println("Pool did not terminate"); } - return running.get(); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } + + @Override + public boolean isRunning() { + if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + running.set(false); + LOGGER.info("Exiting"); } + return running.get(); + } }