http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java
index 57f9be5..16565bb 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeEventClassifierTest.java
@@ -18,32 +18,33 @@
 
 package com.youtube.serializer;
 
-import com.google.api.services.youtube.model.Video;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.api.services.youtube.model.Video;
 import org.junit.Test;
 
-
 import static org.junit.Assert.assertEquals;
 
 public class YoutubeEventClassifierTest {
-    private final String testVideo = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
-    private final String testObjectNode = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#somethingElse\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
-    @Test
-    public void testVideoClassification() {
-        Class klass = YoutubeEventClassifier.detectClass(testVideo);
-
-        assertEquals(klass, Video.class);
-    }
-
-    @Test(expected=IllegalArgumentException.class)
-    public void testExceptionClassification() {
-        YoutubeEventClassifier.detectClass("");
-    }
-
-    @Test
-    public void testObjectNodeClassification() {
-        Class klass = YoutubeEventClassifier.detectClass(testObjectNode);
-
-        assertEquals(klass, ObjectNode.class);
-    }
+
+  private final String testVideo = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
+  private final String testObjectNode = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#somethingElse\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
+
+  @Test
+  public void testVideoClassification() {
+    Class klass = YoutubeEventClassifier.detectClass(testVideo);
+
+    assertEquals(klass, Video.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExceptionClassification() {
+    YoutubeEventClassifier.detectClass("");
+  }
+
+  @Test
+  public void testObjectNodeClassification() {
+    Class klass = YoutubeEventClassifier.detectClass(testObjectNode);
+
+    assertEquals(klass, ObjectNode.class);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java
index c162f41..29afd19 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/serializer/YoutubeVideoSerDeTest.java
@@ -15,17 +15,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package com.youtube.serializer;
 
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.api.services.youtube.model.Video;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Provider;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.services.youtube.model.Video;
 import org.joda.time.DateTime;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,75 +40,84 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Test for YoutubeVideoSerDe.
+ */
 public class YoutubeVideoSerDeTest {
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(YoutubeVideoSerDeTest.class);
-    private final String testVideo = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
-    private ObjectMapper objectMapper;
-    private YoutubeActivityUtil youtubeActivityUtil;
-
-    @Before
-    public void setup() {
-        objectMapper = StreamsJacksonMapper.getInstance();
-        SimpleModule simpleModule = new SimpleModule();
-        simpleModule.addDeserializer(Video.class, new 
YoutubeVideoDeserializer());
-        objectMapper.registerModule(simpleModule);
-        
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-
-        youtubeActivityUtil = new YoutubeActivityUtil();
-    }
 
-    @Test
-    public void testVideoObject() {
-        LOGGER.info("raw: {}", testVideo);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeVideoSerDeTest.class);
+  private final String testVideo = 
"{\"etag\":\"\\\"4FSIjSQU83ZJMYAO0IqRYMvZX98/V0q3OIauZ3ZAkszLUDbHL45yEGM\\\"\",\"id\":\"sUOepRctwVE\",\"kind\":\"youtube#video\",\"snippet\":{\"channelId\":\"UCNENOn2nmwguQYkejKhJGPQ\",\"channelTitle\":\"Carilion
 Clinic\",\"description\":\"Join Carilion Clinic's Heart Failure experts for a 
LIVE Google+ Hangout on Feb. 23, 12:30-1 p.m. to learn more about heart 
failure, treatment options, and lifestyle changes. Learn more: 
https://plus.google.com/u/0/events/cj074q9r6csgv6i2kqhi2isc6k0\",\"publishedAt\":{\"value\":1422977409000,\"dateOnly\":false,\"timeZoneShift\":-360},\"thumbnails\":{\"default\":{\"height\":480,\"url\":\"https://i.ytimg.com/vi/sUOepRctwVE/sddefault.jpg\",\"width\":640}},\"title\":\"Be
 Heart Smart: Congestive Heart Failure LIVE 
Event\"},\"statistics\":{\"commentCount\":1,\"dislikeCount\":0,\"favoriteCount\":0,\"likeCount\":0,\"viewCount\":9}}";
+  private ObjectMapper objectMapper;
+  private YoutubeActivityUtil youtubeActivityUtil;
 
-        try {
-            Activity activity = new Activity();
+  /**
+   * setup for test.
+   */
+  @Before
+  public void setup() {
+    objectMapper = StreamsJacksonMapper.getInstance();
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Video.class, new YoutubeVideoDeserializer());
+    objectMapper.registerModule(simpleModule);
+    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
 
-            Video video = objectMapper.readValue(testVideo, Video.class);
+    youtubeActivityUtil = new YoutubeActivityUtil();
+  }
 
-            youtubeActivityUtil.updateActivity(video, activity, 
"testChannelId");
-            LOGGER.info("activity: {}", activity);
+  @Test
+  public void testVideoObject() {
+    LOGGER.info("raw: {}", testVideo);
 
-            assertNotNull(activity);
-            assert (activity.getId().contains("id:youtube:post"));
-            assertEquals(activity.getVerb(), "post");
+    try {
+      Activity activity = new Activity();
 
-            Provider provider = activity.getProvider();
-            assertEquals(provider.getId(), "id:providers:youtube");
-            assertEquals(provider.getDisplayName(), "YouTube");
+      Video video = objectMapper.readValue(testVideo, Video.class);
 
-            ActivityObject actor = activity.getActor();
-            assert (actor.getId().contains("id:youtube:"));
-            assertNotNull(actor.getDisplayName());
-            assertNotNull(actor.getSummary());
+      youtubeActivityUtil.updateActivity(video, activity, "testChannelId");
+      LOGGER.info("activity: {}", activity);
 
-            assertNotNull(activity.getTitle());
-            assertNotNull(activity.getUrl());
-            assertNotNull(activity.getContent());
+      assertNotNull(activity);
+      assert (activity.getId().contains("id:youtube:post"));
+      assertEquals(activity.getVerb(), "post");
 
-            assertEquals(activity.getPublished().getClass(), DateTime.class);
+      Provider provider = activity.getProvider();
+      assertEquals(provider.getId(), "id:providers:youtube");
+      assertEquals(provider.getDisplayName(), "YouTube");
 
-            Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
+      ActivityObject actor = activity.getActor();
+      assert (actor.getId().contains("id:youtube:"));
+      assertNotNull(actor.getDisplayName());
+      assertNotNull(actor.getSummary());
 
-            assertNotNull(extensions.get("youtube"));
-            assertNotNull(extensions.get("likes"));
+      assertNotNull(activity.getTitle());
+      assertNotNull(activity.getUrl());
+      assertNotNull(activity.getContent());
 
-            assertTrue(testActivityObject(activity));
-        } catch (Exception e) {
-            LOGGER.error("Exception while testing the Ser/De functionality of 
the Video deserializer: {}", e);
-        }
-    }
+      assertEquals(activity.getPublished().getClass(), DateTime.class);
 
-    private boolean testActivityObject(Activity activity) {
-        boolean valid = false;
+      Map<String, Object> extensions = 
ExtensionUtil.getInstance().ensureExtensions(activity);
 
-        ActivityObject obj = activity.getObject();
+      assertNotNull(extensions.get("youtube"));
+      assertNotNull(extensions.get("likes"));
 
-        if(obj.getObjectType().equals("video") && !obj.getImage().equals(null) 
&&
-                !obj.getUrl().equals("null") && 
obj.getUrl().contains("https://www.youtube.com/watch?v=";)) {
-            valid = true;
-        }
+      assertTrue(testActivityObject(activity));
+    } catch (Exception ex) {
+      LOGGER.error("Exception while testing the Ser/De functionality of the 
Video deserializer: {}", ex);
+    }
+  }
+
+  private boolean testActivityObject(Activity activity) {
+    boolean valid = false;
 
-        return valid;
+    ActivityObject obj = activity.getObject();
+
+    if ( obj.getObjectType().equals("video")
+        && !obj.getImage().equals(null)
+        && !obj.getUrl().equals("null")
+        && obj.getUrl().contains("https://www.youtube.com/watch?v=";)) {
+      valid = true;
     }
+
+    return valid;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
index 1f53df8..2e143de 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
@@ -28,43 +28,43 @@ import java.io.FileReader;
 import java.io.LineNumberReader;
 
 /**
- * Created by sblackmon on 10/13/16.
+ * YoutubeChannelProviderIT integration test for YoutubeChannelProvider.
  */
 public class YoutubeChannelProviderIT {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeChannelProviderIT.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeChannelProviderIT.class);
 
-    @Test
-    public void testYoutubeChannelProvider() throws Exception {
+  @Test
+  public void testYoutubeChannelProvider() throws Exception {
 
-        String configfile = 
"./target/test-classes/YoutubeChannelProviderIT.conf";
-        String outfile = 
"./target/test-classes/YoutubeChannelProviderIT.stdout.txt";
+    String configfile = "./target/test-classes/YoutubeChannelProviderIT.conf";
+    String outfile = 
"./target/test-classes/YoutubeChannelProviderIT.stdout.txt";
 
-        String[] args = new String[2];
-        args[0] = configfile;
-        args[1] = outfile;
+    String[] args = new String[2];
+    args[0] = configfile;
+    args[1] = outfile;
 
-        Thread testThread = new Thread((Runnable) () -> {
-            try {
-                YoutubeChannelProvider.main(args);
-            } catch( Exception e ) {
-                LOGGER.error("Test Exception!", e);
-            }
-        });
-        testThread.start();
-        testThread.join(60000);
+    Thread testThread = new Thread((Runnable) () -> {
+      try {
+        YoutubeChannelProvider.main(args);
+      } catch ( Exception ex ) {
+        LOGGER.error("Test Exception!", ex);
+      }
+    });
+    testThread.start();
+    testThread.join(60000);
 
-        File out = new File(outfile);
-        assert (out.exists());
-        assert (out.canRead());
-        assert (out.isFile());
+    File out = new File(outfile);
+    assert (out.exists());
+    assert (out.canRead());
+    assert (out.isFile());
 
-        FileReader outReader = new FileReader(out);
-        LineNumberReader outCounter = new LineNumberReader(outReader);
+    FileReader outReader = new FileReader(out);
+    LineNumberReader outCounter = new LineNumberReader(outReader);
 
-        while(outCounter.readLine() != null) {}
+    while (outCounter.readLine() != null) {}
 
-        assert (outCounter.getLineNumber() >= 1);
+    assert (outCounter.getLineNumber() >= 1);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
index a8b92cf..dd0eaab 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
@@ -28,43 +28,43 @@ import java.io.FileReader;
 import java.io.LineNumberReader;
 
 /**
- * Created by sblackmon on 10/13/16.
+ * Integration Test for YoutubeUserActivityProvider.
  */
 public class YoutubeUserActivityProviderIT {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeUserActivityProviderIT.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(YoutubeUserActivityProviderIT.class);
 
-    @Test
-    public void testYoutubeUserActivityProvider() throws Exception {
+  @Test
+  public void testYoutubeUserActivityProvider() throws Exception {
 
-        String configfile = 
"./target/test-classes/YoutubeUserActivityProviderIT.conf";
-        String outfile = 
"./target/test-classes/YoutubeUserActivityProviderIT.stdout.txt";
+    String configfile = 
"./target/test-classes/YoutubeUserActivityProviderIT.conf";
+    String outfile = 
"./target/test-classes/YoutubeUserActivityProviderIT.stdout.txt";
 
-        String[] args = new String[2];
-        args[0] = configfile;
-        args[1] = outfile;
+    String[] args = new String[2];
+    args[0] = configfile;
+    args[1] = outfile;
 
-        Thread testThread = new Thread((Runnable) () -> {
-            try {
-                YoutubeUserActivityProvider.main(args);
-            } catch( Exception e ) {
-                LOGGER.error("Test Exception!", e);
-            }
-        });
-        testThread.start();
-        testThread.join(60000);
+    Thread testThread = new Thread((Runnable) () -> {
+      try {
+        YoutubeUserActivityProvider.main(args);
+      } catch ( Exception ex ) {
+        LOGGER.error("Test Exception!", ex);
+      }
+    });
+    testThread.start();
+    testThread.join(60000);
 
-        File out = new File(outfile);
-        assert (out.exists());
-        assert (out.canRead());
-        assert (out.isFile());
+    File out = new File(outfile);
+    assert (out.exists());
+    assert (out.canRead());
+    assert (out.isFile());
 
-        FileReader outReader = new FileReader(out);
-        LineNumberReader outCounter = new LineNumberReader(outReader);
+    FileReader outReader = new FileReader(out);
+    LineNumberReader outCounter = new LineNumberReader(outReader);
 
-        while(outCounter.readLine() != null) {}
+    while (outCounter.readLine() != null) {}
 
-        assert (outCounter.getLineNumber() >= 250);
+    assert (outCounter.getLineNumber() >= 250);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/DatumStatus.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/DatumStatus.java 
b/streams-core/src/main/java/org/apache/streams/core/DatumStatus.java
index b85b2d0..5846665 100644
--- a/streams-core/src/main/java/org/apache/streams/core/DatumStatus.java
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatus.java
@@ -18,8 +18,11 @@
 
 package org.apache.streams.core;
 
+/**
+ * Status of StreamsDatum.
+ */
 public enum DatumStatus {
-    SUCCESS,
-    PARTIAL,
-    FAIL
+  SUCCESS,
+  PARTIAL,
+  FAIL
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java 
b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
index 58518b9..c00e378 100644
--- 
a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
+++ 
b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCountable.java
@@ -24,7 +24,7 @@ package org.apache.streams.core;
 @Deprecated
 public interface DatumStatusCountable {
 
-    @Deprecated
-    public DatumStatusCounter getDatumStatusCounter();
+  @Deprecated
+  public DatumStatusCounter getDatumStatusCounter();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java 
b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
index 2e598ce..250cc7e 100644
--- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
@@ -21,72 +21,115 @@ package org.apache.streams.core;
 import java.io.Serializable;
 
 @Deprecated
-public class DatumStatusCounter implements Serializable
-{
-    private volatile int attempted = 0;
-    private volatile int success = 0;
-    private volatile int fail = 0;
-    private volatile int partial = 0;
-    private volatile int emitted = 0;
-
-    public int getAttempted()             { return this.attempted; }
-    public int getSuccess()             { return this.success; }
-    public int getFail()                { return  this.fail; }
-    public int getPartial()             { return this.partial; }
-    public int getEmitted()             { return this.emitted; }
-
-    public DatumStatusCounter() {
-    }
+public class DatumStatusCounter implements Serializable {
 
-    @Deprecated
-    public void add(DatumStatusCounter datumStatusCounter) {
-        this.attempted += datumStatusCounter.getAttempted();
-        this.success += datumStatusCounter.getSuccess();
-        this.partial = datumStatusCounter.getPartial();
-        this.fail += datumStatusCounter.getFail();
-        this.emitted += datumStatusCounter.getEmitted();
-    }
+  private volatile int attempted = 0;
+  private volatile int success = 0;
+  private volatile int fail = 0;
+  private volatile int partial = 0;
+  private volatile int emitted = 0;
 
-    @Deprecated
-    public void incrementAttempt() {
-        this.attempted += 1;
-    }
+  public int getAttempted() {
+    return this.attempted;
+  }
 
-    @Deprecated
-    public void incrementAttempt(int counter) {
-        this.attempted += counter;
-    }
+  public int getSuccess() {
+    return this.success;
+  }
 
-    @Deprecated
-    public synchronized void incrementStatus(DatumStatus workStatus) {
-        // add this to the record counter
-        switch(workStatus) {
-            case SUCCESS: this.success++; break;
-            case PARTIAL: this.partial++; break;
-            case FAIL: this.fail++; break;
-        }
-        this.emitted += 1;
-    }
+  public int getFail() {
+    return this.fail;
+  }
+
+  public int getPartial() {
+    return this.partial;
+  }
 
-    @Deprecated
-    public synchronized void incrementStatus(DatumStatus workStatus, int 
counter) {
-        // add this to the record counter
-        switch(workStatus) {
-            case SUCCESS: this.success += counter; break;
-            case PARTIAL: this.partial += counter; break;
-            case FAIL: this.fail += counter; break;
-        }
-        this.emitted += counter;
+  public int getEmitted() {
+    return this.emitted;
+  }
+
+  public DatumStatusCounter() {
+  }
+
+  /**
+   * accumulate partial DatumStatusCounter.
+   * @param datumStatusCounter DatumStatusCounter
+   */
+  @Deprecated
+  public void add(DatumStatusCounter datumStatusCounter) {
+    this.attempted += datumStatusCounter.getAttempted();
+    this.success += datumStatusCounter.getSuccess();
+    this.partial = datumStatusCounter.getPartial();
+    this.fail += datumStatusCounter.getFail();
+    this.emitted += datumStatusCounter.getEmitted();
+  }
+
+  @Deprecated
+  public void incrementAttempt() {
+    this.attempted += 1;
+  }
+
+  @Deprecated
+  public void incrementAttempt(int counter) {
+    this.attempted += counter;
+  }
+
+  /**
+   * increment specific DatumStatus by 1.
+   * @param workStatus DatumStatus
+   */
+  @Deprecated
+  public synchronized void incrementStatus(DatumStatus workStatus) {
+    // add this to the record counter
+    switch (workStatus) {
+      case SUCCESS:
+        this.success++;
+        break;
+      case PARTIAL:
+        this.partial++;
+        break;
+      case FAIL:
+        this.fail++;
+        break;
+      default:
+        break;
     }
+    this.emitted += 1;
+  }
 
-    @Override
-    public String toString() {
-        return "DatumStatusCounter{" +
-                "attempted=" + attempted +
-                ", success=" + success +
-                ", fail=" + fail +
-                ", partial=" + partial +
-                ", emitted=" + emitted +
-                '}';
+  /**
+   * increment specific DatumStatus by count.
+   * @param workStatus DatumStatus
+   * @param counter counter
+   */
+  @Deprecated
+  public synchronized void incrementStatus(DatumStatus workStatus, int 
counter) {
+    // add this to the record counter
+    switch (workStatus) {
+      case SUCCESS:
+        this.success += counter;
+        break;
+      case PARTIAL:
+        this.partial += counter;
+        break;
+      case FAIL:
+        this.fail += counter;
+        break;
+      default:
+        break;
     }
+    this.emitted += counter;
+  }
+
+  @Override
+  public String toString() {
+    return "DatumStatusCounter{"
+        + "attempted=" + attempted
+        + ", success=" + success
+        + ", fail=" + fail
+        + ", partial=" + partial
+        + ", emitted=" + emitted
+        + '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
index 39bc937..1052647 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.streams.core;
 
 import org.apache.streams.config.StreamsConfiguration;
+
 import org.joda.time.DateTime;
 
 import java.io.Serializable;
@@ -39,82 +40,82 @@ import java.math.BigInteger;
  */
 public interface StreamBuilder extends Serializable {
 
-    public StreamBuilder setStreamsConfiguration(StreamsConfiguration 
configuration);
-
-    public StreamsConfiguration getStreamsConfiguration();
-
-    /**
-     * Add a {@link org.apache.streams.core.StreamsProcessor} to the data 
processing stream.
-     * @param processorId unique id for this processor - must be unique across 
the entire stream
-     * @param processor the processor to execute
-     * @param numTasks the number of instances of this processor to run 
concurrently
-     * @param connectToIds the ids of the {@link 
org.apache.streams.core.StreamsOperation} that this process will
-     *                     receive data from.
-     * @return this
-     */
-    public StreamBuilder addStreamsProcessor(String processorId, 
StreamsProcessor processor, int numTasks, String... connectToIds);
-
-    /**
-     * Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data 
processing stream.
-     * @param persistWriterId unique id for this processor - must be unique 
across the entire stream
-     * @param writer the writer to execute
-     * @param numTasks the number of instances of this writer to run 
concurrently
-     * @param connectToIds the ids of the {@link 
org.apache.streams.core.StreamsOperation} that this process will
-     *                     receive data from.
-     * @return this
-     */
-    public StreamBuilder addStreamsPersistWriter(String persistWriterId, 
StreamsPersistWriter writer, int numTasks, String... connectToIds);
-
-    /**
-     * Add a {@link org.apache.streams.core.StreamsProvider} to the data 
processing stream.  The provider will execute
-     * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to 
produce data.
-     * @param streamId unique if for this provider - must be unique across the 
entire stream.
-     * @param provider provider to execute
-     * @return this
-     */
-    public StreamBuilder newPerpetualStream(String streamId, StreamsProvider 
provider);
-
-    /**
-     * Add a {@link org.apache.streams.core.StreamsProvider} to the data 
processing stream.  The provider will execute
-     * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to 
produce data.
-     * @param streamId unique if for this provider - must be unique across the 
entire stream.
-     * @param provider provider to execute
-     * @return this
-     */
-    public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider 
provider);
-
-    /**
-     * Add a {@link org.apache.streams.core.StreamsProvider} to the data 
processing stream.  The provider will execute
-     * {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} to 
produce data.
-     * @param streamId unique if for this provider - must be unique across the 
entire stream.
-     * @param provider provider to execute
-     * @param sequence sequence to pass to {@link 
org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method
-     * @return this
-     */
-    public StreamBuilder newReadNewStream(String streamId, StreamsProvider 
provider, BigInteger sequence);
-
-    /**
-     * Add a {@link org.apache.streams.core.StreamsProvider} to the data 
processing stream.  The provider will execute
-     * {@link org.apache.streams.core.StreamsProvider:readRange(DateTime, 
DateTime)} to produce data. Whether the start
-     * and end dates are inclusive or exclusive is up to the implementation.
-     * @param streamId unique if for this provider - must be unique across the 
entire stream.
-     * @param provider provider to execute
-     * @param start start date
-     * @param end end date
-     * @return this
-     */
-    public StreamBuilder newReadRangeStream(String streamId, StreamsProvider 
provider, DateTime start, DateTime end);
-
-    /**
-     * Builds the stream, and starts it or submits it based on implementation.
-     */
-    public void start();
-
-    /**
-     * Stops the streams processing.  No guarantee on a smooth shutdown. 
Optional method, may not be implemented in
-     * all cases.
-     */
-    public void stop();
+  public StreamBuilder setStreamsConfiguration(StreamsConfiguration 
configuration);
+
+  public StreamsConfiguration getStreamsConfiguration();
+
+  /**
+   * Add a {@link org.apache.streams.core.StreamsProcessor} to the data 
processing stream.
+   * @param processorId unique id for this processor - must be unique across 
the entire stream
+   * @param processor the processor to execute
+   * @param numTasks the number of instances of this processor to run 
concurrently
+   * @param connectToIds the ids of the {@link 
org.apache.streams.core.StreamsOperation} that this process will
+   *                     receive data from.
+   * @return this
+   */
+  public StreamBuilder addStreamsProcessor(String processorId, 
StreamsProcessor processor, int numTasks, String... connectToIds);
+
+  /**
+   * Add a {@link org.apache.streams.core.StreamsPersistWriter} to the data 
processing stream.
+   * @param persistWriterId unique id for this processor - must be unique 
across the entire stream
+   * @param writer the writer to execute
+   * @param numTasks the number of instances of this writer to run concurrently
+   * @param connectToIds the ids of the {@link 
org.apache.streams.core.StreamsOperation} that this process will
+   *                     receive data from.
+   * @return this
+   */
+  public StreamBuilder addStreamsPersistWriter(String persistWriterId, 
StreamsPersistWriter writer, int numTasks, String... connectToIds);
+
+  /**
+   * Add a {@link org.apache.streams.core.StreamsProvider} to the data 
processing stream.  The provider will execute
+   * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce 
data.
+   * @param streamId unique if for this provider - must be unique across the 
entire stream.
+   * @param provider provider to execute
+   * @return this
+   */
+  public StreamBuilder newPerpetualStream(String streamId, StreamsProvider 
provider);
+
+  /**
+   * Add a {@link org.apache.streams.core.StreamsProvider} to the data 
processing stream.  The provider will execute
+   * {@link org.apache.streams.core.StreamsProvider:readCurrent()} to produce 
data.
+   * @param streamId unique if for this provider - must be unique across the 
entire stream.
+   * @param provider provider to execute
+   * @return this
+   */
+  public StreamBuilder newReadCurrentStream(String streamId, StreamsProvider 
provider);
+
+  /**
+   * Add a {@link org.apache.streams.core.StreamsProvider} to the data 
processing stream.  The provider will execute
+   * {@link org.apache.streams.core.StreamsProvider:readNext(BigInteger)} to 
produce data.
+   * @param streamId unique if for this provider - must be unique across the 
entire stream.
+   * @param provider provider to execute
+   * @param sequence sequence to pass to {@link 
org.apache.streams.core.StreamsProvider:readNext(BigInteger)} method
+   * @return this
+   */
+  public StreamBuilder newReadNewStream(String streamId, StreamsProvider 
provider, BigInteger sequence);
+
+  /**
+   * Add a {@link org.apache.streams.core.StreamsProvider} to the data 
processing stream.  The provider will execute
+   * {@link org.apache.streams.core.StreamsProvider:readRange(DateTime, 
DateTime)} to produce data. Whether the start
+   * and end dates are inclusive or exclusive is up to the implementation.
+   * @param streamId unique if for this provider - must be unique across the 
entire stream.
+   * @param provider provider to execute
+   * @param start start date
+   * @param end end date
+   * @return this
+   */
+  public StreamBuilder newReadRangeStream(String streamId, StreamsProvider 
provider, DateTime start, DateTime end);
+
+  /**
+   * Builds the stream, and starts it or submits it based on implementation.
+   */
+  public void start();
+
+  /**
+   * Stops the streams processing.  No guarantee on a smooth shutdown. 
Optional method, may not be implemented in
+   * all cases.
+   */
+  public void stop();
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java
deleted file mode 100644
index ca2e7ef..0000000
--- a/streams-core/src/main/java/org/apache/streams/core/StreamHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.core;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by sblackmon on 1/6/14.
- */
-public class StreamHandler {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamHandler.class);
-
-    private volatile StreamState state;
-
-    public void setState(StreamState state) {
-        this.state = state;
-    }
-
-    public StreamState getState() {
-        return this.state;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamState.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamState.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamState.java
deleted file mode 100644
index 0c24d29..0000000
--- a/streams-core/src/main/java/org/apache/streams/core/StreamState.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.core;
-
-public enum StreamState {
-    RUNNING,  //Stream is currently connected and running
-    STOPPED,  // Stream has been shut down and is stopped
-    CONNECTING, //Stream is attempting to connect to server
-    SHUTTING_DOWN, //Stream has initialized shutdown
-    DISCONNECTED //Stream has unintentionally lost connection
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
index 8367631..27f3d85 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
@@ -19,6 +19,7 @@
 package org.apache.streams.core;
 
 import org.apache.streams.pojo.json.Activity;
+
 import org.joda.time.DateTime;
 
 import java.io.Serializable;
@@ -28,142 +29,139 @@ import java.util.Map;
 
 public class StreamsDatum implements Serializable {
 
-    public StreamsDatum(Object document) {
-        this(document, null, null, null, new HashMap<>());
-    }
-
-    public StreamsDatum(Object document, String id) {
-        this(document, id, null, null, new HashMap<>());
-    }
-
-    public StreamsDatum(Object document, BigInteger sequenceid) {
-        this(document, null, null, sequenceid);
-    }
-
-    public StreamsDatum(Object document, DateTime timestamp) {
-        this(document, null, timestamp, null);
-    }
-
-    public StreamsDatum(Object document, DateTime timestamp, BigInteger 
sequenceid) {
-        this(document, null, timestamp, sequenceid);
-    }
-
-    public StreamsDatum(Object document, DateTime timestamp, Map<String, 
Object> metadata) {
-        this(document, null, timestamp, null, metadata);
-    }
-
-    public StreamsDatum(Object document, String id, DateTime timestamp) {
-        this(document, id, timestamp, null, new HashMap<>());
-    }
-
-    public StreamsDatum(Object document, String id, Map<String, Object> 
metadata) {
-        this(document, id, null, null, metadata);
-    }
-
-    public StreamsDatum(Object document, String id, BigInteger sequenceid, 
Map<String, Object> metadata) {
-        this(document, id, null, sequenceid, metadata);
-    }
-
-    public StreamsDatum(Object document, String id, BigInteger sequenceid) {
-        this(document, id, sequenceid, new HashMap<>());
-    }
-
-    public StreamsDatum(Object document, String id, DateTime timestamp, 
BigInteger sequenceid) {
-        this.document = document;
-        this.id = id;
-        this.timestamp = timestamp;
-        this.sequenceid = sequenceid;
-        this.metadata = new HashMap<>();
-    }
-
-    public StreamsDatum(Object document, String id, DateTime timestamp, 
BigInteger sequenceid, Map<String, Object> metadata) {
-        this.document = document;
-        this.id = id;
-        this.timestamp = timestamp;
-        this.sequenceid = sequenceid;
-        this.metadata = metadata;
-    }
-
-    public DateTime timestamp;
+  public StreamsDatum(Object document) {
+    this(document, null, null, null, new HashMap<>());
+  }
 
-    public BigInteger sequenceid;
+  public StreamsDatum(Object document, String id) {
+    this(document, id, null, null, new HashMap<>());
+  }
 
-    public Map<String, Object> metadata;
+  public StreamsDatum(Object document, BigInteger sequenceid) {
+    this(document, null, null, sequenceid);
+  }
 
-    public Object document;
+  public StreamsDatum(Object document, DateTime timestamp) {
+    this(document, null, timestamp, null);
+  }
 
-    private String id;
+  public StreamsDatum(Object document, DateTime timestamp, BigInteger 
sequenceid) {
+    this(document, null, timestamp, sequenceid);
+  }
+
+  public StreamsDatum(Object document, DateTime timestamp, Map<String, Object> 
metadata) {
+    this(document, null, timestamp, null, metadata);
+  }
 
-    public DateTime getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(DateTime timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    public BigInteger getSequenceid() {
-        return sequenceid;
-    }
-
-    public void setSequenceid(BigInteger sequenceid) {
-        this.sequenceid = sequenceid;
-    }
-
-    public Map<String, Object> getMetadata() {
-        return metadata;
-    }
-
-    public void setMetadata(Map<String, Object> metadata) {
-        this.metadata = metadata;
-    }
-
-    public Object getDocument() {
-        return document;
-    }
-
-    public void setDocument(Object document) {
-        this.document = document;
-    }
-
-
-    public String getId(){
-        if(this.id == null && this.document instanceof Activity) {
-            return ((Activity)this.document).getId();
-        }
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if(o instanceof StreamsDatum) {
-            StreamsDatum that = (StreamsDatum) o;
-            if(this.document != null && this.document.equals(that.document)) {
-                return (this.timestamp != null ? 
this.timestamp.equals(that.timestamp) : that.timestamp == null) &&
-                        (this.sequenceid != null ? 
this.sequenceid.equals(that.sequenceid) : that.sequenceid == null);
-            }
-            else {
-                return that.document == null && this.document == null;
-            }
-        }
-        else {
-            return false;
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "StreamsDatum{" +
-          "timestamp=" + timestamp +
-          ", sequenceid=" + sequenceid +
-          ", metadata=" + metadata +
-          ", document=" + document +
-          ", id='" + id + '\'' +
-          '}';
-    }
+  public StreamsDatum(Object document, String id, DateTime timestamp) {
+    this(document, id, timestamp, null, new HashMap<>());
+  }
+
+  public StreamsDatum(Object document, String id, Map<String, Object> 
metadata) {
+    this(document, id, null, null, metadata);
+  }
+
+  public StreamsDatum(Object document, String id, BigInteger sequenceid, 
Map<String, Object> metadata) {
+    this(document, id, null, sequenceid, metadata);
+  }
+
+  public StreamsDatum(Object document, String id, BigInteger sequenceid) {
+    this(document, id, sequenceid, new HashMap<>());
+  }
+
+  public StreamsDatum(Object document, String id, DateTime timestamp, 
BigInteger sequenceid) {
+    this.document = document;
+    this.id = id;
+    this.timestamp = timestamp;
+    this.sequenceid = sequenceid;
+    this.metadata = new HashMap<>();
+  }
+
+  public StreamsDatum(Object document, String id, DateTime timestamp, 
BigInteger sequenceid, Map<String, Object> metadata) {
+    this.document = document;
+    this.id = id;
+    this.timestamp = timestamp;
+    this.sequenceid = sequenceid;
+    this.metadata = metadata;
+  }
+
+  public DateTime timestamp;
+
+  public BigInteger sequenceid;
+
+  public Map<String, Object> metadata;
+
+  public Object document;
+
+  private String id;
+
+  public DateTime getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(DateTime timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public BigInteger getSequenceid() {
+    return sequenceid;
+  }
+
+  public void setSequenceid(BigInteger sequenceid) {
+    this.sequenceid = sequenceid;
+  }
+
+  public Map<String, Object> getMetadata() {
+    return metadata;
+  }
+
+  public void setMetadata(Map<String, Object> metadata) {
+    this.metadata = metadata;
+  }
+
+  public Object getDocument() {
+    return document;
+  }
+
+  public void setDocument(Object document) {
+    this.document = document;
+  }
+
+  public String getId() {
+    if (this.id == null && this.document instanceof Activity) {
+      return ((Activity)this.document).getId();
+    }
+    return id;
+  }
+
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  @Override
+  public boolean equals(Object configurationObject) {
+    if (configurationObject instanceof StreamsDatum) {
+      StreamsDatum that = (StreamsDatum) configurationObject;
+      if (this.document != null && this.document.equals(that.document)) {
+        return (this.timestamp != null ? this.timestamp.equals(that.timestamp) 
: that.timestamp == null)
+            && (this.sequenceid != null ? 
this.sequenceid.equals(that.sequenceid) : that.sequenceid == null);
+      } else {
+        return that.document == null && this.document == null;
+      }
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "StreamsDatum{"
+        + "timestamp=" + timestamp
+        + ", sequenceid=" + sequenceid
+        + ", metadata=" + metadata
+        + ", document=" + document
+        + ", id='" + id + '\''
+        + '}';
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java
deleted file mode 100644
index 929b26f..0000000
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsFilter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.core;
-
-import java.util.Queue;
-
-public interface StreamsFilter {
-
-    void start();
-    void stop();
-
-    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue);
-    public Queue<StreamsDatum> getProcessorInputQueue();
-
-    public void setProcessorOutputQueue(Queue<StreamsDatum> outputQueue);
-    public Queue<StreamsDatum> getProcessorOutputQueue();
-
-    public boolean filter(StreamsDatum entry);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
index f5422a5..490f454 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsOperation.java
@@ -21,26 +21,26 @@ package org.apache.streams.core;
 import java.io.Serializable;
 
 /**
- *
+ * Each step in a pipeline is a StreamsOperation.
  */
 public interface StreamsOperation extends Serializable {
 
-    /**
-     * Each operation must publish an identifier.
-     */
-    public String getId();
+  /**
+   * Each operation must publish an identifier.
+   */
+  public String getId();
 
-    /**
-     * This method will be called after initialization/serialization. 
Initialize any non-serializable objects here.
-     * @param configurationObject Any object to help intialize the operation. 
ie. Map, JobContext, Properties, etc. The type
-     *                            will be based on where the operation is 
being run (ie. hadoop, storm, locally, etc.)
-     */
-    public void prepare(Object configurationObject);
+  /**
+   * This method will be called after initialization/serialization. Initialize 
any non-serializable objects here.
+   * @param configurationObject Any object to help intialize the operation. 
ie. Map, JobContext, Properties, etc. The type
+   *                            will be based on where the operation is being 
run (ie. hadoop, storm, locally, etc.)
+   */
+  public void prepare(Object configurationObject);
 
-    /**
-     * No guarantee that this method will ever be called.  But upon shutdown 
of the stream, an attempt to call this method
-     * will be made.
-     * Use this method to terminate connections, etc.
-     */
-    public void cleanUp();
+  /**
+   * No guarantee that this method will ever be called.  But upon shutdown of 
the stream, an attempt to call this method
+   * will be made.
+   * Use this method to terminate connections, etc.
+   */
+  public void cleanUp();
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
index 1a6b0d8..64063ac 100644
--- 
a/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
+++ 
b/streams-core/src/main/java/org/apache/streams/core/StreamsPersistReader.java
@@ -23,16 +23,14 @@ import org.joda.time.DateTime;
 import java.math.BigInteger;
 
 /**
- *
  * Currently a duplicate interface.  Has exact same methods as StreamsProvider.
- * Future work should make this interface necessary I'm told.
- *
  */
 public interface StreamsPersistReader extends StreamsProvider {
 
-    StreamsResultSet readAll();
-    StreamsResultSet readNew(BigInteger sequence);
-    StreamsResultSet readRange(DateTime start, DateTime end);
+  StreamsResultSet readAll();
+
+  StreamsResultSet readNew(BigInteger sequence);
 
+  StreamsResultSet readRange(DateTime start, DateTime end);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
index 1af62e7..59797e4 100644
--- 
a/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
+++ 
b/streams-core/src/main/java/org/apache/streams/core/StreamsPersistWriter.java
@@ -18,12 +18,15 @@
 
 package org.apache.streams.core;
 
-public interface StreamsPersistWriter extends StreamsOperation{
+/**
+ * StreamsOperation for writing data out of a pipeline.
+ */
+public interface StreamsPersistWriter extends StreamsOperation {
 
-    /**
-     * Persist the StreamsDatum to the corresponding data store.
-     * @param entry to be stored.
-     */
-    void write( StreamsDatum entry );
+  /**
+   * Persist the StreamsDatum to the corresponding data store.
+   * @param entry to be stored.
+   */
+  void write( StreamsDatum entry );
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
index b63e2d9..25e1a07 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsProcessor.java
@@ -20,14 +20,14 @@ package org.apache.streams.core;
 
 import java.util.List;
 
-public interface StreamsProcessor extends StreamsOperation{
+public interface StreamsProcessor extends StreamsOperation {
 
-    /**
-     * Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and 
return the the StreamsDatums that will
-     * passed to every down stream operation that reads from this processor.
-     * @param entry StreamsDatum to be process
-     * @return resulting StreamDatums from process. Should never be null or 
contain null object.  Empty list OK.
-     */
-    List<StreamsDatum> process( StreamsDatum entry );
+  /**
+   * Process/Analyze the {@link org.apache.streams.core.StreamsDatum} and 
return the the StreamsDatums that will
+   * passed to every down stream operation that reads from this processor.
+   * @param entry StreamsDatum to be processed
+   * @return resulting StreamDatums from processing. Should never be null or 
contain null object.  Empty list OK.
+   */
+  List<StreamsDatum> process( StreamsDatum entry );
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
index 7c27e34..2547343 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsProvider.java
@@ -18,45 +18,46 @@
 
 package org.apache.streams.core;
 
-import java.math.BigInteger;
 import org.joda.time.DateTime;
 
+import java.math.BigInteger;
+
 /**
  * A StreamsProvider represents the entry point into the Streams pipeline.  
Providers are responsible for inserting
  * data into the pipeline in discrete result sets.
  */
 public interface StreamsProvider extends StreamsOperation {
 
-    /**
-     * Start the operation of the stream
-     */
-    void startStream();
-
-    /**
-     * Read the current items available from the provider
-     * @return a non-null {@link org.apache.streams.core.StreamsResultSet}
-     */
-    StreamsResultSet readCurrent();
-
-    /**
-     * TODO: Define how this operates or eliminate
-     * @param sequence
-     * @return {@link StreamsResultSet}
-     */
-    StreamsResultSet readNew(BigInteger sequence);
-
-    /**
-     * TODO: Define how this operates or eliminate
-     * @param start
-     * @param end
-     * @return {@link StreamsResultSet}
-     */
-    StreamsResultSet readRange(DateTime start, DateTime end);
-
-    /**
-     * Flag to indicate whether the provider is producing data
-     * @return true if the processor is actively awaiting or producing data.  
False otherwise.
-     */
-    boolean isRunning();
+  /**
+   * Start the operation of the stream.
+   */
+  void startStream();
+
+  /**
+   * Read the current items available from the provider.
+   * @return a non-null {@link org.apache.streams.core.StreamsResultSet}
+   */
+  StreamsResultSet readCurrent();
+
+  /**
+   * Read data with sequenceId greater than sequence.
+   * @param sequence BigInteger sequence
+   * @return {@link StreamsResultSet}
+   */
+  StreamsResultSet readNew(BigInteger sequence);
+
+  /**
+   * Read data with event time between start DateTime and end DateTime.
+   * @param start start DateTime
+   * @param end end DateTime
+   * @return {@link StreamsResultSet}
+   */
+  StreamsResultSet readRange(DateTime start, DateTime end);
+
+  /**
+   * Flag to indicate whether the provider is still producing data.
+   * @return true if the processor is actively awaiting or producing data.  
False otherwise.
+   */
+  boolean isRunning();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java 
b/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java
index 1e1cbe2..d64f27e 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsResultSet.java
@@ -21,40 +21,43 @@ package org.apache.streams.core;
 import java.util.Iterator;
 import java.util.Queue;
 
+/**
+ * StreamsResultSet is a wrapper for an Iterator around a set of StreamsDatum.
+ */
 public class StreamsResultSet implements Iterable<StreamsDatum> {
 
-    Queue<StreamsDatum> queue;
+  Queue<StreamsDatum> queue;
 
-    DatumStatusCounter counter;
+  DatumStatusCounter counter;
 
-    public StreamsResultSet(Queue<StreamsDatum> queue) {
-        this.queue = queue;
-    }
+  public StreamsResultSet(Queue<StreamsDatum> queue) {
+    this.queue = queue;
+  }
 
 
-    @Override
-    public Iterator<StreamsDatum> iterator() {
-        return queue.iterator();
-    }
+  @Override
+  public Iterator<StreamsDatum> iterator() {
+    return queue.iterator();
+  }
 
-    public int size() {
-        return queue.size();
-    }
+  public int size() {
+    return queue.size();
+  }
 
-    public Queue<StreamsDatum> getQueue() {
-        return queue;
-    }
+  public Queue<StreamsDatum> getQueue() {
+    return queue;
+  }
 
-    public void setQueue(Queue<StreamsDatum> queue) {
-        this.queue = queue;
-    }
+  public void setQueue(Queue<StreamsDatum> queue) {
+    this.queue = queue;
+  }
 
-    public DatumStatusCounter getCounter() {
-        return counter;
-    }
+  public DatumStatusCounter getCounter() {
+    return counter;
+  }
 
-    public void setCounter(DatumStatusCounter counter) {
-        this.counter = counter;
-    }
+  public void setCounter(DatumStatusCounter counter) {
+    this.counter = counter;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java
----------------------------------------------------------------------
diff --git 
a/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java 
b/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java
index a229413..7d80098 100644
--- a/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java
+++ b/streams-core/src/main/java/org/apache/streams/core/util/DatumUtils.java
@@ -21,39 +21,40 @@ package org.apache.streams.core.util;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsOperation;
+
 import org.joda.time.DateTime;
 
 import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Provides common utilities for managing and manipulating StreamsDatums
+ * Provides common utilities for managing and manipulating StreamsDatums.
  */
 public class DatumUtils {
 
-    /**
-     * Adds an error occurred during a StreamsOperation step to the 
StreamsDatum's metadata.  By convention, errors are
-     * placed in the metadata under the "errors" and are organized by class 
simple name where the failure occurred.
-     *
-     * @param datum the datum on which the operation step errored
-     * @param e the error encountered
-     * @param operationClass the class of the operation
-     */
-    @SuppressWarnings("all")
-    public static void addErrorToMetadata(StreamsDatum datum, Throwable e, 
Class<? extends StreamsOperation> operationClass) {
-        if(!datum.getMetadata().containsKey("errors")) {
-            datum.getMetadata().put("errors", new HashMap<String, 
Throwable>());
-        }
-        Map<String, Throwable> errors = (Map)datum.getMetadata().get("errors");
-        errors.put(operationClass.getCanonicalName(), e);
+  /**
+   * Adds an error occurred during a StreamsOperation step to the 
StreamsDatum's metadata.  By convention, errors are
+   * placed in the metadata under the "errors" and are organized by class 
simple name where the failure occurred.
+   *
+   * @param datum the datum on which the operation step errored
+   * @param throwable the throwable encountered
+   * @param operationClass the class of the operation
+   */
+  @SuppressWarnings("all")
+  public static void addErrorToMetadata(StreamsDatum datum, Throwable 
throwable, Class<? extends StreamsOperation> operationClass) {
+    if (!datum.getMetadata().containsKey("errors")) {
+      datum.getMetadata().put("errors", new HashMap<String, Throwable>());
     }
+    Map<String, Throwable> errors = (Map)datum.getMetadata().get("errors");
+    errors.put(operationClass.getCanonicalName(), throwable);
+  }
 
-    public static StreamsDatum cloneDatum(StreamsDatum datum) {
-        StreamsDatum clone = new StreamsDatum(datum.getDocument());
-        clone.setId(datum.getId() == null ? null : datum.getId());
-        clone.setTimestamp(datum.getTimestamp() == null ? null : new 
DateTime(datum.getTimestamp()));
-        clone.setSequenceid(datum.getSequenceid() == null ? null : 
datum.getSequenceid());
-        clone.setMetadata(datum.getMetadata() == null ? null : new 
HashMap<>(datum.getMetadata()));
-        return clone;
-    }
+  public static StreamsDatum cloneDatum(StreamsDatum datum) {
+    StreamsDatum clone = new StreamsDatum(datum.getDocument());
+    clone.setId(datum.getId() == null ? null : datum.getId());
+    clone.setTimestamp(datum.getTimestamp() == null ? null : new 
DateTime(datum.getTimestamp()));
+    clone.setSequenceid(datum.getSequenceid() == null ? null : 
datum.getSequenceid());
+    clone.setMetadata(datum.getMetadata() == null ? null : new 
HashMap<>(datum.getMetadata()));
+    return clone;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
----------------------------------------------------------------------
diff --git 
a/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
 
b/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
index 8bfa28b..f13a44f 100644
--- 
a/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
+++ 
b/streams-monitoring/src/main/java/org/apache/streams/jackson/DatumStatusCounterDeserializer.java
@@ -15,62 +15,67 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.jackson;
 
+import org.apache.streams.pojo.json.DatumStatusCounterBroadcast;
+
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.DatumStatusCounterBroadcast;
 import org.slf4j.Logger;
 
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
 
 public class DatumStatusCounterDeserializer extends 
JsonDeserializer<DatumStatusCounterBroadcast> {
-    private static final Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(DatumStatusCounterDeserializer.class);
 
-    public DatumStatusCounterDeserializer() {
+  private static final Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(DatumStatusCounterDeserializer.class);
 
-    }
+  public DatumStatusCounterDeserializer() {
 
-    @Override
-    public DatumStatusCounterBroadcast deserialize(JsonParser jsonParser, 
DeserializationContext deserializationContext) throws IOException, 
JsonProcessingException {
-        try {
-            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+  }
 
-            DatumStatusCounterBroadcast datumStatusCounterBroadcast = new 
DatumStatusCounterBroadcast();
-            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+  @Override
+  public DatumStatusCounterBroadcast deserialize(JsonParser jsonParser, 
DeserializationContext deserializationContext) throws IOException, 
JsonProcessingException {
+    try {
+      MBeanServer server = ManagementFactory.getPlatformMBeanServer();
 
-            ObjectName name = new 
ObjectName(attributes.get("canonicalName").asText());
-            MBeanInfo info = server.getMBeanInfo(name);
-            datumStatusCounterBroadcast.setName(name.toString());
+      DatumStatusCounterBroadcast datumStatusCounterBroadcast = new 
DatumStatusCounterBroadcast();
+      JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
 
-            for (MBeanAttributeInfo attribute : 
Arrays.asList(info.getAttributes())) {
-                try {
-                    switch(attribute.getName()) {
-                        case "Failed":
-                            datumStatusCounterBroadcast.setFailed((boolean) 
server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "Passed":
-                            datumStatusCounterBroadcast.setPassed((boolean) 
server.getAttribute(name, attribute.getName()));
-                            break;
-                    }
-                } catch (Exception e) {
-                    LOGGER.error("Exception trying to deserialize 
DatumStatusCounterBroadcast object: {}", e);
-                }
-            }
+      ObjectName name = new 
ObjectName(attributes.get("canonicalName").asText());
+      MBeanInfo info = server.getMBeanInfo(name);
+      datumStatusCounterBroadcast.setName(name.toString());
 
-            return datumStatusCounterBroadcast;
-        } catch (Exception e) {
-            LOGGER.error("Exception trying to deserialize 
DatumStatusCounterBroadcast object: {}", e);
-            return null;
+      for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) 
{
+        try {
+          switch (attribute.getName()) {
+            case "Failed":
+              datumStatusCounterBroadcast.setFailed((boolean) 
server.getAttribute(name, attribute.getName()));
+              break;
+            case "Passed":
+              datumStatusCounterBroadcast.setPassed((boolean) 
server.getAttribute(name, attribute.getName()));
+              break;
+            default:
+              break;
+          }
+        } catch (Exception ex) {
+          LOGGER.error("Exception trying to deserialize 
DatumStatusCounterBroadcast object: {}", ex);
         }
+      }
+
+      return datumStatusCounterBroadcast;
+    } catch (Exception ex) {
+      LOGGER.error("Exception trying to deserialize 
DatumStatusCounterBroadcast object: {}", ex);
+      return null;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
----------------------------------------------------------------------
diff --git 
a/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
 
b/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
index 43c9239..8259340 100644
--- 
a/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
+++ 
b/streams-monitoring/src/main/java/org/apache/streams/jackson/MemoryUsageDeserializer.java
@@ -15,65 +15,70 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.jackson;
 
+import org.apache.streams.pojo.json.MemoryUsageBroadcast;
+
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.MemoryUsageBroadcast;
 import org.slf4j.Logger;
 
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeDataSupport;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
 
 public class MemoryUsageDeserializer extends 
JsonDeserializer<MemoryUsageBroadcast> {
-    private static final Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(MemoryUsageDeserializer.class);
 
-    public MemoryUsageDeserializer() {
+  private static final Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(MemoryUsageDeserializer.class);
 
-    }
+  public MemoryUsageDeserializer() {
 
-    @Override
-    public MemoryUsageBroadcast deserialize(JsonParser jsonParser, 
DeserializationContext deserializationContext) throws IOException, 
JsonProcessingException {
-        try {
-            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+  }
 
-            MemoryUsageBroadcast memoryUsageBroadcast = new 
MemoryUsageBroadcast();
-            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+  @Override
+  public MemoryUsageBroadcast deserialize(JsonParser jsonParser, 
DeserializationContext deserializationContext) throws IOException, 
JsonProcessingException {
+    try {
+      MBeanServer server = ManagementFactory.getPlatformMBeanServer();
 
-            ObjectName name = new 
ObjectName(attributes.get("canonicalName").asText());
-            MBeanInfo info = server.getMBeanInfo(name);
-            memoryUsageBroadcast.setName(name.toString());
+      MemoryUsageBroadcast memoryUsageBroadcast = new MemoryUsageBroadcast();
+      JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
 
-            for(MBeanAttributeInfo attribute : 
Arrays.asList(info.getAttributes())) {
-                switch(attribute.getName()) {
-                    case "Verbose":
-                        memoryUsageBroadcast.setVerbose((boolean) 
server.getAttribute(name, attribute.getName()));
-                        break;
-                    case "ObjectPendingFinalizationCount":
-                        
memoryUsageBroadcast.setObjectPendingFinalizationCount(Long.parseLong(server.getAttribute(name,
 attribute.getName()).toString()));
-                        break;
-                    case "HeapMemoryUsage":
-                        memoryUsageBroadcast.setHeapMemoryUsage((Long) 
((CompositeDataSupport)server.getAttribute(name, 
attribute.getName())).get("used"));
-                        break;
-                    case "NonHeapMemoryUsage":
-                        memoryUsageBroadcast.setNonHeapMemoryUsage((Long) 
((CompositeDataSupport)server.getAttribute(name, 
attribute.getName())).get("used"));
-                        break;
-                }
-            }
+      ObjectName name = new 
ObjectName(attributes.get("canonicalName").asText());
+      MBeanInfo info = server.getMBeanInfo(name);
+      memoryUsageBroadcast.setName(name.toString());
 
-            return memoryUsageBroadcast;
-        } catch (Exception e) {
-            LOGGER.error("Exception trying to deserialize 
MemoryUsageDeserializer object: {}", e);
-            return null;
+      for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) 
{
+        switch (attribute.getName()) {
+          case "Verbose":
+            memoryUsageBroadcast.setVerbose((boolean) 
server.getAttribute(name, attribute.getName()));
+            break;
+          case "ObjectPendingFinalizationCount":
+            
memoryUsageBroadcast.setObjectPendingFinalizationCount(Long.parseLong(server.getAttribute(name,
 attribute.getName()).toString()));
+            break;
+          case "HeapMemoryUsage":
+            memoryUsageBroadcast.setHeapMemoryUsage((Long) 
((CompositeDataSupport)server.getAttribute(name, 
attribute.getName())).get("used"));
+            break;
+          case "NonHeapMemoryUsage":
+            memoryUsageBroadcast.setNonHeapMemoryUsage((Long) 
((CompositeDataSupport)server.getAttribute(name, 
attribute.getName())).get("used"));
+            break;
+          default:
+            break;
         }
+      }
+
+      return memoryUsageBroadcast;
+    } catch (Exception ex) {
+      LOGGER.error("Exception trying to deserialize MemoryUsageDeserializer 
object: {}", ex);
+      return null;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
----------------------------------------------------------------------
diff --git 
a/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
 
b/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
index 8b65bf3..e5f5dcb 100644
--- 
a/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
+++ 
b/streams-monitoring/src/main/java/org/apache/streams/jackson/StreamsTaskCounterDeserializer.java
@@ -15,74 +15,82 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.jackson;
 
+import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast;
+
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
 import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.streams.pojo.json.StreamsTaskCounterBroadcast;
 import org.slf4j.Logger;
 
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanInfo;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
 
+/**
+ * StreamsTaskCounterDeserializer: a JsonDeserializer for 
StreamsTaskCounterBroadcast.
+ */
 public class StreamsTaskCounterDeserializer extends 
JsonDeserializer<StreamsTaskCounterBroadcast> {
-    private static final Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(StreamsTaskCounterDeserializer.class);
 
-    public StreamsTaskCounterDeserializer() {
+  private static final Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(StreamsTaskCounterDeserializer.class);
 
-    }
+  public StreamsTaskCounterDeserializer() {
 
-    @Override
-    public StreamsTaskCounterBroadcast deserialize(JsonParser jsonParser, 
DeserializationContext deserializationContext) throws IOException, 
JsonProcessingException {
-        try {
-            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+  }
 
-            StreamsTaskCounterBroadcast streamsTaskCounterBroadcast = new 
StreamsTaskCounterBroadcast();
-            JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
+  @Override
+  public StreamsTaskCounterBroadcast deserialize(JsonParser jsonParser, 
DeserializationContext deserializationContext) throws IOException, 
JsonProcessingException {
+    try {
+      MBeanServer server = ManagementFactory.getPlatformMBeanServer();
 
-            ObjectName name = new 
ObjectName(attributes.get("canonicalName").asText());
-            MBeanInfo info = server.getMBeanInfo(name);
-            streamsTaskCounterBroadcast.setName(name.toString());
+      StreamsTaskCounterBroadcast streamsTaskCounterBroadcast = new 
StreamsTaskCounterBroadcast();
+      JsonNode attributes = jsonParser.getCodec().readTree(jsonParser);
 
-            for (MBeanAttributeInfo attribute : 
Arrays.asList(info.getAttributes())) {
-                try {
-                    switch (attribute.getName()) {
-                        case "ErrorRate":
-                            streamsTaskCounterBroadcast.setErrorRate((double) 
server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "NumEmitted":
-                            streamsTaskCounterBroadcast.setNumEmitted((long) 
server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "NumReceived":
-                            streamsTaskCounterBroadcast.setNumReceived((long) 
server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "NumUnhandledErrors":
-                            
streamsTaskCounterBroadcast.setNumUnhandledErrors((long) 
server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "AvgTime":
-                            streamsTaskCounterBroadcast.setAvgTime((double) 
server.getAttribute(name, attribute.getName()));
-                            break;
-                        case "MaxTime":
-                            streamsTaskCounterBroadcast.setMaxTime((long) 
server.getAttribute(name, attribute.getName()));
-                            break;
-                    }
-                } catch (Exception e) {
-                    LOGGER.error("Exception while trying to deserialize 
StreamsTaskCounterBroadcast object: {}", e);
-                }
-            }
+      ObjectName name = new 
ObjectName(attributes.get("canonicalName").asText());
+      MBeanInfo info = server.getMBeanInfo(name);
+      streamsTaskCounterBroadcast.setName(name.toString());
 
-            return streamsTaskCounterBroadcast;
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to deserialize 
StreamsTaskCounterBroadcast object: {}", e);
-            return null;
+      for (MBeanAttributeInfo attribute : Arrays.asList(info.getAttributes())) 
{
+        try {
+          switch (attribute.getName()) {
+            case "ErrorRate":
+              streamsTaskCounterBroadcast.setErrorRate((double) 
server.getAttribute(name, attribute.getName()));
+              break;
+            case "NumEmitted":
+              streamsTaskCounterBroadcast.setNumEmitted((long) 
server.getAttribute(name, attribute.getName()));
+              break;
+            case "NumReceived":
+              streamsTaskCounterBroadcast.setNumReceived((long) 
server.getAttribute(name, attribute.getName()));
+              break;
+            case "NumUnhandledErrors":
+              streamsTaskCounterBroadcast.setNumUnhandledErrors((long) 
server.getAttribute(name, attribute.getName()));
+              break;
+            case "AvgTime":
+              streamsTaskCounterBroadcast.setAvgTime((double) 
server.getAttribute(name, attribute.getName()));
+              break;
+            case "MaxTime":
+              streamsTaskCounterBroadcast.setMaxTime((long) 
server.getAttribute(name, attribute.getName()));
+              break;
+            default:
+              break;
+          }
+        } catch (Exception ex) {
+          LOGGER.error("Exception while trying to deserialize 
StreamsTaskCounterBroadcast object: {}", ex);
         }
+      }
+
+      return streamsTaskCounterBroadcast;
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to deserialize 
StreamsTaskCounterBroadcast object: {}", ex);
+      return null;
     }
+  }
 }

Reply via email to