fixes while testing flink examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9495cf52 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9495cf52 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9495cf52 Branch: refs/heads/master Commit: 9495cf52e3d1c5d5100566364bfa30447555682a Parents: d9e58cd Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Wed Oct 5 16:41:48 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Wed Oct 5 16:41:48 2016 -0500 ---------------------------------------------------------------------- .../provider/TwitterFollowingProvider.java | 9 ++-- .../provider/TwitterFollowingProviderTask.java | 52 +++++++++----------- .../provider/TwitterStreamProcessor.java | 5 +- .../TwitterUserInformationProvider.java | 6 ++- 4 files changed, 34 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java index 27c8526..4c3a828 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java @@ -88,7 +88,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider { Twitter client = getTwitterClient(); for (int i = 0; i < ids.length; i++) { - TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i], getConfig().getEndpoint(), getConfig().getIdsOnly()); + TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]); executor.submit(providerTask); } } @@ -97,7 +97,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider { Twitter client = getTwitterClient(); for (int i = 0; i < screenNames.length; i++) { - TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i], getConfig().getEndpoint(), getConfig().getIdsOnly()); + TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]); executor.submit(providerTask); } @@ -106,7 +106,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider { @Override public StreamsResultSet readCurrent() { - LOGGER.debug("Providing {} docs", providerQueue.size()); + LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches); StreamsResultSet result; @@ -115,12 +115,13 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider { result = new StreamsResultSet(providerQueue); result.setCounter(new DatumStatusCounter()); providerQueue = constructQueue(); + LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size()); } finally { lock.writeLock().unlock(); } if (providerQueue.isEmpty() && executor.isTerminated()) { - LOGGER.info("Finished. Cleaning up..."); + LOGGER.info("{}{} - completed", idsBatches, screenNameBatches); running.set(false); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java index cc71d48..f2346fb 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java @@ -44,27 +44,20 @@ public class TwitterFollowingProviderTask implements Runnable { protected TwitterFollowingProvider provider; protected Twitter client; protected Long id; - protected Boolean idsOnly; protected String screenName; - protected String endpoint; - private int max_per_page = 200; int count = 0; - public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id, String endpoint, Boolean idsOnly) { + public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) { this.provider = provider; this.client = twitter; this.id = id; - this.endpoint = endpoint; - this.idsOnly = idsOnly; } - public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName, String endpoint, Boolean idsOnly) { + public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) { this.provider = provider; this.client = twitter; this.screenName = screenName; - this.endpoint = endpoint; - this.idsOnly = idsOnly; } @@ -84,9 +77,9 @@ public class TwitterFollowingProviderTask implements Runnable { protected void getFollowing(Long id) { - Preconditions.checkArgument(endpoint.equals("friends") || endpoint.equals("followers")); + Preconditions.checkArgument(provider.getConfig().getEndpoint().equals("friends") || provider.getConfig().getEndpoint().equals("followers")); - if( idsOnly ) + if( provider.getConfig().getIdsOnly() ) collectIds(id); else collectUsers(id); @@ -112,10 +105,10 @@ public class TwitterFollowingProviderTask implements Runnable { } PagableResponseList<twitter4j.User> list = null; - if( endpoint.equals("followers") ) - list = client.friendsFollowers().getFollowersList(id.longValue(), curser, max_per_page); - else if( endpoint.equals("friends") ) - list = client.friendsFollowers().getFriendsList(id.longValue(), curser, max_per_page); + if( provider.getConfig().getEndpoint().equals("followers") ) + list = client.friendsFollowers().getFollowersList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); + else if( provider.getConfig().getEndpoint().equals("friends") ) + list = client.friendsFollowers().getFriendsList(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); Preconditions.checkNotNull(list); Preconditions.checkArgument(list.size() > 0); @@ -126,11 +119,11 @@ public class TwitterFollowingProviderTask implements Runnable { try { Follow follow = null; - if( endpoint.equals("followers") ) { + if( provider.getConfig().getEndpoint().equals("followers") ) { follow = new Follow() .withFollowee(mapper.readValue(userJson, User.class)) .withFollower(mapper.readValue(otherJson, User.class)); - } else if( endpoint.equals("friends") ) { + } else if( provider.getConfig().getEndpoint().equals("friends") ) { follow = new Follow() .withFollowee(mapper.readValue(otherJson, User.class)) .withFollower(mapper.readValue(userJson, User.class)); @@ -147,9 +140,9 @@ public class TwitterFollowingProviderTask implements Runnable { LOGGER.warn("Exception: {}", e); } } - if( list.size() == max_per_page ) - curser = list.getNextCursor(); - else break; + if( !list.hasNext() ) break; + if( list.getNextCursor() == 0 ) break; + curser = list.getNextCursor(); } catch(TwitterException twitterException) { keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException); @@ -170,10 +163,10 @@ public class TwitterFollowingProviderTask implements Runnable { try { twitter4j.IDs ids = null; - if( endpoint.equals("followers") ) - ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, max_per_page); - else if( endpoint.equals("friends") ) - ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, max_per_page); + if( provider.getConfig().getEndpoint().equals("followers") ) + ids = client.friendsFollowers().getFollowersIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); + else if( provider.getConfig().getEndpoint().equals("friends") ) + ids = client.friendsFollowers().getFriendsIDs(id.longValue(), curser, provider.getConfig().getMaxItems().intValue()); Preconditions.checkNotNull(ids); Preconditions.checkArgument(ids.getIDs().length > 0); @@ -182,16 +175,15 @@ public class TwitterFollowingProviderTask implements Runnable { try { Follow follow = null; - if( endpoint.equals("followers") ) { + if( provider.getConfig().getEndpoint().equals("followers") ) { follow = new Follow() .withFollowee(new User().withId(id)) .withFollower(new User().withId(otherId)); - } else if( endpoint.equals("friends") ) { + } else if( provider.getConfig().getEndpoint().equals("friends") ) { follow = new Follow() .withFollowee(new User().withId(otherId)) .withFollower(new User().withId(id)); } - ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue); Preconditions.checkNotNull(follow); @@ -203,9 +195,9 @@ public class TwitterFollowingProviderTask implements Runnable { LOGGER.warn("Exception: {}", e); } } - if( ids.hasNext() ) - curser = ids.getNextCursor(); - else break; + if( !ids.hasNext() ) break; + if( ids.getNextCursor() == 0 ) break; + curser = ids.getNextCursor(); } catch(TwitterException twitterException) { keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java index f0690f8..8ea65eb 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterStreamProcessor.java @@ -88,8 +88,9 @@ public class TwitterStreamProcessor extends StringDelimitedProcessor { @Override public List<StreamsDatum> call() throws Exception { if(item != null) { - ObjectNode objectNode = (ObjectNode) mapper.readTree(item); - StreamsDatum rawDatum = new StreamsDatum(objectNode); + Class itemClass = TwitterEventClassifier.detectClass(item); + Object document = mapper.readValue(item, itemClass); + StreamsDatum rawDatum = new StreamsDatum(document); return Lists.newArrayList(rawDatum); } return Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9495cf52/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java index 78eb3e6..4231f56 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -197,7 +197,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ public StreamsResultSet readCurrent() { - LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches); + LOGGER.debug("{}{} - readCurrent", idsBatches, screenNameBatches); StreamsResultSet result; @@ -206,7 +206,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ result = new StreamsResultSet(providerQueue); result.setCounter(new DatumStatusCounter()); providerQueue = constructQueue(); - LOGGER.info("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size()); + LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size()); } finally { lock.writeLock().unlock(); } @@ -215,6 +215,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ LOGGER.info("{}{} - completed", idsBatches, screenNameBatches); running.set(false); + + LOGGER.info("Exiting"); } return result;
