created a DatasiftObjectMapper capable of parsing either RFC3339 or the standard Datasift date string changed all classes in package except configurator to use this Mapper
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3af77bd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3af77bd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3af77bd1 Branch: refs/heads/master Commit: 3af77bd1ff83e4c882eb90ea899a02661fdbd2ee Parents: fec8a37 Author: Steve Blackmon <sblack...@w2odigital.com> Authored: Wed Jul 30 21:24:14 2014 -0500 Committer: Steve Blackmon <sblack...@w2odigital.com> Committed: Wed Jul 30 21:26:33 2014 -0500 ---------------------------------------------------------------------- .../provider/DatasiftStreamProvider.java | 4 +- .../DatasiftTypeConverterProcessor.java | 7 +- .../serializer/DatasiftActivitySerializer.java | 4 +- .../DatasiftDefaultActivitySerializer.java | 21 ++--- .../DatasiftTweetActivitySerializer.java | 14 ++-- .../datasift/util/StreamsDatasiftMapper.java | 84 ++++++++++++++++++++ .../DatasiftActivitySerializerTest.java | 31 +++++++- 7 files changed, 135 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java index 09c01b0..8ed1443 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftStreamProvider.java @@ -35,7 +35,7 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.datasift.DatasiftConfiguration; -import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.datasift.util.StreamsDatasiftMapper; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,7 +202,7 @@ public class DatasiftStreamProvider implements StreamsProvider { public void prepare(Object configurationObject) { this.interactions = new ConcurrentLinkedQueue<Interaction>(); this.clients = Maps.newHashMap(); - this.mapper = StreamsJacksonMapper.getInstance(); + this.mapper = StreamsDatasiftMapper.getInstance(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java index 203e5e8..0b847a4 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessor.java @@ -21,10 +21,11 @@ package org.apache.streams.datasift.provider; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; -import org.apache.streams.core.*; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; import org.apache.streams.datasift.Datasift; import org.apache.streams.datasift.serializer.DatasiftActivitySerializer; -import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.datasift.util.StreamsDatasiftMapper; import org.apache.streams.pojo.json.Activity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,7 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor { @Override public void prepare(Object configurationObject) { - this.mapper = StreamsJacksonMapper.getInstance(); + this.mapper = StreamsDatasiftMapper.getInstance(); this.datasiftInteractionActivitySerializer = new DatasiftActivitySerializer(); if(this.outClass.equals(Activity.class)) { this.converter = new ActivityConverter(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java index 1d5fd2c..7d644f0 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java @@ -21,8 +21,8 @@ package org.apache.streams.datasift.serializer; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.streams.data.ActivitySerializer; import org.apache.streams.datasift.Datasift; +import org.apache.streams.datasift.util.StreamsDatasiftMapper; import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import java.util.List; @@ -34,7 +34,7 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift> private static final DatasiftTweetActivitySerializer TWITTER_SERIALIZER = new DatasiftTweetActivitySerializer(); private static final DatasiftDefaultActivitySerializer DEFAULT_SERIALIZER = new DatasiftDefaultActivitySerializer(); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance(); @Override public String serializationFormat() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java index 4095df6..b70aa12 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java @@ -1,15 +1,13 @@ package org.apache.streams.datasift.serializer; -import com.fasterxml.jackson.databind.AnnotationIntrospector; -import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import org.apache.streams.data.ActivitySerializer; import org.apache.streams.datasift.Datasift; -import org.apache.streams.datasift.interaction.*; -import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.datasift.interaction.Interaction; +import org.apache.streams.datasift.interaction.Links; +import org.apache.streams.datasift.util.StreamsDatasiftMapper; import org.apache.streams.pojo.json.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +28,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy"; - ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + ObjectMapper mapper = StreamsDatasiftMapper.getInstance(); @Override public String serializationFormat() { @@ -46,7 +44,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat try { return deserialize(this.mapper.readValue(datasiftJson, Datasift.class)); } catch (Exception e) { - LOGGER.error("Excpetion while trying convert,\n {},\n to a Datasift object.", datasiftJson); + LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson); LOGGER.error("Exception : {}", e); throw new RuntimeException(e); } @@ -55,13 +53,6 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat @Override public Activity deserialize(Datasift serialized) { - AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory()); - mapper.setAnnotationIntrospector(introspector); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE); - try { Activity activity = convert(serialized); @@ -141,7 +132,7 @@ public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Dat Actor actor = new Actor(); org.apache.streams.datasift.interaction.Author author = interaction.getAuthor(); if(author == null) { - LOGGER.warn("Interactiond does not contain author information."); + LOGGER.warn("Interaction does not contain author information."); return actor; } String userName = author.getUsername(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java index e7b0a52..b16aae2 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java @@ -22,8 +22,8 @@ package org.apache.streams.datasift.serializer; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.streams.data.util.RFC3339Utils; import org.apache.streams.datasift.Datasift; +import org.apache.streams.datasift.interaction.Author; import org.apache.streams.datasift.interaction.Interaction; import org.apache.streams.datasift.twitter.DatasiftTwitterUser; import org.apache.streams.datasift.twitter.Retweet; @@ -134,7 +134,7 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri .orNull())); actor.setSummary(user.getDescription()); try { - actor.setPublished(RFC3339Utils.parseToUTC(user.getCreatedAt())); + actor.setPublished(user.getCreatedAt()); } catch (Exception e) { LOGGER.warn("Exception trying to parse date : {}", e); } @@ -149,14 +149,16 @@ public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySeri extensions.put("followers", user.getFollowersCount()); extensions.put("screenName", user.getScreenName()); if(user.getAdditionalProperties() != null) { - extensions.put("favorites", user.getAdditionalProperties().get("favourites_count")); + extensions.put("favorites", user.getFavouritesCount()); } Image profileImage = new Image(); String profileUrl = null; - profileUrl = event.getInteraction().getAuthor().getAvatar(); - if(profileUrl == null && user.getAdditionalProperties() != null) { - Object url = user.getAdditionalProperties().get("profile_image_url_https"); + Author author = event.getInteraction().getAuthor(); + if( author != null ) + profileUrl = author.getAvatar(); + if(profileUrl == null && user.getProfileImageUrlHttps() != null) { + Object url = user.getProfileImageUrlHttps(); if(url instanceof String) profileUrl = (String) url; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java new file mode 100644 index 0000000..c5f2abf --- /dev/null +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.datasift.util; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.streams.data.util.RFC3339Utils; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.io.IOException; + +/** + * Created by sblackmon on 3/27/14. + */ +public class StreamsDatasiftMapper extends StreamsJacksonMapper { + + public static final DateTimeFormatter DATASIFT_FORMAT = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss Z"); + + public static final Long getMillis(String dateTime) { + + // this function is for pig which doesn't handle exceptions well + try { + Long result = DATASIFT_FORMAT.parseMillis(dateTime); + return result; + } catch( Exception e ) { + return null; + } + + } + + private static final StreamsDatasiftMapper INSTANCE = new StreamsDatasiftMapper(); + + public static StreamsDatasiftMapper getInstance(){ + return INSTANCE; + } + + public StreamsDatasiftMapper() { + super(); + registerModule(new SimpleModule() + { + { + addDeserializer(DateTime.class, new StdDeserializer<DateTime>(DateTime.class) { + @Override + public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException { + DateTime result = null; + try { + result = DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString()); + } catch (Exception e) {} + if (result == null) { + try { + result = RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString()); + } catch (Exception e) {} + } + return result; + } + }); + } + }); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3af77bd1/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java index 88dd2d6..90b7285 100644 --- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java +++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java @@ -2,7 +2,8 @@ package org.apache.streams.datasift.serializer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.commons.lang.StringUtils; +import org.apache.streams.datasift.util.StreamsDatasiftMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.Actor; import org.junit.Test; @@ -11,11 +12,12 @@ import java.util.Scanner; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; public class DatasiftActivitySerializerTest { private static final DatasiftActivitySerializer SERIALIZER = new DatasiftActivitySerializer(); - private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance(); @Test public void testGeneralConversion() throws Exception { @@ -39,6 +41,8 @@ public class DatasiftActivitySerializerTest { while(scanner.hasNextLine()) { line = scanner.nextLine(); testGeneralConversion(line); + testDeserNoNull(line); + testDeserNoAddProps(line); System.out.println("ORIGINAL -> "+line); System.out.println("ACTIVITY -> "+MAPPER.writeValueAsString(SERIALIZER.deserialize(line))); System.out.println("NODE -> "+MAPPER.convertValue(SERIALIZER.deserialize(line), JsonNode.class)); @@ -59,8 +63,31 @@ public class DatasiftActivitySerializerTest { assertNotNull(json, activity.getUrl()); Actor actor = activity.getActor(); assertNotNull(json, actor); + } + /** + * Test that null fields are not present + * @param json + */ + private void testDeserNoNull(String json) throws Exception { + Activity ser = SERIALIZER.deserialize(json); + String deser = MAPPER.writeValueAsString(ser); + int nulls = StringUtils.countMatches(deser, ":null"); + assertEquals(0l, (long)nulls); + + } + /** + * Test that null fields are not present + * @param json + */ + private void testDeserNoAddProps(String json) throws Exception { + Activity ser = SERIALIZER.deserialize(json); + String deser = MAPPER.writeValueAsString(ser); + int nulls = StringUtils.countMatches(deser, "additionalProperties:{"); + assertEquals(0l, (long)nulls); + + } }