http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java index 111d213..dbf6ac9 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java @@ -18,12 +18,13 @@ package org.apache.streams.twitter.provider; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; import org.apache.streams.util.ComponentUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import twitter4j.Paging; @@ -39,77 +40,79 @@ import java.util.List; */ public class TwitterTimelineProviderTask implements Runnable { - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderTask.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderTask.class); - private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); - protected TwitterTimelineProvider provider; - protected Twitter client; - protected Long id; + protected TwitterTimelineProvider provider; + protected Twitter client; + protected Long id; - public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, Long id) { - this.provider = provider; - this.client = twitter; - this.id = id; - } + /** + * TwitterTimelineProviderTask constructor. + * @param provider TwitterTimelineProvider + * @param twitter Twitter + * @param id Long + */ + public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, Long id) { + this.provider = provider; + this.client = twitter; + this.id = id; + } + + @Override + public void run() { + + Paging paging = new Paging(1, 200); + List<Status> statuses = null; + int count = 0; + + LOGGER.info(id + " Thread Starting"); + + do { + int keepTrying = 0; - @Override - public void run() { - - Paging paging = new Paging(1, 200); - List<Status> statuses = null; - int count = 0; - - LOGGER.info(id + " Thread Starting"); - - do - { - int keepTrying = 0; - - // keep trying to load, give it 5 attempts. - //This value was chosen because it seemed like a reasonable number of times - //to retry capturing a timeline given the sorts of errors that could potentially - //occur (network timeout/interruption, faulty client, etc.) - while (keepTrying < 5) - { - - try - { - this.client = provider.getTwitterClient(); - - statuses = client.getUserTimeline(id, paging); - - for (Status tStat : statuses) { - - String json = TwitterObjectFactory.getRawJSON(tStat); - if( count < provider.getConfig().getMaxItems() ) { - try { - org.apache.streams.twitter.pojo.Tweet tweet = MAPPER.readValue(json, org.apache.streams.twitter.pojo.Tweet.class); - ComponentUtils.offerUntilSuccess(new StreamsDatum(tweet), provider.providerQueue); - } catch(Exception exception) { - LOGGER.warn("Failed to read document as Tweet ", tStat); - } - count++; - } - - } - - paging.setPage(paging.getPage() + 1); - - keepTrying = 10; - } - catch(TwitterException twitterException) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException); - } - catch(Exception e) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, id, e); - } + // keep trying to load, give it 5 attempts. + //This value was chosen because it seemed like a reasonable number of times + //to retry capturing a timeline given the sorts of errors that could potentially + //occur (network timeout/interruption, faulty client, etc.) + while (keepTrying < 5) { + + try { + this.client = provider.getTwitterClient(); + + statuses = client.getUserTimeline(id, paging); + + for (Status twitterStatus : statuses) { + + String json = TwitterObjectFactory.getRawJSON(twitterStatus); + + if ( count < provider.getConfig().getMaxItems() ) { + try { + org.apache.streams.twitter.pojo.Tweet tweet = MAPPER.readValue(json, org.apache.streams.twitter.pojo.Tweet.class); + ComponentUtils.offerUntilSuccess(new StreamsDatum(tweet), provider.providerQueue); + } catch (Exception exception) { + LOGGER.warn("Failed to read document as Tweet ", twitterStatus); + } + count++; } - } - while (provider.shouldContinuePulling(statuses) && count < provider.getConfig().getMaxItems()); - LOGGER.info(id + " Thread Finished"); + } + + paging.setPage(paging.getPage() + 1); + keepTrying = 10; + } catch (TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException); + } catch (Exception ex) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, id, ex); + } + } } + while (provider.shouldContinuePulling(statuses) && count < provider.getConfig().getMaxItems()); + + LOGGER.info(id + " Thread Finished"); + + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java index 15ff791..3210f80 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -18,18 +18,6 @@ package org.apache.streams.twitter.provider; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfiguration; import org.apache.streams.config.StreamsConfigurator; @@ -43,6 +31,18 @@ import org.apache.streams.twitter.TwitterUserInformationConfiguration; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; import org.apache.streams.twitter.pojo.User; import org.apache.streams.util.ComponentUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.commons.lang.NotImplementedException; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,359 +73,394 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static java.util.concurrent.Executors.newSingleThreadExecutor; -public class TwitterUserInformationProvider implements StreamsProvider, Serializable -{ +/** + * Retrieve current profile status from a list of user ids or names. + */ +public class TwitterUserInformationProvider implements StreamsProvider, Serializable { + + public static final String STREAMS_ID = "TwitterUserInformationProvider"; + + private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class); + + public static final int MAX_NUMBER_WAITING = 1000; + + private TwitterUserInformationConfiguration config; + + /** + * To use from command line: + * + * <p/> + * Supply (at least) the following required configuration in application.conf: + * + * <p/> + * twitter.oauth.consumerKey + * twitter.oauth.consumerSecret + * twitter.oauth.accessToken + * twitter.oauth.accessTokenSecret + * twitter.info + * + * <p/> + * Launch using: + * + * <p/> + * mvn exec:java -Dexec.mainClass=org.apache.streams.twitter.provider.TwitterUserInformationProvider -Dexec.args="application.conf tweets.json" + * + * @param args args + * @throws Exception Exception + */ + public static void main(String[] args) throws Exception { + + Preconditions.checkArgument(args.length >= 2); + + String configfile = args[0]; + String outfile = args[1]; + + Config reference = ConfigFactory.load(); + File file = new File(configfile); + assert (file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)); + + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + + StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); + TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter"); + TwitterUserInformationProvider provider = new TwitterUserInformationProvider(config); + + PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); + provider.prepare(config); + provider.startStream(); + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); + Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); + while (iterator.hasNext()) { + StreamsDatum datum = iterator.next(); + String json; + try { + json = MAPPER.writeValueAsString(datum.getDocument()); + outStream.println(json); + } catch (JsonProcessingException ex) { + System.err.println(ex.getMessage()); + } + } + } + while ( provider.isRunning()); + provider.cleanUp(); + outStream.flush(); + } + + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - public static final String STREAMS_ID = "TwitterUserInformationProvider"; + protected volatile Queue<StreamsDatum> providerQueue; - private static ObjectMapper MAPPER = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + public TwitterUserInformationConfiguration getConfig() { + return config; + } - private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProvider.class); + public void setConfig(TwitterUserInformationConfiguration config) { + this.config = config; + } - public static final int MAX_NUMBER_WAITING = 1000; + protected Iterator<Long[]> idsBatches; + protected Iterator<String[]> screenNameBatches; - private TwitterUserInformationConfiguration config; + protected ListeningExecutorService executor; - public static void main(String[] args) throws Exception { + protected DateTime start; + protected DateTime end; - Preconditions.checkArgument(args.length >= 2); + protected final AtomicBoolean running = new AtomicBoolean(); - String configfile = args[0]; - String outfile = args[1]; + // TODO: this should be abstracted out + public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) { + return new ThreadPoolExecutor(numThreads, numThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } - Config reference = ConfigFactory.load(); - File conf_file = new File(configfile); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + /** + * TwitterUserInformationProvider constructor. + * Resolves config from JVM properties 'twitter'. + */ + public TwitterUserInformationProvider() { + this.config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter")); + } - Config typesafe = testResourceConfig.withFallback(reference).resolve(); + public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) { + this.config = config; + } - StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe); - TwitterUserInformationConfiguration config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(typesafe, "twitter"); - TwitterUserInformationProvider provider = new TwitterUserInformationProvider(config); + public Queue<StreamsDatum> getProviderQueue() { + return this.providerQueue; + } - PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile))); - provider.prepare(config); - provider.startStream(); - do { - Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while(iterator.hasNext()) { - StreamsDatum datum = iterator.next(); - String json; - try { - json = MAPPER.writeValueAsString(datum.getDocument()); - outStream.println(json); - } catch (JsonProcessingException e) { - System.err.println(e.getMessage()); - } - } - } while( provider.isRunning()); - provider.cleanUp(); - outStream.flush(); + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void prepare(Object configurationObject) { + + if ( configurationObject instanceof TwitterFollowingConfiguration ) { + config = (TwitterUserInformationConfiguration) configurationObject; } - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(config.getOauth()); + Preconditions.checkNotNull(config.getOauth().getConsumerKey()); + Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); + Preconditions.checkNotNull(config.getOauth().getAccessToken()); + Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); + Preconditions.checkNotNull(config.getInfo()); + + try { + lock.writeLock().lock(); + providerQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); + } - protected volatile Queue<StreamsDatum> providerQueue; + Preconditions.checkNotNull(providerQueue); - public TwitterUserInformationConfiguration getConfig() { return config; } + List<String> screenNames = new ArrayList<String>(); + List<String[]> screenNameBatches = new ArrayList<String[]>(); - public void setConfig(TwitterUserInformationConfiguration config) { this.config = config; } + List<Long> ids = new ArrayList<Long>(); + List<Long[]> idsBatches = new ArrayList<Long[]>(); - protected Iterator<Long[]> idsBatches; - protected Iterator<String[]> screenNameBatches; + for (String s : config.getInfo()) { + if (s != null) { + String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase(); - protected ListeningExecutorService executor; + // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the + // screen name list + try { + ids.add(Long.parseLong(potentialScreenName)); + } catch (Exception ex) { + screenNames.add(potentialScreenName); + } - protected DateTime start; - protected DateTime end; + // Twitter allows for batches up to 100 per request, but you cannot mix types - protected final AtomicBoolean running = new AtomicBoolean(); + if (ids.size() >= 100) { + // add the batch + idsBatches.add(ids.toArray(new Long[ids.size()])); + // reset the Ids + ids = new ArrayList<Long>(); + } - public static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + if (screenNames.size() >= 100) { + // add the batch + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); + // reset the Ids + screenNames = new ArrayList<String>(); + } + } } - public TwitterUserInformationProvider() { - this.config = new ComponentConfigurator<>(TwitterUserInformationConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("twitter")); - } - public TwitterUserInformationProvider(TwitterUserInformationConfiguration config) { - this.config = config; + if (ids.size() > 0) { + idsBatches.add(ids.toArray(new Long[ids.size()])); } - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; + if (screenNames.size() > 0) { + screenNameBatches.add(screenNames.toArray(new String[ids.size()])); } - @Override - public String getId() { - return STREAMS_ID; + if (ids.size() + screenNames.size() > 0) { + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size()))); + } else { + executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor()); } - @Override - public void prepare(Object o) { - - if( o instanceof TwitterFollowingConfiguration ) - config = (TwitterUserInformationConfiguration) o; - - Preconditions.checkNotNull(config); - Preconditions.checkNotNull(config.getOauth()); - Preconditions.checkNotNull(config.getOauth().getConsumerKey()); - Preconditions.checkNotNull(config.getOauth().getConsumerSecret()); - Preconditions.checkNotNull(config.getOauth().getAccessToken()); - Preconditions.checkNotNull(config.getOauth().getAccessTokenSecret()); - Preconditions.checkNotNull(config.getInfo()); - - try { - lock.writeLock().lock(); - providerQueue = constructQueue(); - } finally { - lock.writeLock().unlock(); - } - - Preconditions.checkNotNull(providerQueue); - - List<String> screenNames = new ArrayList<String>(); - List<String[]> screenNameBatches = new ArrayList<String[]>(); - - List<Long> ids = new ArrayList<Long>(); - List<Long[]> idsBatches = new ArrayList<Long[]>(); - - for(String s : config.getInfo()) { - if(s != null) - { - String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase(); - - // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the - // screen name list - try { - ids.add(Long.parseLong(potentialScreenName)); - } catch (Exception e) { - screenNames.add(potentialScreenName); - } - - // Twitter allows for batches up to 100 per request, but you cannot mix types - - if(ids.size() >= 100) { - // add the batch - idsBatches.add(ids.toArray(new Long[ids.size()])); - // reset the Ids - ids = new ArrayList<Long>(); - } - - if(screenNames.size() >= 100) { - // add the batch - screenNameBatches.add(screenNames.toArray(new String[ids.size()])); - // reset the Ids - screenNames = new ArrayList<String>(); - } - } - } + Preconditions.checkNotNull(executor); + this.idsBatches = idsBatches.iterator(); + this.screenNameBatches = screenNameBatches.iterator(); + } - if(ids.size() > 0) - idsBatches.add(ids.toArray(new Long[ids.size()])); + @Override + public void startStream() { - if(screenNames.size() > 0) - screenNameBatches.add(screenNames.toArray(new String[ids.size()])); + Preconditions.checkNotNull(executor); - if(ids.size() + screenNames.size() > 0) - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size()))); - else - executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor()); + Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); - Preconditions.checkNotNull(executor); + LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches); - this.idsBatches = idsBatches.iterator(); - this.screenNameBatches = screenNameBatches.iterator(); + while (idsBatches.hasNext()) { + loadBatch(idsBatches.next()); } - @Override - public void startStream() { - - Preconditions.checkNotNull(executor); - - Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext()); + while (screenNameBatches.hasNext()) { + loadBatch(screenNameBatches.next()); + } - LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches); + running.set(true); - while(idsBatches.hasNext()) - loadBatch(idsBatches.next()); + executor.shutdown(); + } - while(screenNameBatches.hasNext()) - loadBatch(screenNameBatches.next()); + protected void loadBatch(Long[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; - running.set(true); + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) { + try { + long[] toQuery = new long[ids.length]; - executor.shutdown(); - } + for (int i = 0; i < ids.length; i++) { + toQuery[i] = ids[i]; + } - protected void loadBatch(Long[] ids) { - Twitter client = getTwitterClient(); - int keepTrying = 0; - - // keep trying to load, give it 5 attempts. - //while (keepTrying < 10) - while (keepTrying < 1) - { - try - { - long[] toQuery = new long[ids.length]; - for(int i = 0; i < ids.length; i++) - toQuery[i] = ids[i]; - - for (twitter4j.User tUser : client.lookupUsers(toQuery)) { - String json = DataObjectFactory.getRawJSON(tUser); - try { - User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class); - ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue); - } catch(Exception exception) { - LOGGER.warn("Failed to read document as User ", tUser); - } - } - keepTrying = 10; - } - catch(TwitterException twitterException) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); - } - catch(Exception e) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, e); - } + for (twitter4j.User twitterUser : client.lookupUsers(toQuery)) { + String json = DataObjectFactory.getRawJSON(twitterUser); + try { + User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class); + ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue); + } catch (Exception exception) { + LOGGER.warn("Failed to read document as User ", twitterUser); + } } + keepTrying = 10; + } catch (TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } catch (Exception ex) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, ex); + } } - - protected void loadBatch(String[] ids) { - Twitter client = getTwitterClient(); - int keepTrying = 0; - - // keep trying to load, give it 5 attempts. - //while (keepTrying < 10) - while (keepTrying < 1) - { - try - { - for (twitter4j.User tUser : client.lookupUsers(ids)) { - String json = DataObjectFactory.getRawJSON(tUser); - try { - User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class); - ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue); - } catch(Exception exception) { - LOGGER.warn("Failed to read document as User ", tUser); - } - } - keepTrying = 10; - } - catch(TwitterException twitterException) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); - } - catch(Exception e) { - keepTrying += TwitterErrorHandler.handleTwitterError(client, e); - } + } + + protected void loadBatch(String[] ids) { + Twitter client = getTwitterClient(); + int keepTrying = 0; + + // keep trying to load, give it 5 attempts. + //while (keepTrying < 10) + while (keepTrying < 1) { + try { + for (twitter4j.User twitterUser : client.lookupUsers(ids)) { + String json = DataObjectFactory.getRawJSON(twitterUser); + try { + User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class); + ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue); + } catch (Exception exception) { + LOGGER.warn("Failed to read document as User ", twitterUser); + } } + keepTrying = 10; + } catch (TwitterException twitterException) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); + } catch (Exception ex) { + keepTrying += TwitterErrorHandler.handleTwitterError(client, ex); + } } + } - public StreamsResultSet readCurrent() { - - LOGGER.debug("{}{} - readCurrent", idsBatches, screenNameBatches); + @Override + public StreamsResultSet readCurrent() { - StreamsResultSet result; - - try { - lock.writeLock().lock(); - result = new StreamsResultSet(providerQueue); - result.setCounter(new DatumStatusCounter()); - providerQueue = constructQueue(); - LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size()); - } finally { - lock.writeLock().unlock(); - } + LOGGER.debug("{}{} - readCurrent", idsBatches, screenNameBatches); - return result; + StreamsResultSet result; + try { + lock.writeLock().lock(); + result = new StreamsResultSet(providerQueue); + result.setCounter(new DatumStatusCounter()); + providerQueue = constructQueue(); + LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size()); + } finally { + lock.writeLock().unlock(); } - protected Queue<StreamsDatum> constructQueue() { - return new LinkedBlockingQueue<StreamsDatum>(); - } + return result; - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } + } - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; - } + protected Queue<StreamsDatum> constructQueue() { + return new LinkedBlockingQueue<StreamsDatum>(); + } - @Override - public boolean isRunning() { + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } - if( providerQueue.isEmpty() && executor.isTerminated() ) { - LOGGER.info("{}{} - completed", idsBatches, screenNameBatches); + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); + return result; + } - running.set(false); + @Override + public boolean isRunning() { - LOGGER.info("Exiting"); - } + if ( providerQueue.isEmpty() && executor.isTerminated() ) { + LOGGER.info("{}{} - completed", idsBatches, screenNameBatches); + + running.set(false); - return running.get(); + LOGGER.info("Exiting"); } - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); + return running.get(); + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + System.err.println("Pool did not terminate"); } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); } + } + // TODO: abstract out, also appears in TwitterTimelineProvider + protected Twitter getTwitterClient() { + String baseUrl = TwitterProviderUtil.baseUrl(config); - protected Twitter getTwitterClient() - { - String baseUrl = TwitterProviderUtil.baseUrl(config); + ConfigurationBuilder builder = new ConfigurationBuilder() + .setOAuthConsumerKey(config.getOauth().getConsumerKey()) + .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) + .setOAuthAccessToken(config.getOauth().getAccessToken()) + .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) + .setIncludeEntitiesEnabled(true) + .setJSONStoreEnabled(true) + .setAsyncNumThreads(3) + .setRestBaseURL(baseUrl) + .setIncludeMyRetweetEnabled(Boolean.TRUE) + .setPrettyDebugEnabled(Boolean.TRUE); - ConfigurationBuilder builder = new ConfigurationBuilder() - .setOAuthConsumerKey(config.getOauth().getConsumerKey()) - .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) - .setOAuthAccessToken(config.getOauth().getAccessToken()) - .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) - .setIncludeEntitiesEnabled(true) - .setJSONStoreEnabled(true) - .setAsyncNumThreads(3) - .setRestBaseURL(baseUrl) - .setIncludeMyRetweetEnabled(Boolean.TRUE) - .setPrettyDebugEnabled(Boolean.TRUE); + return new TwitterFactory(builder.build()).getInstance(); + } - return new TwitterFactory(builder.build()).getInstance(); - } + protected void callback() { - protected void callback() { + } - } - - @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java index 42f0fba..a480fd1 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/data/TwitterObjectMapperIT.java @@ -18,25 +18,27 @@ package org.apache.streams.twitter.test.data; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.converter.TwitterDocumentClassifier; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Retweet; +import org.apache.streams.twitter.pojo.Tweet; + import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Optional; import com.google.common.collect.Lists; -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; import org.apache.commons.lang.StringUtils; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.twitter.converter.TwitterDocumentClassifier; -import org.apache.streams.twitter.pojo.Delete; -import org.apache.streams.twitter.pojo.Retweet; -import org.apache.streams.twitter.pojo.Tweet; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + import static org.apache.streams.twitter.converter.TwitterDateTimeFormat.TWITTER_FORMAT; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; @@ -45,87 +47,87 @@ import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertThat; /** -* Tests serialization / deserialization of twitter jsons -*/ + * Tests serialization / deserialization of twitter jsons. + */ public class TwitterObjectMapperIT { - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterObjectMapperIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterObjectMapperIT.class); + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TWITTER_FORMAT)); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TWITTER_FORMAT)); + @Test + public void tests() { - @Test - public void Tests() - { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); + mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - InputStream is = TwitterObjectMapperIT.class.getResourceAsStream("/testtweets.txt"); - InputStreamReader isr = new InputStreamReader(is); - BufferedReader br = new BufferedReader(isr); + InputStream is = TwitterObjectMapperIT.class.getResourceAsStream("/testtweets.txt"); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); - int tweetlinks = 0; - int retweetlinks = 0; + int tweetlinks = 0; + int retweetlinks = 0; - try { - while (br.ready()) { - String line = br.readLine(); - if(!StringUtils.isEmpty(line)) - { - LOGGER.info("raw: {}", line); + try { + while (br.ready()) { + String line = br.readLine(); + if (!StringUtils.isEmpty(line)) { - Class detected = new TwitterDocumentClassifier().detectClasses(line).get(0); + LOGGER.info("raw: {}", line); - ObjectNode event = (ObjectNode) mapper.readTree(line); + Class detected = new TwitterDocumentClassifier().detectClasses(line).get(0); - assertThat(event, is(not(nullValue()))); + ObjectNode event = (ObjectNode) mapper.readTree(line); - if( detected == Tweet.class ) { + assertThat(event, is(not(nullValue()))); - Tweet tweet = mapper.convertValue(event, Tweet.class); + if ( detected == Tweet.class ) { - assertThat(tweet, is(not(nullValue()))); - assertThat(tweet.getCreatedAt(), is(not(nullValue()))); - assertThat(tweet.getText(), is(not(nullValue()))); - assertThat(tweet.getUser(), is(not(nullValue()))); + Tweet tweet = mapper.convertValue(event, Tweet.class); - tweetlinks += Optional.fromNullable(tweet.getEntities().getUrls().size()).or(0); + assertThat(tweet, is(not(nullValue()))); + assertThat(tweet.getCreatedAt(), is(not(nullValue()))); + assertThat(tweet.getText(), is(not(nullValue()))); + assertThat(tweet.getUser(), is(not(nullValue()))); - } else if( detected == Retweet.class ) { + tweetlinks += Optional.fromNullable(tweet.getEntities().getUrls().size()).or(0); - Retweet retweet = mapper.convertValue(event, Retweet.class); + } else if ( detected == Retweet.class ) { - assertThat(retweet.getRetweetedStatus(), is(not(nullValue()))); - assertThat(retweet.getRetweetedStatus().getCreatedAt(), is(not(nullValue()))); - assertThat(retweet.getRetweetedStatus().getText(), is(not(nullValue()))); - assertThat(retweet.getRetweetedStatus().getUser(), is(not(nullValue()))); - assertThat(retweet.getRetweetedStatus().getUser().getId(), is(not(nullValue()))); - assertThat(retweet.getRetweetedStatus().getUser().getCreatedAt(), is(not(nullValue()))); + Retweet retweet = mapper.convertValue(event, Retweet.class); - retweetlinks += Optional.fromNullable(retweet.getRetweetedStatus().getEntities().getUrls().size()).or(0); + assertThat(retweet.getRetweetedStatus(), is(not(nullValue()))); + assertThat(retweet.getRetweetedStatus().getCreatedAt(), is(not(nullValue()))); + assertThat(retweet.getRetweetedStatus().getText(), is(not(nullValue()))); + assertThat(retweet.getRetweetedStatus().getUser(), is(not(nullValue()))); + assertThat(retweet.getRetweetedStatus().getUser().getId(), is(not(nullValue()))); + assertThat(retweet.getRetweetedStatus().getUser().getCreatedAt(), is(not(nullValue()))); - } else if( detected == Delete.class ) { + retweetlinks += Optional.fromNullable(retweet.getRetweetedStatus().getEntities().getUrls().size()).or(0); - Delete delete = mapper.convertValue(event, Delete.class); + } else if ( detected == Delete.class ) { - assertThat(delete.getDelete(), is(not(nullValue()))); - assertThat(delete.getDelete().getStatus(), is(not(nullValue()))); - assertThat(delete.getDelete().getStatus().getId(), is(not(nullValue()))); - assertThat(delete.getDelete().getStatus().getUserId(), is(not(nullValue()))); + Delete delete = mapper.convertValue(event, Delete.class); - } else { - Assert.fail(); - } + assertThat(delete.getDelete(), is(not(nullValue()))); + assertThat(delete.getDelete().getStatus(), is(not(nullValue()))); + assertThat(delete.getDelete().getStatus().getId(), is(not(nullValue()))); + assertThat(delete.getDelete().getStatus().getUserId(), is(not(nullValue()))); - } - } - } catch( Exception e ) { - LOGGER.error("Exception: ", e); + } else { Assert.fail(); + } + } + } + } catch ( Exception ex ) { + LOGGER.error("Exception: ", ex); + Assert.fail(); + } - assertThat(tweetlinks, is(greaterThan(0))); - assertThat(retweetlinks, is(greaterThan(0))); + assertThat(tweetlinks, is(greaterThan(0))); + assertThat(retweetlinks, is(greaterThan(0))); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java index 3d7a6d2..720f6ec 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java @@ -19,6 +19,7 @@ package org.apache.streams.twitter.test.providers; import org.apache.streams.twitter.provider.TwitterFollowingProvider; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,39 +30,39 @@ import java.io.LineNumberReader; public class TwitterFollowingProviderIT { - private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderIT.class); - @Test - public void testTwitterFollowingProvider() throws Exception { + @Test + public void testTwitterFollowingProvider() throws Exception { - String configfile = "./target/test-classes/TwitterFollowingProviderIT.conf"; - String outfile = "./target/test-classes/TwitterFollowingProviderIT.stdout.txt"; + String configfile = "./target/test-classes/TwitterFollowingProviderIT.conf"; + String outfile = "./target/test-classes/TwitterFollowingProviderIT.stdout.txt"; - String[] args = new String[2]; - args[0] = configfile; - args[1] = outfile; + String[] args = new String[2]; + args[0] = configfile; + args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { - try { - TwitterFollowingProvider.main(args); - } catch( Exception e ) { - LOGGER.error("Test Exception!", e); - } - }); - testThread.start(); - testThread.join(60000); + Thread testThread = new Thread((Runnable) () -> { + try { + TwitterFollowingProvider.main(args); + } catch ( Exception ex ) { + LOGGER.error("Test Exception!", ex); + } + }); + testThread.start(); + testThread.join(60000); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() == 10000); + assert (outCounter.getLineNumber() == 10000); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java index c553bf3..12279b9 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java @@ -19,6 +19,7 @@ package org.apache.streams.twitter.test.providers; import org.apache.streams.twitter.provider.TwitterStreamProvider; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,39 +30,39 @@ import java.io.LineNumberReader; public class TwitterStreamProviderIT { - private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProviderIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterStreamProviderIT.class); - final String outfile = "./target/test-classes/TwitterStreamProviderIT.stdout.txt"; - final String configfile = "./target/test-classes/TwitterStreamProviderIT.conf"; + final String outfile = "./target/test-classes/TwitterStreamProviderIT.stdout.txt"; + final String configfile = "./target/test-classes/TwitterStreamProviderIT.conf"; - @Test - public void testTwitterStreamProvider() throws Exception { + @Test + public void testTwitterStreamProvider() throws Exception { - String[] args = new String[2]; - args[0] = configfile; - args[1] = outfile; + String[] args = new String[2]; + args[0] = configfile; + args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { - try { - TwitterStreamProvider.main(args); - } catch( Exception e ) { - LOGGER.error("Test Exception!", e); - } - }); - testThread.start(); - testThread.join(60000); + Thread testThread = new Thread((Runnable) () -> { + try { + TwitterStreamProvider.main(args); + } catch ( Exception ex ) { + LOGGER.error("Test Exception!", ex); + } + }); + testThread.start(); + testThread.join(60000); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() > 25); + assert (outCounter.getLineNumber() > 25); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java index dadfb54..6bb7f20 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java @@ -19,6 +19,7 @@ package org.apache.streams.twitter.test.providers; import org.apache.streams.twitter.provider.TwitterTimelineProvider; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,39 +30,39 @@ import java.io.LineNumberReader; public class TwitterTimelineProviderIT { - private static final Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderIT.class); - @Test - public void testTwitterTimelineProvider() throws Exception { + @Test + public void testTwitterTimelineProvider() throws Exception { - String configfile = "./target/test-classes/TwitterTimelineProviderIT.conf"; - String outfile = "./target/test-classes/TwitterTimelineProviderIT.stdout.txt"; + String configfile = "./target/test-classes/TwitterTimelineProviderIT.conf"; + String outfile = "./target/test-classes/TwitterTimelineProviderIT.stdout.txt"; - String[] args = new String[2]; - args[0] = configfile; - args[1] = outfile; + String[] args = new String[2]; + args[0] = configfile; + args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { - try { - TwitterTimelineProvider.main(args); - } catch( Exception e ) { - LOGGER.error("Test Exception!", e); - } - }); - testThread.start(); - testThread.join(60000); + Thread testThread = new Thread((Runnable) () -> { + try { + TwitterTimelineProvider.main(args); + } catch ( Exception ex ) { + LOGGER.error("Test Exception!", ex); + } + }); + testThread.start(); + testThread.join(60000); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() == 1000); + assert (outCounter.getLineNumber() == 1000); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java index f3ed958..bba6c20 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java @@ -19,6 +19,7 @@ package org.apache.streams.twitter.test.providers; import org.apache.streams.twitter.provider.TwitterUserInformationProvider; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,39 +30,39 @@ import java.io.LineNumberReader; public class TwitterUserInformationProviderIT { - private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProviderIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProviderIT.class); - @Test - public void testTwitterUserInformationProvider() throws Exception { + @Test + public void testTwitterUserInformationProvider() throws Exception { - String configfile = "./target/test-classes/TwitterUserInformationProviderIT.conf"; - String outfile = "./target/test-classes/TwitterUserInformationProviderIT.stdout.txt"; + String configfile = "./target/test-classes/TwitterUserInformationProviderIT.conf"; + String outfile = "./target/test-classes/TwitterUserInformationProviderIT.stdout.txt"; - String[] args = new String[2]; - args[0] = configfile; - args[1] = outfile; + String[] args = new String[2]; + args[0] = configfile; + args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { - try { - TwitterUserInformationProvider.main(args); - } catch( Exception e ) { - LOGGER.error("Test Exception!", e); - } - }); - testThread.start(); - testThread.join(60000); + Thread testThread = new Thread((Runnable) () -> { + try { + TwitterUserInformationProvider.main(args); + } catch ( Exception ex ) { + LOGGER.error("Test Exception!", ex); + } + }); + testThread.start(); + testThread.join(60000); - File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + File out = new File(outfile); + assert (out.exists()); + assert (out.canRead()); + assert (out.isFile()); - FileReader outReader = new FileReader(out); - LineNumberReader outCounter = new LineNumberReader(outReader); + FileReader outReader = new FileReader(out); + LineNumberReader outCounter = new LineNumberReader(outReader); - while(outCounter.readLine() != null) {} + while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() > 750); + assert (outCounter.getLineNumber() > 750); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java index 51f6294..24d646b 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityConvertersTest.java @@ -18,8 +18,6 @@ package org.apache.streams.twitter.test.utils; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; import org.apache.streams.converter.ActivityConverterUtil; import org.apache.streams.data.util.ActivityUtil; import org.apache.streams.jackson.StreamsJacksonMapper; @@ -29,6 +27,9 @@ import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Follow; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; @@ -41,90 +42,98 @@ import java.util.List; */ public class TwitterActivityConvertersTest { - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConvertersTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConvertersTest.class); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); - private ActivityConverterUtil activityConverterUtil = ActivityConverterUtil.getInstance(); + private ActivityConverterUtil activityConverterUtil = ActivityConverterUtil.getInstance(); - private String tweetJson = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profi le_background_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n"; - private String retweetJson = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.c om\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"s ource\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":fa lse,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"n ame\":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n"; + private String tweetJson = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":12345,\"id_str\":\"12345\",\"text\":\"text\",\"source\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":91407775,\"id_str\":\"12345\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"followers_count\":136,\"friends_count\":0,\"listed_count\":1,\"created_at\":\"Fri Nov 20 19:29:02 +0000 2009\",\"favourites_count\":0,\"utc_offset\":null,\"time_zone\":null,\"geo_enabled\":false,\"verified\":false,\"statuses_count\":1793,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.png\",\"profile_background_image_url_https\":\"https:\\/\\/profile _background_image_url_https.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/profile_image_url.jpg\",\"profile_image_url_https\":\"https:\\/\\/profile_image_url_https.jpg\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":true,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/url\",\"expanded_url\":\"http:\\/\\/expanded_url\",\"display_url\":\"display_url\",\"indices\":[118,140]}],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}\n"; + private String retweetJson = "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":23456,\"id_str\":\"23456\",\"text\":\"text\",\"source\":\"web\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":163149656,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"location\",\"url\":\"http:\\/\\/www.youtube.com\\/watch?v=url\",\"description\":\"description\\u00ed\",\"protected\":false,\"followers_count\":41,\"friends_count\":75,\"listed_count\":2,\"created_at\":\"Mon Jul 05 17:35:49 +0000 2010\",\"favourites_count\":4697,\"utc_offset\":-10800,\"time_zone\":\"Buenos Aires\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":5257,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C4A64B\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com \\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/12345\\/12345.jpeg\",\"profile_background_tile\":true,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/12345\\/12345.jpeg\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/12345\\/12345\",\"profile_link_color\":\"BF415A\",\"profile_sidebar_border_color\":\"000000\",\"profile_sidebar_fill_color\":\"B17CED\",\"profile_text_color\":\"3D1957\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 22:25:06 +0000 2013\",\"id\":34567,\"id_str\":\"34567\",\"text\":\"text\",\"sou rce\":\"source\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":34567,\"id_str\":\"34567\",\"name\":\"name\",\"screen_name\":\"screen_name\",\"location\":\"\",\"url\":\"http:\\/\\/www.web.com\",\"description\":\"description\",\"protected\":false,\"followers_count\":34307,\"friends_count\":325,\"listed_count\":361,\"created_at\":\"Fri Apr 13 19:00:11 +0000 2012\",\"favourites_count\":44956,\"utc_offset\":3600,\"time_zone\":\"Madrid\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":24011,\"lang\":\"es\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"000000\",\"profile_background_image_url\":\"http:\\/\\/profile_background_image_url.jpeg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/34567\\/34567.jpeg\",\"profile_background_tile\":fals e,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/34567\\/34567.gif\",\"profile_banner_url\":\"https:\\/\\/pbs.twimg.com\\/profile_banners\\/34567\\/34567\",\"profile_link_color\":\"FF00E1\",\"profile_sidebar_border_color\":\"FFFFFF\",\"profile_sidebar_fill_color\":\"F3F3F3\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":9,\"favorite_count\":6,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[]},\"favorited\":false,\"retweeted\":false,\"lang\":\"es\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[],\"user_mentions\":[{\"screen_name\":\"screen_name\",\"nam e\":\"name emocional\",\"id\":45678,\"id_str\":\"45678\",\"indices\":[3,14]}]},\"favorited\":false,\"retweeted\":false,\"filter_level\":\"medium\",\"lang\":\"es\"}\n"; - @Test - public void testConvertTweet() throws Exception { - Tweet tweet = mapper.readValue(tweetJson, Tweet.class); - List<Activity> activityList = activityConverterUtil.convert(tweet); - Assert.assertTrue(activityList.size() == 1); - Activity activity = activityList.get(0); - if( !ActivityUtil.isValid(activity) ) - Assert.fail(); + @Test + public void testConvertTweet() throws Exception { + Tweet tweet = mapper.readValue(tweetJson, Tweet.class); + List<Activity> activityList = activityConverterUtil.convert(tweet); + Assert.assertTrue(activityList.size() == 1); + Activity activity = activityList.get(0); + if ( !ActivityUtil.isValid(activity) ) { + Assert.fail(); } - - @Test - public void testConvertRetweet() throws Exception { - Retweet retweet = mapper.readValue(retweetJson, Retweet.class); - List<Activity> activityList = activityConverterUtil.convert(retweet); - Assert.assertTrue(activityList.size() == 1); - Activity activity = activityList.get(0); - if( !ActivityUtil.isValid(activity) ) - Assert.fail(); + } + + @Test + public void testConvertRetweet() throws Exception { + Retweet retweet = mapper.readValue(retweetJson, Retweet.class); + List<Activity> activityList = activityConverterUtil.convert(retweet); + Assert.assertTrue(activityList.size() == 1); + Activity activity = activityList.get(0); + if ( !ActivityUtil.isValid(activity) ) { + Assert.fail(); } - - @Test - public void testConvertDelete() throws Exception { - Delete delete = mapper.readValue(retweetJson, Delete.class); - List<Activity> activityList = activityConverterUtil.convert(delete); - Assert.assertTrue(activityList.size() == 1); - Activity activity = activityList.get(0); - if( !ActivityUtil.isValid(activity) ) - Assert.fail(); + } + + @Test + public void testConvertDelete() throws Exception { + Delete delete = mapper.readValue(retweetJson, Delete.class); + List<Activity> activityList = activityConverterUtil.convert(delete); + Assert.assertTrue(activityList.size() == 1); + Activity activity = activityList.get(0); + if ( !ActivityUtil.isValid(activity) ) { + Assert.fail(); } - - @Test - public void testConvertFollow() throws Exception { - Follow follow = mapper.readValue(retweetJson, Follow.class); - List<Activity> activityList = activityConverterUtil.convert(follow); - Assert.assertTrue(activityList.size() == 1); - Activity activity = activityList.get(0); - if( !ActivityUtil.isValid(activity) ) - Assert.fail(); + } + + @Test + public void testConvertFollow() throws Exception { + Follow follow = mapper.readValue(retweetJson, Follow.class); + List<Activity> activityList = activityConverterUtil.convert(follow); + Assert.assertTrue(activityList.size() == 1); + Activity activity = activityList.get(0); + if ( !ActivityUtil.isValid(activity) ) { + Assert.fail(); } - - @Test - public void testConvertTweetString() { - List<Activity> activityList = activityConverterUtil.convert(tweetJson); - Assert.assertTrue(activityList.size() == 1); - Activity activity = activityList.get(0); - if( !ActivityUtil.isValid(activity) ) - Assert.fail(); + } + + @Test + public void testConvertTweetString() { + List<Activity> activityList = activityConverterUtil.convert(tweetJson); + Assert.assertTrue(activityList.size() == 1); + Activity activity = activityList.get(0); + if ( !ActivityUtil.isValid(activity) ) { + Assert.fail(); } - - @Test - public void testConvertRetweetString() { - List<Activity> activityList = activityConverterUtil.convert(retweetJson); - Assert.assertTrue(activityList.size() == 1); - Activity activity = activityList.get(0); - if( !ActivityUtil.isValid(activity) ) - Assert.fail(); + } + + @Test + public void testConvertRetweetString() { + List<Activity> activityList = activityConverterUtil.convert(retweetJson); + Assert.assertTrue(activityList.size() == 1); + Activity activity = activityList.get(0); + if ( !ActivityUtil.isValid(activity) ) { + Assert.fail(); } - - @Test - public void testConvertDeleteString() { - String deleteJson = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n"; - List<Activity> activityList = activityConverterUtil.convert(deleteJson); - Assert.assertTrue(activityList.size() == 1); - Activity activity = activityList.get(0); - if( !ActivityUtil.isValid(activity) ) - Assert.fail(); + } + + @Test + public void testConvertDeleteString() { + String deleteJson = "{\"delete\":{\"status\":{\"id\":56789,\"user_id\":67890,\"id_str\":\"56789\",\"user_id_str\":\"67890\"}}}\n"; + List<Activity> activityList = activityConverterUtil.convert(deleteJson); + Assert.assertTrue(activityList.size() == 1); + Activity activity = activityList.get(0); + if ( !ActivityUtil.isValid(activity) ) { + Assert.fail(); } - - @Test - public void testConvertFollowString() { - String followJson = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n"; - List<Activity> activityList = activityConverterUtil.convert(followJson); - Assert.assertTrue(activityList.size() == 1); - Activity activity = activityList.get(0); - if( !ActivityUtil.isValid(activity) ) - Assert.fail(); + } + + @Test + public void testConvertFollowString() { + String followJson = "{\"follower\":{\"id\":12345},\"followee\":{\"id\":56789}}\n"; + List<Activity> activityList = activityConverterUtil.convert(followJson); + Assert.assertTrue(activityList.size() == 1); + Activity activity = activityList.get(0); + if ( !ActivityUtil.isValid(activity) ) { + Assert.fail(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java index c110670..4f2a4fd 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/utils/TwitterActivityObjectsConvertersTest.java @@ -18,11 +18,8 @@ package org.apache.streams.twitter.test.utils; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; import org.apache.streams.converter.ActivityObjectConverterProcessorConfiguration; import org.apache.streams.converter.ActivityObjectConverterUtil; -import org.apache.streams.data.DocumentClassifier; import org.apache.streams.data.util.ActivityUtil; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.ActivityObject; @@ -30,45 +27,48 @@ import org.apache.streams.twitter.converter.TwitterDateTimeFormat; import org.apache.streams.twitter.converter.TwitterDocumentClassifier; import org.apache.streams.twitter.converter.TwitterJsonUserActivityObjectConverter; import org.apache.streams.twitter.pojo.User; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; - /** * Tests {org.apache.streams.twitter.converter.*} */ public class TwitterActivityObjectsConvertersTest { - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityObjectsConvertersTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterActivityObjectsConvertersTest.class); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); - private ActivityObjectConverterProcessorConfiguration activityObjectConverterProcessorConfiguration = - new ActivityObjectConverterProcessorConfiguration() - .withClassifiers(Lists.newArrayList(new TwitterDocumentClassifier())) - .withConverters(Lists.newArrayList(new TwitterJsonUserActivityObjectConverter())); + private ActivityObjectConverterProcessorConfiguration activityObjectConverterProcessorConfiguration = + new ActivityObjectConverterProcessorConfiguration() + .withClassifiers(Lists.newArrayList(new TwitterDocumentClassifier())) + .withConverters(Lists.newArrayList(new TwitterJsonUserActivityObjectConverter())); - private ActivityObjectConverterUtil activityObjectConverterUtil = ActivityObjectConverterUtil.getInstance(activityObjectConverterProcessorConfiguration); + private ActivityObjectConverterUtil activityObjectConverterUtil = ActivityObjectConverterUtil.getInstance(activityObjectConverterProcessorConfiguration); - private String userJson = "{\"id\":1663018644,\"id_str\":\"1663018644\",\"name\":\"M.R. Clark\",\"screen_name\":\"cantennisfan\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"verified\":false,\"followers_count\":0,\"friends_count\":5,\"listed_count\":0,\"favourites_count\":2,\"statuses_count\":72,\"created_at\":\"Sun Aug 11 17:23:47 +0000 2013\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"geo_enabled\":false,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_tile\":false,\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"profile_im age_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"profile_image_url_https\":\"https://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"default_profile\":true,\"default_profile_image\":true,\"following\":null,\"follow_request_sent\":null,\"notifications\":null,\"status\":{\"created_at\":\"Thu Jan 01 14:11:48 +0000 2015\",\"id\":550655634706669568,\"id_str\":\"550655634706669568\",\"text\":\"CBC Media Centre - CBC - Air Farce New Year's Eve 2014/2015: http://t.co/lMlL9VbC5e\",\"source\":\"<a href=\\\"https://dev.twitter.com/docs/tfw\\\" rel=\\\"nofollow\\\">Twitter for Websites</a>\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"trends\": [],\"urls\":[{\"url\":\"http://t.co/lMlL9VbC5e\",\"expanded_url\":\"http://www.cbc.ca/mediacentre/air-farce-new-years-eve-20142015.html#.VKVVarDhVxR.twitter\",\"display_url\":\"cbc.ca/mediacentre/aiâ¦\",\"indices\":[61,83]}],\"user_mentions\":[],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\",\"timestamp_ms\":\"1420121508658\"}}\n"; + private String userJson = "{\"id\":1663018644,\"id_str\":\"1663018644\",\"name\":\"M.R. Clark\",\"screen_name\":\"cantennisfan\",\"location\":\"\",\"url\":null,\"description\":null,\"protected\":false,\"verified\":false,\"followers_count\":0,\"friends_count\":5,\"listed_count\":0,\"favourites_count\":2,\"statuses_count\":72,\"created_at\":\"Sun Aug 11 17:23:47 +0000 2013\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"geo_enabled\":false,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"C0DEED\",\"profile_background_image_url\":\"http://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_image_url_https\":\"https://abs.twimg.com/images/themes/theme1/bg.png\",\"profile_background_tile\":false,\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"C0DEED\",\"profile_sidebar_fill_color\":\"DDEEF6\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"profile_imag e_url\":\"http://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"profile_image_url_https\":\"https://abs.twimg.com/sticky/default_profile_images/default_profile_0_normal.png\",\"default_profile\":true,\"default_profile_image\":true,\"following\":null,\"follow_request_sent\":null,\"notifications\":null,\"status\":{\"created_at\":\"Thu Jan 01 14:11:48 +0000 2015\",\"id\":550655634706669568,\"id_str\":\"550655634706669568\",\"text\":\"CBC Media Centre - CBC - Air Farce New Year's Eve 2014/2015: http://t.co/lMlL9VbC5e\",\"source\":\"<a href=\\\"https://dev.twitter.com/docs/tfw\\\" rel=\\\"nofollow\\\">Twitter for Websites</a>\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"trends\":[] ,\"urls\":[{\"url\":\"http://t.co/lMlL9VbC5e\",\"expanded_url\":\"http://www.cbc.ca/mediacentre/air-farce-new-years-eve-20142015.html#.VKVVarDhVxR.twitter\",\"display_url\":\"cbc.ca/mediacentre/aiâ¦\",\"indices\":[61,83]}],\"user_mentions\":[],\"symbols\":[]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\",\"timestamp_ms\":\"1420121508658\"}}\n"; - @Test - public void testConvertUser() throws Exception { - User user = mapper.readValue(userJson, User.class); - ActivityObject activityObject = activityObjectConverterUtil.convert(user); - assert( activityObject != null ); - if( !ActivityUtil.isValid(activityObject) ) - Assert.fail(); + @Test + public void testConvertUser() throws Exception { + User user = mapper.readValue(userJson, User.class); + ActivityObject activityObject = activityObjectConverterUtil.convert(user); + assert ( activityObject != null ); + if ( !ActivityUtil.isValid(activityObject) ) { + Assert.fail(); } + } - @Test - public void testConvertUserString() { - ActivityObject activityObject = activityObjectConverterUtil.convert(userJson); - assert( activityObject != null ); - if( !ActivityUtil.isValid(activityObject) ) - Assert.fail(); + @Test + public void testConvertUserString() { + ActivityObject activityObject = activityObjectConverterUtil.convert(userJson); + assert ( activityObject != null ); + if ( !ActivityUtil.isValid(activityObject) ) { + Assert.fail(); } + } }
