http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java index 639c5ad..8f53954 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeDeserializer.java @@ -29,15 +29,12 @@ import org.joda.time.format.DateTimeFormatter; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; /** - * StreamsDateTimeDeserializer is a supporting class for - * @see {@link org.apache.streams.jackson.StreamsJacksonMapper} - * - * Converting date-time strings other than RFC3339 to joda DateTime objects requires - * additional formats to be provided when instantiating StreamsJacksonMapper. + * Created by sblackmon on 3/27/14. */ public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> implements Serializable { @@ -53,9 +50,6 @@ public class StreamsDateTimeDeserializer extends StdDeserializer<DateTime> imple formatters.add(DateTimeFormat.forPattern(format)); } - /** - * Applies each additional format in turn, until it can provide a non-null DateTime - */ @Override public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java index fff314d..1e9c895 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsDateTimeSerializer.java @@ -28,11 +28,12 @@ import java.io.IOException; import java.io.Serializable; /** - * StreamsDateTimeSerializer is a supporting class for - * @see {@link org.apache.streams.jackson.StreamsJacksonMapper} + * Created by sblackmon on 3/27/14. */ public class StreamsDateTimeSerializer extends StdSerializer<DateTime> implements Serializable { + + protected StreamsDateTimeSerializer(Class<DateTime> dateTimeClass) { super(dateTimeClass); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java index 9738fe4..8a74caa 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonMapper.java @@ -24,14 +24,11 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; -import com.google.common.collect.Lists; import java.util.List; /** - * StreamsJacksonMapper is the recommended interface to jackson for any streams component. - * - * Date-time formats that must be supported can be specified with constructor arguments. + * Created by sblackmon on 3/27/14. */ public class StreamsJacksonMapper extends ObjectMapper { @@ -41,13 +38,6 @@ public class StreamsJacksonMapper extends ObjectMapper { return INSTANCE; } - public static StreamsJacksonMapper getInstance(String format){ - - StreamsJacksonMapper instance = new StreamsJacksonMapper(Lists.newArrayList(format)); - - return instance; - - } public static StreamsJacksonMapper getInstance(List<String> formats){ StreamsJacksonMapper instance = new StreamsJacksonMapper(formats); @@ -62,12 +52,6 @@ public class StreamsJacksonMapper extends ObjectMapper { configure(); } - public StreamsJacksonMapper(String format) { - super(); - registerModule(new StreamsJacksonModule(Lists.newArrayList(format))); - configure(); - } - public StreamsJacksonMapper(List<String> formats) { super(); registerModule(new StreamsJacksonModule(formats)); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java ---------------------------------------------------------------------- diff --git a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java index d7e6c64..8b44b0f 100644 --- a/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java +++ b/streams-pojo/src/main/java/org/apache/streams/jackson/StreamsJacksonModule.java @@ -25,8 +25,7 @@ import org.joda.time.Period; import java.util.List; /** - * StreamsJacksonModule is a supporting class for - * @see {@link org.apache.streams.jackson.StreamsJacksonMapper} + * Created by sblackmon on 3/27/14. */ public class StreamsJacksonModule extends SimpleModule { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java index 48207d5..e92a5ae 100644 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsComponentFactory.java @@ -20,10 +20,17 @@ package org.apache.streams.pig; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang.ArrayUtils; import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.data.ActivityConverter; +import org.apache.streams.data.ActivitySerializer; import org.slf4j.Logger; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + /** * Static reflection wrappers for instantiating StreamsComponents */ @@ -31,7 +38,7 @@ public class StreamsComponentFactory { private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsComponentFactory.class); - public static ActivityConverter getSerializerInstance(Class<?> serializerClazz) { + public static ActivitySerializer getSerializerInstance(Class<?> serializerClazz) { Object object = null; try { @@ -42,7 +49,7 @@ public class StreamsComponentFactory { Preconditions.checkNotNull(object); - ActivityConverter serializer = (ActivityConverter) object; + ActivitySerializer serializer = (ActivitySerializer) object; return serializer; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java index 2303d52..788b347 100644 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsProcessDocumentExec.java @@ -20,13 +20,24 @@ package org.apache.streams.pig; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import datafu.pig.util.SimpleEvalFunc; import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.EvalFunc; import org.apache.pig.builtin.MonitoredUDF; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.UDFContext; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.data.ActivitySerializer; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; import org.slf4j.Logger; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java index 65b7956..d517752 100644 --- a/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java +++ b/streams-runtimes/streams-runtime-pig/src/main/java/org/apache/streams/pig/StreamsSerializerExec.java @@ -21,14 +21,26 @@ package org.apache.streams.pig; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import datafu.pig.util.SimpleEvalFunc; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.EvalFunc; import org.apache.pig.builtin.MonitoredUDF; -import org.apache.streams.data.ActivityConverter; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.UDFContext; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.data.ActivitySerializer; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.slf4j.Logger; import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; /** @@ -41,7 +53,7 @@ public class StreamsSerializerExec extends SimpleEvalFunc<String> { private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(StreamsSerializerExec.class); - ActivityConverter activitySerializer; + ActivitySerializer activitySerializer; ObjectMapper mapper = StreamsJacksonMapper.getInstance(); public StreamsSerializerExec(String... execArgs) throws ClassNotFoundException{ http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java index 475d791..e643cb6 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java +++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDatumTest.java @@ -21,9 +21,15 @@ package org.apache.streams.pig.test; import org.apache.pig.pigunit.PigTest; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer; import org.apache.tools.ant.util.StringUtils; +import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; +import java.io.IOException; +import java.text.ParseException; import java.util.List; /** http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java index dd30eb1..556ea3a 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java +++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigProcessDocumentTest.java @@ -21,6 +21,9 @@ package org.apache.streams.pig.test; import org.apache.pig.pigunit.PigTest; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer; import org.apache.tools.ant.util.StringUtils; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigSerializerTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigSerializerTest.java b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigSerializerTest.java index b53083a..a7ad4a0 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigSerializerTest.java +++ b/streams-runtimes/streams-runtime-pig/src/test/java/org/apache/streams/pig/test/PigSerializerTest.java @@ -19,14 +19,22 @@ package org.apache.streams.pig.test; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.apache.pig.data.Tuple; import org.apache.pig.pigunit.PigTest; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; -import org.apache.streams.twitter.serializer.TwitterJsonActivityConverter; +import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer; import org.apache.tools.ant.util.StringUtils; +import org.junit.Ignore; import org.junit.Test; +import java.io.File; +import java.util.Iterator; + /** * This is a test for StreamsSerializerExec */ @@ -39,7 +47,7 @@ public class PigSerializerTest { "159475541894897679\ttwitter,statuses/user_timeline\t1384499359006\t{\"retweeted_status\":{\"contributors\":null,\"text\":\"The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[80,100],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[106,120],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159470076259602432,\"source\":\"<a href=\\\"http://www.hootsuite.com\\\" rel=\\\"nofollow\\\">HootSuite<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retweet_count\":71,\"create d_at\":\"Wed Jan 18 03:00:03 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":14,\"id_str\":\"159470076259602432\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":70754,\"lang\":\"en\",\"profile_link_color\":\"1B4F89\",\"profile_banner_url\":\"https://pbs.twimg.com/profile_banners/14293310/1355243462\",\"id\":14293310,\"following\":false,\"protected\":false,\"favourites_count\":59,\"profile_text_color\":\"000000\",\"description\":\"Breaking news and current events from around the globe. Hosted by TIME staff. Tweet questions to our customer service team @TIMEmag_Service.\",\"verified\":true,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"000000\",\"name\":\"TIME.com\",\"profile_background_color\":\"CC0000\",\"created_at\":\"Thu Apr 03 13:54:30 +0000 2008\",\"default_profile_image\":false,\"followers_count\":5146268,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/1 700796190/Picture_24_normal.png\",\"geo_enabled\":false,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"profile_background_image_url_https\":\"https://si0.twimg.com/profile_background_images/735228291/107f1a300a90ee713937234bb3d139c0.jpeg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]},\"url\":{\"urls\":[{\"expanded_url\":\"http://www.time.com\",\"indices\":[0,22],\"display_url\":\"time.com\",\"url\":\"http://t.co/4aYbUuAeSh\"}]}},\"url\":\"http://t.co/4aYbUuAeSh\",\"utc_offset\":-18000,\"time_zone\":\"Eastern Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":742,\"profile_sidebar_fill_color\":\"D9D9D9\",\"screen_name\":\"TIME\",\"id_str\":\"14293310\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/1700796190/Picture_24_normal.png\",\"listed_count\":76944,\"is_translator\":false},\"coordinates\":null},\"contr ibutors\":null,\"text\":\"RT @TIME: The Costa Concordia cruise ship accident could be a disaster for the industry | http://t.co/M9UUNvZi (via @TIMEMoneyland)\",\"geo\":null,\"retweeted\":false,\"in_reply_to_screen_name\":null,\"possibly_sensitive\":false,\"truncated\":false,\"lang\":\"en\",\"entities\":{\"symbols\":[],\"urls\":[{\"expanded_url\":\"http://ti.me/zYyEtD\",\"indices\":[90,110],\"display_url\":\"ti.me/zYyEtD\",\"url\":\"http://t.co/M9UUNvZi\"}],\"hashtags\":[],\"user_mentions\":[{\"id\":14293310,\"name\":\"TIME.com\",\"indices\":[3,8],\"screen_name\":\"TIME\",\"id_str\":\"14293310\"},{\"id\":245888431,\"name\":\"TIME Moneyland\",\"indices\":[116,130],\"screen_name\":\"TIMEMoneyland\",\"id_str\":\"245888431\"}]},\"in_reply_to_status_id_str\":null,\"id\":159475541894897679,\"source\":\"<a href=\\\"http://twitter.com/download/iphone\\\" rel=\\\"nofollow\\\">Twitter for iPhone<\\/a>\",\"in_reply_to_user_id_str\":null,\"favorited\":false,\"in_reply_to_status_id\":null,\"retwe et_count\":71,\"created_at\":\"Wed Jan 18 03:21:46 +0000 2012\",\"in_reply_to_user_id\":null,\"favorite_count\":0,\"id_str\":\"159475541894897679\",\"place\":null,\"user\":{\"location\":\"\",\"default_profile\":false,\"profile_background_tile\":true,\"statuses_count\":5053,\"lang\":\"en\",\"profile_link_color\":\"738D84\",\"id\":27552112,\"following\":false,\"protected\":false,\"favourites_count\":52,\"profile_text_color\":\"97CEC9\",\"description\":\"\",\"verified\":false,\"contributors_enabled\":false,\"profile_sidebar_border_color\":\"A9AC00\",\"name\":\"rafael medina-flores\",\"profile_background_color\":\"C5EFE3\",\"created_at\":\"Mon Mar 30 01:21:55 +0000 2009\",\"default_profile_image\":false,\"followers_count\":963,\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"geo_enabled\":true,\"profile_background_image_url\":\"http://a0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"profile_background_image_url_https\":\" https://si0.twimg.com/profile_background_images/167479660/trireme.jpg\",\"follow_request_sent\":false,\"entities\":{\"description\":{\"urls\":[]}},\"url\":null,\"utc_offset\":-25200,\"time_zone\":\"Mountain Time (US & Canada)\",\"notifications\":false,\"profile_use_background_image\":true,\"friends_count\":1800,\"profile_sidebar_fill_color\":\"5C4F3C\",\"screen_name\":\"rmedinaflores\",\"id_str\":\"27552112\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/2519547938/image_normal.jpg\",\"listed_count\":50,\"is_translator\":false},\"coordinates\":null}" }; - TwitterJsonActivityConverter serializer = new TwitterJsonActivityConverter(); + TwitterJsonActivitySerializer serializer = new TwitterJsonActivitySerializer(); String doc = (String) StringUtils.split(input[0], '\t').get(3); String outdoc = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT)).writeValueAsString(serializer.deserialize(doc)); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7afd6e0a/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig b/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig index d388269..8ea91d8 100644 --- a/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig +++ b/streams-runtimes/streams-runtime-pig/src/test/resources/pigserializertest.pig @@ -1,4 +1,4 @@ -DEFINE SERIALIZER org.apache.streams.pig.StreamsSerializerExec('org.apache.streams.twitter.serializer.TwitterJsonActivityConverter'); +DEFINE SERIALIZER org.apache.streams.pig.StreamsSerializerExec('org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer'); in = LOAD '*' USING PigStorage('\t') AS (activityid: chararray, source: chararray, timestamp: long, object: chararray); out = FOREACH in { result = SERIALIZER(object);