Repository: incubator-streams
Updated Branches:
  refs/heads/master c0bdfeb48 -> acc7c84f0


resolves STREAMS-212


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b52e7653
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b52e7653
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b52e7653

Branch: refs/heads/master
Commit: b52e7653a720e41bd0f9b714556d7aeae3e84c7d
Parents: e9adcac
Author: sblackmon <[email protected]>
Authored: Mon Dec 1 16:25:45 2014 -0600
Committer: sblackmon <[email protected]>
Committed: Mon Dec 1 16:25:45 2014 -0600

----------------------------------------------------------------------
 pom.xml                                         |  22 ++-
 streams-components/pom.xml                      |   1 +
 streams-components/streams-converters/pom.xml   | 152 ++++++++++++++++
 .../converter/TypeConverterProcessor.java       |  91 ++++++++++
 .../streams/converter/TypeConverterUtil.java    |  51 ++++++
 .../test/TypeConverterProcessorTest.java        | 172 +++++++++++++++++++
 6 files changed, 487 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 351ec68..0969d38 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,7 @@
         <joda-time.version>2.2</joda-time.version>
         <rave.version>0.22</rave.version>
         <datastax.version>1.0.3</datastax.version>
-        <jsonschema2pojo.version>0.4.5</jsonschema2pojo.version>
+        <jsonschema2pojo.version>0.4.6</jsonschema2pojo.version>
         <jaxb2.version>0.8.3</jaxb2.version>
         <jaxb2-basics.version>0.8.4</jaxb2-basics.version>
         <jaxbutil.version>1.2.6</jaxbutil.version>
@@ -81,6 +81,7 @@
         <commons-io.version>2.4</commons-io.version>
         <commons-lang3.version>3.1</commons-lang3.version>
         <typesafe.config.version>1.2.0</typesafe.config.version>
+        <reflections.version>0.9.9</reflections.version>
         <orgjson.version>20140107</orgjson.version>
         <guava.version>17.0</guava.version>
         <scala.version>2.8.0</scala.version>
@@ -220,6 +221,19 @@
                         </excludes>
                     </configuration>
                 </plugin>
+                <plugin>
+                    <groupId>org.reflections</groupId>
+                    <artifactId>reflections-maven</artifactId>
+                    <version>${reflections.version}-RC2</version>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>reflections</goal>
+                            </goals>
+                            <phase>process-classes</phase>
+                        </execution>
+                    </executions>
+                </plugin>
             </plugins>
         </pluginManagement>
     </build>
@@ -260,7 +274,11 @@
                 <artifactId>config</artifactId>
                 <version>${typesafe.config.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>org.reflections</groupId>
+                <artifactId>reflections</artifactId>
+                <version>${reflections.version}</version>
+            </dependency>
             <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/streams-components/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/pom.xml b/streams-components/pom.xml
index 9942e14..138eeef 100644
--- a/streams-components/pom.xml
+++ b/streams-components/pom.xml
@@ -37,6 +37,7 @@
     </properties>
 
     <modules>
+        <module>streams-converters</module>
         <module>streams-http</module>
     </modules>
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/streams-components/streams-converters/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/pom.xml 
b/streams-components/streams-converters/pom.xml
new file mode 100644
index 0000000..58f0c98
--- /dev/null
+++ b/streams-components/streams-converters/pom.xml
@@ -0,0 +1,152 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>streams-converters</artifactId>
+    <version>0.1-SNAPSHOT</version>
+
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-components</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path-assert</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                
<source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>add-source-jaxb2</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jaxb2</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema</sourcePath>
+                    </sourcePaths>
+                    
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.converter</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                    <includeJsr303Annotations>true</includeJsr303Annotations>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.reflections</groupId>
+                <artifactId>reflections-maven</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>reflections</goal>
+                        </goals>
+                        <phase>process-classes</phase>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
new file mode 100644
index 0000000..9c8b2bd
--- /dev/null
+++ 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterProcessor.java
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.streams.converter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * TypeConverterProcessor converts between String json and jackson-compatible 
POJO objects.
+ *
+ * Activity is one supported jackson-compatible POJO, so JSON String and 
objects with structual similarities
+ *   to Activity can be converted to Activity objects.
+ *
+ * However, conversion to Activity should probably use {@link 
org.apache.streams.converter.ActivityConverterProcessor}
+ *
+ */
+public class TypeConverterProcessor implements StreamsProcessor, Serializable {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(TypeConverterProcessor.class);
+
+    private List<String> formats = Lists.newArrayList();
+
+    protected ObjectMapper mapper;
+
+    protected Class outClass;
+
+    public TypeConverterProcessor(Class outClass) {
+        this.outClass = outClass;
+    }
+
+    public TypeConverterProcessor(Class outClass, List<String> formats) {
+        this(outClass);
+        this.formats = formats;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        List<StreamsDatum> result = Lists.newLinkedList();
+        Object inDoc = entry.getDocument();
+
+        Object outDoc = TypeConverterUtil.convert(inDoc, outClass, mapper);
+
+        if( outDoc != null ) {
+            entry.setDocument(outDoc);
+            result.add(entry);
+        }
+
+        return result;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        if( formats.size() > 0 )
+            this.mapper = StreamsJacksonMapper.getInstance(formats);
+        else
+            this.mapper = StreamsJacksonMapper.getInstance();
+    }
+
+    @Override
+    public void cleanUp() {
+        this.mapper = null;
+    }
+
+};

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
new file mode 100644
index 0000000..2ff463b
--- /dev/null
+++ 
b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
@@ -0,0 +1,51 @@
+package org.apache.streams.converter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * TypeConverterUtil supports TypeConverterProcessor in converting between 
String json and
+ * jackson-compatible POJO objects
+ */
+public class TypeConverterUtil {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(TypeConverterUtil.class);
+
+    public static Object convert(Object object, Class outClass) {
+        return TypeConverterUtil.convert(object, outClass, 
StreamsJacksonMapper.getInstance());
+    }
+
+    public static Object convert(Object object, Class outClass, ObjectMapper 
mapper) {
+        ObjectNode node = null;
+        Object outDoc = null;
+        if( object instanceof String ) {
+            try {
+                node = mapper.readValue((String)object, ObjectNode.class);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        } else {
+            node = mapper.convertValue(object, ObjectNode.class);
+        }
+
+        if(node != null) {
+            try {
+                if( outClass == String.class )
+                    outDoc = mapper.writeValueAsString(node);
+                else
+                    outDoc = mapper.convertValue(node, outClass);
+
+            } catch (Throwable e) {
+                LOGGER.warn(e.getMessage());
+                LOGGER.warn(node.toString());
+            }
+        }
+
+        return outDoc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b52e7653/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TypeConverterProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TypeConverterProcessorTest.java
 
b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TypeConverterProcessorTest.java
new file mode 100644
index 0000000..aa7df36
--- /dev/null
+++ 
b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TypeConverterProcessorTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.converter.TypeConverterProcessor;
+import org.apache.streams.data.util.ActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static junit.framework.Assert.*;
+
+/**
+ * Test for
+ * @see {@link org.apache.streams.converter.TypeConverterProcessor}
+ */
+public class TypeConverterProcessorTest {
+
+    private static final String DATASIFT_JSON = 
"{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter
 for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta 
Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue,
 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT 
@AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a 
genius. https://t.co/wB2qKyJMRw\\\"; @ViiOLeee  look at 
this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\
 "confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 
14:28:06 
+0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official
 Video for Ed Sheeran&#39;s track SING Get this track on iTunes: 
http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; 
instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran 
sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran 
one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the 
stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed
 Sheeran - SING [Official 
Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official
 Video for Ed Sheeran&#39;s track SING Get this track on iTunes: 
http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; 
instantly: http://smartu
 
...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed
 Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed 
Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing 
Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: 
http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"tit
 le\":[\"Ed Sheeran - SING [Official Video] - 
YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell:
 Loved working with @edsheeran on Sing. He's a genius. 
https://t.co/wB2qKyJMRw\\\"; @ViiOLeee  look at 
this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta 
Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, 
pero mientras estemos aqui debemos BAILAR!!! 
#ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":709
 
31384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed,
 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a 
href=\\\"http://twitter.com/download/android\\\"; rel=\\\"nofollow\\\">Twitter 
for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 
+0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia
 Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n 
en 
Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_
 
https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon,
 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a 
href=\\\"http://twitter.com/download/android\\\"; rel=\\\"nofollow\\\">Twitter 
for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}";
+
+    private static final String ACTIVITY_JSON = 
"{\"id\":\"id\",\"published\":\"Tue Jan 17 21:21:46 Z 
2012\",\"verb\":\"post\",\"provider\":{\"id\":\"providerid\"}}";
+
+    public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z";
+
+    @Test
+    public void testTypeConverterStringToString() {
+        final String ID = "1";
+        StreamsProcessor processor = new TypeConverterProcessor(String.class, 
Lists.newArrayList(DATASIFT_FORMAT));
+        processor.prepare(null);
+        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof String);
+        assertEquals(ID, resultDatum.getId());
+    }
+
+    @Test
+    public void testTypeConverterStringToObjectNode() {
+        final String ID = "1";
+        StreamsProcessor processor = new 
TypeConverterProcessor(ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT));
+        processor.prepare(null);
+        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof ObjectNode);
+        assertEquals(ID, resultDatum.getId());
+    }
+
+    @Test
+    public void testTypeConverterObjectNodeToString() throws IOException {
+        final String ID = "1";
+        StreamsProcessor processor = new TypeConverterProcessor(String.class, 
Lists.newArrayList(DATASIFT_FORMAT));
+        processor.prepare(null);
+        ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(DATASIFT_FORMAT);
+        ObjectNode node = mapper.readValue(DATASIFT_JSON, ObjectNode.class);
+        StreamsDatum datum = new StreamsDatum(node, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof String);
+        assertEquals(ID, resultDatum.getId());
+    }
+
+    @Test
+    public void testTypeConverterStringToActivity() {
+        final String ID = "1";
+        StreamsProcessor processor = new 
TypeConverterProcessor(Activity.class, Lists.newArrayList(DATASIFT_FORMAT));
+        processor.prepare(null);
+        StreamsDatum datum = new StreamsDatum(ACTIVITY_JSON, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof Activity);
+        assertTrue(ActivityUtil.isValid((Activity)resultDatum.getDocument()));
+        assertEquals(ID, resultDatum.getId());
+    }
+
+    @Test
+    public void testTypeConverterObjectNodeToActivity() throws IOException {
+        final String ID = "1";
+        StreamsProcessor processor = new 
TypeConverterProcessor(Activity.class, Lists.newArrayList(DATASIFT_FORMAT));
+        processor.prepare(null);
+        ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(DATASIFT_FORMAT);
+        ObjectNode node = mapper.readValue(ACTIVITY_JSON, ObjectNode.class);
+        StreamsDatum datum = new StreamsDatum(node, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof Activity);
+        assertTrue(ActivityUtil.isValid((Activity)resultDatum.getDocument()));
+        assertEquals(ID, resultDatum.getId());
+    }
+
+    @Test
+    public void testTypeConverterActivityToString() throws IOException {
+        final String ID = "1";
+        StreamsProcessor processor = new TypeConverterProcessor(String.class, 
Lists.newArrayList(DATASIFT_FORMAT));
+        processor.prepare(null);
+        ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(DATASIFT_FORMAT);
+        Activity node = mapper.readValue(ACTIVITY_JSON, Activity.class);
+        StreamsDatum datum = new StreamsDatum(node, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof String);
+        assertEquals(ID, resultDatum.getId());
+    }
+
+    @Test
+    public void testTypeConverterActivityToObjectNode() throws IOException {
+        final String ID = "1";
+        StreamsProcessor processor = new 
TypeConverterProcessor(ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT));
+        processor.prepare(null);
+        ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(DATASIFT_FORMAT);
+        Activity node = mapper.readValue(ACTIVITY_JSON, Activity.class);
+        StreamsDatum datum = new StreamsDatum(node, ID);
+        List<StreamsDatum> result = processor.process(datum);
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        StreamsDatum resultDatum = result.get(0);
+        assertNotNull(resultDatum);
+        assertNotNull(resultDatum.getDocument());
+        assertTrue(resultDatum.getDocument() instanceof ObjectNode);
+        assertEquals(ID, resultDatum.getId());
+    }
+
+
+}

Reply via email to