Repository: incubator-streams Updated Branches: refs/heads/master c0bdfeb48 -> acc7c84f0
resolves STREAMS-212 Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b52e7653 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b52e7653 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b52e7653 Branch: refs/heads/master Commit: b52e7653a720e41bd0f9b714556d7aeae3e84c7d Parents: e9adcac Author: sblackmon <[email protected]> Authored: Mon Dec 1 16:25:45 2014 -0600 Committer: sblackmon <[email protected]> Committed: Mon Dec 1 16:25:45 2014 -0600 ---------------------------------------------------------------------- pom.xml | 22 ++- streams-components/pom.xml | 1 + streams-components/streams-converters/pom.xml | 152 ++++++++++++++++ .../converter/TypeConverterProcessor.java | 91 ++++++++++ .../streams/converter/TypeConverterUtil.java | 51 ++++++ .../test/TypeConverterProcessorTest.java | 172 +++++++++++++++++++ 6 files changed, 487 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 351ec68..0969d38 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ <joda-time.version>2.2</joda-time.version> <rave.version>0.22</rave.version> <datastax.version>1.0.3</datastax.version> - <jsonschema2pojo.version>0.4.5</jsonschema2pojo.version> + <jsonschema2pojo.version>0.4.6</jsonschema2pojo.version> <jaxb2.version>0.8.3</jaxb2.version> <jaxb2-basics.version>0.8.4</jaxb2-basics.version> <jaxbutil.version>1.2.6</jaxbutil.version> @@ -81,6 +81,7 @@ <commons-io.version>2.4</commons-io.version> <commons-lang3.version>3.1</commons-lang3.version> <typesafe.config.version>1.2.0</typesafe.config.version> + <reflections.version>0.9.9</reflections.version> <orgjson.version>20140107</orgjson.version> <guava.version>17.0</guava.version> <scala.version>2.8.0</scala.version> @@ -220,6 +221,19 @@ </excludes> </configuration> </plugin> + <plugin> + <groupId>org.reflections</groupId> + <artifactId>reflections-maven</artifactId> + <version>${reflections.version}-RC2</version> + <executions> + <execution> + <goals> + <goal>reflections</goal> + </goals> + <phase>process-classes</phase> + </execution> + </executions> + </plugin> </plugins> </pluginManagement> </build> @@ -260,7 +274,11 @@ <artifactId>config</artifactId> <version>${typesafe.config.version}</version> </dependency> - + <dependency> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + <version>${reflections.version}</version> + </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/streams-components/pom.xml ---------------------------------------------------------------------- diff --git a/streams-components/pom.xml b/streams-components/pom.xml index 9942e14..138eeef 100644 --- a/streams-components/pom.xml +++ b/streams-components/pom.xml @@ -37,6 +37,7 @@ </properties> <modules> + <module>streams-converters</module> <module>streams-http</module> </modules> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/streams-components/streams-converters/pom.xml ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/pom.xml b/streams-components/streams-converters/pom.xml new file mode 100644 index 0000000..58f0c98 --- /dev/null +++ b/streams-components/streams-converters/pom.xml @@ -0,0 +1,152 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <artifactId>streams-converters</artifactId> + <version>0.1-SNAPSHOT</version> + + <parent> + <groupId>org.apache.streams</groupId> + <artifactId>streams-components</artifactId> + <version>0.1-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </dependency> + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path</artifactId> + </dependency> + <dependency> + <groupId>com.jayway.jsonpath</groupId> + <artifactId>json-path-assert</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + <plugins> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.8</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jsonschema2pojo</source> + </sources> + </configuration> + </execution> + <execution> + <id>add-source-jaxb2</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jaxb2</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <configuration> + <addCompileSourceRoot>true</addCompileSourceRoot> + <generateBuilders>true</generateBuilders> + <sourcePaths> + <sourcePath>src/main/jsonschema</sourcePath> + </sourcePaths> + <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> + <targetPackage>org.apache.streams.converter</targetPackage> + <useLongIntegers>true</useLongIntegers> + <useJodaDates>true</useJodaDates> + <includeJsr303Annotations>true</includeJsr303Annotations> + </configuration> + <executions> + <execution> + <goals> + <goal>generate</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.reflections</groupId> + <artifactId>reflections-maven</artifactId> + <executions> + <execution> + <goals> + <goal>reflections</goal> + </goals> + <phase>process-classes</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java new file mode 100644 index 0000000..9c8b2bd --- /dev/null +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java @@ -0,0 +1,91 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.streams.converter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.List; + +/** + * TypeConverterProcessor converts between String json and jackson-compatible POJO objects. + * + * Activity is one supported jackson-compatible POJO, so JSON String and objects with structual similarities + * to Activity can be converted to Activity objects. + * + * However, conversion to Activity should probably use {@link org.apache.streams.converter.ActivityConverterProcessor} + * + */ +public class TypeConverterProcessor implements StreamsProcessor, Serializable { + + private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterProcessor.class); + + private List<String> formats = Lists.newArrayList(); + + protected ObjectMapper mapper; + + protected Class outClass; + + public TypeConverterProcessor(Class outClass) { + this.outClass = outClass; + } + + public TypeConverterProcessor(Class outClass, List<String> formats) { + this(outClass); + this.formats = formats; + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + + List<StreamsDatum> result = Lists.newLinkedList(); + Object inDoc = entry.getDocument(); + + Object outDoc = TypeConverterUtil.convert(inDoc, outClass, mapper); + + if( outDoc != null ) { + entry.setDocument(outDoc); + result.add(entry); + } + + return result; + } + + @Override + public void prepare(Object configurationObject) { + if( formats.size() > 0 ) + this.mapper = StreamsJacksonMapper.getInstance(formats); + else + this.mapper = StreamsJacksonMapper.getInstance(); + } + + @Override + public void cleanUp() { + this.mapper = null; + } + +}; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java new file mode 100644 index 0000000..2ff463b --- /dev/null +++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java @@ -0,0 +1,51 @@ +package org.apache.streams.converter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * TypeConverterUtil supports TypeConverterProcessor in converting between String json and + * jackson-compatible POJO objects + */ +public class TypeConverterUtil { + + private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class); + + public static Object convert(Object object, Class outClass) { + return TypeConverterUtil.convert(object, outClass, StreamsJacksonMapper.getInstance()); + } + + public static Object convert(Object object, Class outClass, ObjectMapper mapper) { + ObjectNode node = null; + Object outDoc = null; + if( object instanceof String ) { + try { + node = mapper.readValue((String)object, ObjectNode.class); + } catch (IOException e) { + e.printStackTrace(); + } + } else { + node = mapper.convertValue(object, ObjectNode.class); + } + + if(node != null) { + try { + if( outClass == String.class ) + outDoc = mapper.writeValueAsString(node); + else + outDoc = mapper.convertValue(node, outClass); + + } catch (Throwable e) { + LOGGER.warn(e.getMessage()); + LOGGER.warn(node.toString()); + } + } + + return outDoc; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TypeConverterProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TypeConverterProcessorTest.java b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TypeConverterProcessorTest.java new file mode 100644 index 0000000..aa7df36 --- /dev/null +++ b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TypeConverterProcessorTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.converter.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.converter.TypeConverterProcessor; +import org.apache.streams.data.util.ActivityUtil; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.pojo.json.Activity; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static junit.framework.Assert.*; + +/** + * Test for + * @see {@link org.apache.streams.converter.TypeConverterProcessor} + */ +public class TypeConverterProcessorTest { + + private static final String DATASIFT_JSON = "{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT @AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\ "confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 14:28:06 +0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu ...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed Sheeran's track SING Get this track on iTunes: http://smarturl.it/EdSing Pre-order 'x' on iTunes and get 'One' instantly: http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"tit le\":[\"Ed Sheeran - SING [Official Video] - YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell: Loved working with @edsheeran on Sing. He's a genius. https://t.co/wB2qKyJMRw\\\" @ViiOLeee look at this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, pero mientras estemos aqui debemos BAILAR!!! #ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":709 31384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed, 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 +0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n en Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_ https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon, 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a href=\\\"http://twitter.com/download/android\\\" rel=\\\"nofollow\\\">Twitter for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}"; + + private static final String ACTIVITY_JSON = "{\"id\":\"id\",\"published\":\"Tue Jan 17 21:21:46 Z 2012\",\"verb\":\"post\",\"provider\":{\"id\":\"providerid\"}}"; + + public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z"; + + @Test + public void testTypeConverterStringToString() { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(String.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof String); + assertEquals(ID, resultDatum.getId()); + } + + @Test + public void testTypeConverterStringToObjectNode() { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof ObjectNode); + assertEquals(ID, resultDatum.getId()); + } + + @Test + public void testTypeConverterObjectNodeToString() throws IOException { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(String.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(DATASIFT_FORMAT); + ObjectNode node = mapper.readValue(DATASIFT_JSON, ObjectNode.class); + StreamsDatum datum = new StreamsDatum(node, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof String); + assertEquals(ID, resultDatum.getId()); + } + + @Test + public void testTypeConverterStringToActivity() { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(Activity.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + StreamsDatum datum = new StreamsDatum(ACTIVITY_JSON, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof Activity); + assertTrue(ActivityUtil.isValid((Activity)resultDatum.getDocument())); + assertEquals(ID, resultDatum.getId()); + } + + @Test + public void testTypeConverterObjectNodeToActivity() throws IOException { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(Activity.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(DATASIFT_FORMAT); + ObjectNode node = mapper.readValue(ACTIVITY_JSON, ObjectNode.class); + StreamsDatum datum = new StreamsDatum(node, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof Activity); + assertTrue(ActivityUtil.isValid((Activity)resultDatum.getDocument())); + assertEquals(ID, resultDatum.getId()); + } + + @Test + public void testTypeConverterActivityToString() throws IOException { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(String.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(DATASIFT_FORMAT); + Activity node = mapper.readValue(ACTIVITY_JSON, Activity.class); + StreamsDatum datum = new StreamsDatum(node, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof String); + assertEquals(ID, resultDatum.getId()); + } + + @Test + public void testTypeConverterActivityToObjectNode() throws IOException { + final String ID = "1"; + StreamsProcessor processor = new TypeConverterProcessor(ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT)); + processor.prepare(null); + ObjectMapper mapper = StreamsJacksonMapper.getInstance(DATASIFT_FORMAT); + Activity node = mapper.readValue(ACTIVITY_JSON, Activity.class); + StreamsDatum datum = new StreamsDatum(node, ID); + List<StreamsDatum> result = processor.process(datum); + assertNotNull(result); + assertEquals(1, result.size()); + StreamsDatum resultDatum = result.get(0); + assertNotNull(resultDatum); + assertNotNull(resultDatum.getDocument()); + assertTrue(resultDatum.getDocument() instanceof ObjectNode); + assertEquals(ID, resultDatum.getId()); + } + + +}
