http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
index 6c08eb1..ae0709a 100644
--- 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
+++ 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/CleanAdditionalPropertiesProcessor.java
@@ -19,14 +19,15 @@ under the License.
 
 package org.apache.streams.jackson;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
 import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,53 +37,58 @@ import java.util.Map;
 
 /**
  * This processor walks an input objectnode and corrects any artifacts
- * that may have occured from improper serialization of jsonschema2pojo beans.
+ * that may have occured from improper serialization of jackson beans.
  *
+ * <p/>
  * The logic is also available for inclusion in other module via static import.
  */
 public class CleanAdditionalPropertiesProcessor implements StreamsProcessor {
 
-    public static final String STREAMS_ID = 
"CleanAdditionalPropertiesProcessor";
+  public static final String STREAMS_ID = "CleanAdditionalPropertiesProcessor";
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(CleanAdditionalPropertiesProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CleanAdditionalPropertiesProcessor.class);
 
-    private ObjectMapper mapper;
+  private ObjectMapper mapper;
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum datum) {
-        List<StreamsDatum> result = Lists.newLinkedList();
-        ObjectNode activity = this.mapper.convertValue(datum.getDocument(), 
ObjectNode.class);
-        cleanAdditionalProperties(activity);
-        datum.setDocument(activity);
-        result.add(datum);
-        return result;
-    }
+  @Override
+  public List<StreamsDatum> process(StreamsDatum datum) {
+    List<StreamsDatum> result = Lists.newLinkedList();
+    ObjectNode activity = this.mapper.convertValue(datum.getDocument(), 
ObjectNode.class);
+    cleanAdditionalProperties(activity);
+    datum.setDocument(activity);
+    result.add(datum);
+    return result;
+  }
 
-    @Override
-    public void prepare(Object o) {
-        this.mapper = StreamsJacksonMapper.getInstance();
-        this.mapper.registerModule(new JsonOrgModule());
-    }
+  @Override
+  public void prepare(Object configurationObject) {
+    this.mapper = StreamsJacksonMapper.getInstance();
+    this.mapper.registerModule(new JsonOrgModule());
+  }
 
-    @Override
-    public void cleanUp() {
+  @Override
+  public void cleanUp() {
 
-    }
+  }
 
-    public static void cleanAdditionalProperties(ObjectNode node) {
-        if( node.get("additionalProperties") != null ) {
-            ObjectNode additionalProperties = (ObjectNode) 
node.get("additionalProperties");
-            cleanAdditionalProperties(additionalProperties);
-            Iterator<Map.Entry<String, JsonNode>> jsonNodeIterator = 
additionalProperties.fields();
-            while( jsonNodeIterator.hasNext() ) {
-                Map.Entry<String, JsonNode> entry = jsonNodeIterator.next();
-                node.put(entry.getKey(), entry.getValue());
-            }
-        }
+  /**
+   * Recursively removes all additionalProperties maps.
+   * @param node ObjectNode
+   */
+  public static void cleanAdditionalProperties(ObjectNode node) {
+    if ( node.get("additionalProperties") != null ) {
+      ObjectNode additionalProperties = (ObjectNode) 
node.get("additionalProperties");
+      cleanAdditionalProperties(additionalProperties);
+      Iterator<Map.Entry<String, JsonNode>> jsonNodeIterator = 
additionalProperties.fields();
+      while ( jsonNodeIterator.hasNext() ) {
+        Map.Entry<String, JsonNode> entry = jsonNodeIterator.next();
+        node.put(entry.getKey(), entry.getValue());
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/JsonUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/JsonUtil.java
 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/JsonUtil.java
new file mode 100644
index 0000000..ac4ff08
--- /dev/null
+++ 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/JsonUtil.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.jackson;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * JSON utilities.
+ */
+public class JsonUtil {
+
+  private JsonUtil() {}
+
+  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private static JsonFactory factory = mapper.getFactory();
+
+  public static JsonNode jsonToJsonNode(String json) {
+    JsonNode node;
+    try {
+      JsonParser jp = factory.createJsonParser(json);
+      node = mapper.readTree(jp);
+    } catch (IOException e) {
+      throw new RuntimeException("IO exception while reading JSON", e);
+    }
+    return node;
+  }
+
+  public static String jsonNodeToJson(JsonNode node) {
+    try {
+      return mapper.writeValueAsString(node);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("IO exception while writing JSON", e);
+    }
+  }
+
+  public static <T> T jsonToObject(String json, Class<T> clazz) {
+    try {
+      return mapper.readValue(json, clazz);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not map to object");
+    }
+  }
+
+  public static <T> T jsonNodeToObject(JsonNode node, Class<T> clazz) {
+    return mapper.convertValue(node, clazz);
+  }
+
+  public static <T> JsonNode objectToJsonNode(T obj) {
+    return mapper.valueToTree(obj);
+  }
+
+  public static <T> List<T> jsoNodeToList(JsonNode node, Class<T> clazz) {
+    return mapper.convertValue(node, new TypeReference<List<T>>() {});
+  }
+
+  public static <T> String objectToJson(T object) {
+    try {
+      return mapper.writeValueAsString(object);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not map to object");
+    }
+  }
+
+  public static <T> T getObjFromFile(String filePath, Class<T> clazz) {
+    return jsonNodeToObject(getFromFile(filePath), clazz);
+  }
+
+  public static JsonNode getFromFile(String filePath) {
+    JsonFactory factory = mapper.getFactory(); // since 2.1 use 
mapper.getFactory() instead
+
+    JsonNode node = null;
+    try {
+      InputStream stream = getStreamForLocation(filePath);
+      JsonParser jp = factory.createParser(stream);
+      node = mapper.readTree(jp);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return node;
+  }
+
+  private static InputStream getStreamForLocation(String filePath) throws 
FileNotFoundException {
+    InputStream stream = null;
+    if(filePath.startsWith("file:///")) {
+      stream = new FileInputStream(filePath.replace("file:///", ""));
+    } else if(filePath.startsWith("file:") || filePath.startsWith("/")) {
+      stream = new FileInputStream(filePath.replace("file:", ""));
+    } else {
+      //Assume classpath
+      stream = 
JsonUtil.class.getClassLoader().getResourceAsStream(filePath.replace("classpath:",
 ""));
+    }
+
+    return stream;
+  }
+
+  /**
+   * Creates an empty array if missing
+   * @param node object to create the array within
+   * @param field location to create the array
+   * @return the Map representing the extensions property
+   */
+  public static ArrayNode ensureArray(ObjectNode node, String field) {
+    String[] path = 
Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]);
+    ObjectNode current = node;
+    ArrayNode result = null;
+    for( int i = 0; i < path.length; i++) {
+      current = ensureObject((ObjectNode) node.get(path[i]), path[i]);
+    }
+    if (current.get(field) == null)
+      current.put(field, mapper.createArrayNode());
+    result = (ArrayNode) node.get(field);
+    return result;
+  }
+
+  /**
+   * Creates an empty array if missing
+   * @param node objectnode to create the object within
+   * @param field location to create the object
+   * @return the Map representing the extensions property
+   */
+  public static ObjectNode ensureObject(ObjectNode node, String field) {
+    String[] path = 
Lists.newArrayList(Splitter.on('.').split(field)).toArray(new String[0]);
+    ObjectNode current = node;
+    ObjectNode result = null;
+    for( int i = 0; i < path.length; i++) {
+      if (node.get(field) == null)
+        node.put(field, mapper.createObjectNode());
+      current = (ObjectNode) node.get(field);
+    }
+    result = ensureObject((ObjectNode) node.get(path[path.length]), 
Joiner.on('.').join(Arrays.copyOfRange(path, 1, path.length)));
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
index 454f99e..4736ee2 100644
--- 
a/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
+++ 
b/streams-contrib/streams-processor-jackson/src/main/java/org/apache/streams/jackson/TypeConverterProcessor.java
@@ -19,102 +19,123 @@ under the License.
 
 package org.apache.streams.jackson;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.List;
 
 /**
- *
+ * TypeConverterProcessor changes the JVM type while maintaining
+ * the underlying document.
  */
 public class TypeConverterProcessor implements StreamsProcessor {
 
-    public static final String STREAMS_ID = "TypeConverterProcessor";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(TypeConverterProcessor.class);
-
-    private List<String> formats = Lists.newArrayList();
-
-    private ObjectMapper mapper;
-
-    private Class inClass;
-    private Class outClass;
-
-    public TypeConverterProcessor(Class inClass, Class outClass, ObjectMapper 
mapper) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-        this.mapper = mapper;
+  public static final String STREAMS_ID = "TypeConverterProcessor";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TypeConverterProcessor.class);
+
+  private List<String> formats = Lists.newArrayList();
+
+  private ObjectMapper mapper;
+
+  private Class inClass;
+  private Class outClass;
+
+
+  /**
+   * TypeConverterProcessor constructor.
+   * @param inClass inClass
+   * @param outClass outClass
+   * @param mapper mapper
+   */
+  public TypeConverterProcessor(Class inClass, Class outClass, ObjectMapper 
mapper) {
+    this.inClass = inClass;
+    this.outClass = outClass;
+    this.mapper = mapper;
+  }
+
+  /**
+   * TypeConverterProcessor constructor.
+   * @param inClass inClass
+   * @param outClass outClass
+   * @param formats formats
+   */
+  public TypeConverterProcessor(Class inClass, Class outClass, List<String> 
formats) {
+    this.inClass = inClass;
+    this.outClass = outClass;
+    this.formats = formats;
+  }
+
+  /**
+   * TypeConverterProcessor constructor.
+   * @param inClass inClass
+   * @param outClass outClass
+   */
+  public TypeConverterProcessor(Class inClass, Class outClass) {
+    this.inClass = inClass;
+    this.outClass = outClass;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    List<StreamsDatum> result = Lists.newLinkedList();
+    Object inDoc = entry.getDocument();
+    ObjectNode node = null;
+    if ( inClass == String.class
+          || inDoc instanceof String ) {
+      try {
+        node = this.mapper.readValue((String)entry.getDocument(), 
ObjectNode.class);
+      } catch (IOException ex) {
+        ex.printStackTrace();
+      }
+    } else {
+      node = this.mapper.convertValue(inDoc, ObjectNode.class);
     }
 
-    public TypeConverterProcessor(Class inClass, Class outClass, List<String> 
formats) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-        this.formats = formats;
-    }
-
-    public TypeConverterProcessor(Class inClass, Class outClass) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = Lists.newLinkedList();
-        Object inDoc = entry.getDocument();
-        ObjectNode node = null;
-        if( inClass == String.class ||
-            inDoc instanceof String ) {
-            try {
-                node = this.mapper.readValue((String)entry.getDocument(), 
ObjectNode.class);
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
+    if (node != null) {
+      Object outDoc;
+      try {
+        if ( outClass == String.class ) {
+          outDoc = this.mapper.writeValueAsString(node);
         } else {
-            node = this.mapper.convertValue(inDoc, ObjectNode.class);
+          outDoc = this.mapper.convertValue(node, outClass);
         }
-
-        if(node != null) {
-            Object outDoc;
-            try {
-                if( outClass == String.class )
-                    outDoc = this.mapper.writeValueAsString(node);
-                else
-                    outDoc = this.mapper.convertValue(node, outClass);
-
-                StreamsDatum outDatum = new StreamsDatum(outDoc, 
entry.getId(), entry.getTimestamp(), entry.getSequenceid());
-                outDatum.setMetadata(entry.getMetadata());
-                result.add(outDatum);
-            } catch (Throwable e) {
-                LOGGER.warn(e.getMessage());
-                LOGGER.warn(node.toString());
-            }
-        }
-
-        return result;
+        StreamsDatum outDatum = new StreamsDatum(outDoc, entry.getId(), 
entry.getTimestamp(), entry.getSequenceid());
+        outDatum.setMetadata(entry.getMetadata());
+        result.add(outDatum);
+      } catch (Throwable ex) {
+        LOGGER.warn(ex.getMessage());
+        LOGGER.warn(node.toString());
+      }
     }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        if( formats.size() > 0 )
-            this.mapper = StreamsJacksonMapper.getInstance(formats);
-        else
-            this.mapper = StreamsJacksonMapper.getInstance();
+    return result;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    if ( formats.size() > 0 ) {
+      this.mapper = StreamsJacksonMapper.getInstance(formats);
+    } else {
+      this.mapper = StreamsJacksonMapper.getInstance();
     }
+  }
 
-    @Override
-    public void cleanUp() {
+  @Override
+  public void cleanUp() {
 
-    }
-};
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
 
b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
index 1316d5c..e0759c3 100644
--- 
a/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
+++ 
b/streams-contrib/streams-processor-jackson/src/test/java/org/apache/streams/jackson/test/TypeConverterProcessorTest.java
@@ -18,78 +18,80 @@
 
 package org.apache.streams.jackson.test;
 
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.jackson.TypeConverterProcessor;
-import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
 
-import static junit.framework.Assert.*;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
 
 /**
  *
  */
 public class TypeConverterProcessorTest {
 
-    private static final String DATASIFT_JSON = 
"{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter
 for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta 
Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue,
 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT 
@AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a 
genius. https://t.co/wB2qKyJMRw\\\"; @ViiOLeee  look at 
this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\
 "confidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 
14:28:06 
+0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official
 Video for Ed Sheeran&#39;s track SING Get this track on iTunes: 
http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; 
instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran 
sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran 
one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the 
stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed
 Sheeran - SING [Official 
Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official
 Video for Ed Sheeran&#39;s track SING Get this track on iTunes: 
http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; 
instantly: http://smartu
 
...\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed
 Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed 
Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing 
Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: 
http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"tit
 le\":[\"Ed Sheeran - SING [Official Video] - 
YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell:
 Loved working with @edsheeran on Sing. He's a genius. 
https://t.co/wB2qKyJMRw\\\"; @ViiOLeee  look at 
this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta 
Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, 
pero mientras estemos aqui debemos BAILAR!!! 
#ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":709
 
31384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed,
 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a 
href=\\\"http://twitter.com/download/android\\\"; rel=\\\"nofollow\\\">Twitter 
for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 
+0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia
 Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n 
en 
Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_
 
https\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon,
 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a 
href=\\\"http://twitter.com/download/android\\\"; rel=\\\"nofollow\\\">Twitter 
for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}";
+  private static final String DATASIFT_JSON = 
"{\"demographic\":{\"gender\":\"female\"},\"interaction\":{\"schema\":{\"version\":3},\"source\":\"Twitter
 for Android\",\"author\":{\"username\":\"ViiOLeee\",\"name\":\"Violeta 
Anguita\",\"id\":70931384,\"avatar\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"link\":\"http://twitter.com/ViiOLeee\",\"language\":\"en\"},\"type\":\"twitter\",\"created_at\":\"Tue,
 27 May 2014 22:38:15 +0000\",\"received_at\":1.401230295658E9,\"content\":\"RT 
@AliiAnguita: \\\"@Pharrell: Loved working with @edsheeran on Sing. He's a 
genius. https://t.co/wB2qKyJMRw\\\"; @ViiOLeee  look at 
this!\",\"id\":\"1e3e5ef97532a580e0741841f5746728\",\"link\":\"http://twitter.com/ViiOLeee/status/471420141989666817\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384]},\"klout\":{\"score\":34},\"language\":{\"tag\":\"en\",\"tag_extended\":\"en\",\"c
 onfidence\":98},\"links\":{\"code\":[200],\"created_at\":[\"Tue, 27 May 2014 
14:28:06 
+0000\"],\"meta\":{\"charset\":[\"UTF-8\"],\"content_type\":[\"text/html\"],\"description\":[\"Official
 Video for Ed Sheeran&#39;s track SING Get this track on iTunes: 
http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; 
instantly: http://smartu...\"],\"keywords\":[[\"ed sheeran\",\"ed sheeran 
sing\",\"ed sheeran new album\",\"Ed Sheeran (Musical Artist)\",\"ed sheeran 
one\",\"ed sheeran fault in our stars\",\"ed sheeran all of the 
stars\",\"s...\"]],\"lang\":[\"en\"],\"opengraph\":[{\"site_name\":\"YouTube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed
 Sheeran - SING [Official 
Video]\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"description\":\"Official
 Video for Ed Sheeran&#39;s track SING Get this track on iTunes: 
http://smarturl.it/EdSing Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; 
instantly: http://smartu..
 
.\",\"type\":\"video\"}],\"twitter\":[{\"card\":\"player\",\"site\":\"@youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\",\"title\":\"Ed
 Sheeran - SING [Official Video]\",\"description\":\"Official Video for Ed 
Sheeran&#39;s track SING Get this track on iTunes: http://smarturl.it/EdSing 
Pre-order &#39;x&#39; on iTunes and get &#39;One&#39; instantly: 
http://smartu...\",\"image\":\"https://i1.ytimg.com/vi/tlYcUqEPN58/maxresdefault.jpg\",\"app\":{\"iphone\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"ipad\":{\"name\":\"YouTube\",\"id\":\"544007664\",\"url\":\"vnd.youtube://watch/tlYcUqEPN58\"},\"googleplay\":{\"name\":\"YouTube\",\"id\":\"com.google.android.youtube\",\"url\":\"http://www.youtube.com/watch?v=tlYcUqEPN58\"}},\"player\":\"https://www.youtube.com/embed/tlYcUqEPN58\",\"player_width\":\"1280\",\"player_height\":\"720\"}]},\"normalized_url\":[\"https://youtube.com/watch?v=tlYcUqEPN58\"],\"retweet_count\":[0],\"title
 \":[\"Ed Sheeran - SING [Official Video] - 
YouTube\"],\"url\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"]},\"twitter\":{\"id\":\"471420141989666817\",\"retweet\":{\"text\":\"\\\"@Pharrell:
 Loved working with @edsheeran on Sing. He's a genius. 
https://t.co/wB2qKyJMRw\\\"; @ViiOLeee  look at 
this!\",\"id\":\"471420141989666817\",\"user\":{\"name\":\"Violeta 
Anguita\",\"description\":\"La vida no seria la fiesta que todos esperamos, 
pero mientras estemos aqui debemos BAILAR!!! 
#ErasmusOnceErasmusForever\",\"location\":\"Espanhaa..Olaa!\",\"statuses_count\":5882,\"followers_count\":249,\"friends_count\":1090,\"screen_name\":\"ViiOLeee\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"profile_image_url_https\":\"https://pbs.twimg.com/profile_images/378800000851401229/bbf480cde2e9923a1d20acd393da0212_normal.jpeg\",\"lang\":\"en\",\"time_zone\":\"Madrid\",\"utc_offset\":7200,\"listed_count\":1,\"id\":70931
 
384,\"id_str\":\"70931384\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":275,\"created_at\":\"Wed,
 02 Sep 2009 10:19:59 +0000\"},\"source\":\"<a 
href=\\\"http://twitter.com/download/android\\\"; rel=\\\"nofollow\\\">Twitter 
for Android</a>\",\"count\":1,\"created_at\":\"Tue, 27 May 2014 22:38:15 
+0000\",\"mentions\":[\"Pharrell\",\"edsheeran\",\"ViiOLeee\",\"AliiAnguita\"],\"mention_ids\":[338084918,85452649,70931384],\"links\":[\"https://www.youtube.com/watch?v=tlYcUqEPN58\"],\"display_urls\":[\"youtube.com/watch?v=tlYcUq���\"],\"domains\":[\"www.youtube.com\"],\"lang\":\"en\"},\"retweeted\":{\"id\":\"471419867078209536\",\"user\":{\"name\":\"Alicia
 Anguita \",\"description\":\"Estudiante de Ingenieria de la Edificaci��n 
en 
Granada.\",\"statuses_count\":371,\"followers_count\":185,\"friends_count\":404,\"screen_name\":\"AliiAnguita\",\"profile_image_url\":\"http://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"profile_image_url_ht
 
tps\":\"https://pbs.twimg.com/profile_images/424248659677442048/qCPZL8c9_normal.jpeg\",\"lang\":\"es\",\"listed_count\":0,\"id\":561201891,\"id_str\":\"561201891\",\"geo_enabled\":false,\"verified\":false,\"favourites_count\":17,\"created_at\":\"Mon,
 23 Apr 2012 13:11:44 +0000\"},\"source\":\"<a 
href=\\\"http://twitter.com/download/android\\\"; rel=\\\"nofollow\\\">Twitter 
for Android</a>\",\"created_at\":\"Tue, 27 May 2014 22:37:09 +0000\"}}}";
 
-    public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z";
+  public static final String DATASIFT_FORMAT = "EEE, dd MMM yyyy HH:mm:ss Z";
 
-    @Test
-    public void testTypeConverterStringToString() {
-        final String ID = "1";
-        StreamsProcessor processor = new TypeConverterProcessor(String.class, 
String.class, Lists.newArrayList(DATASIFT_FORMAT));
-        processor.prepare(null);
-        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
-        List<StreamsDatum> result = processor.process(datum);
-        assertNotNull(result);
-        assertEquals(1, result.size());
-        StreamsDatum resultDatum = result.get(0);
-        assertNotNull(resultDatum);
-        assertNotNull(resultDatum.getDocument());
-        assertTrue(resultDatum.getDocument() instanceof String);
-        assertEquals(ID, resultDatum.getId());
-    }
+  @Test
+  public void testTypeConverterStringToString() {
+    final String ID = "1";
+    StreamsProcessor processor = new TypeConverterProcessor(String.class, 
String.class, Lists.newArrayList(DATASIFT_FORMAT));
+    processor.prepare(null);
+    StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
+    List<StreamsDatum> result = processor.process(datum);
+    assertNotNull(result);
+    assertEquals(1, result.size());
+    StreamsDatum resultDatum = result.get(0);
+    assertNotNull(resultDatum);
+    assertNotNull(resultDatum.getDocument());
+    assertTrue(resultDatum.getDocument() instanceof String);
+    assertEquals(ID, resultDatum.getId());
+  }
 
-    @Test
-    public void testTypeConverterStringToObjectNode() {
-        final String ID = "1";
-        StreamsProcessor processor = new TypeConverterProcessor(String.class, 
ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT));
-        processor.prepare(null);
-        StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
-        List<StreamsDatum> result = processor.process(datum);
-        assertNotNull(result);
-        assertEquals(1, result.size());
-        StreamsDatum resultDatum = result.get(0);
-        assertNotNull(resultDatum);
-        assertNotNull(resultDatum.getDocument());
-        assertTrue(resultDatum.getDocument() instanceof ObjectNode);
-        assertEquals(ID, resultDatum.getId());
-    }
+  @Test
+  public void testTypeConverterStringToObjectNode() {
+    final String ID = "1";
+    StreamsProcessor processor = new TypeConverterProcessor(String.class, 
ObjectNode.class, Lists.newArrayList(DATASIFT_FORMAT));
+    processor.prepare(null);
+    StreamsDatum datum = new StreamsDatum(DATASIFT_JSON, ID);
+    List<StreamsDatum> result = processor.process(datum);
+    assertNotNull(result);
+    assertEquals(1, result.size());
+    StreamsDatum resultDatum = result.get(0);
+    assertNotNull(resultDatum);
+    assertNotNull(resultDatum.getDocument());
+    assertTrue(resultDatum.getDocument() instanceof ObjectNode);
+    assertEquals(ID, resultDatum.getId());
+  }
 
-    @Test
-    public void testTypeConverterObjectNodeToString() throws IOException {
-        final String ID = "1";
-        StreamsProcessor processor = new 
TypeConverterProcessor(ObjectNode.class, String.class, 
Lists.newArrayList(DATASIFT_FORMAT));
-        processor.prepare(null);
-        ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(Lists.newArrayList(DATASIFT_FORMAT));
-        ObjectNode node = mapper.readValue(DATASIFT_JSON, ObjectNode.class);
-        StreamsDatum datum = new StreamsDatum(node, ID);
-        List<StreamsDatum> result = processor.process(datum);
-        assertNotNull(result);
-        assertEquals(1, result.size());
-        StreamsDatum resultDatum = result.get(0);
-        assertNotNull(resultDatum);
-        assertNotNull(resultDatum.getDocument());
-        assertTrue(resultDatum.getDocument() instanceof String);
-        assertEquals(ID, resultDatum.getId());
-    }
+  @Test
+  public void testTypeConverterObjectNodeToString() throws IOException {
+    final String ID = "1";
+    StreamsProcessor processor = new TypeConverterProcessor(ObjectNode.class, 
String.class, Lists.newArrayList(DATASIFT_FORMAT));
+    processor.prepare(null);
+    ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(Lists.newArrayList(DATASIFT_FORMAT));
+    ObjectNode node = mapper.readValue(DATASIFT_JSON, ObjectNode.class);
+    StreamsDatum datum = new StreamsDatum(node, ID);
+    List<StreamsDatum> result = processor.process(datum);
+    assertNotNull(result);
+    assertEquals(1, result.size());
+    StreamsDatum resultDatum = result.get(0);
+    assertNotNull(resultDatum);
+    assertNotNull(resultDatum.getDocument());
+    assertTrue(resultDatum.getDocument() instanceof String);
+    assertEquals(ID, resultDatum.getId());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
 
b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
index 24288f1..c2c3705 100644
--- 
a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
+++ 
b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java
@@ -18,18 +18,21 @@
 
 package org.apache.streams.json;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
 import com.google.common.collect.Lists;
 import com.jayway.jsonpath.JsonPath;
+
 import net.minidev.json.JSONArray;
 import net.minidev.json.JSONObject;
+
 import org.apache.commons.lang3.StringUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,118 +41,118 @@ import java.util.List;
 
 /**
  * Provides a base implementation for extracting json fields and
- * objects from datums using JsonPath syntax
+ * objects from datums using JsonPath syntax.
  */
 public class JsonPathExtractor implements StreamsProcessor {
 
-    private final static String STREAMS_ID = "JsonPathExtractor";
+  private static final String STREAMS_ID = "JsonPathExtractor";
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(JsonPathExtractor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JsonPathExtractor.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    private String pathExpression;
-    private JsonPath jsonPath;
+  private String pathExpression;
+  private JsonPath jsonPath;
 
-    public JsonPathExtractor() {
-        LOGGER.info("creating JsonPathExtractor");
-    }
+  public JsonPathExtractor() {
+    LOGGER.info("creating JsonPathExtractor");
+  }
 
-    public JsonPathExtractor(String pathExpression) {
-        this.pathExpression = pathExpression;
-        LOGGER.info("creating JsonPathExtractor for " + this.pathExpression);
-    }
+  public JsonPathExtractor(String pathExpression) {
+    this.pathExpression = pathExpression;
+    LOGGER.info("creating JsonPathExtractor for " + this.pathExpression);
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
 
-        List<StreamsDatum> result = Lists.newArrayList();
+    List<StreamsDatum> result = Lists.newArrayList();
 
-        String json = null;
+    String json = null;
 
-        LOGGER.debug("{} processing {}", STREAMS_ID);
+    LOGGER.debug("{} processing {}", STREAMS_ID);
 
-        if( entry.getDocument() instanceof ObjectNode ) {
-            ObjectNode node = (ObjectNode) entry.getDocument();
-            try {
-                json = mapper.writeValueAsString(node);
-            } catch (JsonProcessingException e) {
-                LOGGER.warn(e.getMessage());
-            }
-        } else if( entry.getDocument() instanceof String ) {
-            json = (String) entry.getDocument();
-        }
+    if ( entry.getDocument() instanceof ObjectNode ) {
+      ObjectNode node = (ObjectNode) entry.getDocument();
+      try {
+        json = mapper.writeValueAsString(node);
+      } catch (JsonProcessingException ex) {
+        LOGGER.warn(ex.getMessage());
+      }
+    } else if ( entry.getDocument() instanceof String ) {
+      json = (String) entry.getDocument();
+    }
 
-        if( StringUtils.isNotEmpty(json)) {
-
-            try {
-                Object readResult = jsonPath.read(json);
-
-                if (readResult instanceof String) {
-                    String match = (String) readResult;
-                    LOGGER.info("Matched String: " + match);
-                    StreamsDatum matchDatum = new StreamsDatum(match);
-                    result.add(matchDatum);
-                } else if (readResult instanceof JSONObject) {
-                    JSONObject match = (JSONObject) readResult;
-                    LOGGER.info("Matched Object: " + match);
-                    ObjectNode objectNode = 
mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);
-                    StreamsDatum matchDatum = new StreamsDatum(objectNode);
-                    result.add(matchDatum);
-                } else if (readResult instanceof JSONArray) {
-                    LOGGER.info("Matched Array:");
-                    JSONArray array = (JSONArray) readResult;
-                    Iterator iterator = array.iterator();
-                    while (iterator.hasNext()) {
-                        Object item = iterator.next();
-                        if( item instanceof String ) {
-                            LOGGER.info("String Item:" + item);
-                            String match = (String) item;
-                            StreamsDatum matchDatum = new StreamsDatum(match);
-                            result.add(matchDatum);
-                        } else if ( item instanceof JSONObject ) {
-                            LOGGER.info("Object Item:" + item);
-                            JSONObject match = (JSONObject) item;
-                            ObjectNode objectNode = 
mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);
-                            StreamsDatum matchDatum = new 
StreamsDatum(objectNode);
-                            result.add(matchDatum);
-                        } else {
-                            LOGGER.info("Other Item:" + item.toString());
-                        }
-                    }
-                } else {
-                    LOGGER.info("Other Match:" + readResult.toString());
-                }
-
-            } catch( Exception e ) {
-                LOGGER.warn(e.getMessage());
+    if ( StringUtils.isNotEmpty(json)) {
+
+      try {
+        Object readResult = jsonPath.read(json);
+
+        if (readResult instanceof String) {
+          String match = (String) readResult;
+          LOGGER.info("Matched String: " + match);
+          StreamsDatum matchDatum = new StreamsDatum(match);
+          result.add(matchDatum);
+        } else if (readResult instanceof JSONObject) {
+          JSONObject match = (JSONObject) readResult;
+          LOGGER.info("Matched Object: " + match);
+          ObjectNode objectNode = 
mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);
+          StreamsDatum matchDatum = new StreamsDatum(objectNode);
+          result.add(matchDatum);
+        } else if (readResult instanceof JSONArray) {
+          LOGGER.info("Matched Array:");
+          JSONArray array = (JSONArray) readResult;
+          Iterator iterator = array.iterator();
+          while (iterator.hasNext()) {
+            Object item = iterator.next();
+            if ( item instanceof String ) {
+              LOGGER.info("String Item:" + item);
+              String match = (String) item;
+              StreamsDatum matchDatum = new StreamsDatum(match);
+              result.add(matchDatum);
+            } else if ( item instanceof JSONObject ) {
+              LOGGER.info("Object Item:" + item);
+              JSONObject match = (JSONObject) item;
+              ObjectNode objectNode = 
mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);
+              StreamsDatum matchDatum = new StreamsDatum(objectNode);
+              result.add(matchDatum);
+            } else {
+              LOGGER.info("Other Item:" + item.toString());
             }
-
+          }
         } else {
-            LOGGER.warn("result empty");
+          LOGGER.info("Other Match:" + readResult.toString());
         }
 
-        return result;
+      } catch ( Exception ex ) {
+        LOGGER.warn(ex.getMessage());
+      }
 
+    } else {
+      LOGGER.warn("result empty");
     }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        if( configurationObject instanceof String )
-            jsonPath = JsonPath.compile((String)(configurationObject));
-        else if( configurationObject instanceof String[] )
-            jsonPath = JsonPath.compile(((String[])(configurationObject))[0]);
+    return result;
 
-        mapper.registerModule(new JsonOrgModule());
-    }
+  }
 
-    @Override
-    public void cleanUp() {
-        LOGGER.info("shutting down JsonPathExtractor for " + 
this.pathExpression);
+  @Override
+  public void prepare(Object configurationObject) {
+    if ( configurationObject instanceof String ) {
+      jsonPath = JsonPath.compile((String) (configurationObject));
+    } else if ( configurationObject instanceof String[] ) {
+      jsonPath = JsonPath.compile(((String[]) (configurationObject))[0]);
     }
-};
+    mapper.registerModule(new JsonOrgModule());
+  }
+
+  @Override
+  public void cleanUp() {
+    LOGGER.info("shutting down JsonPathExtractor for " + this.pathExpression);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
 
b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
index fcf34d7..ec741c2 100644
--- 
a/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
+++ 
b/streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java
@@ -19,6 +19,10 @@
 
 package org.apache.streams.json;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -28,12 +32,11 @@ import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.jayway.jsonpath.JsonPath;
+
 import net.minidev.json.JSONArray;
 import net.minidev.json.JSONObject;
+
 import org.apache.commons.lang3.StringUtils;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,134 +46,133 @@ import java.util.Map;
 
 /**
  * Provides a base implementation for filtering datums which
- * do not contain specific fields using JsonPath syntax
+ * do not contain specific fields using JsonPath syntax.
  */
 public class JsonPathFilter implements StreamsProcessor {
 
-    private final static String STREAMS_ID = "JsonPathFilter";
+  private static final String STREAMS_ID = "JsonPathFilter";
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(JsonPathFilter.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JsonPathFilter.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    private String pathExpression;
-    private JsonPath jsonPath;
-    private String destNodeName;
-
-    public JsonPathFilter() {
-        LOGGER.info("creating JsonPathFilter");
-    }
+  private String pathExpression;
+  private JsonPath jsonPath;
+  private String destNodeName;
 
-    public JsonPathFilter(String pathExpression) {
-        this.pathExpression = pathExpression;
-        LOGGER.info("creating JsonPathFilter for " + this.pathExpression);
-    }
+  public JsonPathFilter() {
+    LOGGER.info("creating JsonPathFilter");
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public JsonPathFilter(String pathExpression) {
+    this.pathExpression = pathExpression;
+    LOGGER.info("creating JsonPathFilter for " + this.pathExpression);
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        List<StreamsDatum> result = Lists.newArrayList();
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
 
-        String json = null;
+    List<StreamsDatum> result = Lists.newArrayList();
 
-        ObjectNode document = null;
+    String json = null;
 
-        LOGGER.debug("{} processing {}", STREAMS_ID);
+    ObjectNode document = null;
 
-        if( entry.getDocument() instanceof ObjectNode ) {
-            document = (ObjectNode) entry.getDocument();
-            try {
-                json = mapper.writeValueAsString(document);
-            } catch (JsonProcessingException e) {
-                e.printStackTrace();
-            }
-        } else if( entry.getDocument() instanceof String ) {
-            json = (String) entry.getDocument();
-            try {
-                document = mapper.readValue(json, ObjectNode.class);
-            } catch (IOException e) {
-                e.printStackTrace();
-                return null;
-            }
-        }
+    LOGGER.debug("{} processing {}", STREAMS_ID);
 
-        Preconditions.checkNotNull(document);
-
-        if( StringUtils.isNotEmpty(json)) {
-
-            Object srcResult = null;
-            try {
-                srcResult = jsonPath.read(json);
-
-            } catch( Exception e ) {
-                e.printStackTrace();
-                LOGGER.warn(e.getMessage());
-            }
-
-            Preconditions.checkNotNull(srcResult);
-
-            String[] path = StringUtils.split(pathExpression, '.');
-            ObjectNode node = document;
-            for (int i = 1; i < path.length-1; i++) {
-                node = (ObjectNode) document.get(path[i]);
-            }
-
-            Preconditions.checkNotNull(node);
-
-            if( srcResult instanceof JSONArray ) {
-                try {
-                    ArrayNode jsonNode = mapper.convertValue(srcResult, 
ArrayNode.class);
-                    if( jsonNode.size() == 1 ) {
-                        JsonNode item = jsonNode.get(0);
-                        node.set(destNodeName, item);
-                    } else {
-                        node.set(destNodeName, jsonNode);
-                    }
-                } catch (Exception e) {
-                    LOGGER.warn(e.getMessage());
-                }
-            } else if( srcResult instanceof JSONObject ) {
-                try {
-                    ObjectNode jsonNode = mapper.convertValue(srcResult, 
ObjectNode.class);
-                    node.set(destNodeName, jsonNode);
-                } catch (Exception e) {
-                    LOGGER.warn(e.getMessage());
-                }
-            } else if( srcResult instanceof String ) {
-                try {
-                    node.put(destNodeName, (String) srcResult);
-                } catch (Exception e) {
-                    LOGGER.warn(e.getMessage());
-                }
-            }
+    if ( entry.getDocument() instanceof ObjectNode ) {
+      document = (ObjectNode) entry.getDocument();
+      try {
+        json = mapper.writeValueAsString(document);
+      } catch (JsonProcessingException ex) {
+        ex.printStackTrace();
+      }
+    } else if ( entry.getDocument() instanceof String ) {
+      json = (String) entry.getDocument();
+      try {
+        document = mapper.readValue(json, ObjectNode.class);
+      } catch (IOException ex) {
+        ex.printStackTrace();
+        return null;
+      }
+    }
 
+    Preconditions.checkNotNull(document);
+
+    if ( StringUtils.isNotEmpty(json)) {
+
+      Object srcResult = null;
+      try {
+        srcResult = jsonPath.read(json);
+      } catch ( Exception ex ) {
+        ex.printStackTrace();
+        LOGGER.warn(ex.getMessage());
+      }
+
+      Preconditions.checkNotNull(srcResult);
+
+      String[] path = StringUtils.split(pathExpression, '.');
+      ObjectNode node = document;
+      for (int i = 1; i < path.length - 1; i++) {
+        node = (ObjectNode) document.get(path[i]);
+      }
+
+      Preconditions.checkNotNull(node);
+
+      if ( srcResult instanceof JSONArray ) {
+        try {
+          ArrayNode jsonNode = mapper.convertValue(srcResult, ArrayNode.class);
+          if ( jsonNode.size() == 1 ) {
+            JsonNode item = jsonNode.get(0);
+            node.set(destNodeName, item);
+          } else {
+            node.set(destNodeName, jsonNode);
+          }
+        } catch (Exception ex) {
+          LOGGER.warn(ex.getMessage());
+        }
+      } else if ( srcResult instanceof JSONObject ) {
+        try {
+          ObjectNode jsonNode = mapper.convertValue(srcResult, 
ObjectNode.class);
+          node.set(destNodeName, jsonNode);
+        } catch (Exception ex) {
+          LOGGER.warn(ex.getMessage());
+        }
+      } else if ( srcResult instanceof String ) {
+        try {
+          node.put(destNodeName, (String) srcResult);
+        } catch (Exception ex) {
+          LOGGER.warn(ex.getMessage());
         }
+      }
 
-        result.add(new StreamsDatum(document));
+    }
 
-        return result;
+    result.add(new StreamsDatum(document));
 
-    }
+    return result;
 
-    @Override
-    public void prepare(Object configurationObject) {
-        if( configurationObject instanceof Map) {
-            Map<String,String> params = ( Map<String,String>) 
configurationObject;
-            pathExpression = params.get("pathExpression");
-            jsonPath = JsonPath.compile(pathExpression);
-            destNodeName = 
pathExpression.substring(pathExpression.lastIndexOf(".") + 1);
-        }
+  }
 
-        mapper.registerModule(new JsonOrgModule());
+  @Override
+  public void prepare(Object configurationObject) {
+    if ( configurationObject instanceof Map) {
+      Map<String,String> params = ( Map<String,String>) configurationObject;
+      pathExpression = params.get("pathExpression");
+      jsonPath = JsonPath.compile(pathExpression);
+      destNodeName = pathExpression.substring(pathExpression.lastIndexOf(".") 
+ 1);
     }
 
-    @Override
-    public void cleanUp() {
-        LOGGER.info("shutting down JsonPathFilter for " + this.pathExpression);
-    }
-};
+    mapper.registerModule(new JsonOrgModule());
+  }
+
+  @Override
+  public void cleanUp() {
+    LOGGER.info("shutting down JsonPathFilter for " + this.pathExpression);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
 
b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
index 2ab3b7f..1ab7c00 100644
--- 
a/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
+++ 
b/streams-contrib/streams-processor-json/src/test/java/org/apache/streams/json/test/JsonPathExtractorTest.java
@@ -19,83 +19,85 @@
 
 package org.apache.streams.json.test;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.io.FileUtils;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.json.JsonPathExtractor;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 
-import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
  * Test for extracting json fields and
- * objects from datums using JsonPath syntax
+ * objects from datums using JsonPath syntax.
  */
 public class JsonPathExtractorTest {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(JsonPathExtractorTest.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JsonPathExtractorTest.class);
 
-    private String testJson;
+  private String testJson;
 
-    @Before
-    public void initialize() {
-        try {
-            testJson = FileUtils.readFileToString(new 
File("src/test/resources/books.json"));
-        } catch (IOException e) {
-            e.printStackTrace();
-            Assert.fail();
-        }
+  @Before
+  public void initialize() {
+    try {
+      testJson = FileUtils.readFileToString(new 
File("src/test/resources/books.json"));
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      Assert.fail();
     }
+  }
 
-    @Test
-    public void test1()
-    {
-        JsonPathExtractor extractor = new JsonPathExtractor();
-        extractor.prepare("$.store.book[*].author");
-        List<StreamsDatum> result = extractor.process(new 
StreamsDatum(testJson));
-        assertThat(result.size(), is(2));
-        assertTrue(result.get(0).getDocument() instanceof String);
-        assertTrue(result.get(1).getDocument() instanceof String);
-    }
+  @Test
+  public void test1()
+  {
+    JsonPathExtractor extractor = new JsonPathExtractor();
+    extractor.prepare("$.store.book[*].author");
+    List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+    assertThat(result.size(), is(2));
+    assertTrue(result.get(0).getDocument() instanceof String);
+    assertTrue(result.get(1).getDocument() instanceof String);
+  }
 
-    @Test
-    public void test2()
-    {
-        JsonPathExtractor extractor = new JsonPathExtractor();
-        extractor.prepare("$.store.book[?(@.category == 'reference')]");
-        List<StreamsDatum> result = extractor.process(new 
StreamsDatum(testJson));
-        assertThat(result.size(), is(1));
-        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
-    }
+  @Test
+  public void test2()
+  {
+    JsonPathExtractor extractor = new JsonPathExtractor();
+    extractor.prepare("$.store.book[?(@.category == 'reference')]");
+    List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+    assertThat(result.size(), is(1));
+    assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+  }
 
-    @Test
-    public void test3()
-    {
-        JsonPathExtractor extractor = new JsonPathExtractor();
-        extractor.prepare("$.store.book[?(@.price > 10)]");
-        List<StreamsDatum> result = extractor.process(new 
StreamsDatum(testJson));
-        assertThat(result.size(), is(1));
-        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
-    }
+  @Test
+  public void test3()
+  {
+    JsonPathExtractor extractor = new JsonPathExtractor();
+    extractor.prepare("$.store.book[?(@.price > 10)]");
+    List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+    assertThat(result.size(), is(1));
+    assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+  }
 
-    @Test
-    public void test4()
-    {
-        JsonPathExtractor extractor = new JsonPathExtractor();
-        extractor.prepare("$.store.book[?(@.isbn)]");
-        List<StreamsDatum> result = extractor.process(new 
StreamsDatum(testJson));
-        assertThat(result.size(), is(1));
-        assertTrue(result.get(0).getDocument() instanceof ObjectNode);
-    }
+  @Test
+  public void test4()
+  {
+    JsonPathExtractor extractor = new JsonPathExtractor();
+    extractor.prepare("$.store.book[?(@.isbn)]");
+    List<StreamsDatum> result = extractor.process(new StreamsDatum(testJson));
+    assertThat(result.size(), is(1));
+    assertTrue(result.get(0).getDocument() instanceof ObjectNode);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
 
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
index 5b23e16..04a680d 100644
--- 
a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
+++ 
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/AccountTypeProcessor.java
@@ -26,6 +26,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,42 +34,49 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Enrich actor with account type
+ * Enrich actor with account type.
  */
 public class AccountTypeProcessor extends SimpleHTTPGetProcessor {
 
-    private final static String STREAMS_ID = "AccountTypeProcessor";
+  private static final String STREAMS_ID = "AccountTypeProcessor";
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(AccountTypeProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AccountTypeProcessor.class);
 
-    public AccountTypeProcessor() {
-        this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern")));
-    }
+  /**
+   * AccountTypeProcessor constructor - resolves HttpProcessorConfiguration 
from JVM 'peoplepattern'.
+   */
+  public AccountTypeProcessor() {
+    this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern")));
+  }
 
-    public AccountTypeProcessor(HttpProcessorConfiguration 
peoplePatternConfiguration) {
-        super(peoplePatternConfiguration);
-        LOGGER.info("creating AccountTypeProcessor");
-        configuration.setProtocol("https");
-        configuration.setHostname("api.peoplepattern.com");
-        configuration.setResourcePath("/v0.2/account_type/");
-        configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR);
-        configuration.setExtension("account_type");
-    }
+  /**
+   * AccountTypeProcessor constructor - uses supplied 
HttpProcessorConfiguration.
+   * @param peoplePatternConfiguration peoplePatternConfiguration
+   */
+  public AccountTypeProcessor(HttpProcessorConfiguration 
peoplePatternConfiguration) {
+    super(peoplePatternConfiguration);
+    LOGGER.info("creating AccountTypeProcessor");
+    configuration.setProtocol("https");
+    configuration.setHostname("api.peoplepattern.com");
+    configuration.setResourcePath("/v0.2/account_type/");
+    configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR);
+    configuration.setExtension("account_type");
+  }
 
-    /**
-     Override this to add parameters to the request
-     */
-    @Override
-    protected Map<String, String> prepareParams(StreamsDatum entry) {
-        Activity activity = mapper.convertValue(entry.getDocument(), 
Activity.class);
-        ActivityObject actor = mapper.convertValue(activity.getActor(), 
ActivityObject.class);
-        String username = (String) 
ExtensionUtil.getInstance().getExtension(actor, "screenName");
-        Map<String, String> params = new HashMap<>();
-        params.put("id", actor.getId());
-        params.put("name", actor.getDisplayName());
-        params.put("username", username);
-        params.put("description", actor.getSummary());
-        return params;
-    }
+  /**
+   Override this to add parameters to the request.
+   */
+  @Override
+  protected Map<String, String> prepareParams(StreamsDatum entry) {
+    Activity activity = mapper.convertValue(entry.getDocument(), 
Activity.class);
+    ActivityObject actor = mapper.convertValue(activity.getActor(), 
ActivityObject.class);
+    String username = (String) ExtensionUtil.getInstance().getExtension(actor, 
"screenName");
+    Map<String, String> params = new HashMap<>();
+    params.put("id", actor.getId());
+    params.put("name", actor.getDisplayName());
+    params.put("username", username);
+    params.put("description", actor.getSummary());
+    return params;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
 
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
index 1ee55a9..e615026 100644
--- 
a/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
+++ 
b/streams-contrib/streams-processor-peoplepattern/src/main/java/org/apache/streams/peoplepattern/DemographicsProcessor.java
@@ -26,6 +26,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,43 +34,50 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Enrich actor with demographics
+ * Enrich actor with demographics.
  */
 public class DemographicsProcessor extends SimpleHTTPGetProcessor {
 
-    public final static String STREAMS_ID = "DemographicsProcessor";
+  public static final String STREAMS_ID = "DemographicsProcessor";
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(DemographicsProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DemographicsProcessor.class);
 
-    public DemographicsProcessor() {
-        this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern")));
-    }
+  /**
+   * DemographicsProcessor constructor - resolves HttpProcessorConfiguration 
from JVM 'peoplepattern'.
+   */
+  public DemographicsProcessor() {
+    this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("peoplepattern")));
+  }
 
-    public DemographicsProcessor(HttpProcessorConfiguration 
peoplePatternConfiguration) {
-        super(peoplePatternConfiguration);
-        LOGGER.info("creating DemographicsProcessor");
-        configuration.setProtocol("https");
-        configuration.setHostname("api.peoplepattern.com");
-        configuration.setResourcePath("/v0.2/demographics/");
-        configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR);
-        configuration.setExtension("demographics");
-    }
+  /**
+   * AccountTypeProcessor constructor - uses supplied 
HttpProcessorConfiguration.
+   * @param peoplePatternConfiguration peoplePatternConfiguration
+   */
+  public DemographicsProcessor(HttpProcessorConfiguration 
peoplePatternConfiguration) {
+    super(peoplePatternConfiguration);
+    LOGGER.info("creating DemographicsProcessor");
+    configuration.setProtocol("https");
+    configuration.setHostname("api.peoplepattern.com");
+    configuration.setResourcePath("/v0.2/demographics/");
+    configuration.setEntity(HttpProcessorConfiguration.Entity.ACTOR);
+    configuration.setExtension("demographics");
+  }
 
-    /**
-     Override this to add parameters to the request
-     */
-    @Override
-    protected Map<String, String> prepareParams(StreamsDatum entry) {
-        Activity activity = mapper.convertValue(entry.getDocument(), 
Activity.class);
-        ActivityObject actor = mapper.convertValue(activity.getActor(), 
ActivityObject.class);
-        String username = (String) 
ExtensionUtil.getInstance().getExtension(actor, "screenName");
-        Map<String, String> params = new HashMap<>();
-        params.put("id", actor.getId());
-        params.put("name", actor.getDisplayName());
-        params.put("username", username);
-        params.put("description", actor.getSummary());
-        return params;
-    }
+  /**
+   Override this to add parameters to the request.
+   */
+  @Override
+  protected Map<String, String> prepareParams(StreamsDatum entry) {
+    Activity activity = mapper.convertValue(entry.getDocument(), 
Activity.class);
+    ActivityObject actor = mapper.convertValue(activity.getActor(), 
ActivityObject.class);
+    String username = (String) ExtensionUtil.getInstance().getExtension(actor, 
"screenName");
+    Map<String, String> params = new HashMap<>();
+    params.put("id", actor.getId());
+    params.put("name", actor.getDisplayName());
+    params.put("username", username);
+    params.put("description", actor.getSummary());
+    return params;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
index 0f46ccd..206931f 100644
--- 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
+++ 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/AbstractRegexExtensionExtractor.java
@@ -19,16 +19,18 @@
 
 package org.apache.streams.regex;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,91 +46,93 @@ import java.util.Set;
  * modifying the appropriate {@link org.apache.streams.pojo.json.Activity} 
extensions object.
  */
 public abstract class AbstractRegexExtensionExtractor<T> implements 
StreamsProcessor {
-    private final String patternConfigKey;
-    private final String extensionKey;
-    private final String defaultPattern;
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(AbstractRegexExtensionExtractor.class);
+  private final String patternConfigKey;
+  private final String extensionKey;
+  private final String defaultPattern;
 
-    private final static ObjectMapper mapper = 
StreamsJacksonMapper.getInstance();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractRegexExtensionExtractor.class);
 
-    private String pattern;
+  private static final ObjectMapper mapper = 
StreamsJacksonMapper.getInstance();
 
-    protected AbstractRegexExtensionExtractor(String patternConfigKey, String 
extensionKey, String defaultPattern) {
-        this.patternConfigKey = patternConfigKey;
-        this.extensionKey = extensionKey;
-        this.defaultPattern = defaultPattern;
-    }
+  private String pattern;
 
-    public String getPattern() {
-        return pattern;
-    }
+  protected AbstractRegexExtensionExtractor(String patternConfigKey, String 
extensionKey, String defaultPattern) {
+    this.patternConfigKey = patternConfigKey;
+    this.extensionKey = extensionKey;
+    this.defaultPattern = defaultPattern;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        Activity activity;
-        if (entry.getDocument() instanceof Activity) {
-            activity = (Activity) entry.getDocument();
-        } else if (entry.getDocument() instanceof ObjectNode) {
-            activity = mapper.convertValue(entry.getDocument(), 
Activity.class);
-        } else {
-            return new ArrayList<>();
-        }
-        if (Strings.isNullOrEmpty(pattern)) {
-            prepare(null);
-        }
-        Map<String, List<Integer>> matches = 
RegexUtils.extractMatches(pattern, activity.getContent());
-        Collection<T> entities = ensureTargetObject(activity);
-        for (String key : matches.keySet()) {
-            entities.add(prepareObject(key));
-        }
-
-        Set<T> set = new HashSet<>();
-        set.addAll(entities);
-        entities.clear();
-        entities.addAll(set);
-
-        entry.setDocument(activity);
-        return Lists.newArrayList(entry);
-    }
+  public String getPattern() {
+    return pattern;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        if (configurationObject instanceof Map) {
-            if (((Map) configurationObject).containsKey(patternConfigKey)) {
-                pattern = (String) ((Map) 
configurationObject).get(patternConfigKey);
-            }
-        } else if (configurationObject instanceof String) {
-            pattern = (String) configurationObject;
-        } else {
-            pattern = defaultPattern;
-        }
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    Activity activity;
+    if (entry.getDocument() instanceof Activity) {
+      activity = (Activity) entry.getDocument();
+    } else if (entry.getDocument() instanceof ObjectNode) {
+      activity = mapper.convertValue(entry.getDocument(), Activity.class);
+    } else {
+      return new ArrayList<>();
     }
-
-    @Override
-    public void cleanUp() {
-        //NOP
+    if (Strings.isNullOrEmpty(pattern)) {
+      prepare(null);
+    }
+    Map<String, List<Integer>> matches = RegexUtils.extractMatches(pattern, 
activity.getContent());
+    Collection<T> entities = ensureTargetObject(activity);
+    for (String key : matches.keySet()) {
+      entities.add(prepareObject(key));
     }
 
-    /**
-     * Configures the value to be persisted to the extensions object
-     * @param extracted the value extracted by the regex
-     * @return an object representing the appropriate extension
-     */
-    protected abstract T prepareObject(String extracted);
-
-    @SuppressWarnings("unchecked")
-    protected Collection<T> ensureTargetObject(Activity activity) {
-        Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
-        Set<T> hashtags;
-        if(extensions.containsKey(extensionKey) && 
extensions.get(extensionKey) != null) {
-            hashtags = Sets.newHashSet((Iterable<T>) 
extensions.get(extensionKey));
-        } else {
-            hashtags = new HashSet<>();
-        }
-
-        extensions.put(extensionKey, hashtags);
-
-        return hashtags;
+    Set<T> set = new HashSet<>();
+    set.addAll(entities);
+    entities.clear();
+    entities.addAll(set);
+
+    entry.setDocument(activity);
+    return Lists.newArrayList(entry);
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    if (configurationObject instanceof Map) {
+      if (((Map) configurationObject).containsKey(patternConfigKey)) {
+        pattern = (String) ((Map) configurationObject).get(patternConfigKey);
+      }
+    } else if (configurationObject instanceof String) {
+      pattern = (String) configurationObject;
+    } else {
+      pattern = defaultPattern;
+    }
+  }
+
+  @Override
+  public void cleanUp() {
+    //NOP
+  }
+
+  /**
+   * Configures the value to be persisted to the extensions object.
+   * @param extracted the value extracted by the regex
+   * @return an object representing the appropriate extension
+   */
+  protected abstract T prepareObject(String extracted);
+
+  @SuppressWarnings("unchecked")
+  protected Collection<T> ensureTargetObject(Activity activity) {
+    Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
+    Set<T> hashtags;
+
+    if (extensions.containsKey(extensionKey) && extensions.get(extensionKey) 
!= null) {
+      hashtags = Sets.newHashSet((Iterable<T>) extensions.get(extensionKey));
+    } else {
+      hashtags = new HashSet<>();
     }
+
+    extensions.put(extensionKey, hashtags);
+
+    return hashtags;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexHashtagExtractor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexHashtagExtractor.java
 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexHashtagExtractor.java
index a59193b..326d5fa 100644
--- 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexHashtagExtractor.java
+++ 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexHashtagExtractor.java
@@ -20,6 +20,7 @@
 package org.apache.streams.regex;
 
 import org.apache.streams.core.StreamsProcessor;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,27 +28,27 @@ import org.slf4j.LoggerFactory;
  * Processes the content of an {@link org.apache.streams.pojo.json.Activity} 
object to extract the Hashtags and add
  * them to the appropriate extensions object
  */
-public class RegexHashtagExtractor extends 
AbstractRegexExtensionExtractor<String> implements StreamsProcessor{
+public class RegexHashtagExtractor extends 
AbstractRegexExtensionExtractor<String> implements StreamsProcessor {
 
-    private final static String STREAMS_ID = "RegexHashtagExtractor";
+  private static final String STREAMS_ID = "RegexHashtagExtractor";
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(RegexHashtagExtractor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RegexHashtagExtractor.class);
 
-    public final static String DEFAULT_PATTERN = "#\\w+";
-    public final static String PATTERN_CONFIG_KEY = "HashtagPattern";
-    public final static String EXTENSION_KEY = "hashtags";
+  public static final String DEFAULT_PATTERN = "#\\w+";
+  public static final String PATTERN_CONFIG_KEY = "HashtagPattern";
+  public static final String EXTENSION_KEY = "hashtags";
 
-    public RegexHashtagExtractor() {
-        super(PATTERN_CONFIG_KEY, EXTENSION_KEY, DEFAULT_PATTERN);
-    }
+  public RegexHashtagExtractor() {
+    super(PATTERN_CONFIG_KEY, EXTENSION_KEY, DEFAULT_PATTERN);
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    protected String prepareObject(String extracted) {
-        return extracted.substring(1);
-    }
+  @Override
+  protected String prepareObject(String extracted) {
+    return extracted.substring(1);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexMentionsExtractor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexMentionsExtractor.java
 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexMentionsExtractor.java
index eabdb04..ddd3ed5 100644
--- 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexMentionsExtractor.java
+++ 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexMentionsExtractor.java
@@ -18,9 +18,11 @@
  */
 
 package org.apache.streams.regex;
-import com.google.common.collect.Maps;
+
 import org.apache.streams.core.StreamsProcessor;
 
+import com.google.common.collect.Maps;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -30,27 +32,27 @@ import java.util.Map;
  */
 public class RegexMentionsExtractor extends 
AbstractRegexExtensionExtractor<Map<String, Object>> implements 
StreamsProcessor {
 
-    private final static String STREAMS_ID = "RegexMentionsExtractor";
+  private static final String STREAMS_ID = "RegexMentionsExtractor";
 
-    public static final String DEFAULT_PATTERN = "@\\w+";
-    public static final String PATTERN_CONFIG_KEY = "MentionPattern";
-    public static final String EXTENSION_KEY = "user_mentions";
-    public static final String DISPLAY_KEY = "displayName";
+  public static final String DEFAULT_PATTERN = "@\\w+";
+  public static final String PATTERN_CONFIG_KEY = "MentionPattern";
+  public static final String EXTENSION_KEY = "user_mentions";
+  public static final String DISPLAY_KEY = "displayName";
 
-    public RegexMentionsExtractor() {
-        super(PATTERN_CONFIG_KEY, EXTENSION_KEY, DEFAULT_PATTERN);
-    }
+  public RegexMentionsExtractor() {
+    super(PATTERN_CONFIG_KEY, EXTENSION_KEY, DEFAULT_PATTERN);
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    protected Map<String, Object> prepareObject(String extracted) {
-        HashMap<String, Object> mention = Maps.newHashMap();
-        mention.put(DISPLAY_KEY, extracted.substring(1));
-        return mention;
-    }
+  @Override
+  protected Map<String, Object> prepareObject(String extracted) {
+    HashMap<String, Object> mention = Maps.newHashMap();
+    mention.put(DISPLAY_KEY, extracted.substring(1));
+    return mention;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUrlExtractor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUrlExtractor.java
 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUrlExtractor.java
index 84d3257..ea8474d 100644
--- 
a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUrlExtractor.java
+++ 
b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUrlExtractor.java
@@ -30,46 +30,46 @@ import java.util.Collection;
  */
 public class RegexUrlExtractor extends AbstractRegexExtensionExtractor<String> 
implements StreamsProcessor {
 
-    private final static String STREAMS_ID = "RegexUrlExtractor";
+  private static final String STREAMS_ID = "RegexUrlExtractor";
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    //Temporarily copied from streams-processor-urls so as not to force a 
dependency on that provider.  This should
-    //be moved to a common utility package
-    public final static String DEFAULT_PATTERN =
-            "(?:(?:https?|ftp)://)" +
-                    "(?:\\S+(?::\\S*)?@)?" +
-                    "(?:" +
-                    "(?!(?:10|127)(?:\\.\\d{1,3}){3})" +
-                    "(?!(?:169\\.254|192\\.168)(?:\\.\\d{1,3}){2})" +
-                    "(?!172\\.(?:1[6-9]|2\\d|3[0-1])(?:\\.\\d{1,3}){2})" +
-                    "(?:[1-9]\\d?|1\\d\\d|2[01]\\d|22[0-3])" +
-                    "(?:\\.(?:1?\\d{1,2}|2[0-4]\\d|25[0-5])){2}" +
-                    "(?:\\.(?:[1-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4]))" +
-                    "|" +
-                    
"(?:(?:[a-z\\u00a1-\\uffff0-9]+-?)*[a-z\\u00a1-\\uffff0-9]+)" +
-                    
"(?:\\.(?:[a-z\\u00a1-\\uffff0-9]+-?)*[a-z\\u00a1-\\uffff0-9]+)*" +
-                    "(?:\\.(?:[a-z\\u00a1-\\uffff]{2,}))" +
-                    ")" +
-                    "(?::\\d{2,5})?" +
-                    "(?:/[^\\s]*)?";
+  //Temporarily copied from streams-processor-urls so as not to force a 
dependency on that provider.  This should
+  //be moved to a common utility package
+  public static final String DEFAULT_PATTERN =
+      "(?:(?:https?|ftp)://)"
+          + "(?:\\S+(?::\\S*)?@)?"
+          + "(?:"
+          + "(?!(?:10|127)(?:\\.\\d{1,3}){3})"
+          + "(?!(?:169\\.254|192\\.168)(?:\\.\\d{1,3}){2})"
+          + "(?!172\\.(?:1[6-9]|2\\d|3[0-1])(?:\\.\\d{1,3}){2})"
+          + "(?:[1-9]\\d?|1\\d\\d|2[01]\\d|22[0-3])"
+          + "(?:\\.(?:1?\\d{1,2}|2[0-4]\\d|25[0-5])){2}"
+          + "(?:\\.(?:[1-9]\\d?|1\\d\\d|2[0-4]\\d|25[0-4]))"
+          + "|"
+          + "(?:(?:[a-z\\u00a1-\\uffff0-9]+-?)*[a-z\\u00a1-\\uffff0-9]+)"
+          + "(?:\\.(?:[a-z\\u00a1-\\uffff0-9]+-?)*[a-z\\u00a1-\\uffff0-9]+)*"
+          + "(?:\\.(?:[a-z\\u00a1-\\uffff]{2,}))"
+          + ")"
+          + "(?::\\d{2,5})?"
+          + "(?:/[^\\s]*)?";
 
-    public final static String PATTERN_CONFIG_KEY = "URLPattern";
+  public static final String PATTERN_CONFIG_KEY = "URLPattern";
 
-    public RegexUrlExtractor() {
-        super(PATTERN_CONFIG_KEY, null, DEFAULT_PATTERN);
-    }
+  public RegexUrlExtractor() {
+    super(PATTERN_CONFIG_KEY, null, DEFAULT_PATTERN);
+  }
 
-    @Override
-    protected String prepareObject(String extracted) {
-        return extracted;
-    }
+  @Override
+  protected String prepareObject(String extracted) {
+    return extracted;
+  }
 
-    @Override
-    protected Collection<String> ensureTargetObject(Activity activity) {
-        return activity.getLinks();
-    }
+  @Override
+  protected Collection<String> ensureTargetObject(Activity activity) {
+    return activity.getLinks();
+  }
 }

Reply via email to