STREAMS-105 | Updated the InstagramTypeConverter to use the conversion utility functions provided in InstagramActivityUtil
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e8511ada Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e8511ada Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e8511ada Branch: refs/heads/master Commit: e8511ada025c1837db8ddd4fb62d24eb231a9f7f Parents: a031202 Author: Robert Douglas <rdoug...@w2odigital.com> Authored: Wed Jul 2 10:48:35 2014 -0500 Committer: sblackmon <sblack...@w2odigital.com> Committed: Mon Jul 21 12:25:43 2014 -0500 ---------------------------------------------------------------------- .../provider/FacebookUserstreamProvider.java | 72 +++++++++++++------- .../FacebookUserstreamConfiguration.json | 7 ++ 2 files changed, 53 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e8511ada/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java index eae8069..af7868b 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java @@ -62,7 +62,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable private static final ObjectMapper mapper = new StreamsJacksonMapper(); - private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; + private static final String ALL_PERMISSIONS = "read_stream"; private FacebookUserstreamConfiguration configuration; private Class klass; @@ -88,6 +88,8 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable private DatumStatusCounter countersCurrent = new DatumStatusCounter(); private DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected Facebook client; + private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, @@ -133,31 +135,17 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable @Override public void startStream() { - executor.submit(new FacebookFeedPollingTask(this)); - running.set(true); - } - - private void loadBatch(String[] ids) { - Facebook client = getFacebookClient(); - int keepTrying = 0; - - // keep trying to load, give it 5 attempts. - //while (keepTrying < 10) - while (keepTrying < 1) { - try { - for (User user : client.getUsers(ids)) { - String json = DataObjectFactory.getRawJSON(user); + client = getFacebookClient(); - providerQueue.offer(new StreamsDatum(json)); -// - } - ; - } catch (FacebookException e) { - e.printStackTrace(); - return; + if( configuration.getInfo() != null && + configuration.getInfo().size() > 0 ) { + for( String id : configuration.getInfo()) { + executor.submit(new FacebookFeedPollingTask(this, id)); } - + } else { + executor.submit(new FacebookFeedPollingTask(this)); } + running.set(true); } public StreamsResultSet readCurrent() { @@ -225,6 +213,28 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); + client = getFacebookClient(); + + if( configuration.getInfo() != null && + configuration.getInfo().size() > 0 ) { + + List<String> ids = new ArrayList<String>(); + List<String[]> idsBatches = new ArrayList<String[]>(); + + for (String s : configuration.getInfo()) { + if (s != null) { + ids.add(s); + + if (ids.size() >= 100) { + // add the batch + idsBatches.add(ids.toArray(new String[ids.size()])); + // reset the Ids + ids = new ArrayList<String>(); + } + + } + } + } } protected Facebook getFacebookClient() { @@ -251,19 +261,29 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable FacebookUserstreamProvider provider; Facebook client; + String id; private Set<Post> priorPollResult = Sets.newHashSet(); public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { - provider = facebookUserstreamProvider; + this.provider = facebookUserstreamProvider; } + public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) { + this.provider = facebookUserstreamProvider; + this.client = provider.client; + this.id = id; + } @Override public void run() { - client = provider.getFacebookClient(); while (provider.isRunning()) { + ResponseList<Post> postResponseList; try { - ResponseList<Post> postResponseList = client.getHome(); + if( id != null ) + postResponseList = client.getFeed(id); + else + postResponseList = client.getHome(); + Set<Post> update = Sets.newHashSet(postResponseList); Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); Set<Post> entrySet = Sets.difference(update, repeats); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e8511ada/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json index c823a12..bcb2258 100644 --- a/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json +++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json @@ -6,6 +6,13 @@ "javaInterfaces": ["java.io.Serializable"], "extends": {"$ref":"FacebookConfiguration.json"}, "properties": { + "info": { + "type": "array", + "description": "A list of user IDs, indicating users of interest", + "items": { + "type": "string" + } + }, "pollIntervalMillis": { "type": "integer", "default" : "60000",