Merging ryan and matt's changes Tweaks to enable twitter-userstream-local
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f1518b3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f1518b3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f1518b3d Branch: refs/heads/master Commit: f1518b3ddcc798c7c4baae9a16667678e2554b55 Parents: 8883b43 Author: sblackmon <[email protected]> Authored: Wed Apr 2 12:17:48 2014 -0500 Committer: sblackmon <[email protected]> Committed: Wed Apr 2 12:17:48 2014 -0500 ---------------------------------------------------------------------- pom.xml | 2 +- streams-contrib/pom.xml | 1 + .../processor/TwitterEventProcessor.java | 65 ++++++++++++-------- .../twitter/processor/TwitterTypeConverter.java | 48 ++------------- .../provider/TwitterEventClassifier.java | 42 ++++++++++--- .../serializer/StreamsTwitterMapper.java | 2 +- .../streams/twitter/test/SimpleTweetTest.java | 14 ++++- .../streams/jackson/StreamsJacksonModule.java | 1 - .../streams/local/builders/StreamComponent.java | 16 +++-- .../streams/local/tasks/BaseStreamsTask.java | 3 +- streams-util/pom.xml | 4 ++ 11 files changed, 110 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d8a21df..01f2c2d 100644 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,7 @@ <kafka.version>0.8.1</kafka.version> <zookeeper.version>3.4.5-cdh4.5.0</zookeeper.version> <netty.version>3.8.0.Final</netty.version> - <json-path.version>0.9.0</json-path.version> + <json-path.version>0.9.1</json-path.version> <build-helper.version>1.8</build-helper.version> </properties> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml index 38f02f6..2d2d27c 100644 --- a/streams-contrib/pom.xml +++ b/streams-contrib/pom.xml @@ -44,6 +44,7 @@ <module>streams-persist-hdfs</module> <module>streams-persist-kafka</module> <module>streams-persist-mongo</module> + <module>streams-processor-tika</module> <module>streams-processor-urls</module> <module>streams-provider-datasift</module> <module>streams-provider-facebook</module> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java index 2f2194f..abc0c1a 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.exceptions.ActivitySerializerException; @@ -32,7 +33,7 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable { private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class); - private ObjectMapper mapper = StreamsTwitterMapper.getInstance(); + private ObjectMapper mapper = new StreamsTwitterMapper(); private BlockingQueue<String> inQueue; private Queue<StreamsDatum> outQueue; @@ -64,18 +65,22 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable { while(true) { String item; try { - item = inQueue.poll(); + item = inQueue.take(); if(item instanceof String && item.equals(TERMINATE)) { LOGGER.info("Terminating!"); break; } - ObjectNode objectNode = (ObjectNode) mapper.readTree(item); + System.out.println(item); - StreamsDatum rawDatum = new StreamsDatum(objectNode); + if( StringUtils.isNotEmpty(item) ) { + ObjectNode objectNode = (ObjectNode) mapper.readTree(item); - for( StreamsDatum entry : process(rawDatum)) { - outQueue.offer(entry); + StreamsDatum rawDatum = new StreamsDatum(objectNode); + + for (StreamsDatum entry : process(rawDatum)) { + outQueue.offer(entry); + } } } catch (Exception e) { @@ -166,29 +171,37 @@ public class TwitterEventProcessor implements StreamsProcessor, Runnable { LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass()); - String json = node.asText(); + String json = null; + try { + json = mapper.writeValueAsString(node); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } - // since data is coming from outside provider, we don't know what type the events are - Class inClass = TwitterEventClassifier.detectClass(json); + if( StringUtils.isNotEmpty(json)) { + + // since data is coming from outside provider, we don't know what type the events are + Class inClass = TwitterEventClassifier.detectClass(json); + + // if the target is string, just pass-through + if (java.lang.String.class.equals(outClass)) + return Lists.newArrayList(new StreamsDatum(json)); + else { + // convert to desired format + Object out = null; + try { + out = convert(node, inClass, outClass); + } catch (ActivitySerializerException e) { + LOGGER.warn("Failed deserializing", e); + return Lists.newArrayList(); + } catch (JsonProcessingException e) { + LOGGER.warn("Failed parsing JSON", e); + return Lists.newArrayList(); + } - // if the target is string, just pass-through - if( java.lang.String.class.equals(outClass)) - return Lists.newArrayList(new StreamsDatum(json)); - else { - // convert to desired format - Object out = null; - try { - out = convert(node, inClass, outClass); - } catch (ActivitySerializerException e) { - LOGGER.warn("Failed deserializing", e); - return Lists.newArrayList(); - } catch (JsonProcessingException e) { - LOGGER.warn("Failed parsing JSON", e); - return Lists.newArrayList(); + if (out != null && validate(out, outClass)) + return Lists.newArrayList(new StreamsDatum(out)); } - - if( out != null && validate(out, outClass)) - return Lists.newArrayList(new StreamsDatum(out)); } return Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java index 60f2ae7..1c1e2fb 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java @@ -14,10 +14,7 @@ import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.provider.TwitterEventClassifier; -import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer; -import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer; -import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer; -import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer; +import org.apache.streams.twitter.serializer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +31,7 @@ public class TwitterTypeConverter implements StreamsProcessor { private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class); - private ObjectMapper mapper = new ObjectMapper(); + private ObjectMapper mapper = new StreamsTwitterMapper(); private Queue<StreamsDatum> inQueue; private Queue<StreamsDatum> outQueue; @@ -42,9 +39,7 @@ public class TwitterTypeConverter implements StreamsProcessor { private Class inClass; private Class outClass; - private TwitterJsonTweetActivitySerializer twitterJsonTweetActivitySerializer = new TwitterJsonTweetActivitySerializer(); - private TwitterJsonRetweetActivitySerializer twitterJsonRetweetActivitySerializer = new TwitterJsonRetweetActivitySerializer(); - private TwitterJsonDeleteActivitySerializer twitterJsonDeleteActivitySerializer = new TwitterJsonDeleteActivitySerializer(); + private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); public final static String TERMINATE = new String("TERMINATE"); @@ -66,21 +61,10 @@ public class TwitterTypeConverter implements StreamsProcessor { Object result = null; if( outClass.equals( Activity.class )) { - if( inClass.equals( Delete.class )) { - LOGGER.debug("ACTIVITY DELETE"); - result = twitterJsonDeleteActivitySerializer.deserialize( + LOGGER.debug("ACTIVITY"); + result = twitterJsonActivitySerializer.deserialize( mapper.writeValueAsString(event)); - } else if ( inClass.equals( Retweet.class )) { - LOGGER.debug("ACTIVITY RETWEET"); - result = twitterJsonRetweetActivitySerializer.deserialize( - mapper.writeValueAsString(event)); - } else if ( inClass.equals( Tweet.class )) { - LOGGER.debug("ACTIVITY TWEET"); - result = twitterJsonTweetActivitySerializer.deserialize( - mapper.writeValueAsString(event)); - } else { - return null; - } + return result; } else if( outClass.equals( Tweet.class )) { if ( inClass.equals( Tweet.class )) { LOGGER.debug("TWEET"); @@ -200,24 +184,4 @@ public class TwitterTypeConverter implements StreamsProcessor { } -// public void run() { -// while(true) { -// StreamsDatum item; -// try { -// item = inQueue.poll(); -// if(item.getDocument() instanceof String && item.equals(TERMINATE)) { -// LOGGER.info("Terminating!"); -// break; -// } -// -// for( StreamsDatum entry : process(item)) { -// outQueue.offer(entry); -// } -// -// } catch (Exception e) { -// e.printStackTrace(); -// -// } -// } -// } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java index d31c346..b577e42 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java @@ -1,9 +1,15 @@ package org.apache.streams.twitter.provider; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; import com.jayway.jsonassert.JsonAssert; +import org.apache.commons.lang.StringUtils; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.serializer.StreamsTwitterMapper; + +import java.io.IOException; /** * Created by sblackmon on 12/13/13. @@ -12,18 +18,36 @@ public class TwitterEventClassifier { public static Class detectClass( String json ) { - try { - JsonAssert.with(json).assertNull("$.delete"); - } catch( AssertionError ae ) { - return Delete.class; - } + Preconditions.checkNotNull(json); + Preconditions.checkArgument(StringUtils.isNotEmpty(json)); +// try { +// JsonAssert.with(json).assertNull("$.delete"); +// } catch( AssertionError ae ) { +// return Delete.class; +// } +// +// try { +// JsonAssert.with(json).assertNull("$.retweeted_status"); +// } catch( AssertionError ae ) { +// return Retweet.class; +// } +// +// return Tweet.class; + + ObjectNode objectNode; try { - JsonAssert.with(json).assertNull("$.retweeted_status"); - } catch( AssertionError ae ) { - return Retweet.class; + objectNode = (ObjectNode) StreamsTwitterMapper.getInstance().readTree(json); + } catch (IOException e) { + e.printStackTrace(); + return null; } - return Tweet.class; + if( objectNode.findValue("retweeted_status") != null ) + return Retweet.class; + else if( objectNode.findValue("delete") != null ) + return Delete.class; + else + return Tweet.class; } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java index 004e174..6b61036 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java @@ -30,7 +30,7 @@ public class StreamsTwitterMapper extends StreamsJacksonMapper { return INSTANCE; } - private StreamsTwitterMapper() { + public StreamsTwitterMapper() { super(); registerModule(new SimpleModule() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java index 8988de0..31ddfce 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java @@ -3,14 +3,13 @@ package org.apache.streams.twitter.test; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Optional; -import org.apache.commons.lang.StringUtils; +import org.apache.streams.core.StreamsDatum; import org.apache.streams.exceptions.ActivitySerializerException; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.provider.TwitterEventClassifier; +import org.apache.streams.twitter.processor.TwitterTypeConverter; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer; import org.junit.Assert; @@ -82,6 +81,15 @@ public class SimpleTweetTest { Assert.fail(); } + try { + TwitterTypeConverter converter = new TwitterTypeConverter(String.class, Activity.class); + converter.prepare(null); + converter.process(new StreamsDatum(TWITTER_JSON)); + } catch (Throwable e) { + e.printStackTrace(); + Assert.fail(); + } + assertThat(activity, is(not(nullValue()))); assertThat(activity.getId(), is(not(nullValue()))); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java index 4c8a5aa..323e40c 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java @@ -14,5 +14,4 @@ public class StreamsJacksonModule extends SimpleModule { addDeserializer(DateTime.class, new StreamsDateTimeDeserializer(DateTime.class)); } - } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java index 6319ba8..f5e9978 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java @@ -165,10 +165,18 @@ public class StreamComponent { public StreamsTask createConnectedTask() { StreamsTask task; if(this.processor != null) { - task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor)); - task.addInputQueue(this.inQueue); - for(Queue<StreamsDatum> q : this.outBound.values()) { - task.addOutputQueue(q); + if(this.numTasks > 1) { + task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor)); + task.addInputQueue(this.inQueue); + for(Queue<StreamsDatum> q : this.outBound.values()) { + task.addOutputQueue(q); + } + } else { + task = new StreamsProcessorTask(this.processor); + task.addInputQueue(this.inQueue); + for(Queue<StreamsDatum> q : this.outBound.values()) { + task.addOutputQueue(q); + } } } else if(this.writer != null) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 3799480..694cb76 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -3,6 +3,7 @@ package org.apache.streams.local.tasks; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.util.SerializationUtil; import org.slf4j.Logger; @@ -27,7 +28,7 @@ public abstract class BaseStreamsTask implements StreamsTask { private ObjectMapper mapper; public BaseStreamsTask() { - this.mapper = new ObjectMapper(); + this.mapper = new StreamsJacksonMapper(); this.mapper.registerSubtypes(Activity.class); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f1518b3d/streams-util/pom.xml ---------------------------------------------------------------------- diff --git a/streams-util/pom.xml b/streams-util/pom.xml index cd6d031..0a48ec9 100644 --- a/streams-util/pom.xml +++ b/streams-util/pom.xml @@ -39,6 +39,10 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> </dependencies> </project> \ No newline at end of file
