Repository: incubator-streams Updated Branches: refs/heads/STREAMS-168 ad5f90cc1 -> 0c8e67ce2
a few package name changes working twitterurlapiprocess which uses streams-http (w/o authentication) working people pattern processors which use streams-http (w/ authentication) Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0c8e67ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0c8e67ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0c8e67ce Branch: refs/heads/STREAMS-168 Commit: 0c8e67ce26e131282aaaa03815ad6daefd684346 Parents: ad5f90c Author: Steve Blackmon <[email protected]> Authored: Sun Oct 12 23:13:29 2014 -0500 Committer: Steve Blackmon <[email protected]> Committed: Sun Oct 12 23:13:29 2014 -0500 ---------------------------------------------------------------------- .../http/processor/SimpleHTTPGetProcessor.java | 50 +++++++++++++++----- .../peoplepattern/AccountTypeProcessor.java | 7 ++- .../peoplepattern/DemographicsProcessor.java | 7 ++- .../streams-provider-twitter/pom.xml | 2 +- .../processor/TwitterUrlApiProcessor.java | 21 ++++---- .../provider/TwitterTimelineProvider.java | 8 ++-- .../apache/streams/data/util/ExtensionUtil.java | 37 ++++++--------- 7 files changed, 77 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java index 0d17cc6..d3d4429 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java @@ -21,6 +21,7 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.data.util.ExtensionUtil; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.ActivityObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,15 +100,30 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor { } } - /** - Override this to place result in non-standard location on document - */ - protected ObjectNode getEntityToExtend(ObjectNode rootDocument) { + + /** + Override this to place result in non-standard location on document + */ + protected ActivityObject getEntityToExtend(ObjectNode rootDocument) { + + if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) + return mapper.convertValue(rootDocument, ActivityObject.class); + else + return mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()), ActivityObject.class); + + } + + /** + Override this to place result in non-standard location on document + */ + protected ObjectNode setEntityToExtend(ObjectNode rootDocument, ActivityObject activityObject) { if( this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY)) - return rootDocument; + return mapper.convertValue(activityObject, ObjectNode.class); else - return (ObjectNode) rootDocument.get(this.configuration.getEntity().toString()); + rootDocument.set(this.configuration.getEntity().toString(), mapper.convertValue(activityObject, ObjectNode.class)); + + return rootDocument; } @@ -150,9 +166,6 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor { try { response.close(); } catch (IOException e) {} - try { - httpclient.close(); - } catch (IOException e) {} } if( entityString == null ) @@ -162,12 +175,12 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor { ObjectNode extensionFragment = prepareExtensionFragment(entityString); - ObjectNode extensionEntity = getEntityToExtend(rootDocument); - - ExtensionUtil.ensureExtensions(extensionEntity); + ActivityObject extensionEntity = getEntityToExtend(rootDocument); ExtensionUtil.addExtension(extensionEntity, this.configuration.getExtension(), extensionFragment); + rootDocument = setEntityToExtend(rootDocument, extensionEntity); + entry.setDocument(rootDocument); result.add(entry); @@ -220,5 +233,18 @@ public class SimpleHTTPGetProcessor implements StreamsProcessor { @Override public void cleanUp() { LOGGER.info("shutting down SimpleHTTPGetProcessor"); + try { + httpclient.close(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + httpclient.close(); + } catch (IOException e) { + e.printStackTrace(); + } finally { + httpclient = null; + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java index d180b7f..edcf4d3 100644 --- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java +++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java @@ -18,7 +18,6 @@ package org.apache.streams.peoplepattern; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Maps; import org.apache.streams.components.http.HttpConfigurator; import org.apache.streams.components.http.HttpProcessorConfiguration; @@ -27,6 +26,7 @@ import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.data.util.ExtensionUtil; import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Actor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,10 +62,9 @@ public class AccountTypeProcessor extends SimpleHTTPGetProcessor { @Override protected Map<String, String> prepareParams(StreamsDatum entry) { Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); - //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class); Actor actor = activity.getActor(); - ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class); - String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName"); + ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class); + String username = (String) ExtensionUtil.getExtension(actorObject, "screenName"); Map<String, String> params = Maps.newHashMap(); params.put("id", actor.getId()); params.put("name", actor.getDisplayName()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java index 6ffbb9b..60db379 100644 --- a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java +++ b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java @@ -18,7 +18,6 @@ package org.apache.streams.peoplepattern; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Maps; import org.apache.streams.components.http.HttpConfigurator; import org.apache.streams.components.http.HttpProcessorConfiguration; @@ -27,6 +26,7 @@ import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.data.util.ExtensionUtil; import org.apache.streams.pojo.json.Activity; +import org.apache.streams.pojo.json.ActivityObject; import org.apache.streams.pojo.json.Actor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,10 +62,9 @@ public class DemographicsProcessor extends SimpleHTTPGetProcessor { @Override protected Map<String, String> prepareParams(StreamsDatum entry) { Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); - //Actor actor = mapper.convertValue(entry.getDocument(), Actor.class); Actor actor = activity.getActor(); - ObjectNode actorObjectNode = mapper.convertValue(actor, ObjectNode.class); - String username = (String) ExtensionUtil.getExtension(actorObjectNode, "screenName"); + ActivityObject actorObject = mapper.convertValue(actor, ActivityObject.class); + String username = (String) ExtensionUtil.getExtension(actorObject, "screenName"); Map<String, String> params = Maps.newHashMap(); params.put("id", actor.getId()); params.put("name", actor.getDisplayName()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index f0d65f8..604e5a7 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -55,7 +55,7 @@ </dependency> <dependency> <groupId>org.apache.streams</groupId> - <artifactId>streams-processor-http</artifactId> + <artifactId>streams-http</artifactId> <version>${project.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java index 438937f..54e1369 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterUrlApiProcessor.java @@ -1,11 +1,10 @@ package org.apache.streams.twitter.processor; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.streams.components.http.HttpProcessorConfiguration; -import org.apache.streams.components.http.SimpleHTTPGetProcessor; -import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.components.http.processor.SimpleHTTPGetProcessor; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.pojo.json.Activity; @@ -21,23 +20,27 @@ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements St public TwitterUrlApiProcessor() { super(); this.configuration.setHostname("urls.api.twitter.com"); - this.configuration.setResourceUri("/1/urls/count.json"); + this.configuration.setResourcePath("/1/urls/count.json"); + this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY); this.configuration.setExtension("twitter_url_count"); } public TwitterUrlApiProcessor(HttpProcessorConfiguration processorConfiguration) { super(processorConfiguration); this.configuration.setHostname("urls.api.twitter.com"); - this.configuration.setResourceUri("/1/urls/count.json"); + this.configuration.setResourcePath("/1/urls/count.json"); + this.configuration.setEntity(HttpProcessorConfiguration.Entity.ACTIVITY); this.configuration.setExtension("twitter_url_count"); } @Override public List<StreamsDatum> process(StreamsDatum entry) { Preconditions.checkArgument(entry.getDocument() instanceof Activity); - Activity activity = mapper.convertValue(entry, Activity.class); - Preconditions.checkArgument(!Strings.isNullOrEmpty(activity.getUrl())); - return super.process(entry); + Activity activity = mapper.convertValue(entry.getDocument(), Activity.class); + if( activity.getLinks() != null && activity.getLinks().size() > 0) + return super.process(entry); + else + return Lists.newArrayList(entry); } @Override @@ -45,7 +48,7 @@ public class TwitterUrlApiProcessor extends SimpleHTTPGetProcessor implements St Map<String, String> params = Maps.newHashMap(); - params.put("url", mapper.convertValue(entry, Activity.class).getUrl()); + params.put("url", mapper.convertValue(entry.getDocument(), Activity.class).getLinks().get(0)); return params; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index ae755c2..86395a2 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -19,9 +19,9 @@ package org.apache.streams.twitter.provider; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Queues; +import org.apache.commons.lang.NotImplementedException; import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; @@ -31,14 +31,16 @@ import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; import twitter4j.*; import twitter4j.conf.ConfigurationBuilder; import java.io.Serializable; import java.math.BigInteger; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0c8e67ce/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java ---------------------------------------------------------------------- diff --git a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java index a8d068a..7ce013c 100644 --- a/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java +++ b/streams-pojo-extensions/src/main/java/org/apache/streams/data/util/ExtensionUtil.java @@ -1,7 +1,6 @@ package org.apache.streams.data.util; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Maps; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.ActivityObject; @@ -41,38 +40,33 @@ public class ExtensionUtil { private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - public static Map<String, Object> getExtensions(ObjectNode object) { + public static Map<String, Object> getExtensions(ActivityObject object) { ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class); - Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY); + Map<String,Object> extensions = ensureExtensions(object); return extensions; } - public static Object getExtension(ObjectNode object, String key) { - ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class); - Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY); + public static Object getExtension(ActivityObject object, String key) { + Map<String,Object> extensions = ensureExtensions(object); return extensions.get(key); } - public static void setExtensions(ObjectNode object, Map<String, Object> extensions) { - ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class); - activityObject.setAdditionalProperty(EXTENSION_PROPERTY, extensions); + public static void setExtensions(ActivityObject object, Map<String, Object> extensions) { + object.setAdditionalProperty(EXTENSION_PROPERTY, extensions); }; - public static void addExtension(ObjectNode object, String key, Object extension) { - ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class); - Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY); + public static void addExtension(ActivityObject object, String key, Object extension) { + Map<String,Object> extensions = ensureExtensions(object); extensions.put(key, extension); }; - public static void addExtensions(ObjectNode object, Map<String, Object> extensions) { - ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class); + public static void addExtensions(ActivityObject object, Map<String, Object> extensions) { for( Map.Entry<String, Object> item : extensions.entrySet()) - activityObject.getAdditionalProperties().put(item.getKey(), item.getValue()); + addExtension(object, item.getKey(), item.getValue()); }; - public static void removeExtension(ObjectNode object, String key) { - ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class); - Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY); + public static void removeExtension(ActivityObject object, String key) { + Map<String,Object> extensions = ensureExtensions(object); extensions.remove(key); }; @@ -82,13 +76,12 @@ public class ExtensionUtil { * @return the Map representing the extensions property */ @SuppressWarnings("unchecked") - public static Map<String, Object> ensureExtensions(ObjectNode object) { - ActivityObject activityObject = mapper.convertValue(object, ActivityObject.class); - Map<String,Object> extensions = (Map<String,Object>) activityObject.getAdditionalProperties().get(EXTENSION_PROPERTY); + public static Map<String, Object> ensureExtensions(ActivityObject object) { + Map<String,Object> extensions = (Map<String,Object>) object.getAdditionalProperties().get(EXTENSION_PROPERTY); if(extensions == null) { extensions = Maps.newHashMap(); setExtensions(object, extensions); } - return getExtensions(object); + return extensions; } }
