http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java index 6b4f28a..d1936d1 100644 --- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java +++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java @@ -21,6 +21,7 @@ package org.apache.streams.regex; import com.google.common.collect.Lists; import com.google.common.collect.Maps; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,73 +31,73 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; /** - * Provides utilities for extracting matches from content + * Provides utilities for extracting matches from content. */ public class RegexUtils { - private static final Map<String, Pattern> patternCache = Maps.newConcurrentMap(); - private final static Logger LOGGER = LoggerFactory.getLogger(RegexUtils.class); + private static final Map<String, Pattern> patternCache = Maps.newConcurrentMap(); + private static final Logger LOGGER = LoggerFactory.getLogger(RegexUtils.class); - private RegexUtils() {} + private RegexUtils() {} - /** - * Extracts matches of the given pattern in the content and returns them as a list. - * @param pattern the pattern for the substring to match. For example, [0-9]* matches 911 in Emergency number is 911. - * @param content the complete content to find matches in. - * @return a non-null list of matches. - */ - public static Map<String, List<Integer>> extractMatches(String pattern, String content) { - return getMatches(pattern, content, -1); - } + /** + * Extracts matches of the given pattern in the content and returns them as a list. + * @param pattern the pattern for the substring to match. For example, [0-9]* matches 911 in Emergency number is 911. + * @param content the complete content to find matches in. + * @return a non-null list of matches. + */ + public static Map<String, List<Integer>> extractMatches(String pattern, String content) { + return getMatches(pattern, content, -1); + } - /** - * Extracts matches of the given pattern that are bounded by separation characters and returns them as a list. - * @param pattern the pattern for the substring to match. For example, [0-9]* matches 911 in Emergency number is 911. - * @param content the complete content to find matches in. - * @return a non-null list of matches. - */ - public static Map<String, List<Integer>> extractWordMatches(String pattern, String content) { - pattern = "(^|\\s)(" + pattern + ")([\\s!\\.;,?]|$)"; - return getMatches(pattern, content, 2); - } + /** + * Extracts matches of the given pattern that are bounded by separation characters and returns them as a list. + * @param pattern the pattern for the substring to match. For example, [0-9]* matches 911 in Emergency number is 911. + * @param content the complete content to find matches in. + * @return a non-null list of matches. + */ + public static Map<String, List<Integer>> extractWordMatches(String pattern, String content) { + pattern = "(^|\\s)(" + pattern + ")([\\s!\\.;,?]|$)"; + return getMatches(pattern, content, 2); + } - protected static Map<String, List<Integer>> getMatches(String pattern, String content, int capture) { - try { - Map<String, List<Integer>> matches = Maps.newHashMap(); - if(content == null) { - return matches; - } + protected static Map<String, List<Integer>> getMatches(String pattern, String content, int capture) { + try { + Map<String, List<Integer>> matches = Maps.newHashMap(); + if (content == null) { + return matches; + } - Matcher m = getPattern(pattern).matcher(content); - while (m.find()) { - String group = capture > 0 ? m.group(capture) : m.group(); - if (group != null && !group.equals("")) { - List<Integer> indices; - if (matches.containsKey(group)) { - indices = matches.get(group); - } else { - indices = Lists.newArrayList(); - matches.put(group, indices); - } - indices.add(m.start()); - } - } - return matches; - } catch (Throwable e) { - LOGGER.error("Throwable process {}", e); - e.printStackTrace(); - throw new RuntimeException(e); + Matcher matcher = getPattern(pattern).matcher(content); + while (matcher.find()) { + String group = capture > 0 ? matcher.group(capture) : matcher.group(); + if (group != null && !group.equals("")) { + List<Integer> indices; + if (matches.containsKey(group)) { + indices = matches.get(group); + } else { + indices = Lists.newArrayList(); + matches.put(group, indices); + } + indices.add(matcher.start()); } + } + return matches; + } catch (Throwable ex) { + LOGGER.error("Throwable process {}", ex); + ex.printStackTrace(); + throw new RuntimeException(ex); } + } - private static Pattern getPattern(String pattern) { - Pattern p; - if (patternCache.containsKey(pattern)) { - p = patternCache.get(pattern); - } else { - p = Pattern.compile(pattern); - patternCache.put(pattern, p); - } - return p; + private static Pattern getPattern(String patternString) { + Pattern pattern; + if (patternCache.containsKey(patternString)) { + pattern = patternCache.get(patternString); + } else { + pattern = Pattern.compile(patternString); + patternCache.put(patternString, pattern); } + return pattern; + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java index 2de4aa8..6e17de8 100644 --- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java +++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java @@ -19,7 +19,6 @@ package org.apache.streams.regex; - import com.google.common.collect.Sets; import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.extensions.ExtensionUtil; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java index c7778a8..66f7aa5 100644 --- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java +++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java @@ -19,7 +19,6 @@ package org.apache.streams.regex; - import com.google.common.collect.Sets; import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.extensions.ExtensionUtil; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java index 344bf98..d5d8d9b 100644 --- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java +++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java @@ -19,7 +19,6 @@ package org.apache.streams.regex; - import com.google.common.collect.Sets; import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.json.Activity; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java index fc2b9f6..a156f3a 100644 --- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java +++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java @@ -19,7 +19,6 @@ package org.apache.streams.regex; - import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -33,7 +32,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; - @RunWith(Parameterized.class) public class RegexUtilsTest { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java index 7a6648a..1216c38 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java @@ -18,15 +18,17 @@ package org.apache.streams.facebook.api; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivitySerializer; import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.facebook.Page; import org.apache.streams.facebook.serializer.FacebookActivityUtil; import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.facebook.Page; import org.apache.streams.pojo.json.Activity; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.commons.lang.NotImplementedException; + import java.util.List; /** @@ -35,32 +37,32 @@ import java.util.List; */ public class FacebookPageActivitySerializer implements ActivitySerializer<Page> { - public static ObjectMapper mapper; - static { - mapper = StreamsJacksonMapper.getInstance(); - } + public static ObjectMapper mapper; + static { + mapper = StreamsJacksonMapper.getInstance(); + } - @Override - public String serializationFormat() { - return "facebook_post_json_v1"; - } + @Override + public String serializationFormat() { + return "facebook_post_json_v1"; + } - @Override - public Page serialize(Activity deserialized) throws ActivitySerializerException { - throw new NotImplementedException("Not currently supported by this deserializer"); - } + @Override + public Page serialize(Activity deserialized) throws ActivitySerializerException { + throw new NotImplementedException("Not currently supported by this deserializer"); + } - @Override - public Activity deserialize(Page page) throws ActivitySerializerException { - Activity activity = new Activity(); + @Override + public Activity deserialize(Page page) throws ActivitySerializerException { + Activity activity = new Activity(); - FacebookActivityUtil.updateActivity(page, activity); + FacebookActivityUtil.updateActivity(page, activity); - return activity; - } + return activity; + } - @Override - public List<Activity> deserializeAll(List<Page> serializedList) { - throw new NotImplementedException("Not currently supported by this deserializer"); - } + @Override + public List<Activity> deserializeAll(List<Page> serializedList) { + throw new NotImplementedException("Not currently supported by this deserializer"); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java index 4326fb1..306fecc 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java @@ -15,16 +15,19 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.facebook.api; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivitySerializer; import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.facebook.Post; import org.apache.streams.facebook.serializer.FacebookActivityUtil; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.commons.lang.NotImplementedException; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; @@ -33,37 +36,34 @@ import java.util.List; public class FacebookPostActivitySerializer implements ActivitySerializer<org.apache.streams.facebook.Post> { - public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ"); - public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime(); + public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ"); + public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime(); - public static final String PROVIDER_NAME = "Facebook"; + public static final String PROVIDER_NAME = "Facebook"; - public static ObjectMapper mapper; - static { - mapper = StreamsJacksonMapper.getInstance(); - } + public static ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - @Override - public String serializationFormat() { - return "facebook_post_json_v1"; - } + @Override + public String serializationFormat() { + return "facebook_post_json_v1"; + } - @Override - public Post serialize(Activity deserialized) throws ActivitySerializerException { - throw new NotImplementedException("Not currently supported by this deserializer"); - } + @Override + public Post serialize(Activity deserialized) throws ActivitySerializerException { + throw new NotImplementedException("Not currently supported by this deserializer"); + } - @Override - public Activity deserialize(Post post) throws ActivitySerializerException { - Activity activity = new Activity(); + @Override + public Activity deserialize(Post post) throws ActivitySerializerException { + Activity activity = new Activity(); - FacebookActivityUtil.updateActivity(post, activity); + FacebookActivityUtil.updateActivity(post, activity); - return activity; - } + return activity; + } - @Override - public List<Activity> deserializeAll(List<Post> serializedList) { - throw new NotImplementedException("Not currently supported by this deserializer"); - } + @Override + public List<Activity> deserializeAll(List<Post> serializedList) { + throw new NotImplementedException("Not currently supported by this deserializer"); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java index 762b6c0..92cf333 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java @@ -18,12 +18,6 @@ package org.apache.streams.facebook.processor; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.exceptions.ActivitySerializerException; @@ -34,6 +28,14 @@ import org.apache.streams.facebook.api.FacebookPostActivitySerializer; import org.apache.streams.facebook.provider.FacebookEventClassifier; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,172 +43,187 @@ import java.io.IOException; import java.util.List; import java.util.Queue; +/** + * FacebookTypeConverter converts facebook data to activity streams types. + */ public class FacebookTypeConverter implements StreamsProcessor { - public final static String STREAMS_ID = "FacebookTypeConverter"; - - private final static Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class); - - private ObjectMapper mapper; - - private Queue<StreamsDatum> inQueue; - private Queue<StreamsDatum> outQueue; - - private Class inClass; - private Class outClass; - - private FacebookPostActivitySerializer facebookPostActivitySerializer; - private FacebookPageActivitySerializer facebookPageActivitySerializer; - - private int count = 0; - - public final static String TERMINATE = new String("TERMINATE"); - - public FacebookTypeConverter(Class inClass, Class outClass) { - this.inClass = inClass; - this.outClass = outClass; - } - - public Queue<StreamsDatum> getProcessorOutputQueue() { - return outQueue; - } - - public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) { - inQueue = inputQueue; + public static final String STREAMS_ID = "FacebookTypeConverter"; + + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class); + + private ObjectMapper mapper; + + private Queue<StreamsDatum> inQueue; + private Queue<StreamsDatum> outQueue; + + private Class inClass; + private Class outClass; + + private FacebookPostActivitySerializer facebookPostActivitySerializer; + private FacebookPageActivitySerializer facebookPageActivitySerializer; + + private int count = 0; + + public static final String TERMINATE = new String("TERMINATE"); + + public FacebookTypeConverter(Class inClass, Class outClass) { + this.inClass = inClass; + this.outClass = outClass; + } + + public Queue<StreamsDatum> getProcessorOutputQueue() { + return outQueue; + } + + public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) { + inQueue = inputQueue; + } + + /** + * convert. + * @param event event + * @param inClass inClass + * @param outClass outClass + * @return Object + * @throws ActivitySerializerException ActivitySerializerException + * @throws JsonProcessingException JsonProcessingException + */ + public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException { + + Object result = null; + + if ( outClass.equals( Activity.class )) { + LOGGER.debug("ACTIVITY"); + if (inClass.equals(Post.class)) { + LOGGER.debug("POST"); + result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class)); + } else if (inClass.equals(Page.class)) { + LOGGER.debug("PAGE"); + result = facebookPageActivitySerializer.deserialize(mapper.convertValue(event, Page.class)); + } + } else if ( outClass.equals( Post.class )) { + LOGGER.debug("POST"); + result = mapper.convertValue(event, Post.class); + } else if ( outClass.equals(Page.class)) { + LOGGER.debug("PAGE"); + result = mapper.convertValue(event, Page.class); + } else if ( outClass.equals( ObjectNode.class )) { + LOGGER.debug("OBJECTNODE"); + result = mapper.convertValue(event, ObjectNode.class); } - public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException { - - Object result = null; - - if( outClass.equals( Activity.class )) { - LOGGER.debug("ACTIVITY"); - if(inClass.equals(Post.class)) { - LOGGER.debug("POST"); - result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class)); - } else if(inClass.equals(Page.class)) { - LOGGER.debug("PAGE"); - result = facebookPageActivitySerializer.deserialize(mapper.convertValue(event, Page.class)); - } - } else if( outClass.equals( Post.class )) { - LOGGER.debug("POST"); - result = mapper.convertValue(event, Post.class); - } else if( outClass.equals(Page.class)) { - LOGGER.debug("PAGE"); - result = mapper.convertValue(event, Page.class); - } else if( outClass.equals( ObjectNode.class )) { - LOGGER.debug("OBJECTNODE"); - result = mapper.convertValue(event, ObjectNode.class); - } - - // no supported conversion were applied - if( result != null ) { - count ++; - return result; - } - - LOGGER.debug("CONVERT FAILED"); - - return null; + // no supported conversion were applied + if ( result != null ) { + count ++; + return result; } - public boolean validate(Object document, Class klass) { - - // TODO - return true; + LOGGER.debug("CONVERT FAILED"); + + return null; + } + + // TODO: use standard validation + public boolean validate(Object document, Class klass) { + return true; + } + + // TODO: replace with standard validation + public boolean isValidJSON(final String json) { + boolean valid = false; + try { + final JsonParser parser = new ObjectMapper().getJsonFactory() + .createJsonParser(json); + while (parser.nextToken() != null) { + } + valid = true; + } catch (JsonParseException jpe) { + LOGGER.warn("validate: {}", jpe); + } catch (IOException ioe) { + LOGGER.warn("validate: {}", ioe); } - public boolean isValidJSON(final String json) { - boolean valid = false; - try { - final JsonParser parser = new ObjectMapper().getJsonFactory() - .createJsonParser(json); - while (parser.nextToken() != null) { - } - valid = true; - } catch (JsonParseException jpe) { - LOGGER.warn("validate: {}", jpe); - } catch (IOException ioe) { - LOGGER.warn("validate: {}", ioe); - } - - return valid; - } + return valid; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { + @Override + public List<StreamsDatum> process(StreamsDatum entry) { - StreamsDatum result = null; + StreamsDatum result = null; - try { - Object item = entry.getDocument(); - ObjectNode node; + try { + Object item = entry.getDocument(); + ObjectNode node; - LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); + LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - if( item instanceof String ) { + if ( item instanceof String ) { - // if the target is string, just pass-through - if( String.class.equals(outClass)) { - result = entry; - } - else { - // first check for valid json - node = (ObjectNode)mapper.readTree((String)item); + // if the target is string, just pass-through + if ( String.class.equals(outClass)) { + result = entry; + } else { + // first check for valid json + node = (ObjectNode)mapper.readTree((String)item); - // since data is coming from outside provider, we don't know what type the events are - // for now we'll assume post - Class inClass = FacebookEventClassifier.detectClass((String) item); + // since data is coming from outside provider, we don't know what type the events are + // for now we'll assume post + Class inClass = FacebookEventClassifier.detectClass((String) item); - Object out = convert(node, inClass, outClass); + Object out = convert(node, inClass, outClass); - if( out != null && validate(out, outClass)) - result = new StreamsDatum(out); - } + if ( out != null && validate(out, outClass)) { + result = new StreamsDatum(out); + } + } - } else if( item instanceof ObjectNode) { + } else if ( item instanceof ObjectNode) { - // first check for valid json - node = (ObjectNode)mapper.valueToTree(item); + // first check for valid json + node = (ObjectNode)mapper.valueToTree(item); - Class inClass = FacebookEventClassifier.detectClass(mapper.writeValueAsString(item)); + Class inClass = FacebookEventClassifier.detectClass(mapper.writeValueAsString(item)); - Object out = convert(node, inClass, outClass); + Object out = convert(node, inClass, outClass); - if( out != null && validate(out, outClass)) - result = new StreamsDatum(out); - } else if(item instanceof Post || item instanceof Page) { - Object out = convert(mapper.convertValue(item, ObjectNode.class), inClass, outClass); + if ( out != null && validate(out, outClass)) { + result = new StreamsDatum(out); + } + } else if (item instanceof Post || item instanceof Page) { + Object out = convert(mapper.convertValue(item, ObjectNode.class), inClass, outClass); - if( out != null && validate(out, outClass)) - result = new StreamsDatum(out); - } - } catch (Exception e) { - LOGGER.error("Exception switching types : {}", e); - if(e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } + if ( out != null && validate(out, outClass)) { + result = new StreamsDatum(out); } + } + } catch (Exception ex) { + LOGGER.error("Exception switching types : {}", ex); + if (ex instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } - if( result != null ) - return Lists.newArrayList(result); - else - return Lists.newArrayList(); + if ( result != null ) { + return Lists.newArrayList(result); + } else { + return Lists.newArrayList(); } + } - @Override - public void prepare(Object o) { - mapper = StreamsJacksonMapper.getInstance(); + @Override + public void prepare(Object configurationObject) { + mapper = StreamsJacksonMapper.getInstance(); - facebookPageActivitySerializer = new FacebookPageActivitySerializer(); - facebookPostActivitySerializer = new FacebookPostActivitySerializer(); - } + facebookPageActivitySerializer = new FacebookPageActivitySerializer(); + facebookPostActivitySerializer = new FacebookPostActivitySerializer(); + } - @Override - public void cleanUp() {} + @Override + public void cleanUp() {} } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java index 33ee9dc..617bfab 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java @@ -15,130 +15,139 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.facebook.provider; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; -import facebook4j.Facebook; -import facebook4j.FacebookFactory; -import facebook4j.conf.ConfigurationBuilder; import org.apache.streams.core.StreamsDatum; import org.apache.streams.facebook.FacebookConfiguration; import org.apache.streams.facebook.IdConfig; import org.apache.streams.util.api.requests.backoff.BackOffStrategy; import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy; import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager; -import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger; +import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import facebook4j.Facebook; +import facebook4j.FacebookFactory; +import facebook4j.conf.ConfigurationBuilder; + /** * Abstract data collector for Facebook. Iterates over ids and queues data to be output * by a {@link org.apache.streams.core.StreamsProvider} */ public abstract class FacebookDataCollector implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookDataCollector.class); - private static final String READ_ONLY = "read_streams"; - - @VisibleForTesting - protected AtomicBoolean isComplete; - protected BackOffStrategy backOff; - - private FacebookConfiguration config; - private BlockingQueue<StreamsDatum> queue; - private SimpleTokenManager<String> authTokens; - - - public FacebookDataCollector(FacebookConfiguration config, BlockingQueue<StreamsDatum> queue) { - this.config = config; - this.queue = queue; - this.isComplete = new AtomicBoolean(false); - this.backOff = new ExponentialBackOffStrategy(5); - this.authTokens = new BasicTokenManger<String>(); - if(config.getUserAccessTokens() != null) { - for(String token : config.getUserAccessTokens()) { - this.authTokens.addTokenToPool(token); - } - } - } + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookDataCollector.class); + private static final String READ_ONLY = "read_streams"; - /** - * Returns true when the collector has finished querying facebook and has queued all data - * for the provider - * @return - */ - public boolean isComplete(){ - return this.isComplete.get(); - } + @VisibleForTesting + protected AtomicBoolean isComplete; + protected BackOffStrategy backOff; + + private FacebookConfiguration config; + private BlockingQueue<StreamsDatum> queue; + private SimpleTokenManager<String> authTokens; - /** - * Queues facebook data - * @param data - * @param id - */ - protected void outputData(Object data, String id) { - try { - this.queue.put(new StreamsDatum(data, id)); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + /** + * FacebookDataCollector constructor. + * @param config config + * @param queue queue + */ + public FacebookDataCollector(FacebookConfiguration config, BlockingQueue<StreamsDatum> queue) { + this.config = config; + this.queue = queue; + this.isComplete = new AtomicBoolean(false); + this.backOff = new ExponentialBackOffStrategy(5); + this.authTokens = new BasicTokenManager<String>(); + if (config.getUserAccessTokens() != null) { + for (String token : config.getUserAccessTokens()) { + this.authTokens.addTokenToPool(token); + } } + } - /** - * Gets a Facebook client. If multiple authenticated users for this app are available - * it will rotate through the users oauth credentials - * @return - */ - protected Facebook getNextFacebookClient() { - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(true); - cb.setOAuthPermissions(READ_ONLY); - cb.setOAuthAppId(this.config.getOauth().getAppId()); - cb.setOAuthAppSecret(this.config.getOauth().getAppSecret()); - if(this.authTokens.numAvailableTokens() > 0) - cb.setOAuthAccessToken(this.authTokens.getNextAvailableToken()); - else { - cb.setOAuthAccessToken(this.config.getOauth().getAppAccessToken()); - LOGGER.debug("appAccessToken : {}", this.config.getOauth().getAppAccessToken()); - } - cb.setJSONStoreEnabled(true); - if(!Strings.isNullOrEmpty(config.getVersion())) - cb.setRestBaseURL("https://graph.facebook.com/" + config.getVersion() + "/"); - LOGGER.debug("appId : {}", this.config.getOauth().getAppId()); - LOGGER.debug("appSecret: {}", this.config.getOauth().getAppSecret()); - FacebookFactory ff = new FacebookFactory(cb.build()); - return ff.getInstance(); + /** + * Returns true when the collector has finished querying facebook and has queued all data + * for the provider. + * @return isComplete + */ + public boolean isComplete() { + return this.isComplete.get(); + } + + /** + * Queues facebook data. + * @param data data + * @param id id + */ + protected void outputData(Object data, String id) { + try { + this.queue.put(new StreamsDatum(data, id)); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } + } - /** - * Queries facebook and queues the resulting data - * @param id - * @throws Exception - */ - protected abstract void getData(IdConfig id) throws Exception; - - - @Override - public void run() { - for( IdConfig id : this.config.getIds()) { - try { - getData(id); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOGGER.error("Caught Exception while trying to poll data for page : {}", id); - LOGGER.error("Exception while getting page feed data: {}", e); - } - } - this.isComplete.set(true); + /** + * Gets a Facebook client. If multiple authenticated users for this app are available + * it will rotate through the users oauth credentials + * @return client + */ + protected Facebook getNextFacebookClient() { + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true); + cb.setOAuthPermissions(READ_ONLY); + cb.setOAuthAppId(this.config.getOauth().getAppId()); + cb.setOAuthAppSecret(this.config.getOauth().getAppSecret()); + if (this.authTokens.numAvailableTokens() > 0) { + cb.setOAuthAccessToken(this.authTokens.getNextAvailableToken()); + } else { + cb.setOAuthAccessToken(this.config.getOauth().getAppAccessToken()); + LOGGER.debug("appAccessToken : {}", this.config.getOauth().getAppAccessToken()); + } + cb.setJSONStoreEnabled(true); + if (!Strings.isNullOrEmpty(config.getVersion())) { + cb.setRestBaseURL("https://graph.facebook.com/" + config.getVersion() + "/"); } + LOGGER.debug("appId : {}", this.config.getOauth().getAppId()); + LOGGER.debug("appSecret: {}", this.config.getOauth().getAppSecret()); + FacebookFactory ff = new FacebookFactory(cb.build()); + return ff.getInstance(); + } + + /** + * Queries facebook and queues the resulting data. + * @param id id + * @throws Exception Exception + */ + protected abstract void getData(IdConfig id) throws Exception; - @VisibleForTesting - protected BlockingQueue<StreamsDatum> getQueue() { - return queue; + + @Override + public void run() { + for ( IdConfig id : this.config.getIds()) { + try { + getData(id); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (Exception ex) { + LOGGER.error("Caught Exception while trying to poll data for page : {}", id); + LOGGER.error("Exception while getting page feed data: {}", ex); + } } + this.isComplete.set(true); + } + + @VisibleForTesting + protected BlockingQueue<StreamsDatum> getQueue() { + return queue; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java index 16e2a25..47c2afb 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java @@ -18,40 +18,50 @@ package org.apache.streams.facebook.provider; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import org.apache.commons.lang.StringUtils; - -import java.io.IOException; - import org.apache.streams.facebook.Page; import org.apache.streams.facebook.Post; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; + +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + +/** + * FacebookEventClassifier classifies facebook events. + */ public class FacebookEventClassifier { - private final static Logger LOGGER = LoggerFactory.getLogger(FacebookEventClassifier.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookEventClassifier.class); - public static Class detectClass( String json ) { + /** + * detectClass from json string. + * @param json json string + * @return detected Class + */ + public static Class detectClass( String json ) { - Preconditions.checkNotNull(json); - Preconditions.checkArgument(StringUtils.isNotEmpty(json)); + Preconditions.checkNotNull(json); + Preconditions.checkArgument(StringUtils.isNotEmpty(json)); - ObjectNode objectNode; - try { - objectNode = (ObjectNode) StreamsJacksonMapper.getInstance().readTree(json); - } catch (IOException e) { - LOGGER.error("Exception while trying to detect class: {}", e.getMessage()); - return null; - } + ObjectNode objectNode; + try { + objectNode = (ObjectNode) StreamsJacksonMapper.getInstance().readTree(json); + } catch (IOException ex) { + LOGGER.error("Exception while trying to detect class: {}", ex.getMessage()); + return null; + } - if( objectNode.findValue("about") != null) - return Page.class; - else if( objectNode.findValue("statusType") != null ) - return Post.class; - else - return Post.class; + if ( objectNode.findValue("about") != null) { + return Page.class; + } else if ( objectNode.findValue("statusType") != null ) { + return Post.class; + } else { + return Post.class; } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java index 231ee4f..3253479 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java @@ -18,15 +18,6 @@ package org.apache.streams.facebook.provider; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.collect.Queues; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import facebook4j.*; -import facebook4j.conf.ConfigurationBuilder; -import facebook4j.json.DataObjectFactory; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; @@ -36,253 +27,295 @@ import org.apache.streams.facebook.FacebookUserInformationConfiguration; import org.apache.streams.facebook.FacebookUserstreamConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.ComponentUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Queues; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; + +import org.apache.commons.lang.NotImplementedException; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang.NotImplementedException; import java.io.IOException; import java.io.Serializable; import java.math.BigInteger; import java.util.Iterator; import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -public class FacebookFriendFeedProvider implements StreamsProvider, Serializable -{ - - public static final String STREAMS_ID = "FacebookFriendFeedProvider"; - - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendFeedProvider.class); - - private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); +import facebook4j.Facebook; +import facebook4j.FacebookException; +import facebook4j.FacebookFactory; +import facebook4j.Friend; +import facebook4j.Paging; +import facebook4j.Post; +import facebook4j.ResponseList; +import facebook4j.conf.ConfigurationBuilder; +import facebook4j.json.DataObjectFactory; - private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; - private FacebookUserstreamConfiguration configuration; +public class FacebookFriendFeedProvider implements StreamsProvider, Serializable { - private Class klass; - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + public static final String STREAMS_ID = "FacebookFriendFeedProvider"; - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendFeedProvider.class); - public FacebookUserstreamConfiguration getConfig() { return configuration; } + private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - public void setConfig(FacebookUserstreamConfiguration config) { this.configuration = config; } + private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activities, user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; + private FacebookUserstreamConfiguration configuration; - protected Iterator<String[]> idsBatches; + private Class klass; + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - protected ExecutorService executor; + protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); - protected DateTime start; - protected DateTime end; + public FacebookUserstreamConfiguration getConfig() { + return configuration; + } - protected final AtomicBoolean running = new AtomicBoolean(); + public void setConfig(FacebookUserstreamConfiguration config) { + this.configuration = config; + } - private DatumStatusCounter countersCurrent = new DatumStatusCounter(); - private DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected Iterator<String[]> idsBatches; - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } + protected ExecutorService executor; - public FacebookFriendFeedProvider() { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration configuration; - try { - configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - } + protected DateTime start; + protected DateTime end; - public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config) { - this.configuration = config; - } + protected final AtomicBoolean running = new AtomicBoolean(); - public FacebookFriendFeedProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration configuration; - try { - configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - this.klass = klass; - } + private DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private DatumStatusCounter countersTotal = new DatumStatusCounter(); - public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config, Class klass) { - this.configuration = config; - this.klass = klass; - } + private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) { + return new ThreadPoolExecutor(numThreads, numThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; + /** + * FacebookFriendFeedProvider constructor - resolves FacebookUserInformationConfiguration from JVM 'facebook'. + */ + public FacebookFriendFeedProvider() { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration configuration; + try { + configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException ex) { + ex.printStackTrace(); + return; } - - @Override - public String getId() { - return STREAMS_ID; + } + + /** + * FacebookFriendFeedProvider constructor - uses supplied FacebookUserInformationConfiguration. + */ + public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config) { + this.configuration = config; + } + + /** + * FacebookFriendFeedProvider constructor - output supplied Class. + * @param klass Class + */ + public FacebookFriendFeedProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration configuration; + try { + configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException ex) { + ex.printStackTrace(); + return; } - - @Override - public void startStream() { - shutdownAndAwaitTermination(executor); - running.set(true); + this.klass = klass; + } + + public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config, Class klass) { + this.configuration = config; + this.klass = klass; + } + + public Queue<StreamsDatum> getProviderQueue() { + return this.providerQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void startStream() { + shutdownAndAwaitTermination(executor); + running.set(true); + } + + @Override + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + synchronized (FacebookUserstreamProvider.class) { + current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); + current.setCounter(new DatumStatusCounter()); + current.getCounter().add(countersCurrent); + countersTotal.add(countersCurrent); + countersCurrent = new DatumStatusCounter(); + providerQueue.clear(); } - public StreamsResultSet readCurrent() { - - StreamsResultSet current; - - synchronized (FacebookUserstreamProvider.class) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); - current.setCounter(new DatumStatusCounter()); - current.getCounter().add(countersCurrent); - countersTotal.add(countersCurrent); - countersCurrent = new DatumStatusCounter(); - providerQueue.clear(); + return current; + + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); + return result; + } + + @Override + public boolean isRunning() { + return running.get(); + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + System.err.println("Pool did not terminate"); } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } - return current; + @Override + public void prepare(Object configurationObject) { - } + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(configuration.getOauth().getAppId()); + Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); + Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; - } + Facebook client = getFacebookClient(); - @Override - public boolean isRunning() { - return running.get(); - } + try { + ResponseList<Friend> friendResponseList = client.friends().getFriends(); + Paging<Friend> friendPaging; + do { - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); + for ( Friend friend : friendResponseList ) { + executor.submit(new FacebookFriendFeedTask(this, friend.getId())); } + friendPaging = friendResponseList.getPaging(); + friendResponseList = client.fetchNext(friendPaging); + } + while ( friendPaging != null + && + friendResponseList != null ); + } catch (FacebookException ex) { + ex.printStackTrace(); } - @Override - public void prepare(Object o) { + } - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + protected Facebook getFacebookClient() { - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(configuration.getOauth().getAppId()); - Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); - Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true) + .setOAuthAppId(configuration.getOauth().getAppId()) + .setOAuthAppSecret(configuration.getOauth().getAppSecret()) + .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) + .setOAuthPermissions(ALL_PERMISSIONS) + .setJSONStoreEnabled(true) + .setClientVersion("v1.0"); - Facebook client = getFacebookClient(); + FacebookFactory ff = new FacebookFactory(cb.build()); + Facebook facebook = ff.getInstance(); - try { - ResponseList<Friend> friendResponseList = client.friends().getFriends(); - Paging<Friend> friendPaging; - do { + return facebook; + } - for( Friend friend : friendResponseList ) { + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } - executor.submit(new FacebookFriendFeedTask(this, friend.getId())); - } - friendPaging = friendResponseList.getPaging(); - friendResponseList = client.fetchNext(friendPaging); - } while( friendPaging != null && - friendResponseList != null ); - } catch (FacebookException e) { - e.printStackTrace(); - } + private class FacebookFriendFeedTask implements Runnable { - } + FacebookFriendFeedProvider provider; + Facebook client; + String id; - protected Facebook getFacebookClient() - { - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(true) - .setOAuthAppId(configuration.getOauth().getAppId()) - .setOAuthAppSecret(configuration.getOauth().getAppSecret()) - .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) - .setOAuthPermissions(ALL_PERMISSIONS) - .setJSONStoreEnabled(true) - .setClientVersion("v1.0"); - - FacebookFactory ff = new FacebookFactory(cb.build()); - Facebook facebook = ff.getInstance(); - - return facebook; + public FacebookFriendFeedTask(FacebookFriendFeedProvider provider, String id) { + this.provider = provider; + this.id = id; } @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } - - private class FacebookFriendFeedTask implements Runnable { - - FacebookFriendFeedProvider provider; - Facebook client; - String id; - - public FacebookFriendFeedTask(FacebookFriendFeedProvider provider, String id) { - this.provider = provider; - this.id = id; + public void run() { + client = provider.getFacebookClient(); + try { + ResponseList<Post> postResponseList = client.getFeed(id); + Paging<Post> postPaging; + do { + + for (Post item : postResponseList) { + String json = DataObjectFactory.getRawJSON(item); + org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); + try { + lock.readLock().lock(); + ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); + countersCurrent.incrementAttempt(); + } finally { + lock.readLock().unlock(); + } + } + postPaging = postResponseList.getPaging(); + postResponseList = client.fetchNext(postPaging); } + while ( postPaging != null + && + postResponseList != null ); - @Override - public void run() { - client = provider.getFacebookClient(); - try { - ResponseList<Post> postResponseList = client.getFeed(id); - Paging<Post> postPaging; - do { - - for (Post item : postResponseList) { - String json = DataObjectFactory.getRawJSON(item); - org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); - try { - lock.readLock().lock(); - ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); - countersCurrent.incrementAttempt(); - } finally { - lock.readLock().unlock(); - } - } - postPaging = postResponseList.getPaging(); - postResponseList = client.fetchNext(postPaging); - } while( postPaging != null && - postResponseList != null ); - - } catch (Exception e) { - e.printStackTrace(); - } - } + } catch (Exception ex) { + ex.printStackTrace(); + } } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java index cda868e..50ac64a 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java @@ -18,15 +18,6 @@ package org.apache.streams.facebook.provider; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.MoreExecutors; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigRenderOptions; -import facebook4j.*; -import facebook4j.conf.ConfigurationBuilder; -import facebook4j.json.DataObjectFactory; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; @@ -36,10 +27,18 @@ import org.apache.streams.facebook.FacebookUserInformationConfiguration; import org.apache.streams.facebook.FacebookUserstreamConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.ComponentUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigRenderOptions; + +import org.apache.commons.lang.NotImplementedException; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang.NotImplementedException; import java.io.IOException; import java.io.Serializable; @@ -47,246 +46,290 @@ import java.math.BigInteger; import java.util.Iterator; import java.util.Queue; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable -{ - - public static final String STREAMS_ID = "FacebookFriendPostsProvider"; +import facebook4j.Facebook; +import facebook4j.FacebookException; +import facebook4j.FacebookFactory; +import facebook4j.Friend; +import facebook4j.Paging; +import facebook4j.Post; +import facebook4j.ResponseList; +import facebook4j.conf.ConfigurationBuilder; +import facebook4j.json.DataObjectFactory; - private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class); +/** + * FacebookFriendUpdatesProvider provides updates from friend feed. + */ +public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable { - private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + public static final String STREAMS_ID = "FacebookFriendPostsProvider"; - private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; - private FacebookUserstreamConfiguration configuration; + private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class); - private Class klass; - protected final ReadWriteLock lock = new ReentrantReadWriteLock(); + private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); + private static final String ALL_PERMISSIONS = + "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activities,user_birthday,user_education_history,user_ events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login"; - public FacebookUserstreamConfiguration getConfig() { return configuration; } + private FacebookUserstreamConfiguration configuration; - public void setConfig(FacebookUserstreamConfiguration config) { this.configuration = config; } + private Class klass; + protected final ReadWriteLock lock = new ReentrantReadWriteLock(); - protected Iterator<String[]> idsBatches; + protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>(); - protected ExecutorService executor; + public FacebookUserstreamConfiguration getConfig() { + return configuration; + } - protected DateTime start; - protected DateTime end; + public void setConfig(FacebookUserstreamConfiguration config) { + this.configuration = config; + } - protected final AtomicBoolean running = new AtomicBoolean(); + protected Iterator<String[]> idsBatches; - private DatumStatusCounter countersCurrent = new DatumStatusCounter(); - private DatumStatusCounter countersTotal = new DatumStatusCounter(); + protected ExecutorService executor; - private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { - return new ThreadPoolExecutor(nThreads, nThreads, - 5000L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); - } + protected DateTime start; + protected DateTime end; - public FacebookFriendUpdatesProvider() { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration configuration; - try { - configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - } + protected final AtomicBoolean running = new AtomicBoolean(); - public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config) { - this.configuration = config; - } + private DatumStatusCounter countersCurrent = new DatumStatusCounter(); + private DatumStatusCounter countersTotal = new DatumStatusCounter(); - public FacebookFriendUpdatesProvider(Class klass) { - Config config = StreamsConfigurator.config.getConfig("facebook"); - FacebookUserInformationConfiguration configuration; - try { - configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); - } catch (IOException e) { - e.printStackTrace(); - return; - } - this.klass = klass; - } + // TODO: factor this out. + private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) { + return new ThreadPoolExecutor(numThreads, numThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } - public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config, Class klass) { - this.configuration = config; - this.klass = klass; + /** + * FacebookFriendUpdatesProvider constructor - resolves FacebookUserInformationConfiguration from JVM 'facebook'. + */ + public FacebookFriendUpdatesProvider() { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration configuration; + try { + configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException ex) { + ex.printStackTrace(); + return; } - - public Queue<StreamsDatum> getProviderQueue() { - return this.providerQueue; + } + + /** + * FacebookFriendUpdatesProvider constructor - uses supplied FacebookUserstreamConfiguration. + */ + public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config) { + this.configuration = config; + } + + /** + * FacebookFriendUpdatesProvider constructor. + * uses supplied output Class. + * resolves FacebookUserInformationConfiguration from JVM 'facebook. + */ + public FacebookFriendUpdatesProvider(Class klass) { + Config config = StreamsConfigurator.config.getConfig("facebook"); + FacebookUserInformationConfiguration configuration; + try { + configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class); + } catch (IOException ex) { + ex.printStackTrace(); + return; } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public void startStream() { - running.set(true); + this.klass = klass; + } + + /** + * FacebookFriendUpdatesProvider constructor. + * uses supplied FacebookUserstreamConfiguration. + * uses supplied output Class. + */ + public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config, Class klass) { + this.configuration = config; + this.klass = klass; + } + + public Queue<StreamsDatum> getProviderQueue() { + return this.providerQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void startStream() { + running.set(true); + } + + @Override + public StreamsResultSet readCurrent() { + + Preconditions.checkArgument(idsBatches.hasNext()); + + LOGGER.info("readCurrent"); + + // return stuff + + LOGGER.info("Finished. Cleaning up..."); + + LOGGER.info("Providing {} docs", providerQueue.size()); + + StreamsResultSet result = new StreamsResultSet(providerQueue); + running.set(false); + + LOGGER.info("Exiting"); + + return result; + + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + LOGGER.debug("{} readNew", STREAMS_ID); + throw new NotImplementedException(); + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + LOGGER.debug("{} readRange", STREAMS_ID); + this.start = start; + this.end = end; + readCurrent(); + StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); + return result; + } + + @Override + public boolean isRunning() { + return running.get(); + } + + void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + System.err.println("Pool did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); } + } - public StreamsResultSet readCurrent() { - - Preconditions.checkArgument(idsBatches.hasNext()); - - LOGGER.info("readCurrent"); - - // return stuff + @Override + public void prepare(Object configurationObject) { - LOGGER.info("Finished. Cleaning up..."); + executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); - LOGGER.info("Providing {} docs", providerQueue.size()); + Preconditions.checkNotNull(providerQueue); + Preconditions.checkNotNull(this.klass); + Preconditions.checkNotNull(configuration.getOauth().getAppId()); + Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); + Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); - StreamsResultSet result = new StreamsResultSet(providerQueue); - running.set(false); + Facebook client = getFacebookClient(); - LOGGER.info("Exiting"); - - return result; + try { + ResponseList<Friend> friendResponseList = client.friends().getFriends(); + Paging<Friend> friendPaging; + do { + for ( Friend friend : friendResponseList ) { + // client.rawAPI().callPostAPI(); + // add a subscription + } + friendPaging = friendResponseList.getPaging(); + friendResponseList = client.fetchNext(friendPaging); + } + while ( friendPaging != null + && + friendResponseList != null ); + } catch (FacebookException ex) { + ex.printStackTrace(); } - public StreamsResultSet readNew(BigInteger sequence) { - LOGGER.debug("{} readNew", STREAMS_ID); - throw new NotImplementedException(); - } + } - public StreamsResultSet readRange(DateTime start, DateTime end) { - LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - StreamsResultSet result = (StreamsResultSet)providerQueue.iterator(); - return result; - } + protected Facebook getFacebookClient() { - @Override - public boolean isRunning() { - return running.get(); - } + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true) + .setOAuthAppId(configuration.getOauth().getAppId()) + .setOAuthAppSecret(configuration.getOauth().getAppSecret()) + .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) + .setOAuthPermissions(ALL_PERMISSIONS) + .setJSONStoreEnabled(true) + .setClientVersion("v1.0"); - void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - System.err.println("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - @Override - public void prepare(Object o) { + FacebookFactory ff = new FacebookFactory(cb.build()); + Facebook facebook = ff.getInstance(); - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + return facebook; + } - Preconditions.checkNotNull(providerQueue); - Preconditions.checkNotNull(this.klass); - Preconditions.checkNotNull(configuration.getOauth().getAppId()); - Preconditions.checkNotNull(configuration.getOauth().getAppSecret()); - Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken()); + @Override + public void cleanUp() { + shutdownAndAwaitTermination(executor); + } - Facebook client = getFacebookClient(); + private class FacebookFeedPollingTask implements Runnable { - try { - ResponseList<Friend> friendResponseList = client.friends().getFriends(); - Paging<Friend> friendPaging; - do { - - for( Friend friend : friendResponseList ) { - - //client.rawAPI().callPostAPI(); - // add a subscription - } - friendPaging = friendResponseList.getPaging(); - friendResponseList = client.fetchNext(friendPaging); - } while( friendPaging != null && - friendResponseList != null ); - } catch (FacebookException e) { - e.printStackTrace(); - } + FacebookUserstreamProvider provider; + Facebook client; - } + private Set<Post> priorPollResult = Sets.newHashSet(); - protected Facebook getFacebookClient() - { - ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.setDebugEnabled(true) - .setOAuthAppId(configuration.getOauth().getAppId()) - .setOAuthAppSecret(configuration.getOauth().getAppSecret()) - .setOAuthAccessToken(configuration.getOauth().getUserAccessToken()) - .setOAuthPermissions(ALL_PERMISSIONS) - .setJSONStoreEnabled(true) - .setClientVersion("v1.0"); - - FacebookFactory ff = new FacebookFactory(cb.build()); - Facebook facebook = ff.getInstance(); - - return facebook; + public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { + provider = facebookUserstreamProvider; } @Override - public void cleanUp() { - shutdownAndAwaitTermination(executor); - } - - private class FacebookFeedPollingTask implements Runnable { - - FacebookUserstreamProvider provider; - Facebook client; - - private Set<Post> priorPollResult = Sets.newHashSet(); - - public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { - provider = facebookUserstreamProvider; - } - - @Override - public void run() { - client = provider.getFacebookClient(); - while (provider.isRunning()) { - try { - ResponseList<Post> postResponseList = client.getHome(); - Set<Post> update = Sets.newHashSet(postResponseList); - Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); - Set<Post> entrySet = Sets.difference(update, repeats); - for (Post item : entrySet) { - String json = DataObjectFactory.getRawJSON(item); - org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); - try { - lock.readLock().lock(); - ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); - countersCurrent.incrementAttempt(); - } finally { - lock.readLock().unlock(); - } - } - priorPollResult = update; - Thread.sleep(configuration.getPollIntervalMillis()); - } catch (Exception e) { - e.printStackTrace(); - } + public void run() { + client = provider.getFacebookClient(); + while (provider.isRunning()) { + try { + ResponseList<Post> postResponseList = client.getHome(); + Set<Post> update = Sets.newHashSet(postResponseList); + Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); + Set<Post> entrySet = Sets.difference(update, repeats); + for (Post item : entrySet) { + String json = DataObjectFactory.getRawJSON(item); + org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); + try { + lock.readLock().lock(); + ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue); + countersCurrent.incrementAttempt(); + } finally { + lock.readLock().unlock(); } + } + priorPollResult = update; + Thread.sleep(configuration.getPollIntervalMillis()); + } catch (Exception ex) { + ex.printStackTrace(); } + } } + } }
