Repository: apex-malhar Updated Branches: refs/heads/master 5fa5b275c -> 15e42c991
APEXMALHAR-2504 Allow for customization of TwitterSampleInput ConfigurationBuilder Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/15e42c99 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/15e42c99 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/15e42c99 Branch: refs/heads/master Commit: 15e42c991f0ea08626d5ad3fd33786501cc911b6 Parents: 5fa5b27 Author: Thomas Weise <[email protected]> Authored: Sat May 27 21:37:43 2017 -0700 Committer: Thomas Weise <[email protected]> Committed: Sat Jun 3 16:02:43 2017 -0700 ---------------------------------------------------------------------- .../contrib/twitter/TwitterSampleInput.java | 30 +++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/15e42c99/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java b/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java index 697dd88..a93fbb1 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java +++ b/contrib/src/main/java/com/datatorrent/contrib/twitter/TwitterSampleInput.java @@ -56,19 +56,19 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope /** * This is the output port on which the twitter status information is emitted. */ - public final transient DefaultOutputPort<Status> status = new DefaultOutputPort<Status>(); + public final transient DefaultOutputPort<Status> status = new DefaultOutputPort<>(); /** * This is the output port on which the twitter text is emitted. */ - public final transient DefaultOutputPort<String> text = new DefaultOutputPort<String>(); + public final transient DefaultOutputPort<String> text = new DefaultOutputPort<>(); /** * This is the output port on which the twitter url is emitted. */ - public final transient DefaultOutputPort<String> url = new DefaultOutputPort<String>(); + public final transient DefaultOutputPort<String> url = new DefaultOutputPort<>(); /** * This is the output port on which the twitter hashtags are emitted. */ - public final transient DefaultOutputPort<String> hashtag = new DefaultOutputPort<String>(); + public final transient DefaultOutputPort<String> hashtag = new DefaultOutputPort<>(); /* the following 3 ports are not implemented so far */ public final transient DefaultOutputPort<?> userMention = null; @@ -82,8 +82,8 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope */ private transient Thread operatorThread; private transient TwitterStream ts; - private transient ArrayBlockingQueue<Status> statuses = new ArrayBlockingQueue<Status>(1024 * 1024); - transient int count; + private transient ArrayBlockingQueue<Status> statuses = new ArrayBlockingQueue<>(1024 * 1024); + protected transient int count; /** * The state which we would like to save for this operator. */ @@ -112,13 +112,7 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope logger.info("Load set to be {}% of the entire twitter feed", feedMultiplier); } - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(debug). - setOAuthConsumerKey(consumerKey). - setOAuthConsumerSecret(consumerSecret). - setOAuthAccessToken(accessToken). - setOAuthAccessTokenSecret(accessTokenSecret); - + ConfigurationBuilder cb = setupConfigurationBuilder(); ts = new TwitterStreamFactory(cb.build()).getInstance(); } @@ -204,7 +198,10 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope } } - private void setUpTwitterConnection() + /** + * Allow derived classes to customize the configuration + */ + protected ConfigurationBuilder setupConfigurationBuilder() { ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(debug). @@ -212,7 +209,12 @@ public class TwitterSampleInput implements InputOperator, ActivationListener<Ope setOAuthConsumerSecret(consumerSecret). setOAuthAccessToken(accessToken). setOAuthAccessTokenSecret(accessTokenSecret); + return cb; + } + private void setUpTwitterConnection() + { + ConfigurationBuilder cb = setupConfigurationBuilder(); ts = new TwitterStreamFactory(cb.build()).getInstance(); ts.addListener(TwitterSampleInput.this); // we can only listen to tweets containing links by callng ts.links().
