http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java index ef74371..b8ce79b 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityConverter.java @@ -18,65 +18,69 @@ package org.apache.streams.twitter.converter; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivityConverter; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.User; +import com.google.common.collect.Lists; +import org.apache.commons.lang.NotImplementedException; + import java.util.List; import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity; public class TwitterJsonUserActivityConverter implements ActivityConverter<User> { - public static Class requiredClass = User.class; + public static Class requiredClass = User.class; - @Override - public Class requiredClass() { - return requiredClass; - } + @Override + public Class requiredClass() { + return requiredClass; + } - private static TwitterJsonUserActivityConverter instance = new TwitterJsonUserActivityConverter(); + private static TwitterJsonUserActivityConverter instance = new TwitterJsonUserActivityConverter(); - public static TwitterJsonUserActivityConverter getInstance() { - return instance; - } + public static TwitterJsonUserActivityConverter getInstance() { + return instance; + } - @Override - public String serializationFormat() { - return null; - } + @Override + public String serializationFormat() { + return null; + } - @Override - public User fromActivity(Activity deserialized) throws ActivityConversionException { - throw new NotImplementedException(); - } + @Override + public User fromActivity(Activity deserialized) throws ActivityConversionException { + throw new NotImplementedException(); + } - @Override - public List<Activity> toActivityList(User user) throws ActivityConversionException { + @Override + public List<User> fromActivityList(List<Activity> list) { + throw new NotImplementedException(); + } - Activity activity = new Activity(); - updateActivity(user, activity); - return Lists.newArrayList(activity); - } + @Override + public List<Activity> toActivityList(User user) throws ActivityConversionException { - @Override - public List<User> fromActivityList(List<Activity> list) { - throw new NotImplementedException(); - } + Activity activity = new Activity(); + updateActivity(user, activity); + + return Lists.newArrayList(activity); + } - @Override - public List<Activity> toActivityList(List<User> serializedList) { - List<Activity> result = Lists.newArrayList(); - for( User item : serializedList ) { - try { - List<Activity> activities = toActivityList(item); - result.addAll(activities); - } catch (ActivityConversionException e) {} - } - return result; + @Override + public List<Activity> toActivityList(List<User> serializedList) { + List<Activity> result = Lists.newArrayList(); + for ( User item : serializedList ) { + try { + List<Activity> activities = toActivityList(item); + result.addAll(activities); + } catch (ActivityConversionException ex) { + // + } } + return result; + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java index d62b1e8..7cb4158 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserActivityObjectConverter.java @@ -18,47 +18,42 @@ package org.apache.streams.twitter.converter; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; -import org.apache.streams.data.ActivityConverter; import org.apache.streams.data.ActivityObjectConverter; import org.apache.streams.exceptions.ActivityConversionException; -import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.twitter.pojo.User; -import java.util.List; +import org.apache.commons.lang.NotImplementedException; import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.buildActor; -import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity; public class TwitterJsonUserActivityObjectConverter implements ActivityObjectConverter<User> { - public static Class requiredClass = User.class; + public static Class requiredClass = User.class; - @Override - public Class requiredClass() { - return requiredClass; - } + @Override + public Class requiredClass() { + return requiredClass; + } - private static TwitterJsonUserActivityObjectConverter instance = new TwitterJsonUserActivityObjectConverter(); + private static TwitterJsonUserActivityObjectConverter instance = new TwitterJsonUserActivityObjectConverter(); - public static TwitterJsonUserActivityObjectConverter getInstance() { - return instance; - } + public static TwitterJsonUserActivityObjectConverter getInstance() { + return instance; + } - @Override - public String serializationFormat() { - return null; - } + @Override + public String serializationFormat() { + return null; + } - @Override - public User fromActivityObject(ActivityObject deserialized) throws ActivityConversionException { - throw new NotImplementedException(); - } + @Override + public User fromActivityObject(ActivityObject deserialized) throws ActivityConversionException { + throw new NotImplementedException(); + } - @Override - public ActivityObject toActivityObject(User serialized) throws ActivityConversionException { - return buildActor(serialized); - } + @Override + public ActivityObject toActivityObject(User serialized) throws ActivityConversionException { + return buildActor(serialized); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java index bb31fd6..6685a96 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonUserstreameventActivityConverter.java @@ -18,15 +18,16 @@ package org.apache.streams.twitter.converter; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import org.apache.commons.lang.NotImplementedException; import org.apache.streams.data.ActivityConverter; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.twitter.pojo.UserstreamEvent; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.apache.commons.lang.NotImplementedException; + import java.util.List; import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.formatId; @@ -34,87 +35,101 @@ import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getP /** -* Created with IntelliJ IDEA. -* User: mdelaet -* Date: 9/30/13 -* Time: 9:24 AM -* To change this template use File | Settings | File Templates. -*/ + * TwitterJsonUserstreameventActivityConverter. + */ +// TODO: Use this class explicitely somewhere public class TwitterJsonUserstreameventActivityConverter implements ActivityConverter<UserstreamEvent> { - public static Class requiredClass = UserstreamEvent.class; - - @Override - public Class requiredClass() { - return requiredClass; - } - - private static TwitterJsonUserstreameventActivityConverter instance = new TwitterJsonUserstreameventActivityConverter(); - - public static TwitterJsonUserstreameventActivityConverter getInstance() { - return instance; - } - - @Override - public String serializationFormat() { - return null; - } - - @Override - public UserstreamEvent fromActivity(Activity deserialized) throws ActivityConversionException { - throw new NotImplementedException(); - } - - @Override - public List<Activity> toActivityList(UserstreamEvent userstreamEvent) throws ActivityConversionException { - - Activity activity = convert(userstreamEvent); - return Lists.newArrayList(activity); - - } - - @Override - public List<UserstreamEvent> fromActivityList(List<Activity> list) { - throw new NotImplementedException(); - } - - @Override - public List<Activity> toActivityList(List<UserstreamEvent> serializedList) { - return null; - } - - public Activity convert(UserstreamEvent event) throws ActivityConversionException { - - Activity activity = new Activity(); - activity.setActor(buildActor(event)); - activity.setVerb(detectVerb(event)); - activity.setObject(buildActivityObject(event)); - activity.setId(formatId(activity.getVerb())); - if(Strings.isNullOrEmpty(activity.getId())) - throw new ActivityConversionException("Unable to determine activity id"); - activity.setProvider(getProvider()); - return activity; - } - - public ActivityObject buildActor(UserstreamEvent event) { - ActivityObject actor = new ActivityObject(); - //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr())); - return actor; - } - - public ActivityObject buildActivityObject(UserstreamEvent event) { - ActivityObject actObj = new ActivityObject(); - //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr())); - //actObj.setObjectType("tweet"); - return actObj; - } - - public String detectVerb(UserstreamEvent event) { - return null; - } - - public ActivityObject buildTarget(UserstreamEvent event) { - return null; + public static Class requiredClass = UserstreamEvent.class; + + @Override + public Class requiredClass() { + return requiredClass; + } + + private static TwitterJsonUserstreameventActivityConverter instance = new TwitterJsonUserstreameventActivityConverter(); + + public static TwitterJsonUserstreameventActivityConverter getInstance() { + return instance; + } + + @Override + public String serializationFormat() { + return null; + } + + @Override + public UserstreamEvent fromActivity(Activity deserialized) throws ActivityConversionException { + throw new NotImplementedException(); + } + + @Override + public List<UserstreamEvent> fromActivityList(List<Activity> list) { + throw new NotImplementedException(); + } + + @Override + public List<Activity> toActivityList(UserstreamEvent userstreamEvent) throws ActivityConversionException { + + Activity activity = convert(userstreamEvent); + return Lists.newArrayList(activity); + + } + + @Override + public List<Activity> toActivityList(List<UserstreamEvent> serializedList) { + return null; + } + + /** + * convert UserstreamEvent to Activity. + * @param event UserstreamEvent + * @return Activity + * @throws ActivityConversionException ActivityConversionException + */ + public Activity convert(UserstreamEvent event) throws ActivityConversionException { + + Activity activity = new Activity(); + activity.setActor(buildActor(event)); + activity.setVerb(detectVerb(event)); + activity.setObject(buildActivityObject(event)); + activity.setId(formatId(activity.getVerb())); + if (Strings.isNullOrEmpty(activity.getId())) { + throw new ActivityConversionException("Unable to determine activity id"); } + activity.setProvider(getProvider()); + return activity; + } + + /** + * build ActivityObject from UserstreamEvent + * @param event UserstreamEvent + * @return $.actor + */ + public ActivityObject buildActor(UserstreamEvent event) { + ActivityObject actor = new ActivityObject(); + //actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr())); + return actor; + } + + /** + * build ActivityObject from UserstreamEvent + * @param event UserstreamEvent + * @return $.object + */ + public ActivityObject buildActivityObject(UserstreamEvent event) { + ActivityObject actObj = new ActivityObject(); + //actObj.setId(formatId(delete.getDelete().getStatus().getIdStr())); + //actObj.setObjectType("tweet"); + return actObj; + } + + public String detectVerb(UserstreamEvent event) { + return null; + } + + public ActivityObject buildTarget(UserstreamEvent event) { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java index 4015514..e0e2e80 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java @@ -19,12 +19,6 @@ package org.apache.streams.twitter.converter.util; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; -import com.google.common.base.Optional; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; import org.apache.streams.exceptions.ActivityConversionException; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.extensions.ExtensionUtil; @@ -41,6 +35,14 @@ import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.pojo.User; import org.apache.streams.twitter.pojo.UserMentions; +import org.apache.streams.twitter.provider.TwitterErrorHandler; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,323 +54,342 @@ import java.util.Map; import static com.google.common.math.DoubleMath.mean; /** - * Provides utilities for working with Activity objects within the context of Twitter + * Provides utilities for working with Activity objects within the context of Twitter. */ public class TwitterActivityUtil { - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterActivityUtil.class); - - /** - * Updates the given Activity object with the values from the Tweet - * @param tweet the object to use as the source - * @param activity the target of the updates. Will receive all values from the tweet. - * @throws org.apache.streams.exceptions.ActivityConversionException - */ - public static void updateActivity(Tweet tweet, Activity activity) throws ActivityConversionException { - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - activity.setActor(buildActor(tweet)); - activity.setId(formatId(activity.getVerb(), - Optional.fromNullable( - tweet.getIdStr()) - .or(Optional.of(tweet.getId().toString())) - .orNull())); - - if(tweet instanceof Retweet) { - updateActivityContent(activity, ((Retweet) tweet).getRetweetedStatus(), "share"); - } else { - updateActivityContent(activity, tweet, "post"); - } - - if(Strings.isNullOrEmpty(activity.getId())) - throw new ActivityConversionException("Unable to determine activity id"); - try { - activity.setPublished(tweet.getCreatedAt()); - } catch( Exception e ) { - throw new ActivityConversionException("Unable to determine publishedDate", e); - } - activity.setTarget(buildTarget(tweet)); - activity.setProvider(getProvider()); - activity.setUrl(String.format("http://twitter.com/%s/%s/%s", tweet.getUser().getScreenName(),"/status/",tweet.getIdStr())); - - addTwitterExtension(activity, mapper.convertValue(tweet, ObjectNode.class)); + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterActivityUtil.class); + + static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + + /** + * Updates the given Activity object with the values from the Tweet. + * @param tweet the object to use as the source + * @param activity the target of the updates. Will receive all values from the tweet. + * @throws ActivityConversionException ActivityConversionException + */ + public static void updateActivity(Tweet tweet, Activity activity) throws ActivityConversionException { + activity.setActor(buildActor(tweet)); + activity.setId(formatId(activity.getVerb(), + Optional.fromNullable( + tweet.getIdStr()) + .or(Optional.of(tweet.getId().toString())) + .orNull())); + + if (tweet instanceof Retweet) { + updateActivityContent(activity, ((Retweet) tweet).getRetweetedStatus(), "share"); + } else { + updateActivityContent(activity, tweet, "post"); } - /** - * Updates the given Activity object with the values from the User - * @param user the object to use as the source - * @param activity the target of the updates. Will receive all values from the tweet. - */ - public static void updateActivity(User user, Activity activity) { - activity.setActor(buildActor(user)); - activity.setId(null); - activity.setVerb(null); + if (Strings.isNullOrEmpty(activity.getId())) { + throw new ActivityConversionException("Unable to determine activity id"); } - - /** - * Updates the activity for a delete event - * @param delete the delete event - * @param activity the Activity object to update - * @throws org.apache.streams.exceptions.ActivityConversionException - */ - public static void updateActivity(Delete delete, Activity activity) throws ActivityConversionException { - activity.setActor(buildActor(delete)); - activity.setVerb("delete"); - activity.setObject(buildActivityObject(delete)); - activity.setId(formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr())); - if(Strings.isNullOrEmpty(activity.getId())) - throw new ActivityConversionException("Unable to determine activity id"); - activity.setProvider(getProvider()); - addTwitterExtension(activity, StreamsJacksonMapper.getInstance().convertValue(delete, ObjectNode.class)); + try { + activity.setPublished(tweet.getCreatedAt()); + } catch ( Exception ex ) { + throw new ActivityConversionException("Unable to determine publishedDate", ex); } - - /** - * Builds the actor for a delete event - * @param delete the delete event - * @return a valid Actor - */ - public static ActivityObject buildActor(Delete delete) { - ActivityObject actor = new ActivityObject(); - actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr())); - actor.setObjectType("page"); - return actor; + activity.setTarget(buildTarget(tweet)); + activity.setProvider(getProvider()); + activity.setUrl(String.format("http://twitter.com/%s/%s/%s", tweet.getUser().getScreenName(),"/status/",tweet.getIdStr())); + + addTwitterExtension(activity, mapper.convertValue(tweet, ObjectNode.class)); + } + + /** + * Updates the given Activity object with the values from the User + * @param user the object to use as the source + * @param activity the target of the updates. Will receive all values from the tweet. + */ + public static void updateActivity(User user, Activity activity) { + activity.setActor(buildActor(user)); + activity.setId(null); + activity.setVerb(null); + } + + /** + * Updates the activity for a delete event. + * @param delete the delete event + * @param activity the Activity object to update + * @throws ActivityConversionException ActivityConversionException + */ + public static void updateActivity(Delete delete, Activity activity) throws ActivityConversionException { + activity.setActor(buildActor(delete)); + activity.setVerb("delete"); + activity.setObject(buildActivityObject(delete)); + activity.setId(formatId(activity.getVerb(), delete.getDelete().getStatus().getIdStr())); + if (Strings.isNullOrEmpty(activity.getId())) { + throw new ActivityConversionException("Unable to determine activity id"); } - - /** - * Builds the ActivityObject for the delete event - * @param delete the delete event - * @return a valid Activity Object - */ - public static ActivityObject buildActivityObject(Delete delete) { - ActivityObject actObj = new ActivityObject(); - actObj.setId(formatId(delete.getDelete().getStatus().getIdStr())); - actObj.setObjectType("tweet"); - return actObj; + activity.setProvider(getProvider()); + addTwitterExtension(activity, StreamsJacksonMapper.getInstance().convertValue(delete, ObjectNode.class)); + } + + /** + * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the tweet + * @param tweet the object to use as the source + * @return a valid Actor populated from the Tweet + */ + public static ActivityObject buildActor(Tweet tweet) { + ActivityObject actor = new ActivityObject(); + User user = tweet.getUser(); + + return buildActor(user); + } + + /** + * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the User + * @param user the object to use as the source + * @return a valid Actor populated from the Tweet + */ + public static ActivityObject buildActor(User user) { + ActivityObject actor = new ActivityObject(); + actor.setId(formatId( + Optional.fromNullable( + user.getIdStr()) + .or(Optional.of(user.getId().toString())) + .orNull() + )); + actor.setObjectType("page"); + actor.setDisplayName(user.getName()); + actor.setAdditionalProperty("handle", user.getScreenName()); + actor.setSummary(user.getDescription()); + + if (user.getUrl() != null) { + actor.setUrl(user.getUrl()); } - - /** - * Updates the content, and associated fields, with those from the given tweet - * @param activity the target of the updates. Will receive all values from the tweet. - * @param tweet the object to use as the source - * @param verb the verb for the given activity's type - */ - public static void updateActivityContent(Activity activity, Tweet tweet, String verb) { - activity.setVerb(verb); - activity.setTitle(""); - if( tweet != null ) { - activity.setObject(buildActivityObject(tweet)); - activity.setLinks(getLinks(tweet)); - activity.setContent(tweet.getText()); - addLocationExtension(activity, tweet); - addTwitterExtensions(activity, tweet); - } + Map<String, Object> extensions = new HashMap<>(); + extensions.put("location", user.getLocation()); + extensions.put("posts", user.getStatusesCount()); + extensions.put("favorites", user.getFavouritesCount()); + extensions.put("followers", user.getFollowersCount()); + + Image profileImage = new Image(); + profileImage.setUrl(user.getProfileImageUrlHttps()); + actor.setImage(profileImage); + + extensions.put("screenName", user.getScreenName()); + + actor.setAdditionalProperty("extensions", extensions); + return actor; + } + + /** + * Builds the actor for a delete event. + * @param delete the delete event + * @return a valid Actor + */ + public static ActivityObject buildActor(Delete delete) { + ActivityObject actor = new ActivityObject(); + actor.setId(formatId(delete.getDelete().getStatus().getUserIdStr())); + actor.setObjectType("page"); + return actor; + } + + /** + * Creates an {@link org.apache.streams.pojo.json.ActivityObject} for the tweet + * @param tweet the object to use as the source + * @return a valid ActivityObject + */ + public static ActivityObject buildActivityObject(Tweet tweet) { + ActivityObject actObj = new ActivityObject(); + String id = Optional.fromNullable( + tweet.getIdStr()) + .or(Optional.of(tweet.getId().toString())) + .orNull(); + if ( id != null ) { + actObj.setId(id); } - - /** - * Creates an {@link org.apache.streams.pojo.json.ActivityObject} for the tweet - * @param tweet the object to use as the source - * @return a valid ActivityObject - */ - public static ActivityObject buildActivityObject(Tweet tweet) { - ActivityObject actObj = new ActivityObject(); - String id = Optional.fromNullable( - tweet.getIdStr()) - .or(Optional.of(tweet.getId().toString())) - .orNull(); - if( id != null ) - actObj.setId(id); - actObj.setObjectType("post"); - actObj.setContent(tweet.getText()); - return actObj; + actObj.setObjectType("post"); + actObj.setContent(tweet.getText()); + return actObj; + } + + /** + * Builds the ActivityObject for the delete event. + * @param delete the delete event + * @return a valid Activity Object + */ + public static ActivityObject buildActivityObject(Delete delete) { + ActivityObject actObj = new ActivityObject(); + actObj.setId(formatId(delete.getDelete().getStatus().getIdStr())); + actObj.setObjectType("tweet"); + return actObj; + } + + /** + * Updates the content, and associated fields, with those from the given tweet + * @param activity the target of the updates. Will receive all values from the tweet. + * @param tweet the object to use as the source + * @param verb the verb for the given activity's type + */ + public static void updateActivityContent(Activity activity, Tweet tweet, String verb) { + activity.setVerb(verb); + activity.setTitle(""); + if ( tweet != null ) { + activity.setObject(buildActivityObject(tweet)); + activity.setLinks(getLinks(tweet)); + activity.setContent(tweet.getText()); + addLocationExtension(activity, tweet); + addTwitterExtensions(activity, tweet); } + } - /** - * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the tweet - * @param tweet the object to use as the source - * @return a valid Actor populated from the Tweet - */ - public static ActivityObject buildActor(Tweet tweet) { - ActivityObject actor = new ActivityObject(); - User user = tweet.getUser(); - return buildActor(user); - } - /** - * Builds the activity {@link org.apache.streams.pojo.json.ActivityObject} actor from the User - * @param user the object to use as the source - * @return a valid Actor populated from the Tweet - */ - public static ActivityObject buildActor(User user) { - ActivityObject actor = new ActivityObject(); - actor.setId(formatId( - Optional.fromNullable( - user.getIdStr()) - .or(Optional.of(user.getId().toString())) - .orNull() - )); - actor.setObjectType("page"); - actor.setDisplayName(user.getName()); - actor.setAdditionalProperty("handle", user.getScreenName()); - actor.setSummary(user.getDescription()); - - if (user.getUrl()!=null){ - actor.setUrl(user.getUrl()); - } - - Map<String, Object> extensions = new HashMap<>(); - extensions.put("location", user.getLocation()); - extensions.put("posts", user.getStatusesCount()); - extensions.put("favorites", user.getFavouritesCount()); - extensions.put("followers", user.getFollowersCount()); - - Image profileImage = new Image(); - profileImage.setUrl(user.getProfileImageUrlHttps()); - actor.setImage(profileImage); - - extensions.put("screenName", user.getScreenName()); - - actor.setAdditionalProperty("extensions", extensions); - return actor; - } - /** - * Gets the links from the Twitter event - * @param tweet the object to use as the source - * @return a list of links corresponding to the expanded URL (no t.co) - */ - public static List<String> getLinks(Tweet tweet) { - List<String> links = new ArrayList<>(); - if( tweet.getEntities().getUrls() != null ) { - for (Url url : tweet.getEntities().getUrls()) { - links.add(url.getExpandedUrl()); - } - } - else - LOGGER.debug(" 0 links"); - return links; - } - /** - * Builds the {@link org.apache.streams.twitter.pojo.TargetObject} from the tweet - * @param tweet the object to use as the source - * @return currently returns null for all activities - */ - public static ActivityObject buildTarget(Tweet tweet) { - return null; - } - /** - * Adds the location extension and populates with teh twitter data - * @param activity the Activity object to update - * @param tweet the object to use as the source - */ - public static void addLocationExtension(Activity activity, Tweet tweet) { - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - Map<String, Object> location = new HashMap<>(); - location.put("id", formatId( - Optional.fromNullable( - tweet.getIdStr()) - .or(Optional.of(tweet.getId().toString())) - .orNull() - )); - location.put("coordinates", boundingBoxCenter(tweet.getPlace())); - extensions.put("location", location); - } - /** - * Gets the common twitter {@link org.apache.streams.pojo.json.Provider} object - * @return a provider object representing Twitter - */ - public static Provider getProvider() { - Provider provider = new Provider(); - provider.setId("id:providers:twitter"); - provider.setObjectType("application"); - provider.setDisplayName("Twitter"); - return provider; + /** + * Gets the links from the Twitter event + * @param tweet the object to use as the source + * @return a list of links corresponding to the expanded URL (no t.co) + */ + public static List<String> getLinks(Tweet tweet) { + List<String> links = new ArrayList<>(); + if ( tweet.getEntities().getUrls() != null ) { + for (Url url : tweet.getEntities().getUrls()) { + links.add(url.getExpandedUrl()); + } + } else { + LOGGER.debug(" 0 links"); } - /** - * Adds the given Twitter event to the activity as an extension - * @param activity the Activity object to update - * @param event the Twitter event to add as the extension - */ - public static void addTwitterExtension(Activity activity, ObjectNode event) { - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); - extensions.put("twitter", event); + return links; + } + + /** + * Builds the {@link org.apache.streams.twitter.pojo.TargetObject} from the tweet. + * @param tweet the object to use as the source + * @return currently returns null for all activities + */ + public static ActivityObject buildTarget(Tweet tweet) { + return null; + } + + /** + * Adds the location extension and populates with teh twitter data. + * @param activity the Activity object to update + * @param tweet the object to use as the source + */ + public static void addLocationExtension(Activity activity, Tweet tweet) { + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + Map<String, Object> location = new HashMap<>(); + location.put("id", formatId( + Optional.fromNullable( + tweet.getIdStr()) + .or(Optional.of(tweet.getId().toString())) + .orNull() + )); + location.put("coordinates", boundingBoxCenter(tweet.getPlace())); + extensions.put("location", location); + } + + /** + * Gets the common twitter {@link org.apache.streams.pojo.json.Provider} object + * @return a provider object representing Twitter + */ + public static Provider getProvider() { + Provider provider = new Provider(); + provider.setId("id:providers:twitter"); + provider.setObjectType("application"); + provider.setDisplayName("Twitter"); + return provider; + } + + /** + * Adds the given Twitter event to the activity as an extension. + * @param activity the Activity object to update + * @param event the Twitter event to add as the extension + */ + public static void addTwitterExtension(Activity activity, ObjectNode event) { + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + extensions.put("twitter", event); + } + + /** + * Formats the ID to conform with the Apache Streams activity ID convention. + * @param idparts the parts of the ID to join + * @return a valid Activity ID in format "id:twitter:part1:part2:...partN" + */ + public static String formatId(String... idparts) { + return Joiner.on(":").join(Lists.asList("id:twitter", idparts)); + } + + /** + * Takes various parameters from the twitter object that are currently not part of the + * activity schema and stores them in a generic extensions attribute. + * @param activity Activity + * @param tweet Tweet + */ + public static void addTwitterExtensions(Activity activity, Tweet tweet) { + Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + + List<String> hashtags = new ArrayList<>(); + for (Hashtag hashtag : tweet.getEntities().getHashtags()) { + hashtags.add(hashtag.getText()); } - /** - * Formats the ID to conform with the Apache Streams activity ID convention - * @param idparts the parts of the ID to join - * @return a valid Activity ID in format "id:twitter:part1:part2:...partN" - */ - public static String formatId(String... idparts) { - return Joiner.on(":").join(Lists.asList("id:twitter", idparts)); - } - - /** - * Takes various parameters from the twitter object that are currently not part of teh - * activity schema and stores them in a generic extensions attribute - * @param activity - * @param tweet - */ - public static void addTwitterExtensions(Activity activity, Tweet tweet) { - Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity); + extensions.put("hashtags", hashtags); - List<String> hashtags = new ArrayList<>(); - for(Hashtag hashtag : tweet.getEntities().getHashtags()) { - hashtags.add(hashtag.getText()); - } - extensions.put("hashtags", hashtags); + Map<String, Object> likes = new HashMap<>(); + likes.put("perspectival", tweet.getFavorited()); + likes.put("count", tweet.getAdditionalProperties().get("favorite_count")); - Map<String, Object> likes = new HashMap<>(); - likes.put("perspectival", tweet.getFavorited()); - likes.put("count", tweet.getAdditionalProperties().get("favorite_count")); + extensions.put("likes", likes); - extensions.put("likes", likes); + Map<String, Object> rebroadcasts = new HashMap<>(); + rebroadcasts.put("perspectival", tweet.getRetweeted()); + rebroadcasts.put("count", tweet.getRetweetCount()); - Map<String, Object> rebroadcasts = new HashMap<>(); - rebroadcasts.put("perspectival", tweet.getRetweeted()); - rebroadcasts.put("count", tweet.getRetweetCount()); + extensions.put("rebroadcasts", rebroadcasts); - extensions.put("rebroadcasts", rebroadcasts); + List<Map<String, Object>> userMentions = new ArrayList<>(); + Entities entities = tweet.getEntities(); - List<Map<String, Object>> userMentions = new ArrayList<>(); - Entities entities = tweet.getEntities(); + for (UserMentions user : entities.getUserMentions()) { + //Map the twitter user object into an actor + Map<String, Object> actor = new HashMap<>(); + actor.put("id", "id:twitter:" + user.getIdStr()); + actor.put("displayName", user.getName()); + actor.put("handle", user.getScreenName()); - for(UserMentions user : entities.getUserMentions()) { - //Map the twitter user object into an actor - Map<String, Object> actor = new HashMap<>(); - actor.put("id", "id:twitter:" + user.getIdStr()); - actor.put("displayName", user.getName()); - actor.put("handle", user.getScreenName()); + userMentions.add(actor); + } - userMentions.add(actor); - } + extensions.put("user_mentions", userMentions); - extensions.put("user_mentions", userMentions); + extensions.put("keywords", tweet.getText()); + } - extensions.put("keywords", tweet.getText()); + /** + * Compute central coordinates from bounding box. + * @param place the bounding box to use as the source + */ + public static List<Double> boundingBoxCenter(Place place) { + if ( place == null ) { + return new ArrayList<>(); } - - /** - * Compute central coordinates from bounding box - * @param place the bounding box to use as the source - */ - public static List<Double> boundingBoxCenter(Place place) { - if( place == null ) return new ArrayList<>(); - if( place.getBoundingBox() == null ) return new ArrayList<>(); - if( place.getBoundingBox().getCoordinates().size() != 1 ) return new ArrayList<>(); - if( place.getBoundingBox().getCoordinates().get(0).size() != 4 ) return new ArrayList<>(); - List<Double> lats = new ArrayList<>(); - List<Double> lons = new ArrayList<>(); - for( List<Double> point : place.getBoundingBox().getCoordinates().get(0)) { - lats.add(point.get(0)); - lons.add(point.get(1)); - } - List<Double> result = new ArrayList<>(); - result.add(mean(lats)); - result.add(mean(lons)); - return result; + if ( place.getBoundingBox() == null ) { + return new ArrayList<>(); + } + if ( place.getBoundingBox().getCoordinates().size() != 1 ) { + return new ArrayList<>(); + } + if ( place.getBoundingBox().getCoordinates().get(0).size() != 4 ) { + return new ArrayList<>(); + } + List<Double> lats = new ArrayList<>(); + List<Double> lons = new ArrayList<>(); + for ( List<Double> point : place.getBoundingBox().getCoordinates().get(0)) { + lats.add(point.get(0)); + lons.add(point.get(1)); } + List<Double> result = new ArrayList<>(); + result.add(mean(lats)); + result.add(mean(lons)); + return result; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java index 046cb76..c1c205b 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java @@ -19,9 +19,6 @@ package org.apache.streams.twitter.processor; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import java.util.List; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; @@ -31,11 +28,14 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.TwitterConfiguration; import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.twitter.converter.TwitterDocumentClassifier; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.provider.TwitterEventClassifier; import org.apache.streams.twitter.provider.TwitterProviderUtil; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import twitter4j.Status; @@ -45,6 +45,8 @@ import twitter4j.TwitterFactory; import twitter4j.TwitterObjectFactory; import twitter4j.conf.ConfigurationBuilder; +import java.util.List; + import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.getProvider; import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity; @@ -54,132 +56,132 @@ import static org.apache.streams.twitter.converter.util.TwitterActivityUtil.upda */ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor { - private static final String PROVIDER_ID = getProvider().getId(); - private static final Logger LOGGER = LoggerFactory.getLogger(FetchAndReplaceTwitterProcessor.class); - - //Default number of attempts before allowing the document through - private static final int MAX_ATTEMPTS = 5; - //Start the backoff at 4 minutes. This results in a wait period of 4, 8, 12, 16 & 20 min with an attempt of 5 - public static final int BACKOFF = 1000 * 60 * 4; - - private final TwitterConfiguration config; - private Twitter client; - private ObjectMapper mapper; - private int retryCount; - - public FetchAndReplaceTwitterProcessor() { - this(new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter")); - } - - public FetchAndReplaceTwitterProcessor(TwitterStreamConfiguration config) { - this.config = config; + private static final String PROVIDER_ID = getProvider().getId(); + private static final Logger LOGGER = LoggerFactory.getLogger(FetchAndReplaceTwitterProcessor.class); + + //Default number of attempts before allowing the document through + private static final int MAX_ATTEMPTS = 5; + //Start the backoff at 4 minutes. This results in a wait period of 4, 8, 12, 16 & 20 min with an attempt of 5 + public static final int BACKOFF = 1000 * 60 * 4; + + private final TwitterConfiguration config; + private Twitter client; + private ObjectMapper mapper; + private int retryCount; + + public FetchAndReplaceTwitterProcessor() { + this(new ComponentConfigurator<>(TwitterStreamConfiguration.class).detectConfiguration(StreamsConfigurator.config, "twitter")); + } + + public FetchAndReplaceTwitterProcessor(TwitterStreamConfiguration config) { + this.config = config; + } + + @Override + public String getId() { + return getProvider().getId(); + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + if (entry.getDocument() instanceof Activity) { + Activity doc = (Activity)entry.getDocument(); + String originalId = doc.getId(); + if (PROVIDER_ID.equals(doc.getProvider().getId())) { + fetchAndReplace(doc, originalId); + } + } else { + throw new IllegalStateException("Requires an activity document"); } - - @Override - public String getId() { - return getProvider().getId(); + return Lists.newArrayList(entry); + } + + + @Override + public void prepare(Object configurationObject) { + this.client = getTwitterClient(); + this.mapper = StreamsJacksonMapper.getInstance(); + } + + @Override + public void cleanUp() { + + } + + protected void fetchAndReplace(Activity doc, String originalId) { + try { + String json = fetch(doc); + replace(doc, json); + doc.setId(originalId); + retryCount = 0; + } catch (TwitterException tw) { + if (tw.exceededRateLimitation()) { + sleepAndTryAgain(doc, originalId); + } + } catch (Exception ex) { + LOGGER.warn("Error fetching and replacing tweet for activity {}", doc.getId()); } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - if(entry.getDocument() instanceof Activity) { - Activity doc = (Activity)entry.getDocument(); - String originalId = doc.getId(); - if(PROVIDER_ID.equals(doc.getProvider().getId())) { - fetchAndReplace(doc, originalId); - } - } else { - throw new IllegalStateException("Requires an activity document"); - } - return Lists.newArrayList(entry); + } + + protected void replace(Activity doc, String json) throws java.io.IOException, ActivityConversionException { + Class documentSubType = new TwitterDocumentClassifier().detectClasses(json).get(0); + Object object = mapper.readValue(json, documentSubType); + + if (documentSubType.equals(Retweet.class) || documentSubType.equals(Tweet.class)) { + updateActivity((Tweet)object, doc); + } else if (documentSubType.equals(Delete.class)) { + updateActivity((Delete)object, doc); + } else { + LOGGER.info("Could not determine the correct update method for {}", documentSubType); } + } + protected String fetch(Activity doc) throws TwitterException { + String id = doc.getObject().getId(); + LOGGER.debug("Fetching status from Twitter for {}", id); + Long tweetId = Long.valueOf(id.replace("id:twitter:tweets:", "")); + Status status = getTwitterClient().showStatus(tweetId); + return TwitterObjectFactory.getRawJSON(status); + } - @Override - public void prepare(Object configurationObject) { - this.client = getTwitterClient(); - this.mapper = StreamsJacksonMapper.getInstance(); - } - @Override - public void cleanUp() { + protected Twitter getTwitterClient() { - } + if (this.client == null) { - protected void fetchAndReplace(Activity doc, String originalId) { - try { - String json = fetch(doc); - replace(doc, json); - doc.setId(originalId); - retryCount = 0; - } catch(TwitterException tw) { - if(tw.exceededRateLimitation()) { - sleepAndTryAgain(doc, originalId); - } - } catch (Exception e) { - LOGGER.warn("Error fetching and replacing tweet for activity {}", doc.getId()); - } - } + String baseUrl = TwitterProviderUtil.baseUrl(config); - protected void replace(Activity doc, String json) throws java.io.IOException, ActivityConversionException { - Class documentSubType = TwitterEventClassifier.detectClass(json); - Object object = mapper.readValue(json, documentSubType); - - if(documentSubType.equals(Retweet.class) || documentSubType.equals(Tweet.class)) { - updateActivity((Tweet)object, doc); - } else if(documentSubType.equals(Delete.class)) { - updateActivity((Delete)object, doc); - } else { - LOGGER.info("Could not determine the correct update method for {}", documentSubType); - } - } + ConfigurationBuilder builder = new ConfigurationBuilder() + .setOAuthConsumerKey(config.getOauth().getConsumerKey()) + .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) + .setOAuthAccessToken(config.getOauth().getAccessToken()) + .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) + .setIncludeEntitiesEnabled(true) + .setJSONStoreEnabled(true) + .setAsyncNumThreads(1) + .setRestBaseURL(baseUrl) + .setIncludeMyRetweetEnabled(Boolean.TRUE) + .setPrettyDebugEnabled(Boolean.TRUE); - protected String fetch(Activity doc) throws TwitterException { - String id = doc.getObject().getId(); - LOGGER.debug("Fetching status from Twitter for {}", id); - Long tweetId = Long.valueOf(id.replace("id:twitter:tweets:", "")); - Status status = getTwitterClient().showStatus(tweetId); - return TwitterObjectFactory.getRawJSON(status); + this.client = new TwitterFactory(builder.build()).getInstance(); } - - - protected Twitter getTwitterClient() - { - if(this.client == null) { - - String baseUrl = TwitterProviderUtil.baseUrl(config); - - ConfigurationBuilder builder = new ConfigurationBuilder() - .setOAuthConsumerKey(config.getOauth().getConsumerKey()) - .setOAuthConsumerSecret(config.getOauth().getConsumerSecret()) - .setOAuthAccessToken(config.getOauth().getAccessToken()) - .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret()) - .setIncludeEntitiesEnabled(true) - .setJSONStoreEnabled(true) - .setAsyncNumThreads(1) - .setRestBaseURL(baseUrl) - .setIncludeMyRetweetEnabled(Boolean.TRUE) - .setPrettyDebugEnabled(Boolean.TRUE); - - this.client = new TwitterFactory(builder.build()).getInstance(); - } - return this.client; - } - - //Hardcore sleep to allow for catch up - protected void sleepAndTryAgain(Activity doc, String originalId) { - try { - //Attempt to fetchAndReplace with a backoff up to the limit then just reset the count and let the process continue - if(retryCount < MAX_ATTEMPTS) { - retryCount++; - LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (retryCount * 4)); - Thread.sleep(BACKOFF * retryCount); - fetchAndReplace(doc, originalId); - } else { - retryCount = 0; - } - } catch (InterruptedException e) { - LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff"); - } + return this.client; + } + + //Hardcore sleep to allow for catch up + protected void sleepAndTryAgain(Activity doc, String originalId) { + try { + //Attempt to fetchAndReplace with a backoff up to the limit then just reset the count and let the process continue + if (retryCount < MAX_ATTEMPTS) { + retryCount++; + LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (retryCount * 4)); + Thread.sleep(BACKOFF * retryCount); + fetchAndReplace(doc, originalId); + } else { + retryCount = 0; + } + } catch (InterruptedException ex) { + LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff"); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java deleted file mode 100644 index ed6b90a..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.processor; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.twitter.converter.StreamsTwitterMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * This class performs conversion of a twitter event to a specified outClass - * - * Deprecated: use TypeConverterProcessor and ActivityConverterProcessor instead - */ -@Deprecated -public class TwitterEventProcessor implements StreamsProcessor { - - private final static String STREAMS_ID = "TwitterEventProcessor"; - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class); - - private ObjectMapper mapper = new StreamsTwitterMapper(); - - private Class inClass; - private Class outClass; - - public TwitterEventProcessor(Class inClass, Class outClass) { - this.inClass = inClass; - this.outClass = outClass; - } - - public TwitterEventProcessor( Class outClass) { - this(null, outClass); - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - - LOGGER.error("You are calling a deprecated / defunct class. Modify your stream to use ActivityConverterProcessor."); - - LOGGER.debug("CONVERT FAILED"); - - return Lists.newArrayList(); - - } - - @Override - public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); - } - - @Override - public void cleanUp() { - - } -}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java deleted file mode 100644 index d49a54f..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.processor; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.twitter.converter.TwitterDateTimeFormat; -import org.apache.streams.twitter.pojo.Retweet; -import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.pojo.User; -import org.apache.streams.twitter.provider.TwitterEventClassifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.Random; - -public class TwitterProfileProcessor implements StreamsProcessor, Runnable { - - private final static String STREAMS_ID = "TwitterProfileProcessor"; - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class); - - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(TwitterDateTimeFormat.TWITTER_FORMAT); - - private Queue<StreamsDatum> inQueue; - private Queue<StreamsDatum> outQueue; - - private final static String TERMINATE = "TERMINATE"; - - @Override - public void run() { - - while(true) { - StreamsDatum item; - try { - item = inQueue.poll(); - if(item.getDocument() instanceof String && item.equals(TERMINATE)) { - LOGGER.info("Terminating!"); - break; - } - - Thread.sleep(new Random().nextInt(100)); - - for( StreamsDatum entry : process(item)) { - outQueue.offer(entry); - } - - - } catch (Exception e) { - e.printStackTrace(); - - } - } - } - - public StreamsDatum createStreamsDatum(User user) { - return new StreamsDatum(user, user.getIdStr()); - } - - @Override - public String getId() { - return STREAMS_ID; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - - List<StreamsDatum> result = new ArrayList<>(); - String item; - try { - // first check for valid json - // since data is coming from outside provider, we don't know what type the events are - if( entry.getDocument() instanceof String) { - item = (String) entry.getDocument(); - } else { - item = mapper.writeValueAsString(entry.getDocument()); - } - - Class inClass = TwitterEventClassifier.detectClass(item); - - User user; - - if ( inClass.equals( Tweet.class )) { - LOGGER.debug("TWEET"); - Tweet tweet = mapper.readValue(item, Tweet.class); - user = tweet.getUser(); - result.add(createStreamsDatum(user)); - } - else if ( inClass.equals( Retweet.class )) { - LOGGER.debug("RETWEET"); - Retweet retweet = mapper.readValue(item, Retweet.class); - user = retweet.getRetweetedStatus().getUser(); - result.add(createStreamsDatum(user)); - } else if ( inClass.equals( User.class )) { - LOGGER.debug("USER"); - user = mapper.readValue(item, User.class); - result.add(createStreamsDatum(user)); - } else { - return new ArrayList<>(); - } - - return result; - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Error processing " + entry.toString()); - return new ArrayList<>(); - } - } - - @Override - public void prepare(Object o) { - - } - - @Override - public void cleanUp() { - - } -}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java index cc1ecb1..d51e4e7 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java @@ -21,13 +21,14 @@ package org.apache.streams.twitter.processor; import org.apache.streams.converter.ActivityConverterProcessor; /** - * This class performs conversion of a twitter event to a specified outClass + * This class performs conversion of a twitter event to a specified outClass. * + * <p/> * Deprecated: use TypeConverterProcessor and ActivityConverterProcessor instead */ @Deprecated public class TwitterTypeConverter extends ActivityConverterProcessor { - public final static String STREAMS_ID = "TwitterTypeConverter"; + public static final String STREAMS_ID = "TwitterTypeConverter"; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java index 30db471..0dd43bb 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java @@ -18,63 +18,71 @@ package org.apache.streams.twitter.processor; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.streams.components.http.HttpProcessorConfiguration; import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.pojo.json.Activity; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + import java.util.HashMap; import java.util.List; import java.util.Map; /** - * Class gets a global share count from Twitter API for links on Activity datums + * Class gets a global share count from Twitter API for links on Activity datums. */ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements StreamsProcessor { - private final static String STREAMS_ID = "TwitterUrlApiProcessor"; + private static final String STREAMS_ID = "TwitterUrlApiProcessor"; - public TwitterUrlApiProcessor() { - super(); - this.configuration.setHostname("urls.api.twitter.com"); - this.configuration.setResourcePath("/1/urls/count.json"); - this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY); - this.configuration.setExtension("twitter_url_count"); - } + /** + * TwitterUrlApiProcessor constructor. + */ + public TwitterUrlApiProcessor() { + super(); + this.configuration.setHostname("urls.api.twitter.com"); + this.configuration.setResourcePath("/1/urls/count.json"); + this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY); + this.configuration.setExtension("twitter_url_count"); + } - public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) { - super(processorConfiguration); - this.configuration.setHostname("urls.api.twitter.com"); - this.configuration.setResourcePath("/1/urls/count.json"); - this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY); - this.configuration.setExtension("twitter_url_count"); - } + /** + * TwitterUrlApiProcessor constructor. + */ + public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) { + super(processorConfiguration); + this.configuration.setHostname("urls.api.twitter.com"); + this.configuration.setResourcePath("/1/urls/count.json"); + this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY); + this.configuration.setExtension("twitter_url_count"); + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - Preconditions.checkArgument(entry.getDocument() instanceof Activity); - Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); - if( activity.getLinks() != null && activity.getLinks().size() > 0) - return super.process(entry); - else - return Lists.newArrayList(entry); + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + Preconditions.checkArgument(entry.getDocument() instanceof Activity); + Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); + if ( activity.getLinks() != null && activity.getLinks().size() > 0) { + return super.process(entry); + } else { + return Lists.newArrayList(entry); } + } - @Override - protected Map<String, String> prepareParams(StreamsDatum entry) { + @Override + protected Map<String, String> prepareParams(StreamsDatum entry) { - Map<String, String> params = new HashMap<>(); + Map<String, String> params = new HashMap<>(); - params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0)); + params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0)); - return params; - } + return params; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java index 90f6b62..ec43fba 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java @@ -21,120 +21,113 @@ package org.apache.streams.twitter.provider; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.twitter.TwitterConfiguration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import twitter4j.RateLimitStatus; import twitter4j.Twitter; import twitter4j.TwitterException; -import twitter4j.RateLimitStatus; /** * Handle expected and unexpected exceptions. */ -public class TwitterErrorHandler -{ - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class); - - // selected because 3 * 5 + n >= 15 for positive n - protected static long retry = - new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration( - StreamsConfigurator.getConfig().getConfig("twitter") - ).getRetrySleepMs(); - protected static long retryMax = - new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration( - StreamsConfigurator.getConfig().getConfig("twitter") - ).getRetryMax(); - - @Deprecated - public static int handleTwitterError(Twitter twitter, Exception exception) { - return handleTwitterError( twitter, null, exception); - } +public class TwitterErrorHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class); + + // selected because 3 * 5 + n >= 15 for positive n + protected static long retry = + new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration( + StreamsConfigurator.getConfig().getConfig("twitter") + ).getRetrySleepMs(); + protected static long retryMax = + new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration( + StreamsConfigurator.getConfig().getConfig("twitter") + ).getRetryMax(); + + @Deprecated + public static int handleTwitterError(Twitter twitter, Exception exception) { + return handleTwitterError( twitter, null, exception); + } + + /** + * handleTwitterError. + * @param twitter Twitter + * @param id id + * @param exception exception + * @return + */ + public static int handleTwitterError(Twitter twitter, Long id, Exception exception) { + + if (exception instanceof TwitterException) { + TwitterException twitterException = (TwitterException)exception; + + if (twitterException.exceededRateLimitation()) { + + long millisUntilReset = retry; + + final RateLimitStatus rateLimitStatus = twitterException.getRateLimitStatus(); + if (rateLimitStatus != null) { + millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000; + } + + LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", millisUntilReset / 1000); + + try { + Thread.sleep(millisUntilReset); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + } - public static int handleTwitterError(Twitter twitter, Long id, Exception exception) - { - if(exception instanceof TwitterException) - { - TwitterException e = (TwitterException)exception; - if(e.exceededRateLimitation()) - { - long millisUntilReset = retry; - - final RateLimitStatus rateLimitStatus = e.getRateLimitStatus(); - if (rateLimitStatus != null) { - millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000; - } - - LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", millisUntilReset / 1000); - - try { - Thread.sleep(millisUntilReset); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - } - - return 1; - } - else if(e.isCausedByNetworkIssue()) - { - LOGGER.info("Twitter Network Issues Detected. Backing off..."); - LOGGER.info("{} - {}", e.getExceptionCode(), e.getLocalizedMessage()); - try { - Thread.sleep(retry); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - } - return 1; - } - else if(e.isErrorMessageAvailable()) - { - if(e.getMessage().toLowerCase().contains("does not exist")) - { - if( id != null ) - LOGGER.warn("User does not exist: {}", id); - else - LOGGER.warn("User does not exist"); - return (int)retryMax; - } - else - { - return (int)retryMax/3; - } - } - else - { - if(e.getExceptionCode().equals("ced778ef-0c669ac0")) - { - // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data. - return (int)retryMax/3; - } - else if(e.getExceptionCode().equals("4be80492-0a7bf7c7")) { - // This is a 401 reflecting credentials don't have access to the requested resource. - if( id != null ) - LOGGER.warn("Authentication Exception accessing id: {}", id); - else - LOGGER.warn("Authentication Exception"); - return (int)retryMax; - } - else - { - LOGGER.warn("Unknown Twitter Exception..."); - LOGGER.warn(" Account: {}", twitter); - LOGGER.warn(" Access: {}", e.getAccessLevel()); - LOGGER.warn(" Code: {}", e.getExceptionCode()); - LOGGER.warn(" Message: {}", e.getLocalizedMessage()); - return (int)retryMax/10; - } - } + return 1; + } else if (twitterException.isCausedByNetworkIssue()) { + LOGGER.info("Twitter Network Issues Detected. Backing off..."); + LOGGER.info("{} - {}", twitterException.getExceptionCode(), twitterException.getLocalizedMessage()); + try { + Thread.sleep(retry); + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); } - else if(exception instanceof RuntimeException) - { - LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage()); - return (int)retryMax/3; + return 1; + } else if (twitterException.isErrorMessageAvailable()) { + if (twitterException.getMessage().toLowerCase().contains("does not exist")) { + if ( id != null ) { + LOGGER.warn("User does not exist: {}", id); + } else { + LOGGER.warn("User does not exist"); + } + return (int)retryMax; + } else { + return (int)retryMax / 3; } - else - { - LOGGER.info("Completely Unknown Exception: {}", exception); - return (int)retryMax/3; + } else { + if (twitterException.getExceptionCode().equals("ced778ef-0c669ac0")) { + // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data. + return (int)retryMax / 3; + } else if (twitterException.getExceptionCode().equals("4be80492-0a7bf7c7")) { + // This is a 401 reflecting credentials don't have access to the requested resource. + if ( id != null ) { + LOGGER.warn("Authentication Exception accessing id: {}", id); + } else { + LOGGER.warn("Authentication Exception"); + } + return (int)retryMax; + } else { + LOGGER.warn("Unknown Twitter Exception..."); + LOGGER.warn(" Account: {}", twitter); + LOGGER.warn(" Access: {}", twitterException.getAccessLevel()); + LOGGER.warn(" Code: {}", twitterException.getExceptionCode()); + LOGGER.warn(" Message: {}", twitterException.getLocalizedMessage()); + return (int)retryMax / 10; } + } + } else if (exception instanceof RuntimeException) { + LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage()); + return (int)retryMax / 3; + } else { + LOGGER.info("Completely Unknown Exception: {}", exception); + return (int)retryMax / 3; } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java deleted file mode 100644 index 9466c2e..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.provider; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.commons.lang.StringUtils; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.twitter.converter.TwitterDateTimeFormat; -import org.apache.streams.twitter.pojo.Delete; -import org.apache.streams.twitter.pojo.FriendList; -import org.apache.streams.twitter.pojo.Retweet; -import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.pojo.User; -import org.apache.streams.twitter.pojo.UserstreamEvent; - -import java.io.IOException; -import java.io.Serializable; - -/** - * TwitterEventClassifier classifies twitter events - * - * @Deprecated - replaced by TwitterDocumentClassifier - use ActivityConverterProcessor - */ -public class TwitterEventClassifier implements Serializable { - - private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(TwitterDateTimeFormat.TWITTER_FORMAT)); - - public static Class detectClass( String json ) { - Preconditions.checkNotNull(json); - Preconditions.checkArgument(StringUtils.isNotEmpty(json)); - - ObjectNode objectNode; - try { - objectNode = (ObjectNode) mapper.readTree(json); - } catch (IOException e) { - e.printStackTrace(); - return null; - } - - if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null) - return Retweet.class; - else if( objectNode.findValue("delete") != null ) - return Delete.class; - else if( objectNode.findValue("friends") != null || - objectNode.findValue("friends_str") != null ) - return FriendList.class; - else if( objectNode.findValue("target_object") != null ) - return UserstreamEvent.class; - else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null) - return User.class; - else - return Tweet.class; - } - -}
