updated packages
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dc432af2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dc432af2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dc432af2 Branch: refs/heads/STREAMS-46 Commit: dc432af2c70e9ac443af3cbe57a7772fdd1e146d Parents: e0cb5ec Author: sblackmon <sblack...@apache.org> Authored: Mon Aug 11 17:03:50 2014 -0500 Committer: sblackmon <sblack...@apache.org> Committed: Mon Aug 11 17:03:50 2014 -0500 ---------------------------------------------------------------------- .../streams-provider-facebook/pom.xml | 8 +- .../api/FacebookPostActivitySerializer.java | 300 ----------------- ...FacebookPublicFeedXmlActivitySerializer.java | 29 -- .../processor/FacebookTypeConverter.java | 194 ----------- .../provider/FacebookFriendFeedProvider.java | 285 ----------------- .../provider/FacebookFriendUpdatesProvider.java | 285 ----------------- .../FacebookUserInformationProvider.java | 298 ----------------- .../provider/FacebookUserstreamProvider.java | 319 ------------------ .../api/FacebookPostActivitySerializer.java | 286 +++++++++++++++++ ...FacebookPublicFeedXmlActivitySerializer.java | 29 ++ .../processor/FacebookTypeConverter.java | 194 +++++++++++ .../provider/FacebookFriendFeedProvider.java | 282 ++++++++++++++++ .../provider/FacebookFriendUpdatesProvider.java | 286 +++++++++++++++++ .../FacebookUserInformationProvider.java | 299 +++++++++++++++++ .../provider/FacebookUserstreamProvider.java | 320 +++++++++++++++++++ .../com/facebook/FacebookConfiguration.json | 49 --- .../FacebookUserInformationConfiguration.json | 23 -- .../FacebookUserstreamConfiguration.json | 22 -- .../jsonschema/com/facebook/graph/Post.json | 203 ------------ .../streams/facebook/FacebookConfiguration.json | 49 +++ .../FacebookUserInformationConfiguration.json | 23 ++ .../FacebookUserstreamConfiguration.json | 22 ++ .../org/apache/streams/facebook/graph/Post.json | 203 ++++++++++++ .../test/FacebookActivitySerDeTest.java | 4 +- .../gnip-edc-facebook/pom.xml | 2 +- .../src/test/resources/FlickrEDC.xml | 128 ++++---- .../src/test/resources/RedditEDC.xml | 200 ++++++------ .../src/test/resources/RedditEDCFlattened.xml | 200 ++++++------ .../src/test/resources/redditTest.xml | 2 +- 29 files changed, 2264 insertions(+), 2280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/pom.xml b/streams-contrib/streams-provider-facebook/pom.xml index 34d53a9..d688a00 100644 --- a/streams-contrib/streams-provider-facebook/pom.xml +++ b/streams-contrib/streams-provider-facebook/pom.xml @@ -101,10 +101,10 @@ <addCompileSourceRoot>true</addCompileSourceRoot> <generateBuilders>true</generateBuilders> <sourcePaths> - <sourcePath>src/main/jsonschema/com/facebook/FacebookConfiguration.json</sourcePath> - <sourcePath>src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json</sourcePath> - <sourcePath>src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json</sourcePath> - <sourcePath>src/main/jsonschema/com/facebook/graph/Post.json</sourcePath> + <sourcePath>src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json</sourcePath> + <sourcePath>src/main/jsonschema/org/apache/streams/facebook/FacebookUserInformationConfiguration.json</sourcePath> + <sourcePath>src/main/jsonschema/org/apache/streams/facebook/FacebookUserstreamConfiguration.json</sourcePath> + <sourcePath>src/main/jsonschema/org/apache/streams/facebook/graph/Post.json</sourcePath> </sourcePaths> <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> <targetPackage>com.facebook</targetPackage> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java deleted file mode 100644 index 71bc5c9..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.facebook.api; - - -import com.fasterxml.jackson.core.JsonGenerationException; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.deser.std.StdDeserializer; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.ser.std.StdSerializer; -import com.fasterxml.jackson.datatype.joda.JodaModule; -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.ActivitySerializer; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.facebook.Post; -import org.apache.streams.pojo.json.*; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; - -import java.io.IOException; -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.List; - -import static org.apache.streams.data.util.ActivityUtil.*; -import static org.apache.streams.data.util.JsonUtil.jsonToJsonNode; - -/** - * Serializes activity posts - * sblackmon: This class needs a rewrite - */ -public class FacebookPostActivitySerializer implements ActivitySerializer<org.apache.streams.facebook.Post> { - - public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ"); - public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime(); - - public static final String PROVIDER_NAME = "facebook"; - - public static ObjectMapper mapper; - static { - mapper = StreamsJacksonMapper.getInstance(); - } - - @Override - public String serializationFormat() { - return "facebook_post_json_v1"; - } - - @Override - public Post serialize(Activity deserialized) throws ActivitySerializerException { - throw new NotImplementedException("Not currently supported by this deserializer"); - } - - @Override - public Activity deserialize(Post post) throws ActivitySerializerException { - Activity activity = new Activity(); - activity.setPublished(post.getCreatedTime()); - activity.setUpdated(post.getUpdatedTime()); - addActor(activity, mapper.convertValue(post.getFrom(), ObjectNode.class)); - setProvider(activity); - setObjectType(post.getType(), activity); - parseObject(activity, mapper.convertValue(post, ObjectNode.class)); - fixObjectId(activity); - fixContentFromSummary(activity); - return activity; - } - - @Override - public List<Activity> deserializeAll(List<Post> serializedList) { - throw new NotImplementedException("Not currently supported by this deserializer"); - } - - private void fixContentFromSummary(Activity activity) { - //we MUST have a content field set, so choose the best option - if(activity.getContent() == null) { - activity.setContent(activity.getAdditionalProperties().containsKey("summary") ? - (String) activity.getAdditionalProperties().get("summary") : - activity.getObject().getSummary()); - } - } - - private void fixObjectId(Activity activity) { - //An artifact of schema generation, the default value is {link} - if(activity.getObject().getId().equals("{link}")) { - activity.getObject().setId(null); - } - } - - private void setObjectType(String type, Activity activity) { - ActivityObject object = new ActivityObject(); - activity.setObject(object); - object.setObjectType(type); - } - - private void setProvider(Activity activity) { - Provider provider = new Provider(); - provider.setId(getProviderId(PROVIDER_NAME)); - provider.setDisplayName(PROVIDER_NAME); - activity.setProvider(provider); - } - - private String getObjectType(JsonNode node) { - Iterator<Map.Entry<String, JsonNode>> fields = node.fields(); - ensureMoreFields(fields); - Map.Entry<String, JsonNode> field = fields.next(); - //ensureNoMoreFields(fields); - return node.asText(); - } - - private void parseObject(Activity activity, JsonNode jsonNode) throws ActivitySerializerException { - for(Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields(); fields.hasNext();) { - Map.Entry<String, JsonNode> field = fields.next(); - String key = field.getKey(); - JsonNode value = field.getValue(); - mapField(activity, key, value); - } - } - - private void mapField(Activity activity, String name, JsonNode value) throws ActivitySerializerException { - if("application".equals(name)) { - addGenerator(activity, value); - } else if ("caption".equals(name)) { - addSummary(activity, value); - } else if ("comments".equals(name)) { - addAttachments(activity, value); - } else if ("description".equals(name)) { - addObjectSummary(activity, value); - } else if ("from".equals(name)) { - addActor(activity, value); - } else if ("icon".equals(name)) { - addIcon(activity, value); - } else if ("id".equals(name)) { - addId(activity, value); - } else if ("is_hidden".equals(name)) { - addObjectHiddenExtension(activity, value); - } else if ("like_count".equals(name)) { - addLikeExtension(activity, value); - } else if ("link".equals(name)) { - addObjectLink(activity, value); - } else if ("message".equals(name)) { - activity.setContent(value.asText()); - } else if ("name".equals(name)) { - addObjectName(activity, value); - } else if ("object_id".equals(name)) { - addObjectId(activity, value); - } else if ("picture".equals(name)) { - addObjectImage(activity, value); - } else if ("place".equals(name)) { - addLocationExtension(activity, value); - } else if ("shares".equals(name)) { - addRebroadcastExtension(activity, value); - } else if ("source".equals(name)) { - addObjectLink(activity, value); - } else if ("story".equals(name)) { - addTitle(activity, value); - } - } - - private void addSummary(Activity activity, JsonNode value) { - activity.setAdditionalProperty("summary", value.asText()); - } - - private void addTitle(Activity activity, JsonNode value) { - activity.setTitle(value.asText()); - } - - private void addLikeExtension(Activity activity, JsonNode value) { - Map<String, Object> extensions = ensureExtensions(activity); - extensions.put(LIKES_EXTENSION, value.asInt()); - } - - private void addLocationExtension(Activity activity, JsonNode value) { - Map<String, Object> extensions = ensureExtensions(activity); - if(value.has("location")) { - Map<String, Object> location = new HashMap<String, Object>(); - JsonNode fbLocation = value.get("location"); - if(fbLocation.has("country")) { - location.put(LOCATION_EXTENSION_COUNTRY, fbLocation.get("country")); - } - if(fbLocation.has("latitude") && fbLocation.has("longitude")) { - location.put(LOCATION_EXTENSION_COORDINATES, String.format("%s,%s", fbLocation.get("longitude"), fbLocation.get("latitude"))); - } - extensions.put(LOCATION_EXTENSION, location); - } - } - - private void addObjectImage(Activity activity, JsonNode value) { - Image image = new Image(); - image.setUrl(value.asText()); - activity.getObject().setImage(image); - } - - private void addObjectId(Activity activity, JsonNode value) { - activity.getObject().setId(getObjectId("facebook", activity.getObject().getObjectType(), value.asText())); - } - - private void addObjectName(Activity activity, JsonNode value) { - activity.getObject().setDisplayName(value.asText()); - } - - private void addId(Activity activity, JsonNode value) { - activity.setId(getActivityId(PROVIDER_NAME, value.asText())); - } - - private void addObjectLink(Activity activity, JsonNode value) { - activity.getObject().setUrl(value.asText()); - } - - private void addRebroadcastExtension(Activity activity, JsonNode value) { - Map<String, Object> extensions = ensureExtensions(activity); - if(value.has("count")) { - extensions.put(REBROADCAST_EXTENSION, value.get("count").asInt()); - } - } - - private void addObjectHiddenExtension(Activity activity, JsonNode value) { - Map<String, Object> extensions = ensureExtensions(activity); - extensions.put("hidden", value.asBoolean()); - } - - private void addIcon(Activity activity, JsonNode value) { - Icon icon = new Icon(); - //Apparently the Icon didn't map from the schema very well - icon.setAdditionalProperty("url", value.asText()); - activity.setIcon(icon); - } - - private void addActor(Activity activity, JsonNode value) { - Actor actor = new Actor(); - if(value.has("name")) { - actor.setDisplayName(value.get("name").asText()); - } - if(value.has("id")) { - actor.setId(getPersonId(PROVIDER_NAME, value.get("id").asText())); - } - activity.setActor(actor); - } - - private void addObjectSummary(Activity activity, JsonNode value) { - activity.getObject().setSummary(value.asText()); - } - - private void addGenerator(Activity activity, JsonNode value) { - Generator generator = new Generator(); - if(value.has("id")) { - generator.setId(getObjectId(PROVIDER_NAME, "generator", value.get("id").asText())); - } - if(value.has("name")) { - generator.setDisplayName(value.get("name").asText()); - } - if(value.has("namespace")) { - generator.setSummary(value.get("namespace").asText()); - } - activity.setGenerator(generator); - } - - private void addAttachments(Activity activity, JsonNode value) { - //No direct mapping at this time - } - - private static void ensureMoreFields(Iterator<Map.Entry<String, JsonNode>> fields) { - if(!fields.hasNext()) { - throw new IllegalStateException("Facebook activity must have one and only one root element"); - } - } - private static void ensureNoMoreFields(Iterator<Map.Entry<String, JsonNode>> fields) { - if(fields.hasNext()) { - throw new IllegalStateException("Facebook activity must have one and only one root element"); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java deleted file mode 100644 index f126d88..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.facebook.feed; - -/** - * Created with IntelliJ IDEA. - * User: sblackmon - * Date: 10/2/13 - * Time: 6:32 PM - * To change this template use File | Settings | File Templates. - */ -public class FacebookPublicFeedXmlActivitySerializer { -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java deleted file mode 100644 index 6ddb673..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.facebook.processor; - -import com.facebook.api.FacebookPostActivitySerializer; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.facebook.Post; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Queue; - -/** - * Created by sblackmon on 12/10/13. - */ -public class FacebookTypeConverter implements StreamsProcessor { - - public final static String STREAMS_ID = "TwitterTypeConverter"; - - private final static Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class); - - private ObjectMapper mapper; - - private Queue<StreamsDatum> inQueue; - private Queue<StreamsDatum> outQueue; - - private Class inClass; - private Class outClass; - - private FacebookPostActivitySerializer facebookPostActivitySerializer; - - private int count = 0; - - public final static String TERMINATE = new String("TERMINATE"); - - public FacebookTypeConverter(Class inClass, Class outClass) { - this.inClass = inClass; - this.outClass = outClass; - } - - public Queue<StreamsDatum> getProcessorOutputQueue() { - return outQueue; - } - - public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) { - inQueue = inputQueue; - } - - public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException { - - Object result = null; - - if( outClass.equals( Activity.class )) { - LOGGER.debug("ACTIVITY"); - result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class)); - } else if( outClass.equals( Post.class )) { - LOGGER.debug("POST"); - result = mapper.convertValue(event, Post.class); - } else if( outClass.equals( ObjectNode.class )) { - LOGGER.debug("OBJECTNODE"); - result = mapper.convertValue(event, ObjectNode.class); - } - - // no supported conversion were applied - if( result != null ) { - count ++; - return result; - } - - LOGGER.debug("CONVERT FAILED"); - - return null; - - } - - public boolean validate(Object document, Class klass) { - - // TODO - return true; - } - - public boolean isValidJSON(final String json) { - boolean valid = false; - try { - final JsonParser parser = new ObjectMapper().getJsonFactory() - .createJsonParser(json); - while (parser.nextToken() != null) { - } - valid = true; - } catch (JsonParseException jpe) { - LOGGER.warn("validate: {}", jpe); - } catch (IOException ioe) { - LOGGER.warn("validate: {}", ioe); - } - - return valid; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - - StreamsDatum result = null; - - try { - - Object item = entry.getDocument(); - ObjectNode node; - - LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - - if( item instanceof String ) { - - // if the target is string, just pass-through - if( String.class.equals(outClass)) { - result = entry; - } - else { - // first check for valid json - node = (ObjectNode)mapper.readTree((String)item); - - // since data is coming from outside provider, we don't know what type the events are - // for now we'll assume post - - Object out = convert(node, Post.class, outClass); - - if( out != null && validate(out, outClass)) - result = new StreamsDatum(out); - } - - } else if( item instanceof ObjectNode || item instanceof Post) { - - // first check for valid json - node = (ObjectNode)mapper.valueToTree(item); - - // since data is coming from outside provider, we don't know what type the events are - // for now we'll assume post - - Object out = convert(node, Post.class, outClass); - - if( out != null && validate(out, outClass)) - result = new StreamsDatum(out); - - } - - } catch (Exception e) { - e.printStackTrace(); - } - - if( result != null ) - return Lists.newArrayList(result); - else - return Lists.newArrayList(); - } - - @Override - public void prepare(Object o) { - mapper = new StreamsJacksonMapper(); - facebookPostActivitySerializer = new FacebookPostActivitySerializer(); - } - - @Override - public void cleanUp() { - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendFeedProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendFeedProvider.java deleted file mode 100644 index a66d213..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendFeedProvider.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.facebook.provider; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.collect.Queues; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import facebook4j.*; -import facebook4j.conf.ConfigurationBuilder; -import facebook4j.json.DataObjectFactory; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.facebook.FacebookUserInformationConfiguration; -import org.apache.streams.facebook.FacebookUserstreamConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.ComponentUtils; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -import java.io.IOException; -import java.io.Serializable; -import java.math.BigInteger; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class FacebookFriendFeedProvider implements StreamsProvider, Serializable -{ - - public static final String STREAMS_ID = "FacebookFriendFeedProvider"; - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendFeedProvider.class); - - private static final ObjectMapper mapper = new StreamsJacksonMapper(); - - private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; - private FacebookUserstreamConfiguration configuration; - - private Class klass; - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); - - public FacebookUserstreamConfiguration getConfig() { return configuration; } - - public void setConfig(FacebookUserstreamConfiguration config) { this.configuration = config; } - - protected Iterator<String[]> idsBatches; - - protected ExecutorService executor; - - protected DateTime start; - protected DateTime end; - - protected final AtomicBoolean running = new AtomicBoolean(); - - private DatumStatusCounter countersCurrent = new DatumStatusCounter(); - private DatumStatusCounter countersTotal = new DatumStatusCounter(); - - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } - - public FacebookFriendFeedProvider() { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration configuration; - try { - configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - } - - public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config) { - this.configuration = config; - } - - public FacebookFriendFeedProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration configuration; - try { - configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - this.klass = klass; - } - - public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config, Class klass) { - this.configuration = config; - this.klass = klass; - } - - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; - } - - @Override - public void startStream() { - shutdownAndAwaitTermination(executor); - running.set(true); - } - - public StreamsResultSet readCurrent() { - - StreamsResultSet current; - - synchronized (FacebookUserstreamProvider.class) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); - current.setCounter(new DatumStatusCounter()); - current.getCounter().add(countersCurrent); - countersTotal.add(countersCurrent); - countersCurrent = new DatumStatusCounter(); - providerQueue.clear(); - } - - return current; - - } - - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } - - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; - } - - @Override - public boolean isRunning() { - return running.get(); - } - - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - @Override - public void prepare(Object o) { - - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); - - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(configuration.getOauth().getAppId()); - Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); - Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); - - Facebook client = getFacebookClient(); - - try { - ResponseList<Friend> friendResponseList = client.friends().getFriends(); - Paging<Friend> friendPaging; - do { - - for( Friend friend : friendResponseList ) { - - executor.submit(new FacebookFriendFeedTask(this, friend.getId())); - } - friendPaging = friendResponseList.getPaging(); - friendResponseList = client.fetchNext(friendPaging); - } while( friendPaging != null && - friendResponseList != null ); - } catch (FacebookException e) { - e.printStackTrace(); - } - - } - - protected Facebook getFacebookClient() - { - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(true) - .setOAuthAppId(configuration.getOauth().getAppId()) - .setOAuthAppSecret(configuration.getOauth().getAppSecret()) - .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) - .setOAuthPermissions(ALL_PERMISSIONS) - .setJSONStoreEnabled(true) - .setClientVersion("v1.0"); - - FacebookFactory ff = new FacebookFactory(cb.build()); - Facebook facebook = ff.getInstance(); - - return facebook; - } - - @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } - - private class FacebookFriendFeedTask implements Runnable { - - FacebookFriendFeedProvider provider; - Facebook client; - String id; - - public FacebookFriendFeedTask(FacebookFriendFeedProvider provider, String id) { - this.provider = provider; - this.id = id; - } - - @Override - public void run() { - client = provider.getFacebookClient(); - try { - ResponseList<Post> postResponseList = client.getFeed(id); - Paging<Post> postPaging; - do { - - for (Post item : postResponseList) { - String json = DataObjectFactory.getRawJSON(item); - org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); - try { - lock.readLock().lock(); - ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); - countersCurrent.incrementAttempt(); - } finally { - lock.readLock().unlock(); - } - } - postPaging = postResponseList.getPaging(); - postResponseList = client.fetchNext(postPaging); - } while( postPaging != null && - postResponseList != null ); - - } catch (Exception e) { - e.printStackTrace(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendUpdatesProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendUpdatesProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendUpdatesProvider.java deleted file mode 100644 index a111823..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendUpdatesProvider.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.facebook.provider; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import facebook4j.*; -import facebook4j.conf.ConfigurationBuilder; -import facebook4j.json.DataObjectFactory; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.facebook.FacebookUserInformationConfiguration; -import org.apache.streams.facebook.FacebookUserstreamConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.ComponentUtils; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -import java.io.IOException; -import java.io.Serializable; -import java.math.BigInteger; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable -{ - - public static final String STREAMS_ID = "FacebookFriendPostsProvider"; - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class); - - private static final ObjectMapper mapper = new StreamsJacksonMapper(); - - private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; - private FacebookUserstreamConfiguration configuration; - - private Class klass; - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); - - public FacebookUserstreamConfiguration getConfig() { return configuration; } - - public void setConfig(FacebookUserstreamConfiguration config) { this.configuration = config; } - - protected Iterator<String[]> idsBatches; - - protected ExecutorService executor; - - protected DateTime start; - protected DateTime end; - - protected final AtomicBoolean running = new AtomicBoolean(); - - private DatumStatusCounter countersCurrent = new DatumStatusCounter(); - private DatumStatusCounter countersTotal = new DatumStatusCounter(); - - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } - - public FacebookFriendUpdatesProvider() { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration configuration; - try { - configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - } - - public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config) { - this.configuration = config; - } - - public FacebookFriendUpdatesProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration configuration; - try { - configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - this.klass = klass; - } - - public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config, Class klass) { - this.configuration = config; - this.klass = klass; - } - - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; - } - - @Override - public void startStream() { - running.set(true); - } - - public StreamsResultSet readCurrent() { - - Preconditions.checkArgument(idsBatches.hasNext()); - - LOGGER.info("readCurrent"); - - // return stuff - - LOGGER.info("Finished. Cleaning up..."); - - LOGGER.info("Providing {} docs", providerQueue.size()); - - StreamsResultSet result = new StreamsResultSet(providerQueue); - running.set(false); - - LOGGER.info("Exiting"); - - return result; - - } - - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } - - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; - } - - @Override - public boolean isRunning() { - return running.get(); - } - - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - @Override - public void prepare(Object o) { - - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); - - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(configuration.getOauth().getAppId()); - Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); - Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); - - Facebook client = getFacebookClient(); - - try { - ResponseList<Friend> friendResponseList = client.friends().getFriends(); - Paging<Friend> friendPaging; - do { - - for( Friend friend : friendResponseList ) { - - //client.rawAPI().callPostAPI(); - // add a subscription - } - friendPaging = friendResponseList.getPaging(); - friendResponseList = client.fetchNext(friendPaging); - } while( friendPaging != null && - friendResponseList != null ); - } catch (FacebookException e) { - e.printStackTrace(); - } - - } - - protected Facebook getFacebookClient() - { - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(true) - .setOAuthAppId(configuration.getOauth().getAppId()) - .setOAuthAppSecret(configuration.getOauth().getAppSecret()) - .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) - .setOAuthPermissions(ALL_PERMISSIONS) - .setJSONStoreEnabled(true) - .setClientVersion("v1.0"); - - FacebookFactory ff = new FacebookFactory(cb.build()); - Facebook facebook = ff.getInstance(); - - return facebook; - } - - @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } - - private class FacebookFeedPollingTask implements Runnable { - - FacebookUserstreamProvider provider; - Facebook client; - - private Set<Post> priorPollResult = Sets.newHashSet(); - - public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { - provider = facebookUserstreamProvider; - } - - @Override - public void run() { - client = provider.getFacebookClient(); - while (provider.isRunning()) { - try { - ResponseList<Post> postResponseList = client.getHome(); - Set<Post> update = Sets.newHashSet(postResponseList); - Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); - Set<Post> entrySet = Sets.difference(update, repeats); - for (Post item : entrySet) { - String json = DataObjectFactory.getRawJSON(item); - org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); - try { - lock.readLock().lock(); - ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); - countersCurrent.incrementAttempt(); - } finally { - lock.readLock().unlock(); - } - } - priorPollResult = update; - Thread.sleep(configuration.getPollIntervalMillis()); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java deleted file mode 100644 index 8640f5d..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.facebook.provider; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import facebook4j.*; -import facebook4j.conf.ConfigurationBuilder; -import facebook4j.json.DataObjectFactory; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.facebook.FacebookUserInformationConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -import java.io.IOException; -import java.io.Serializable; -import java.math.BigInteger; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; - -public class FacebookUserInformationProvider implements StreamsProvider, Serializable -{ - - public static final String STREAMS_ID = "FacebookUserInformationProvider"; - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class); - - private static final ObjectMapper mapper = new StreamsJacksonMapper(); - - private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; - private FacebookUserInformationConfiguration facebookUserInformationConfiguration; - - private Class klass; - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); - - public FacebookUserInformationConfiguration getConfig() { return facebookUserInformationConfiguration; } - - public void setConfig(FacebookUserInformationConfiguration config) { this.facebookUserInformationConfiguration = config; } - - protected Iterator<String[]> idsBatches; - - protected ExecutorService executor; - - protected DateTime start; - protected DateTime end; - - protected final AtomicBoolean running = new AtomicBoolean(); - - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } - - public FacebookUserInformationProvider() { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration facebookUserInformationConfiguration; - try { - facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - } - - public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) { - this.facebookUserInformationConfiguration = config; - } - - public FacebookUserInformationProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration facebookUserInformationConfiguration; - try { - facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - this.klass = klass; - } - - public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) { - this.facebookUserInformationConfiguration = config; - this.klass = klass; - } - - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; - } - - @Override - public void startStream() { - running.set(true); - } - - public StreamsResultSet readCurrent() { - - Preconditions.checkArgument(idsBatches.hasNext()); - - LOGGER.info("readCurrent"); - - Facebook client = getFacebookClient(); - - try { - User me = client.users().getMe(); - String json = mapper.writeValueAsString(me); - providerQueue.add( - new StreamsDatum(json, DateTime.now()) - ); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } catch (FacebookException e) { - e.printStackTrace(); - } - - if( idsBatches.hasNext()) { - while (idsBatches.hasNext()) { - try { - List<User> userList = client.users().getUsers(idsBatches.next()); - for (User user : userList) { - - try { - String json = mapper.writeValueAsString(user); - providerQueue.add( - new StreamsDatum(json, DateTime.now()) - ); - } catch (JsonProcessingException e) { - // e.printStackTrace(); - } - } - - } catch (FacebookException e) { - e.printStackTrace(); - } - } - } else { - try { - ResponseList<Friend> friendResponseList = client.friends().getFriends(); - Paging<Friend> friendPaging; - do { - - for( Friend friend : friendResponseList ) { - - String json; - try { - json = mapper.writeValueAsString(friend); - providerQueue.add( - new StreamsDatum(json) - ); - } catch (JsonProcessingException e) { -// e.printStackTrace(); - } - } - friendPaging = friendResponseList.getPaging(); - friendResponseList = client.fetchNext(friendPaging); - } while( friendPaging != null && - friendResponseList != null ); - } catch (FacebookException e) { - e.printStackTrace(); - } - - } - - LOGGER.info("Finished. Cleaning up..."); - - LOGGER.info("Providing {} docs", providerQueue.size()); - - StreamsResultSet result = new StreamsResultSet(providerQueue); - running.set(false); - - LOGGER.info("Exiting"); - - return result; - - } - - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } - - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; - } - - @Override - public boolean isRunning() { - return running.get(); - } - - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - @Override - public void prepare(Object o) { - - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); - - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId()); - Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret()); - Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken()); - Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo()); - - List<String> ids = new ArrayList<String>(); - List<String[]> idsBatches = new ArrayList<String[]>(); - - for(String s : facebookUserInformationConfiguration.getInfo()) { - if(s != null) - { - ids.add(s); - - if(ids.size() >= 100) { - // add the batch - idsBatches.add(ids.toArray(new String[ids.size()])); - // reset the Ids - ids = new ArrayList<String>(); - } - - } - } - - if(ids.size() > 0) - idsBatches.add(ids.toArray(new String[ids.size()])); - - this.idsBatches = idsBatches.iterator(); - } - - protected Facebook getFacebookClient() - { - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(true) - .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId()) - .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret()) - .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken()) - .setOAuthPermissions(ALL_PERMISSIONS) - .setJSONStoreEnabled(true) - .setClientVersion("v1.0"); - - FacebookFactory ff = new FacebookFactory(cb.build()); - Facebook facebook = ff.getInstance(); - - return facebook; - } - - @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java deleted file mode 100644 index b0bf082..0000000 --- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.facebook.provider; - -import com.facebook.*; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.collect.Queues; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import facebook4j.*; -import facebook4j.Post; -import facebook4j.conf.ConfigurationBuilder; -import facebook4j.json.DataObjectFactory; -import org.apache.streams.config.StreamsConfigurator; -import org.apache.streams.core.DatumStatusCounter; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProvider; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.facebook.FacebookUserInformationConfiguration; -import org.apache.streams.facebook.FacebookUserstreamConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.util.ComponentUtils; -import org.joda.time.DateTime; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - -import java.io.IOException; -import java.io.Serializable; -import java.math.BigInteger; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -public class FacebookUserstreamProvider implements StreamsProvider, Serializable { - - public static final String STREAMS_ID = "FacebookUserstreamProvider"; - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class); - - private static final ObjectMapper mapper = new StreamsJacksonMapper(); - - private static final String ALL_PERMISSIONS = "read_stream"; - private FacebookUserstreamConfiguration configuration; - - private Class klass; - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); - - public FacebookUserstreamConfiguration getConfig() { - return configuration; - } - - public void setConfig(FacebookUserstreamConfiguration config) { - this.configuration = config; - } - - protected ListeningExecutorService executor; - - protected DateTime start; - protected DateTime end; - - protected final AtomicBoolean running = new AtomicBoolean(); - - private DatumStatusCounter countersCurrent = new DatumStatusCounter(); - private DatumStatusCounter countersTotal = new DatumStatusCounter(); - - protected Facebook client; - - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } - - public FacebookUserstreamProvider() { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration facebookUserInformationConfiguration; - try { - facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - } - - public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) { - this.configuration = config; - } - - public FacebookUserstreamProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration facebookUserInformationConfiguration; - try { - facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - this.klass = klass; - } - - public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) { - this.configuration = config; - this.klass = klass; - } - - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; - } - - @Override - public void startStream() { - - client = getFacebookClient(); - - if( configuration.getInfo() != null && - configuration.getInfo().size() > 0 ) { - for( String id : configuration.getInfo()) { - executor.submit(new FacebookFeedPollingTask(this, id)); - } - running.set(true); - } else { - try { - String id = client.getMe().getId(); - executor.submit(new FacebookFeedPollingTask(this, id)); - running.set(true); - } catch (FacebookException e) { - LOGGER.error(e.getMessage()); - running.set(false); - } - } - } - - public StreamsResultSet readCurrent() { - - StreamsResultSet current; - - synchronized (FacebookUserstreamProvider.class) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); - current.setCounter(new DatumStatusCounter()); - current.getCounter().add(countersCurrent); - countersTotal.add(countersCurrent); - countersCurrent = new DatumStatusCounter(); - providerQueue.clear(); - } - - return current; - - } - - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } - - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - StreamsResultSet result = (StreamsResultSet) providerQueue.iterator(); - return result; - } - - @Override - public boolean isRunning() { - return running.get(); - } - - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - @Override - public void prepare(Object o) { - - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); - - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(configuration.getOauth().getAppId()); - Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); - Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); - - client = getFacebookClient(); - - if( configuration.getInfo() != null && - configuration.getInfo().size() > 0 ) { - - List<String> ids = new ArrayList<String>(); - List<String[]> idsBatches = new ArrayList<String[]>(); - - for (String s : configuration.getInfo()) { - if (s != null) { - ids.add(s); - - if (ids.size() >= 100) { - // add the batch - idsBatches.add(ids.toArray(new String[ids.size()])); - // reset the Ids - ids = new ArrayList<String>(); - } - - } - } - } - } - - protected Facebook getFacebookClient() { - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(true) - .setOAuthAppId(configuration.getOauth().getAppId()) - .setOAuthAppSecret(configuration.getOauth().getAppSecret()) - .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) - .setOAuthPermissions(ALL_PERMISSIONS) - .setJSONStoreEnabled(true); - - FacebookFactory ff = new FacebookFactory(cb.build()); - Facebook facebook = ff.getInstance(); - - return facebook; - } - - @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } - - private class FacebookFeedPollingTask implements Runnable { - - FacebookUserstreamProvider provider; - Facebook client; - String id; - - private Set<Post> priorPollResult = Sets.newHashSet(); - - public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { - this.provider = facebookUserstreamProvider; - } - - public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) { - this.provider = facebookUserstreamProvider; - this.client = provider.client; - this.id = id; - } - @Override - public void run() { - while (provider.isRunning()) { - ResponseList<Post> postResponseList; - try { - postResponseList = client.getFeed(id); - - Set<Post> update = Sets.newHashSet(postResponseList); - Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); - Set<Post> entrySet = Sets.difference(update, repeats); - LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size()); - for (Post item : entrySet) { - String json = DataObjectFactory.getRawJSON(item); - org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); - try { - lock.readLock().lock(); - ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); - countersCurrent.incrementAttempt(); - } finally { - lock.readLock().unlock(); - } - } - priorPollResult = update; - } catch (Exception e) { - e.printStackTrace(); - } finally { - try { - Thread.sleep(configuration.getPollIntervalMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - } - } -}