use StreamsJacksonMapper with channel-specific date support deprecate StreamsTwitterMapper deprecate StreamsDatasiftMapper use generic TypeConverterProcessor deprecate/delete now unnecessary classes refactor event classification add event classification tests refactor channel serialization update tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2f6a6574 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2f6a6574 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2f6a6574 Branch: refs/heads/STREAMS-212 Commit: 2f6a6574e1a3d87fd6ae1457f35ce663fc914173 Parents: 9aebd0b Author: sblackmon <[email protected]> Authored: Thu Nov 6 15:27:52 2014 -0800 Committer: sblackmon <[email protected]> Committed: Thu Nov 6 15:27:52 2014 -0800 ---------------------------------------------------------------------- .../DatasiftTypeConverterProcessor.java | 7 +- .../serializer/DatasiftActivitySerializer.java | 12 +- .../DatasiftDefaultActivitySerializer.java | 214 ------------------- .../DatasiftInstagramActivitySerializer.java | 8 +- .../DatasiftTweetActivitySerializer.java | 8 +- .../datasift/util/StreamsDatasiftMapper.java | 11 +- .../com/datasift/test/DatasiftSerDeTest.java | 18 +- .../DatasiftTypeConverterProcessorTest.java | 72 ------- .../DatasiftActivitySerializerTest.java | 6 +- .../serializer/DatasiftEventClassifierTest.java | 76 +++++++ .../streams-provider-twitter/pom.xml | 6 + .../processor/TwitterEventProcessor.java | 194 ----------------- .../processor/TwitterProfileProcessor.java | 133 ------------ .../twitter/processor/TwitterTypeConverter.java | 209 ------------------ .../provider/TwitterEventClassifier.java | 58 +++-- .../serializer/StreamsTwitterMapper.java | 11 +- .../TwitterJsonActivitySerializer.java | 24 +-- .../TwitterJsonDeleteActivitySerializer.java | 6 + .../TwitterJsonRetweetActivitySerializer.java | 6 + .../TwitterJsonTweetActivitySerializer.java | 6 + .../TwitterJsonUserActivitySerializer.java | 6 + ...erJsonUserstreameventActivitySerializer.java | 6 + .../streams/twitter/test/SimpleTweetTest.java | 11 +- .../twitter/test/TweetActivitySerDeTest.java | 6 +- .../streams/twitter/test/TweetSerDeTest.java | 6 +- .../test/TwitterEventClassifierTest.java | 34 +++ 26 files changed, 261 insertions(+), 893 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java index a00cf23..1166b2e 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/processor/DatasiftTypeConverterProcessor.java @@ -57,7 +57,12 @@ public class DatasiftTypeConverterProcessor implements StreamsProcessor { List<StreamsDatum> result = Lists.newLinkedList(); Object doc; try { - doc = this.converter.convert(entry.getDocument(), this.mapper); + if( entry.getDocument() instanceof String ) { + ObjectNode node = this.mapper.readValue((String)entry.getDocument(), ObjectNode.class); + doc = this.converter.convert(node, this.mapper); + } else { + doc = this.converter.convert(entry.getDocument(), this.mapper); + } if(doc != null) { result.add(new StreamsDatum(doc, entry.getId())); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java index 0ada979..b587cd6 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftActivitySerializer.java @@ -32,9 +32,6 @@ import java.util.List; */ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift> { - private static final DatasiftDefaultActivitySerializer DEFAULT_SERIALIZER = new DatasiftDefaultActivitySerializer(); - private static final DatasiftTweetActivitySerializer TWITTER_SERIALIZER = new DatasiftTweetActivitySerializer(); - private static final DatasiftInstagramActivitySerializer INSTAGRAM_SERIALIZER = new DatasiftInstagramActivitySerializer(); private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance(); @Override @@ -49,13 +46,8 @@ public class DatasiftActivitySerializer implements ActivitySerializer<Datasift> @Override public Activity deserialize(Datasift serialized) throws ActivitySerializerException { - if(serialized.getTwitter() != null) { - return TWITTER_SERIALIZER.deserialize(serialized); - } else if(serialized.getInstagram() != null) { - return INSTAGRAM_SERIALIZER.deserialize(serialized); - } else { - return DEFAULT_SERIALIZER.deserialize(serialized); - } + ActivitySerializer serializer = DatasiftEventClassifier.bestSerializer(serialized); + return serializer.deserialize(serialized); } public Activity deserialize(String json) throws ActivitySerializerException { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java deleted file mode 100644 index 678f67b..0000000 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftDefaultActivitySerializer.java +++ /dev/null @@ -1,214 +0,0 @@ -package org.apache.streams.datasift.serializer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import org.apache.streams.data.ActivitySerializer; -import org.apache.streams.datasift.Datasift; -import org.apache.streams.datasift.interaction.Interaction; -import org.apache.streams.datasift.links.Links; -import org.apache.streams.datasift.util.StreamsDatasiftMapper; -import org.apache.streams.pojo.json.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; - -/** - * - */ -public class DatasiftDefaultActivitySerializer implements ActivitySerializer<Datasift>, Serializable { - - private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftDefaultActivitySerializer.class); - - public static final String DATE_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy"; - - ObjectMapper mapper = StreamsDatasiftMapper.getInstance(); - - @Override - public String serializationFormat() { - return "application/json+datasift.com.v1.1"; - } - - @Override - public Datasift serialize(Activity deserialized) { - throw new UnsupportedOperationException("Cannot currently serialize to Datasift JSON"); - } - - public Activity deserialize(String datasiftJson) { - try { - return deserialize(this.mapper.readValue(datasiftJson, Datasift.class)); - } catch (Exception e) { - LOGGER.error("Exception while trying convert,\n {},\n to a Datasift object.", datasiftJson); - LOGGER.error("Exception : {}", e); - throw new RuntimeException(e); - } - } - - @Override - public Activity deserialize(Datasift serialized) { - - try { - - Activity activity = convert(serialized); - - return activity; - - } catch (Exception e) { - throw new IllegalArgumentException("Unable to deserialize", e); - } - - } - - @Override - public List<Activity> deserializeAll(List<Datasift> datasifts) { - List<Activity> activities = Lists.newArrayList(); - for( Datasift datasift : datasifts ) { - activities.add(deserialize(datasift)); - } - return activities; - } - - public static Generator buildGenerator(Interaction interaction) { - Generator generator = new Generator(); - generator.setDisplayName(interaction.getSource()); - generator.setId(interaction.getSource()); - return generator; - } - - public static Icon getIcon(Interaction interaction) { - return null; - } - - public static Provider buildProvider(Interaction interaction) { - Provider provider = new Provider(); - provider.setId("id:providers:"+interaction.getType()); - provider.setDisplayName(interaction.getType()); - return provider; - } - - public static String getUrls(Interaction interaction) { - return null; - } - - public static void addDatasiftExtension(Activity activity, Datasift datasift) { - Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity); - extensions.put("datasift", datasift); - } - - public static String formatId(String... idparts) { - return Joiner.on(":").join(Lists.asList("id:datasift", idparts)); - } - - public Activity convert(Datasift event) { - - Activity activity = new Activity(); - activity.setActor(buildActor(event.getInteraction())); - activity.setVerb(selectVerb(event)); - activity.setObject(buildActivityObject(event.getInteraction())); - activity.setId(formatId(activity.getVerb(), event.getInteraction().getId())); - activity.setTarget(buildTarget(event.getInteraction())); - activity.setPublished(event.getInteraction().getCreatedAt()); - activity.setGenerator(buildGenerator(event.getInteraction())); - activity.setIcon(getIcon(event.getInteraction())); - activity.setProvider(buildProvider(event.getInteraction())); - activity.setTitle(event.getInteraction().getTitle()); - activity.setContent(event.getInteraction().getContent()); - activity.setUrl(event.getInteraction().getLink()); - activity.setLinks(getLinks(event)); - addDatasiftExtension(activity, event); - if( event.getInteraction().getGeo() != null) { - addLocationExtension(activity, event.getInteraction()); - } - return activity; - } - - private String selectVerb(Datasift event) { - return "post"; - } - - public Actor buildActor(Interaction interaction) { - Actor actor = new Actor(); - org.apache.streams.datasift.interaction.Author author = interaction.getAuthor(); - if(author == null) { - LOGGER.warn("Interaction does not contain author information."); - return actor; - } - String userName = author.getUsername(); - String name = author.getName(); - Long id = author.getId(); - if(userName != null) { - actor.setDisplayName(userName); - } else { - actor.setDisplayName(name); - } - - if(id != null) { - actor.setId(id.toString()); - } else { - if(userName != null) - actor.setId(userName); - else - actor.setId(name); - } - Image image = new Image(); - image.setUrl(interaction.getAuthor().getAvatar()); - actor.setImage(image); - if (interaction.getAuthor().getLink()!=null){ - actor.setUrl(interaction.getAuthor().getLink()); - } - return actor; - } - - public static ActivityObject buildActivityObject(Interaction interaction) { - ActivityObject actObj = new ActivityObject(); - actObj.setObjectType(interaction.getContenttype()); - actObj.setUrl(interaction.getLink()); - actObj.setId(formatId("post",interaction.getId())); - actObj.setContent(interaction.getContent()); - - return actObj; - } - - public static List<String> getLinks(Datasift event) { - List<String> result = Lists.newArrayList(); - Links links = event.getLinks(); - if(links == null) - return null; - for(Object link : links.getNormalizedUrl()) { - if(link != null) { - if(link instanceof String) { - result.add((String) link); - } else { - LOGGER.warn("link is not of type String : {}", link.getClass().getName()); - } - } - } - return result; - } - - public static ActivityObject buildTarget(Interaction interaction) { - return null; - } - - public static void addLocationExtension(Activity activity, Interaction interaction) { - Map<String, Object> extensions = ensureExtensions(activity); - Map<String, Object> location = new HashMap<String, Object>(); - Map<String, Double> coordinates = new HashMap<String, Double>(); - coordinates.put("latitude", interaction.getGeo().getLatitude()); - coordinates.put("longitude", interaction.getGeo().getLongitude()); - location.put("coordinates", coordinates); - extensions.put("location", location); - } - - public static String firstStringIfNotNull(List<Object> list) { - if( list != null && list.size() > 0) { - return (String) list.get(0); - } else return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java index cb44df2..d121d65 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftInstagramActivitySerializer.java @@ -39,10 +39,16 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; /** * */ -public class DatasiftInstagramActivitySerializer extends DatasiftDefaultActivitySerializer { +public class DatasiftInstagramActivitySerializer extends DatasiftInteractionActivitySerializer { private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftInstagramActivitySerializer.class); + private static DatasiftInstagramActivitySerializer instance = new DatasiftInstagramActivitySerializer(); + + public static DatasiftInstagramActivitySerializer getInstance() { + return instance; + } + @Override public Activity convert(Datasift event) { Activity activity = super.convert(event); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java index 3c7abda..6fd19e7 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/serializer/DatasiftTweetActivitySerializer.java @@ -46,10 +46,16 @@ import static org.apache.streams.data.util.ActivityUtil.ensureExtensions; /** * */ -public class DatasiftTweetActivitySerializer extends DatasiftDefaultActivitySerializer { +public class DatasiftTweetActivitySerializer extends DatasiftInteractionActivitySerializer { private static final Logger LOGGER = LoggerFactory.getLogger(DatasiftTweetActivitySerializer.class); + private static DatasiftTweetActivitySerializer instance = new DatasiftTweetActivitySerializer(); + + public static DatasiftTweetActivitySerializer getInstance() { + return instance; + } + @Override public Activity convert(Datasift event) { Activity activity = new Activity(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java index c5f2abf..93ab28b 100644 --- a/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java +++ b/streams-contrib/streams-provider-datasift/src/main/java/org/apache/streams/datasift/util/StreamsDatasiftMapper.java @@ -33,16 +33,21 @@ import java.io.IOException; /** * Created by sblackmon on 3/27/14. + * + * Depracated: Use StreamsJacksonMapper instead */ +@Deprecated() public class StreamsDatasiftMapper extends StreamsJacksonMapper { - public static final DateTimeFormatter DATASIFT_FORMAT = DateTimeFormat.forPattern("EEE, dd MMM yyyy HH:mm:ss Z"); + public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z"; + + public static final DateTimeFormatter DATASIFT_FORMATTER = DateTimeFormat.forPattern(DATASIFT_FORMAT); public static final Long getMillis(String dateTime) { // this function is for pig which doesn't handle exceptions well try { - Long result = DATASIFT_FORMAT.parseMillis(dateTime); + Long result = DATASIFT_FORMATTER.parseMillis(dateTime); return result; } catch( Exception e ) { return null; @@ -66,7 +71,7 @@ public class StreamsDatasiftMapper extends StreamsJacksonMapper { public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException { DateTime result = null; try { - result = DATASIFT_FORMAT.parseDateTime(jpar.getValueAsString()); + result = DATASIFT_FORMATTER.parseDateTime(jpar.getValueAsString()); } catch (Exception e) {} if (result == null) { try { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java index c4422db..750915e 100644 --- a/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java +++ b/streams-contrib/streams-provider-datasift/src/test/java17/com/datasift/test/DatasiftSerDeTest.java @@ -20,7 +20,9 @@ package com.datasift.test; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import org.apache.streams.datasift.Datasift; +import org.apache.streams.datasift.util.StreamsDatasiftMapper; import org.apache.streams.jackson.StreamsJacksonMapper; import org.junit.Assert; import org.junit.Ignore; @@ -31,6 +33,8 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; /** * @@ -39,18 +43,11 @@ public class DatasiftSerDeTest { private final static Logger LOGGER = LoggerFactory.getLogger(DatasiftSerDeTest.class); - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT)); - - - - @Test @Ignore + @Test public void Tests() { - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE); - mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE); - InputStream is = DatasiftSerDeTest.class.getResourceAsStream("/part-r-00000.json"); InputStreamReader isr = new InputStreamReader(is); BufferedReader br = new BufferedReader(isr); @@ -60,6 +57,7 @@ public class DatasiftSerDeTest { String line = br.readLine(); LOGGER.debug(line); System.out.println(line); + Datasift ser = mapper.readValue(line, Datasift.class); String de = mapper.writeValueAsString(ser); @@ -68,7 +66,7 @@ public class DatasiftSerDeTest { Datasift serde = mapper.readValue(de, Datasift.class); -// Assert.assertEquals(ser, serde); + Assert.assertEquals(ser, serde); LOGGER.debug(mapper.writeValueAsString(serde)); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java deleted file mode 100644 index 015f4e9..0000000 --- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/provider/DatasiftTypeConverterProcessorTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.datasift.provider; - -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.datasift.processor.DatasiftTypeConverterProcessor; -import org.apache.streams.pojo.json.Activity; -import org.junit.Test; - -import java.util.List; - -import static junit.framework.Assert.*; - -/** - * - */ -public class DatasiftTypeConverterProcessorTest { - - private static final String DATASIFT_JSON = "{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT @AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\ "confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 14:28:06 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu ...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"tit le\":[\"Ed Sheeran - SING [Official Video] - YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, pero mientras estemos aqui debemos BAILAR!!! #ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":709 31384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed, 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n en Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_ https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon, 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}"; - - @Test - public void testTypeConverterToString() { - final String ID = "1"; - StreamsProcessor processor = new DatasiftTypeConverterProcessor(String.class); - processor.prepare(null); - StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); - List<StreamsDatum> result = processor.process(datum); - assertNotNull(result); - assertEquals(1, result.size()); - StreamsDatum resultDatum = result.get(0); - assertNotNull(resultDatum); - assertNotNull(resultDatum.getDocument()); - assertTrue(resultDatum.getDocument() instanceof String); - assertEquals(ID, resultDatum.getId()); - } - - @Test - public void testTypeConverterToActivity() { - final String ID = "1"; - StreamsProcessor processor = new DatasiftTypeConverterProcessor(Activity.class); - processor.prepare(null); - StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); - List<StreamsDatum> result = processor.process(datum); - assertNotNull(result); - assertEquals(1, result.size()); - StreamsDatum resultDatum = result.get(0); - assertNotNull(resultDatum); - assertNotNull(resultDatum.getDocument()); - assertTrue(resultDatum.getDocument() instanceof Activity); - assertEquals(ID, resultDatum.getId()); - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java index 5f9feed..162526b 100644 --- a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java +++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftActivitySerializerTest.java @@ -2,8 +2,10 @@ package org.apache.streams.datasift.serializer; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.apache.streams.datasift.util.StreamsDatasiftMapper; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.Actor; import org.junit.Test; @@ -16,7 +18,8 @@ import static org.junit.Assert.assertNotNull; public class DatasiftActivitySerializerTest { private static final DatasiftActivitySerializer SERIALIZER = new DatasiftActivitySerializer(); - private static final ObjectMapper MAPPER = StreamsDatasiftMapper.getInstance(); + + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT)); @Test public void testGeneralConversion() throws Exception { @@ -42,6 +45,7 @@ public class DatasiftActivitySerializerTest { testGeneralConversion(line); testDeserNoNull(line); testDeserNoAddProps(line); + System.out.println("ORIGINAL -> "+line); System.out.println("ACTIVITY -> "+MAPPER.writeValueAsString(SERIALIZER.deserialize(line))); System.out.println("NODE -> "+MAPPER.convertValue(SERIALIZER.deserialize(line), JsonNode.class)); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java new file mode 100644 index 0000000..2004654 --- /dev/null +++ b/streams-contrib/streams-provider-datasift/src/test/java17/org/apache/streams/datasift/serializer/DatasiftEventClassifierTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.datasift.serializer; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.datasift.Datasift; +import org.apache.streams.datasift.instagram.Instagram; +import org.apache.streams.datasift.twitter.Twitter; +import org.apache.streams.datasift.util.StreamsDatasiftMapper; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.twitter.pojo.Delete; +import org.apache.streams.twitter.pojo.Retweet; +import org.apache.streams.twitter.pojo.Tweet; +import org.apache.streams.twitter.pojo.User; +import org.apache.streams.twitter.provider.TwitterEventClassifier; +import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Scanner; + +/** + * Created by sblackmon on 12/13/13. + */ +public class DatasiftEventClassifierTest { + + private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsDatasiftMapper.DATASIFT_FORMAT)); + + @Test + public void testTwitterDetection() throws Exception { + Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/twitter_datasift_json.txt")); + String line = null; + while(scanner.hasNextLine()) { + line = scanner.nextLine(); + Datasift datasift = MAPPER.readValue(line, Datasift.class); + assert(DatasiftEventClassifier.detectClass(datasift) == Twitter.class); + assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftTweetActivitySerializer); + } + } + + @Test + public void testInstagramDetection() throws Exception { + Scanner scanner = new Scanner(DatasiftActivitySerializerTest.class.getResourceAsStream("/instagram_datasift_json.txt")); + String line = null; + while(scanner.hasNextLine()) { + line = scanner.nextLine(); + Datasift datasift = MAPPER.readValue(line, Datasift.class); + assert(DatasiftEventClassifier.detectClass(datasift) == Instagram.class); + assert(DatasiftEventClassifier.bestSerializer(datasift) instanceof DatasiftInstagramActivitySerializer); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml index edf4959..9c99a92 100644 --- a/streams-contrib/streams-provider-twitter/pom.xml +++ b/streams-contrib/streams-provider-twitter/pom.xml @@ -50,6 +50,12 @@ </dependency> <dependency> <groupId>org.apache.streams</groupId> + <artifactId>streams-processor-jackson</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> <artifactId>streams-util</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java deleted file mode 100644 index fb4615f..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterEventProcessor.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.processor; - -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.commons.lang3.StringUtils; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.twitter.pojo.Delete; -import org.apache.streams.twitter.pojo.Retweet; -import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.provider.TwitterEventClassifier; -import org.apache.streams.twitter.serializer.*; -import org.apache.streams.util.ComponentUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; - -/** - * Created by sblackmon on 12/10/13. - */ -public class TwitterEventProcessor implements StreamsProcessor { - - private final static String STREAMS_ID = "TwitterEventProcessor"; - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterEventProcessor.class); - - private ObjectMapper mapper = new StreamsTwitterMapper(); - - private Class inClass; - private Class outClass; - - private TwitterJsonActivitySerializer twitterJsonActivitySerializer; - - public TwitterEventProcessor(Class inClass, Class outClass) { - this.inClass = inClass; - this.outClass = outClass; - } - - public TwitterEventProcessor( Class outClass) { - this(null, outClass); - } - - public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException { - - Object result = null; - - Preconditions.checkNotNull(event); - Preconditions.checkNotNull(mapper); - Preconditions.checkNotNull(twitterJsonActivitySerializer); - - if( outClass.equals( Activity.class )) { - LOGGER.debug("ACTIVITY"); - result = twitterJsonActivitySerializer.deserialize( - mapper.writeValueAsString(event)); - } else if( outClass.equals( Tweet.class )) { - if ( inClass.equals( Tweet.class )) { - LOGGER.debug("TWEET"); - result = mapper.convertValue(event, Tweet.class); - } - } else if( outClass.equals( Retweet.class )) { - if ( inClass.equals( Retweet.class )) { - LOGGER.debug("RETWEET"); - result = mapper.convertValue(event, Retweet.class); - } - } else if( outClass.equals( Delete.class )) { - if ( inClass.equals( Delete.class )) { - LOGGER.debug("DELETE"); - result = mapper.convertValue(event, Delete.class); - } - } else if( outClass.equals( ObjectNode.class )) { - LOGGER.debug("OBJECTNODE"); - result = mapper.convertValue(event, ObjectNode.class); - } - - // no supported conversion were applied - if( result != null ) - return result; - - LOGGER.debug("CONVERT FAILED"); - - return null; - - } - - public boolean validate(Object document, Class klass) { - - // TODO - return true; - } - - public boolean isValidJSON(final String json) { - boolean valid = false; - try { - final JsonParser parser = new ObjectMapper().getJsonFactory() - .createJsonParser(json); - while (parser.nextToken() != null) { - } - valid = true; - } catch (JsonParseException jpe) { - LOGGER.warn("validate: {}", jpe); - } catch (IOException ioe) { - LOGGER.warn("validate: {}", ioe); - } - - return valid; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - - // first check for valid json - ObjectNode node = (ObjectNode) entry.getDocument(); - - LOGGER.debug("{} processing {}", STREAMS_ID, node.getClass()); - - String json = null; - try { - json = mapper.writeValueAsString(node); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - - if( StringUtils.isNotEmpty(json)) { - - // since data is coming from outside provider, we don't know what type the events are - Class inClass = TwitterEventClassifier.detectClass(json); - - // if the target is string, just pass-through - if (java.lang.String.class.equals(outClass)) - return Lists.newArrayList(new StreamsDatum(json)); - else { - // convert to desired format - Object out = null; - try { - out = convert(node, inClass, outClass); - } catch (ActivitySerializerException e) { - LOGGER.warn("Failed deserializing", e); - return Lists.newArrayList(); - } catch (JsonProcessingException e) { - LOGGER.warn("Failed parsing JSON", e); - return Lists.newArrayList(); - } - - if (out != null && validate(out, outClass)) - return Lists.newArrayList(new StreamsDatum(out)); - } - } - - return Lists.newArrayList(); - - } - - @Override - public void prepare(Object configurationObject) { - mapper = new StreamsJacksonMapper(); - twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); - } - - @Override - public void cleanUp() { - - } -}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java deleted file mode 100644 index 674eef1..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterProfileProcessor.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.processor; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.twitter.pojo.Retweet; -import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.pojo.User; -import org.apache.streams.twitter.provider.TwitterEventClassifier; -import org.apache.streams.twitter.serializer.StreamsTwitterMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Queue; -import java.util.Random; - -public class TwitterProfileProcessor implements StreamsProcessor, Runnable { - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterProfileProcessor.class); - - private ObjectMapper mapper = new StreamsTwitterMapper(); - - private Queue<StreamsDatum> inQueue; - private Queue<StreamsDatum> outQueue; - - public final static String TERMINATE = new String("TERMINATE"); - - @Override - public void run() { - - while(true) { - StreamsDatum item; - try { - item = inQueue.poll(); - if(item.getDocument() instanceof String && item.equals(TERMINATE)) { - LOGGER.info("Terminating!"); - break; - } - - Thread.sleep(new Random().nextInt(100)); - - for( StreamsDatum entry : process(item)) { - outQueue.offer(entry); - } - - - } catch (Exception e) { - e.printStackTrace(); - - } - } - } - - public StreamsDatum createStreamsDatum(User user) { - return new StreamsDatum(user, user.getIdStr()); - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - - List<StreamsDatum> result = Lists.newArrayList(); - String item; - try { - // first check for valid json - // since data is coming from outside provider, we don't know what type the events are - if( entry.getDocument() instanceof String) { - item = (String) entry.getDocument(); - } else { - item = mapper.writeValueAsString((ObjectNode)entry.getDocument()); - } - - Class inClass = TwitterEventClassifier.detectClass(item); - - User user; - - if ( inClass.equals( Tweet.class )) { - LOGGER.debug("TWEET"); - Tweet tweet = mapper.readValue(item, Tweet.class); - user = tweet.getUser(); - result.add(createStreamsDatum(user)); - } - else if ( inClass.equals( Retweet.class )) { - LOGGER.debug("RETWEET"); - Retweet retweet = mapper.readValue(item, Retweet.class); - user = retweet.getRetweetedStatus().getUser(); - result.add(createStreamsDatum(user)); - } else if ( inClass.equals( User.class )) { - LOGGER.debug("USER"); - user = mapper.readValue(item, User.class); - result.add(createStreamsDatum(user)); - } else { - return Lists.newArrayList(); - } - - return result; - } catch (Exception e) { - e.printStackTrace(); - LOGGER.warn("Error processing " + entry.toString()); - return Lists.newArrayList(); - } - } - - @Override - public void prepare(Object o) { - - } - - @Override - public void cleanUp() { - - } -}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java deleted file mode 100644 index 74cce27..0000000 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/TwitterTypeConverter.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.twitter.processor; - -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Lists; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.exceptions.ActivitySerializerException; -import org.apache.streams.pojo.json.Activity; -import org.apache.streams.twitter.pojo.Delete; -import org.apache.streams.twitter.pojo.Retweet; -import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.provider.TwitterEventClassifier; -import org.apache.streams.twitter.serializer.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Queue; - -/** - * Created by sblackmon on 12/10/13. - */ -public class TwitterTypeConverter implements StreamsProcessor { - - public final static String STREAMS_ID = "TwitterTypeConverter"; - - private final static Logger LOGGER = LoggerFactory.getLogger(TwitterTypeConverter.class); - - private ObjectMapper mapper; - - private Queue<StreamsDatum> inQueue; - private Queue<StreamsDatum> outQueue; - - private Class inClass; - private Class outClass; - - private TwitterJsonActivitySerializer twitterJsonActivitySerializer; - - private int count = 0; - - public final static String TERMINATE = new String("TERMINATE"); - - public TwitterTypeConverter(Class inClass, Class outClass) { - this.inClass = inClass; - this.outClass = outClass; - } - - public Queue<StreamsDatum> getProcessorOutputQueue() { - return outQueue; - } - - public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) { - inQueue = inputQueue; - } - - public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException { - - Object result = null; - - if( outClass.equals( Activity.class )) { - LOGGER.debug("ACTIVITY"); - result = twitterJsonActivitySerializer.deserialize( - mapper.writeValueAsString(event)); - } else if( outClass.equals( Tweet.class )) { - if ( inClass.equals( Tweet.class )) { - LOGGER.debug("TWEET"); - result = mapper.convertValue(event, Tweet.class); - } - } else if( outClass.equals( Retweet.class )) { - if ( inClass.equals( Retweet.class )) { - LOGGER.debug("RETWEET"); - result = mapper.convertValue(event, Retweet.class); - } - } else if( outClass.equals( Delete.class )) { - if ( inClass.equals( Delete.class )) { - LOGGER.debug("DELETE"); - result = mapper.convertValue(event, Delete.class); - } - } else if( outClass.equals( ObjectNode.class )) { - LOGGER.debug("OBJECTNODE"); - result = mapper.convertValue(event, ObjectNode.class); - } - - // no supported conversion were applied - if( result != null ) { - count ++; - return result; - } - - LOGGER.debug("CONVERT FAILED"); - - return null; - - } - - public boolean validate(Object document, Class klass) { - - // TODO - return true; - } - - public boolean isValidJSON(final String json) { - boolean valid = false; - try { - final JsonParser parser = new ObjectMapper().getJsonFactory() - .createJsonParser(json); - while (parser.nextToken() != null) { - } - valid = true; - } catch (JsonParseException jpe) { - LOGGER.warn("validate: {}", jpe); - } catch (IOException ioe) { - LOGGER.warn("validate: {}", ioe); - } - - return valid; - } - - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - - StreamsDatum result = null; - - try { - - Object item = entry.getDocument(); - ObjectNode node; - - LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass()); - - if( item instanceof String ) { - - // if the target is string, just pass-through - if( String.class.equals(outClass)) { - result = entry; - } - else { - // first check for valid json - node = (ObjectNode)mapper.readTree((String)item); - - // since data is coming from outside provider, we don't know what type the events are - Class inClass = TwitterEventClassifier.detectClass((String) item); - - Object out = convert(node, inClass, outClass); - - if( out != null && validate(out, outClass)) - result = new StreamsDatum(out); - } - - } else if( item instanceof ObjectNode ) { - - // first check for valid json - node = (ObjectNode)mapper.valueToTree(item); - - // since data is coming from outside provider, we don't know what type the events are - Class inClass = TwitterEventClassifier.detectClass(mapper.writeValueAsString(item)); - - Object out = convert(node, inClass, outClass); - - if( out != null && validate(out, outClass)) - result = new StreamsDatum(out); - - } - - } catch (Exception e) { - e.printStackTrace(); - } - - if( result != null ) - return Lists.newArrayList(result); - else - return Lists.newArrayList(); - } - - @Override - public void prepare(Object o) { - mapper = new StreamsTwitterMapper(); - twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); - } - - @Override - public void cleanUp() { - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java index 432a047..2234739 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterEventClassifier.java @@ -18,41 +18,38 @@ package org.apache.streams.twitter.provider; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; +import org.apache.streams.data.ActivitySerializer; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.pojo.*; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; +import org.apache.streams.twitter.serializer.TwitterJsonDeleteActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonRetweetActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonTweetActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonUserActivitySerializer; +import org.apache.streams.twitter.serializer.TwitterJsonUserstreameventActivitySerializer; import java.io.IOException; +import java.io.Serializable; /** * Created by sblackmon on 12/13/13. */ -public class TwitterEventClassifier { +public class TwitterEventClassifier implements Serializable { - public static Class detectClass( String json ) { + private static ObjectMapper mapper = new StreamsJacksonMapper(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT)); + public static Class detectClass( String json ) { Preconditions.checkNotNull(json); Preconditions.checkArgument(StringUtils.isNotEmpty(json)); -// try { -// JsonAssert.with(json).assertNull("$.delete"); -// } catch( AssertionError ae ) { -// return Delete.class; -// } -// -// try { -// JsonAssert.with(json).assertNull("$.retweeted_status"); -// } catch( AssertionError ae ) { -// return Retweet.class; -// } -// -// return Tweet.class; - ObjectNode objectNode; try { - objectNode = (ObjectNode) StreamsTwitterMapper.getInstance().readTree(json); + objectNode = (ObjectNode) mapper.readTree(json); } catch (IOException e) { e.printStackTrace(); return null; @@ -72,4 +69,31 @@ public class TwitterEventClassifier { else return Tweet.class; } + public static ActivitySerializer bestSerializer( String json ) { + + Preconditions.checkNotNull(json); + Preconditions.checkArgument(StringUtils.isNotEmpty(json)); + + ObjectNode objectNode; + try { + objectNode = (ObjectNode) mapper.readTree(json); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + + if( objectNode.findValue("retweeted_status") != null && objectNode.get("retweeted_status") != null) + return TwitterJsonRetweetActivitySerializer.getInstance(); + else if( objectNode.findValue("delete") != null ) + return TwitterJsonDeleteActivitySerializer.getInstance(); +// else if( objectNode.findValue("friends") != null || +// objectNode.findValue("friends_str") != null ) +// return FriendList.class; + else if( objectNode.findValue("target_object") != null ) + return TwitterJsonUserstreameventActivitySerializer.getInstance(); + else if ( objectNode.findValue("location") != null && objectNode.findValue("user") == null) + return TwitterJsonUserActivitySerializer.getInstance(); + else + return TwitterJsonTweetActivitySerializer.getInstance(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java index 2ed16ba..395bd95 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/StreamsTwitterMapper.java @@ -40,16 +40,21 @@ import java.io.IOException; /** * Created by sblackmon on 3/27/14. + * + * Deprecated: Use StreamsJacksonMapper */ +@Deprecated public class StreamsTwitterMapper extends StreamsJacksonMapper { - public static final DateTimeFormatter TWITTER_FORMAT = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy"); + public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy"; + + public static final DateTimeFormatter TWITTER_FORMATTER = DateTimeFormat.forPattern(TWITTER_FORMAT); public static final Long getMillis(String dateTime) { // this function is for pig which doesn't handle exceptions well try { - Long result = TWITTER_FORMAT.parseMillis(dateTime); + Long result = TWITTER_FORMATTER.parseMillis(dateTime); return result; } catch( Exception e ) { return null; @@ -73,7 +78,7 @@ public class StreamsTwitterMapper extends StreamsJacksonMapper { public DateTime deserialize(JsonParser jpar, DeserializationContext context) throws IOException, JsonProcessingException { DateTime result = null; try { - result = TWITTER_FORMAT.parseDateTime(jpar.getValueAsString()); + result = TWITTER_FORMATTER.parseDateTime(jpar.getValueAsString()); } catch( Exception e ) { } try { result = RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java index 5b5be96..d1f0de9 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonActivitySerializer.java @@ -35,10 +35,11 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String> } - TwitterJsonTweetActivitySerializer tweetActivitySerializer = new TwitterJsonTweetActivitySerializer(); - TwitterJsonRetweetActivitySerializer retweetActivitySerializer = new TwitterJsonRetweetActivitySerializer(); - TwitterJsonDeleteActivitySerializer deleteActivitySerializer = new TwitterJsonDeleteActivitySerializer(); - TwitterJsonUserActivitySerializer userActivitySerializer = new TwitterJsonUserActivitySerializer(); + private static TwitterJsonActivitySerializer instance = new TwitterJsonActivitySerializer(); + + public static TwitterJsonActivitySerializer getInstance() { + return instance; + } @Override public String serializationFormat() { @@ -53,18 +54,11 @@ public class TwitterJsonActivitySerializer implements ActivitySerializer<String> @Override public Activity deserialize(String serialized) throws ActivitySerializerException { - Class documentSubType = TwitterEventClassifier.detectClass(serialized); + ActivitySerializer serializer = TwitterEventClassifier.bestSerializer(serialized); + Activity activity = serializer.deserialize(serialized); - Activity activity; - if( documentSubType == Tweet.class ) - activity = tweetActivitySerializer.deserialize(serialized); - else if( documentSubType == Retweet.class ) - activity = retweetActivitySerializer.deserialize(serialized); - else if( documentSubType == Delete.class ) - activity = deleteActivitySerializer.deserialize(serialized); - else if( documentSubType == User.class ) - activity = userActivitySerializer.deserialize(serialized); - else throw new ActivitySerializerException("unrecognized type"); + if( activity == null ) + throw new ActivitySerializerException("unrecognized type"); return activity; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java index cb1618a..b368f71 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonDeleteActivitySerializer.java @@ -46,6 +46,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*; */ public class TwitterJsonDeleteActivitySerializer implements ActivitySerializer<String>, Serializable { + private static TwitterJsonDeleteActivitySerializer instance = new TwitterJsonDeleteActivitySerializer(); + + public static TwitterJsonDeleteActivitySerializer getInstance() { + return instance; + } + @Override public String serializationFormat() { return null; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java index 4f141bb..58cb769 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonRetweetActivitySerializer.java @@ -37,6 +37,12 @@ public class TwitterJsonRetweetActivitySerializer implements ActivitySerializer< } + private static TwitterJsonRetweetActivitySerializer instance = new TwitterJsonRetweetActivitySerializer(); + + public static TwitterJsonRetweetActivitySerializer getInstance() { + return instance; + } + @Override public String serializationFormat() { return null; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java index 8e58326..e6fc05f 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonTweetActivitySerializer.java @@ -34,6 +34,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*; public class TwitterJsonTweetActivitySerializer implements ActivitySerializer<String>, Serializable { + private static TwitterJsonTweetActivitySerializer instance = new TwitterJsonTweetActivitySerializer(); + + public static TwitterJsonTweetActivitySerializer getInstance() { + return instance; + } + @Override public String serializationFormat() { return null; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java index 2ae5355..1bf935c 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserActivitySerializer.java @@ -36,6 +36,12 @@ public class TwitterJsonUserActivitySerializer implements ActivitySerializer<Str public TwitterJsonUserActivitySerializer() {} + private static TwitterJsonUserActivitySerializer instance = new TwitterJsonUserActivitySerializer(); + + public static TwitterJsonUserActivitySerializer getInstance() { + return instance; + } + @Override public String serializationFormat() { return null; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java index edaec01..e2832dd 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/serializer/TwitterJsonUserstreameventActivitySerializer.java @@ -44,6 +44,12 @@ import static org.apache.streams.twitter.serializer.util.TwitterActivityUtil.*; */ public class TwitterJsonUserstreameventActivitySerializer implements ActivitySerializer<String> { + private static TwitterJsonUserstreameventActivitySerializer instance = new TwitterJsonUserstreameventActivitySerializer(); + + public static TwitterJsonUserstreameventActivitySerializer getInstance() { + return instance; + } + @Override public String serializationFormat() { return null; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java index 2d18db9..6b62fe3 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/SimpleTweetTest.java @@ -21,13 +21,15 @@ package org.apache.streams.twitter.test; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.exceptions.ActivitySerializerException; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.jackson.TypeConverterProcessor; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; -import org.apache.streams.twitter.processor.TwitterTypeConverter; import org.apache.streams.twitter.serializer.StreamsTwitterMapper; import org.apache.streams.twitter.serializer.TwitterJsonActivitySerializer; import org.junit.Assert; @@ -55,14 +57,13 @@ import static org.junit.Assert.assertThat; public class SimpleTweetTest { private final static Logger LOGGER = LoggerFactory.getLogger(SimpleTweetTest.class); - private ObjectMapper mapper = StreamsTwitterMapper.getInstance(); + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT)); private static final String TWITTER_JSON= "{\"created_at\":\"Wed Dec 11 22:27:34 +0000 2013\",\"id\":410898682356047872,\"id_str\":\"410898682356047872\",\"text\":\"RT @ughhblog: RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"https:\\/\\/about.twitter.com\\/products\\/tweetdeck\\\" rel=\\\"nofollow\\\"\\u003eTweetDeck\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":70463906,\"id_str\":\"70463906\",\"name\":\"MHM DESIGNS, LLC\",\"screen_name\":\"MHMDESIGNS\",\"location\":\"Los Angeles New York\",\"url\":\"http:\\/\\/www.mhmdesigns.com\",\"description\":\"Multi Media Made Simple- Web desig, Graphic Design, Internet Marketing, Photography, Video Production and much much more.\",\"protected\":false,\"followers_count\":10,\"friends_coun t\":64,\"listed_count\":1,\"created_at\":\"Mon Aug 31 18:31:54 +0000 2009\",\"favourites_count\":0,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":87,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"9AE4E8\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/33456434\\/body.png\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_image_url_https\":\"https:\\/\\/pbs.twimg.com\\/profile_images\\/391494416\\/mhm_design_logo__normal.png\",\"profile_link_color\":\"0084B4\",\"profile_sidebar_border_color\":\"BDDCAD\",\"profile_sidebar_fill_color\":\"DDFFCC\",\"profile_text_color\":\"333333\",\"profile_us e_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweeted_status\":{\"created_at\":\"Wed Dec 11 10:56:49 +0000 2013\",\"id\":410724848306892800,\"id_str\":\"410724848306892800\",\"text\":\"RRome (Brooklyn, NY) \\u2013 MY GIRL http:\\/\\/t.co\\/x6uxX9PLsH via @indierapblog @RRoseRRome\",\"source\":\"\\u003ca href=\\\"http:\\/\\/twitter.com\\/tweetbutton\\\" rel=\\\"nofollow\\\"\\u003eTweet Button\\u003c\\/a\\u003e\",\"truncated\":false,\"in_reply_to_status_id\":null,\"in_reply_to_status_id_str\":null,\"in_reply_to_user_id\":null,\"in_reply_to_user_id_str\":null,\"in_reply_to_screen_name\":null,\"user\":{\"id\":538836510,\"id_str\":\"538836510\",\"name\":\"UGHHBlog\",\"screen_name\":\"ughhblog\",\"location\":\"Los Angeles\",\"url\":\"http:\\/\\/www.undergroundhiphopblog.com\",\"description\":\"http:\\/\\/UN DERGROUNDHIPHOPBLOG.com: A top Indie\\/Underground Hip Hop community blog. Submission Email: [email protected] \\/\\/\\/ Official Host: @pawz1\",\"protected\":false,\"followers_count\":2598,\"friends_count\":373,\"listed_count\":25,\"created_at\":\"Wed Mar 28 05:40:49 +0000 2012\",\"favourites_count\":423,\"utc_offset\":-28800,\"time_zone\":\"Pacific Time (US & Canada)\",\"geo_enabled\":false,\"verified\":false,\"statuses_count\":9623,\"lang\":\"en\",\"contributors_enabled\":false,\"is_translator\":false,\"profile_background_color\":\"131516\",\"profile_background_image_url\":\"http:\\/\\/a0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_image_url_https\":\"https:\\/\\/si0.twimg.com\\/profile_background_images\\/544717772\\/UGHHBlogLogo.jpg\",\"profile_background_tile\":false,\"profile_image_url\":\"http:\\/\\/pbs.twimg.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_image_url_https\":\"https:\\/\\/pbs.tw img.com\\/profile_images\\/2583702975\\/uas8528qzzdlnsb7igzn_normal.jpeg\",\"profile_link_color\":\"009999\",\"profile_sidebar_border_color\":\"EEEEEE\",\"profile_sidebar_fill_color\":\"EFEFEF\",\"profile_text_color\":\"333333\",\"profile_use_background_image\":true,\"default_profile\":false,\"default_profile_image\":false,\"following\":null,\"follow_request_sent\":null,\"notifications\":null},\"geo\":null,\"coordinates\":null,\"place\":null,\"contributors\":null,\"retweet_count\":4,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[31,53]}],\"user_mentions\":[{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[58,71]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76 371478\",\"indices\":[72,83]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"lang\":\"en\"},\"retweet_count\":0,\"favorite_count\":0,\"entities\":{\"hashtags\":[],\"symbols\":[],\"urls\":[{\"url\":\"http:\\/\\/t.co\\/x6uxX9PLsH\",\"expanded_url\":\"http:\\/\\/indierapblog.com\\/rrome-brooklyn-ny-my-girl\\/\",\"display_url\":\"indierapblog.com\\/rrome-brooklyn\\u2026\",\"indices\":[45,67]}],\"user_mentions\":[{\"screen_name\":\"ughhblog\",\"name\":\"UGHHBlog\",\"id\":538836510,\"id_str\":\"538836510\",\"indices\":[3,12]},{\"screen_name\":\"IndieRapBlog\",\"name\":\"IndieRapBlog.com\",\"id\":922776728,\"id_str\":\"922776728\",\"indices\":[72,85]},{\"screen_name\":\"RRoseRRome\",\"name\":\"RRome\",\"id\":76371478,\"id_str\":\"76371478\",\"indices\":[86,97]}]},\"favorited\":false,\"retweeted\":false,\"possibly_sensitive\":false,\"filter_level\":\"medium\",\"lang\":\"en\"}"; private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); - - // @Ignore @Test public void Tests() { @@ -100,7 +101,7 @@ public class SimpleTweetTest { } try { - TwitterTypeConverter converter = new TwitterTypeConverter(String.class, Activity.class); + TypeConverterProcessor converter = new TypeConverterProcessor(String.class, Activity.class); converter.prepare(null); converter.process(new StreamsDatum(TWITTER_JSON)); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java index c7f6434..d6af4d9 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetActivitySerDeTest.java @@ -21,7 +21,9 @@ package org.apache.streams.twitter.test; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.Retweet; import org.apache.streams.twitter.pojo.Tweet; @@ -52,11 +54,11 @@ import static org.junit.Assert.assertThat; public class TweetActivitySerDeTest { private final static Logger LOGGER = LoggerFactory.getLogger(TweetActivitySerDeTest.class); - private ObjectMapper mapper = StreamsTwitterMapper.getInstance(); + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT)); private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); - // @Ignore @Test public void Tests() { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f6a6574/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java index d0a6714..eba5fd0 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/TweetSerDeTest.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; +import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; import org.apache.streams.twitter.pojo.Delete; import org.apache.streams.twitter.pojo.Retweet; @@ -54,11 +56,11 @@ import static org.junit.Assert.assertThat; public class TweetSerDeTest { private final static Logger LOGGER = LoggerFactory.getLogger(TweetSerDeTest.class); - private ObjectMapper mapper = StreamsTwitterMapper.getInstance(); + + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(Lists.newArrayList(StreamsTwitterMapper.TWITTER_FORMAT)); private TwitterJsonActivitySerializer twitterJsonActivitySerializer = new TwitterJsonActivitySerializer(); - // @Ignore @Test public void Tests() {
