http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
index 64cc0e8..71447cb 100644
--- 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
+++ 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosHeartbeatStream.java
@@ -19,8 +19,9 @@
 
 package org.apache.streams.sysomos.provider;
 
-import com.sysomos.xml.BeatApi;
 import org.apache.streams.core.StreamsDatum;
+
+import com.sysomos.xml.BeatApi;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,208 +31,228 @@ import org.slf4j.LoggerFactory;
  */
 public class SysomosHeartbeatStream implements Runnable {
 
-    private static enum OperatingMode { DATE, DOC_MATCH}
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SysomosHeartbeatStream.class);
-
-    private final SysomosProvider provider;
-    private final SysomosClient client;
-    private final String heartbeatId;
-    private final long maxApiBatch;
-    private final long minLatency;
-    private final OperatingMode mode;
-
-    private String lastID;
-    private DateTime beforeTime;
-    private DateTime afterTime;
-    private DateTime lastRunTime;
-    private int offsetCount = 0;
-    private boolean enabled = true;
-
-    public SysomosHeartbeatStream(SysomosProvider provider, String 
heartbeatId) {
-        this(provider, heartbeatId, null, DateTime.now());
+  private enum OperatingMode { DATE, DOC_MATCH }
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SysomosHeartbeatStream.class);
+
+  private final SysomosProvider provider;
+  private final SysomosClient client;
+  private final String heartbeatId;
+  private final long maxApiBatch;
+  private final long minLatency;
+  private final OperatingMode mode;
+
+  private String lastId;
+  private DateTime beforeTime;
+  private DateTime afterTime;
+  private DateTime lastRunTime;
+  private int offsetCount = 0;
+  private boolean enabled = true;
+
+  public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId) {
+    this(provider, heartbeatId, null, DateTime.now());
+  }
+
+  /**
+   * SysomosHeartbeatStream constructor.
+   * @param provider SysomosProvider
+   * @param heartbeatId heartbeatId
+   * @param beforeTime DateTime
+   * @param afterTime DateTime
+   */
+  public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, 
DateTime beforeTime, DateTime afterTime) {
+    this(provider, heartbeatId, OperatingMode.DATE);
+    this.beforeTime = beforeTime;
+    this.afterTime = afterTime;
+  }
+
+  /**
+   * SysomosHeartbeatStream constructor.
+   * @param provider SysomosProvider
+   * @param heartbeatId heartbeatId
+   * @param documentId last documentId
+   */
+  public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, 
String documentId) {
+    this(provider, heartbeatId, OperatingMode.DOC_MATCH);
+    this.lastId = documentId;
+  }
+
+  /**
+   * SysomosHeartbeatStream constructor.
+   * @param provider SysomosProvider
+   * @param heartbeatId heartbeatId
+   * @param mode OperatingMode
+   */
+  public SysomosHeartbeatStream(SysomosProvider provider, String heartbeatId, 
OperatingMode mode) {
+    this.provider = provider;
+    this.heartbeatId = heartbeatId;
+
+    this.client = provider.getClient();
+    this.maxApiBatch = provider.getMaxApiBatch();
+    this.minLatency = provider.getMinLatency();
+    this.mode = mode;
+  }
+
+  @Override
+  public void run() {
+    try {
+      executeRun();
+    } catch (Exception ex) {
+      LOGGER.error("Error executing heartbeat stream", ex);
+      shutdown();
     }
-
-    public SysomosHeartbeatStream(SysomosProvider provider, String 
heartbeatId, DateTime beforeTime, DateTime afterTime) {
-        this(provider, heartbeatId, OperatingMode.DATE);
-        this.beforeTime = beforeTime;
-        this.afterTime = afterTime;
+  }
+
+  protected void executeRun() {
+    QueryResult result;
+    String mostCurrentId = null;
+    int totalDocCount = 0;
+    lastRunTime = DateTime.now();
+    //Iff we are trying to get to a specific document ID, continue to query 
after minimum delay
+    do {
+      LOGGER.debug("Querying API to match last ID of {} or time range of {} - 
{}", lastId, afterTime, beforeTime);
+      result = queryApi();
+      totalDocCount += result.getResponseSize();
+      //Ensure that we are only assigning lastId to the latest ID, even if 
there is backfill query.
+      //Since offset is calcuated at the end of the run, if we detect the need 
to backfill, it will increment to 1
+      if (offsetCount == 1) {
+        mostCurrentId = result.getCurrentId();
+      }
+      updateOffset(result);
     }
-
-    public SysomosHeartbeatStream(SysomosProvider provider, String 
heartbeatId, String documentId) {
-        this(provider, heartbeatId, OperatingMode.DOC_MATCH);
-        this.lastID = documentId;
+    while (offsetCount > 0);
+
+    updateState(result, mostCurrentId, totalDocCount);
+    LOGGER.debug("Completed current execution with a final docID of {} or time 
of {}", lastId, afterTime);
+  }
+
+  protected void updateState(QueryResult result, String mostCurrentId, int 
totalDocCount) {
+    if (OperatingMode.DOC_MATCH.equals(mode)) {
+      //Set the last ID so that the next time we are executed we will continue 
to query only so long as we haven't
+      //found the specific ID
+      lastId = mostCurrentId == null ? result.getCurrentId() : mostCurrentId;
+    } else {
+      //If we didn't see any docs, there might be a lag on the Sysomos side.  
Retry.
+      afterTime = totalDocCount == 0 ? afterTime : lastRunTime;
     }
 
-    public SysomosHeartbeatStream(SysomosProvider provider, String 
heartbeatId, OperatingMode mode) {
-        this.provider = provider;
-        this.heartbeatId = heartbeatId;
-
-        this.client = provider.getClient();
-        this.maxApiBatch = provider.getMaxApiBatch();
-        this.minLatency = provider.getMinLatency();
-        this.mode = mode;
+    if 
(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
+      shutdown();
+      LOGGER.info("Completed backfill to {} for heartbeat {}", 
OperatingMode.DOC_MATCH.equals(mode) ? lastId : afterTime, heartbeatId);
     }
-
-    @Override
-    public void run() {
-        try {
-            executeRun();
-        } catch (Exception e) {
-            LOGGER.error("Error executing heartbeat stream", e);
-            shutdown();
-        }
+  }
+
+  protected void updateOffset(QueryResult result) {
+    if (OperatingMode.DOC_MATCH.equals(mode)) {
+      //Reset the offset iff we have found a match or this is the first 
execution
+      offsetCount = lastId == null || result.isMatchedLastId() ? 0 : 
offsetCount + 1;
+    } else {
+      offsetCount = result.getResponseSize() == 0 ? 0 : offsetCount + 1;
     }
-
-    protected void executeRun() {
-        QueryResult result;
-        String mostCurrentId = null;
-        int totalDocCount = 0;
-        lastRunTime = DateTime.now();
-        //Iff we are trying to get to a specific document ID, continue to 
query after minimum delay
-        do {
-            LOGGER.debug("Querying API to match last ID of {} or time range of 
{} - {}", lastID, afterTime, beforeTime);
-            result = queryAPI();
-            totalDocCount += result.getResponseSize();
-            //Ensure that we are only assigning lastID to the latest ID, even 
if there is backfill query.
-            //Since offset is calcuated at the end of the run, if we detect 
the need to backfill, it will increment to 1
-            if(offsetCount == 1) {
-                mostCurrentId = result.getCurrentId();
-            }
-            updateOffset(result);
-        } while (offsetCount > 0);
-
-        updateState(result, mostCurrentId, totalDocCount);
-        LOGGER.debug("Completed current execution with a final docID of {} or 
time of {}", lastID, afterTime);
-    }
-
-    protected void updateState(QueryResult result, String mostCurrentId, int 
totalDocCount) {
-        if(OperatingMode.DOC_MATCH.equals(mode)) {
-            //Set the last ID so that the next time we are executed we will 
continue to query only so long as we haven't
-            //found the specific ID
-            lastID = mostCurrentId == null ? result.getCurrentId() : 
mostCurrentId;
-        } else {
-            //If we didn't see any docs, there might be a lag on the Sysomos 
side.  Retry.
-            afterTime = totalDocCount == 0 ? afterTime : lastRunTime;
-        }
-
-        
if(SysomosProvider.Mode.BACKFILL_AND_TERMINATE.equals(provider.getMode())) {
-            shutdown();
-            LOGGER.info("Completed backfill to {} for heartbeat {}", 
OperatingMode.DOC_MATCH.equals(mode) ? lastID : afterTime, heartbeatId);
-        }
+    if (offsetCount > 0) {
+      sleep();
     }
-
-    protected void updateOffset(QueryResult result) {
-        if(OperatingMode.DOC_MATCH.equals(mode)) {
-            //Reset the offset iff we have found a match or this is the first 
execution
-            offsetCount = lastID == null || result.isMatchedLastId() ? 0 : 
offsetCount + 1;
-        } else {
-            offsetCount = result.getResponseSize() == 0 ? 0 : offsetCount + 1;
-        }
-        if(offsetCount > 0) {
-            sleep();
-        }
+  }
+
+  protected void sleep() {
+    try {
+      Thread.sleep(this.minLatency);
+    } catch (InterruptedException ex) {
+      LOGGER.warn("Thread interrupted while sleeping minimum delay", ex);
+      shutdown();
     }
-
-    protected void sleep() {
-        try {
-            Thread.sleep(this.minLatency);
-        } catch (InterruptedException e) {
-            LOGGER.warn("Thread interrupted while sleeping minimum delay", e);
-            shutdown();
+  }
+
+  protected QueryResult queryApi() {
+    BeatApi.BeatResponse response = executeApiRequest();
+
+    String currentId = null;
+    boolean matched = false;
+    int responseSize = 0;
+    if (response != null) {
+      for (BeatApi.BeatResponse.Beat beat : response.getBeat()) {
+        String docId = beat.getDocid();
+        //We get documents in descending time order.  This will set the id to 
the latest document
+        if (currentId == null) {
+          currentId = docId;
+        }
+        //We only want to process documents that we know we have not seen 
before
+        if (lastId != null && lastId.equals(docId)) {
+          matched = true;
+          break;
         }
+        StreamsDatum item = new StreamsDatum(beat, docId);
+        item.getMetadata().put("heartbeat", this.heartbeatId);
+        this.provider.enqueueItem(item);
+      }
+      responseSize = response.getCount();
     }
-
-    protected QueryResult queryAPI() {
-        BeatApi.BeatResponse response = executeAPIRequest();
-
-        String currentId = null;
-        boolean matched = false;
-        int responseSize = 0;
-        if(response != null) {
-            for (BeatApi.BeatResponse.Beat beat : response.getBeat()) {
-                String docId = beat.getDocid();
-                //We get documents in descending time order.  This will set 
the id to the latest document
-                if (currentId == null) {
-                    currentId = docId;
-                }
-                //We only want to process documents that we know we have not 
seen before
-                if (lastID != null && lastID.equals(docId)) {
-                    matched = true;
-                    break;
-                }
-                StreamsDatum item = new StreamsDatum(beat, docId);
-                item.getMetadata().put("heartbeat", this.heartbeatId);
-                this.provider.enqueueItem(item);
-            }
-            responseSize = response.getCount();
+    return new QueryResult(matched, currentId, responseSize);
+  }
+
+  protected BeatApi.BeatResponse executeApiRequest() {
+    BeatApi.BeatResponse response = null;
+    try {
+      if (enabled) {
+        RequestBuilder requestBuilder = this.client.createRequestBuilder()
+            .setHeartBeatId(heartbeatId)
+            .setOffset(offsetCount * maxApiBatch)
+            .setReturnSetSize(maxApiBatch);
+        if (beforeTime != null) {
+          requestBuilder.setAddedBeforeDate(beforeTime);
         }
-        return new QueryResult(matched, currentId, responseSize);
-    }
-
-    protected BeatApi.BeatResponse executeAPIRequest() {
-        BeatApi.BeatResponse response = null;
-        try {
-            if(enabled) {
-                RequestBuilder requestBuilder = 
this.client.createRequestBuilder()
-                        .setHeartBeatId(heartbeatId)
-                        .setOffset(offsetCount * maxApiBatch)
-                        .setReturnSetSize(maxApiBatch);
-                if(beforeTime != null) {
-                    requestBuilder.setAddedBeforeDate(beforeTime);
-                }
-                if(afterTime != null) {
-                    requestBuilder.setAddedAfterDate(afterTime);
-                }
-                response = requestBuilder.execute();
-
-                LOGGER.debug("Received {} results from API query", 
response.getCount());
-            }
-        } catch (Exception e) {
-            LOGGER.warn("Error querying Sysomos API", e);
+        if (afterTime != null) {
+          requestBuilder.setAddedAfterDate(afterTime);
         }
-        return response;
-    }
+        response = requestBuilder.execute();
 
-    protected void shutdown() {
-        provider.signalComplete(heartbeatId);
-        enabled = false;
+        LOGGER.debug("Received {} results from API query", 
response.getCount());
+      }
+    } catch (Exception ex) {
+      LOGGER.warn("Error querying Sysomos API", ex);
     }
+    return response;
+  }
 
-    protected class QueryResult {
-        private boolean matchedLastId;
-        private String currentId;
-        private int responseSize;
+  protected void shutdown() {
+    provider.signalComplete(heartbeatId);
+    enabled = false;
+  }
 
+  protected class QueryResult {
+    private boolean matchedLastId;
+    private String currentId;
+    private int responseSize;
 
-        public QueryResult(boolean matchedLastId, String currentId, int 
responseSize) {
-            this.matchedLastId = matchedLastId;
-            this.currentId = currentId;
-            this.responseSize = responseSize;
-        }
 
-        public boolean isMatchedLastId() {
-            return matchedLastId;
-        }
+    public QueryResult(boolean matchedLastId, String currentId, int 
responseSize) {
+      this.matchedLastId = matchedLastId;
+      this.currentId = currentId;
+      this.responseSize = responseSize;
+    }
 
-        public void setMatchedLastId(boolean matchedLastId) {
-            this.matchedLastId = matchedLastId;
-        }
+    public boolean isMatchedLastId() {
+      return matchedLastId;
+    }
 
-        public String getCurrentId() {
-            return currentId;
-        }
+    public void setMatchedLastId(boolean matchedLastId) {
+      this.matchedLastId = matchedLastId;
+    }
 
-        public void setCurrentId(String currentId) {
-            this.currentId = currentId;
-        }
+    public String getCurrentId() {
+      return currentId;
+    }
 
-        public int getResponseSize() {
-            return responseSize;
-        }
+    public void setCurrentId(String currentId) {
+      this.currentId = currentId;
+    }
 
-        public void setResponseSize(int responseSize) {
-            this.responseSize = responseSize;
-        }
+    public int getResponseSize() {
+      return responseSize;
+    }
+
+    public void setResponseSize(int responseSize) {
+      this.responseSize = responseSize;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index 824ede2..ec1f317 100644
--- 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -19,6 +19,15 @@
 
 package org.apache.streams.sysomos.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
@@ -31,14 +40,6 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.data.util.RFC3339Utils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +53,9 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -60,310 +63,336 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 /**
  * Streams Provider for the Sysomos Heartbeat API
  *
+ * <p/>
  * Configuration:
- * The provider takes either a Map<String,Object> containing the mode 
(backfill and terminate OR continuous) and a
- * Map<String,String> of heartbeat IDs to document target ids or a string of 
the format ${heartbeatId}:${documentId},...,${heartbeatId}:${documentId}
+ * The provider takes either a Map[String,Object] containing the mode 
(backfill and terminate OR continuous) and a
+ * Map[String,String] of heartbeat IDs to document target ids or a string of 
the format
+ *   ${heartbeatId}:${documentId},...,${heartbeatId}:${documentId}
  * This configuration will configure the provider to backfill to the specified 
document and either terminate or not
  * depending on the mode flag.  Continuous mode is assumed, and is the ony 
mode supported by the String configuration.
  *
- *  To use from command line:
- *
- *  Supply configuration similar to src/test/resources/rss.conf
- *
- *  Launch using:
- *
- *  mvn exec:java 
-Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider 
-Dexec.args="rss.conf articles.json"
  */
 public class SysomosProvider implements StreamsProvider {
 
-    public static final String STREAMS_ID = "SysomosProvider";
-
-    public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SysomosProvider.class);
-
-    public static final String ENDING_TIME_KEY = "addedBefore";
-    public static final String STARTING_TIME_KEY = "addedAfter";
-    public static final String MODE_KEY = "mode";
-    public static final String STARTING_DOCS_KEY = "startingDocs";
-    public static final int LATENCY = 10000;  //Default minLatency for 
querying the Sysomos API in milliseconds
-    public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum 
size of the queue
-    public static final long API_BATCH_SIZE = 1000L; //Default maximum size of 
an API request
-
-    protected volatile Queue<StreamsDatum> providerQueue;
-
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final Set<String> completedHeartbeats = Sets.newHashSet();
-    private final long maxQueued;
-    private final long minLatency;
-    private final long scheduledLatency;
-    private final long maxApiBatch;
-
-    private SysomosClient client;
-    private SysomosConfiguration config;
-    private ScheduledExecutorService stream;
-    private Map<String, String> documentIds;
-    private Map<String, String> addedBefore;
-    private Map<String, String> addedAfter;
-    private Mode mode = Mode.CONTINUOUS;
-    private boolean started = false;
-    private AtomicInteger count;
-
-    public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
-        this.config = sysomosConfiguration;
-        this.client = new SysomosClient(sysomosConfiguration.getApiKey());
-        this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? 
PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize();
-        this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? 
LATENCY : sysomosConfiguration.getMinDelayMs();
-        this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == 
null ? (LATENCY * 15) : sysomosConfiguration.getScheduledDelayMs();
-        this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? 
API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize();
-        this.count = new AtomicInteger();
-    }
-
-    public SysomosConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(SysomosConfiguration config) {
-        this.config = config;
-    }
-
-    public Mode getMode() {
-        return mode;
-    }
-
-    public long getMinLatency() {
-        return minLatency;
-    }
-
-    public long getMaxApiBatch() {
-        return maxApiBatch;
+  public static final String STREAMS_ID = "SysomosProvider";
+
+  public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SysomosProvider.class);
+
+  public static final String ENDING_TIME_KEY = "addedBefore";
+  public static final String STARTING_TIME_KEY = "addedAfter";
+  public static final String MODE_KEY = "mode";
+  public static final String STARTING_DOCS_KEY = "startingDocs";
+  public static final int LATENCY = 10000;  //Default minLatency for querying 
the Sysomos API in milliseconds
+  public static final long PROVIDER_BATCH_SIZE = 10000L; //Default maximum 
size of the queue
+  public static final long API_BATCH_SIZE = 1000L; //Default maximum size of 
an API request
+
+  protected volatile Queue<StreamsDatum> providerQueue;
+
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Set<String> completedHeartbeats = Sets.newHashSet();
+  private final long maxQueued;
+  private final long minLatency;
+  private final long scheduledLatency;
+  private final long maxApiBatch;
+
+  private SysomosClient client;
+  private SysomosConfiguration config;
+  private ScheduledExecutorService stream;
+  private Map<String, String> documentIds;
+  private Map<String, String> addedBefore;
+  private Map<String, String> addedAfter;
+  private Mode mode = Mode.CONTINUOUS;
+  private boolean started = false;
+  private AtomicInteger count;
+
+  /**
+   * SysomosProvider constructor.
+   * @param sysomosConfiguration SysomosConfiguration
+   */
+  public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
+    this.config = sysomosConfiguration;
+    this.client = new SysomosClient(sysomosConfiguration.getApiKey());
+    this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? 
PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize();
+    this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? LATENCY : 
sysomosConfiguration.getMinDelayMs();
+    this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == null
+        ? (LATENCY * 15) : sysomosConfiguration.getScheduledDelayMs();
+    this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? 
API_BATCH_SIZE : sysomosConfiguration.getApiBatchSize();
+    this.count = new AtomicInteger();
+  }
+
+  public SysomosConfiguration getConfig() {
+    return config;
+  }
+
+  public void setConfig(SysomosConfiguration config) {
+    this.config = config;
+  }
+
+  public Mode getMode() {
+    return mode;
+  }
+
+  public long getMinLatency() {
+    return minLatency;
+  }
+
+  public long getMaxApiBatch() {
+    return maxApiBatch;
+  }
+
+  public SysomosClient getClient() {
+    return client;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void startStream() {
+    LOGGER.trace("Starting Producer");
+    if (!started) {
+      LOGGER.trace("Producer not started.  Initializing");
+      stream = 
Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
+      for (String heartbeatId : getConfig().getHeartbeatIds()) {
+        Runnable task = createStream(heartbeatId);
+        stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, 
TimeUnit.MILLISECONDS);
+        LOGGER.info("Started producer task for heartbeat {}", heartbeatId);
+      }
+      started = true;
     }
-
-    public SysomosClient getClient() {
-        return client;
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    StreamsResultSet current;
+    try {
+      lock.writeLock().lock();
+      LOGGER.debug("Creating new result set for {} items", 
providerQueue.size());
+      count.addAndGet(providerQueue.size());
+      current = new StreamsResultSet(providerQueue);
+      providerQueue = constructQueue();
+    } finally {
+      lock.writeLock().unlock();
     }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
+    return current;
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger bigInteger) {
+    throw new NotImplementedException("readNew not currently implemented");
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+    throw new NotImplementedException("readRange not currently implemented");
+  }
+
+  //If the provider queue still has data, we are still running.  If not, we 
are running if we have not been signaled
+  //by all completed heartbeats so long as the thread pool is alive
+  @Override
+  public boolean isRunning() {
+    return providerQueue.size() > 0
+        || (completedHeartbeats.size() < 
this.getConfig().getHeartbeatIds().size()
+            && !(stream.isTerminated()
+        || stream.isShutdown()));
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    this.providerQueue = constructQueue();
+    if (configurationObject instanceof Map) {
+      extractConfigFromMap((Map) configurationObject);
+    } else if (configurationObject instanceof String) {
+      documentIds = 
Splitter.on(";").trimResults().withKeyValueSeparator("=").split((String)configurationObject);
     }
-
-    @Override
-    public void startStream() {
-        LOGGER.trace("Starting Producer");
-        if (!started) {
-            LOGGER.trace("Producer not started.  Initializing");
-            stream = 
Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
-            for (String heartbeatId : getConfig().getHeartbeatIds()) {
-                Runnable task = createStream(heartbeatId);
-                stream.scheduleWithFixedDelay(task, 0, this.scheduledLatency, 
TimeUnit.MILLISECONDS);
-                LOGGER.info("Started producer task for heartbeat {}", 
heartbeatId);
-            }
-            started = true;
+  }
+
+  @Override
+  public void cleanUp() {
+    stream.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
+        stream.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
+          LOGGER.error("Stream did not terminate");
         }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      stream.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
     }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        StreamsResultSet current;
-        try {
-            lock.writeLock().lock();
-            LOGGER.debug("Creating new result set for {} items", 
providerQueue.size());
-            count.addAndGet(providerQueue.size());
-            current = new StreamsResultSet(providerQueue);
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        return current;
+  }
+
+  /**
+   * signalComplete.
+   * @param heartbeatId heartbeatId
+   */
+  public void signalComplete(String heartbeatId) {
+    try {
+      this.lock.writeLock().lock();
+      this.completedHeartbeats.add(heartbeatId);
+      if (!this.isRunning()) {
+        this.cleanUp();
+      }
+    } finally {
+      this.lock.writeLock().unlock();
     }
 
-    @Override
-    public StreamsResultSet readNew(BigInteger bigInteger) {
-        throw new NotImplementedException("readNew not currently implemented");
+  }
+
+  protected void enqueueItem(StreamsDatum datum) {
+    boolean success;
+    do {
+      try {
+        pauseForSpace(); //Dont lock before this pause. We don't want to block 
the readCurrent method
+        lock.readLock().lock();
+        success = providerQueue.offer(datum);
+        Thread.yield();
+      } finally {
+        lock.readLock().unlock();
+      }
     }
+    while (!success);
+  }
 
-    @Override
-    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
-        throw new NotImplementedException("readRange not currently 
implemented");
-    }
+  protected SysomosHeartbeatStream createStream(String heartbeatId) {
+    String afterTime = addedAfter != null && 
addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null;
+    String beforeTime = addedBefore != null && 
addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null;
 
-    //If the provider queue still has data, we are still running.  If not, we 
are running if we have not been signaled
-    //by all completed heartbeats so long as the thread pool is alive
-    @Override
-    public boolean isRunning() {
-        return providerQueue.size() > 0 || (completedHeartbeats.size() < 
this.getConfig().getHeartbeatIds().size() && !(stream.isTerminated() || 
stream.isShutdown()));
+    if (documentIds != null && documentIds.containsKey(heartbeatId)) {
+      return new SysomosHeartbeatStream(this, heartbeatId, 
documentIds.get(heartbeatId));
     }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.providerQueue = constructQueue();
-        if(configurationObject instanceof Map) {
-            extractConfigFromMap((Map) configurationObject);
-        } else if(configurationObject instanceof String) {
-            documentIds = 
Splitter.on(";").trimResults().withKeyValueSeparator("=").split((String)configurationObject);
-        }
+    if (afterTime != null) {
+      if (beforeTime != null) {
+        return new SysomosHeartbeatStream(this, heartbeatId, 
RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime));
+      } else {
+        return new SysomosHeartbeatStream(this, heartbeatId, null, 
RFC3339Utils.parseToUTC(afterTime));
+      }
     }
-
-    @Override
-    public void cleanUp() {
-        stream.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
-                stream.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!stream.awaitTermination(60, TimeUnit.SECONDS)) {
-                    LOGGER.error("Stream did not terminate");
-                }
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            stream.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
+    return new SysomosHeartbeatStream(this, heartbeatId);
+  }
+
+  /**
+   * Wait for the queue size to be below threshold before allowing execution 
to continue on this thread.
+   */
+  protected void pauseForSpace() {
+    while (this.providerQueue.size() >= maxQueued) {
+      LOGGER.trace("Sleeping the current thread due to a full queue");
+      try {
+        Thread.sleep(100);
+        LOGGER.trace("Resuming thread after wait period");
+      } catch (InterruptedException ex) {
+        LOGGER.warn("Thread was interrupted", ex);
+      }
     }
-
-    public void signalComplete(String heartbeatId) {
-        try {
-            this.lock.writeLock().lock();
-            this.completedHeartbeats.add(heartbeatId);
-            if(!this.isRunning()) {
-                this.cleanUp();
-            }
-        } finally {
-            this.lock.writeLock().unlock();
-        }
-
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void extractConfigFromMap(Map configMap) {
+    if (configMap.containsKey(MODE_KEY)) {
+      Object configMode = configMap.get(MODE_KEY);
+      if (!(configMode instanceof Mode)) {
+        throw new IllegalStateException("Invalid configuration.  Mode must be 
an instance of the Mode enum but was " + configMode);
+      }
+      this.mode = (Mode)configMode;
     }
-
-    protected void enqueueItem(StreamsDatum datum) {
-        boolean success;
-        do {
-            try {
-                pauseForSpace(); //Dont lock before this pause. We don't want 
to block the readCurrent method
-                lock.readLock().lock();
-                success = providerQueue.offer(datum);
-                Thread.yield();
-            }finally {
-                lock.readLock().unlock();
-            }
-        }
-        while (!success);
+    if (configMap.containsKey(STARTING_DOCS_KEY)) {
+      Object configIds = configMap.get(STARTING_DOCS_KEY);
+      if (!(configIds instanceof Map)) {
+        throw new IllegalStateException("Invalid configuration.  StartingDocs 
must be an instance of Map<String,String> but was "
+            + configIds);
+      }
+      this.documentIds = (Map)configIds;
     }
-
-    protected SysomosHeartbeatStream createStream(String heartbeatId) {
-        String afterTime = addedAfter != null && 
addedAfter.containsKey(heartbeatId) ? addedAfter.get(heartbeatId) : null;
-        String beforeTime = addedBefore != null && 
addedBefore.containsKey(heartbeatId) ? addedBefore.get(heartbeatId) : null;
-
-        if(documentIds != null && documentIds.containsKey(heartbeatId)) {
-            return new SysomosHeartbeatStream(this, heartbeatId, 
documentIds.get(heartbeatId));
-        }
-        if(afterTime != null) {
-            if(beforeTime != null) {
-                return new SysomosHeartbeatStream(this, heartbeatId, 
RFC3339Utils.parseToUTC(beforeTime), RFC3339Utils.parseToUTC(afterTime));
-            } else {
-                return new SysomosHeartbeatStream(this, heartbeatId, null, 
RFC3339Utils.parseToUTC(afterTime));
-            }
-        }
-        return new SysomosHeartbeatStream(this, heartbeatId);
+    if (configMap.containsKey(STARTING_TIME_KEY)) {
+      Object configIds = configMap.get(STARTING_TIME_KEY);
+      if (!(configIds instanceof Map)) {
+        throw new IllegalStateException("Invalid configuration.  Added after 
key must be an instance of Map<String,String> but was "
+            + configIds);
+      }
+      this.addedAfter = (Map)configIds;
     }
-
-    /**
-     * Wait for the queue size to be below threshold before allowing execution 
to continue on this thread
-     */
-    protected void pauseForSpace() {
-        while(this.providerQueue.size() >= maxQueued) {
-            LOGGER.trace("Sleeping the current thread due to a full queue");
-            try {
-                Thread.sleep(100);
-                LOGGER.trace("Resuming thread after wait period");
-            } catch (InterruptedException e) {
-                LOGGER.warn("Thread was interrupted", e);
-            }
-        }
+    if (configMap.containsKey(ENDING_TIME_KEY)) {
+      Object configIds = configMap.get(ENDING_TIME_KEY);
+      if (!(configIds instanceof Map)) {
+        throw new IllegalStateException("Invalid configuration.  Added before 
key must be an instance of Map<String,String> but was "
+            + configIds);
+      }
+      this.addedBefore = (Map)configIds;
     }
-
-    @SuppressWarnings("unchecked")
-    protected void extractConfigFromMap(Map configMap) {
-        if(configMap.containsKey(MODE_KEY)) {
-            Object configMode = configMap.get(MODE_KEY);
-            if(!(configMode instanceof Mode)) {
-                throw new IllegalStateException("Invalid configuration.  Mode 
must be an instance of the Mode enum but was " + configMode);
-            }
-            this.mode = (Mode)configMode;
-        }
-        if(configMap.containsKey(STARTING_DOCS_KEY)) {
-            Object configIds = configMap.get(STARTING_DOCS_KEY);
-            if(!(configIds instanceof Map)) {
-                throw new IllegalStateException("Invalid configuration.  
StartingDocs must be an instance of Map<String,String> but was " + configIds);
-            }
-            this.documentIds = (Map)configIds;
-        }
-        if(configMap.containsKey(STARTING_TIME_KEY)) {
-            Object configIds = configMap.get(STARTING_TIME_KEY);
-            if(!(configIds instanceof Map)) {
-                throw new IllegalStateException("Invalid configuration.  Added 
after key must be an instance of Map<String,String> but was " + configIds);
-            }
-            this.addedAfter = (Map)configIds;
-        }
-        if(configMap.containsKey(ENDING_TIME_KEY)) {
-            Object configIds = configMap.get(ENDING_TIME_KEY);
-            if(!(configIds instanceof Map)) {
-                throw new IllegalStateException("Invalid configuration.  Added 
before key must be an instance of Map<String,String> but was " + configIds);
-            }
-            this.addedBefore = (Map)configIds;
+  }
+
+  private Queue<StreamsDatum> constructQueue() {
+    return Queues.newConcurrentLinkedQueue();
+  }
+
+  public int getCount() {
+    return this.count.get();
+  }
+
+  /**
+   * To use from command line:
+   *
+   * <p/>
+   * Supply configuration similar to src/test/resources/rss.conf
+   *
+   * <p/>
+   * Launch using:
+   *
+   * <p/>
+   * mvn exec:java 
-Dexec.mainClass=org.apache.streams.rss.provider.RssStreamProvider 
-Dexec.args="rss.conf articles.json"
+   *
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+    SysomosConfiguration config = new 
ComponentConfigurator<>(SysomosConfiguration.class).detectConfiguration(typesafe,
 "rss");
+    SysomosProvider provider = new SysomosProvider(config);
+
+    ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
+        try {
+          json = mapper.writeValueAsString(datum.getDocument());
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
         }
+      }
     }
-
-    private Queue<StreamsDatum> constructQueue() {
-        return Queues.newConcurrentLinkedQueue();
-    }
-
-    public int getCount() {
-        return this.count.get();
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        Preconditions.checkArgument(args.length >= 2);
-
-        String configfile = args[0];
-        String outfile = args[1];
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
-
-        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
-        SysomosConfiguration config = new 
ComponentConfigurator<>(SysomosConfiguration.class).detectConfiguration(typesafe,
 "rss");
-        SysomosProvider provider = new SysomosProvider(config);
-
-        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
-                String json;
-                try {
-                    json = mapper.writeValueAsString(datum.getDocument());
-                    outStream.println(json);
-                } catch (JsonProcessingException e) {
-                    System.err.println(e.getMessage());
-                }
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
-    }
+    while ( provider.isRunning() );
+    provider.cleanUp();
+    outStream.flush();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
index 3b6a843..82d538d 100644
--- 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
+++ 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
@@ -19,9 +19,10 @@
 
 package org.apache.streams.sysomos.util;
 
+import org.apache.streams.sysomos.SysomosException;
+
 import com.google.common.base.Strings;
 import org.apache.commons.io.IOUtils;
-import org.apache.streams.sysomos.SysomosException;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.slf4j.Logger;
@@ -36,49 +37,53 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /**
- * Provides utilities for working with Sysomos
+ * Provides utilities for working with Sysomos.
  */
 public class SysomosUtils {
 
-    public static final Pattern CODE_PATTERN = Pattern.compile("code: 
([0-9]+)");
-    public static final DateTimeFormatter SYSOMOS_DATE_FORMATTER = 
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZoneUTC();
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SysomosUtils.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SysomosUtils.class);
+
+  public static final Pattern CODE_PATTERN = Pattern.compile("code: ([0-9]+)");
+  public static final DateTimeFormatter SYSOMOS_DATE_FORMATTER = 
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'").withZoneUTC();
 
-    private SysomosUtils() {}
+  private SysomosUtils() {}
 
-    /**
-     * Queries the sysomos URL and provides the response as a String
-     *
-     * @param url the Sysomos URL to query
-     * @return valid XML String
-     */
-    public static String queryUrl(URL url) {
-        try {
-            HttpURLConnection cn = (HttpURLConnection) url.openConnection();
-            cn.setRequestMethod("GET");
-            cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8");
-            cn.setDoInput(true);
-            cn.setDoOutput(false);
-            StringWriter writer = new StringWriter();
-            IOUtils.copy(new InputStreamReader(cn.getInputStream()), writer);
-            writer.flush();
+  /**
+   * Queries the sysomos URL and provides the response as a String.
+   *
+   * @param url the Sysomos URL to query
+   * @return valid XML String
+   */
+  public static String queryUrl(URL url) {
+    try {
+      HttpURLConnection cn = (HttpURLConnection) url.openConnection();
+      cn.setRequestMethod("GET");
+      cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8");
+      cn.setDoInput(true);
+      cn.setDoOutput(false);
+      StringWriter writer = new StringWriter();
+      IOUtils.copy(new InputStreamReader(cn.getInputStream()), writer);
+      writer.flush();
 
-            String xmlResponse = writer.toString();
-            if (Strings.isNullOrEmpty(xmlResponse)) {
-                throw new SysomosException("XML Response from Sysomos was 
empty : " + xmlResponse + "\n" + cn.getResponseMessage(), cn.getResponseCode());
-            }
-            return xmlResponse;
-        } catch (IOException e) {
-            LOGGER.error("Error executing request : {}", e, url.toString());
-            String message = e.getMessage();
-            Matcher match = CODE_PATTERN.matcher(message);
-            if(match.find()) {
-                int errorCode = Integer.parseInt(match.group(1));
-                throw new SysomosException(message, e, errorCode);
-            }
-            else {
-                throw new SysomosException(e.getMessage(), e);
-            }
-        }
+      String xmlResponse = writer.toString();
+      if (Strings.isNullOrEmpty(xmlResponse)) {
+        throw new SysomosException("XML Response from Sysomos was empty : "
+            + xmlResponse
+            + "\n"
+            + cn.getResponseMessage(),
+            cn.getResponseCode());
+      }
+      return xmlResponse;
+    } catch (IOException ex) {
+      LOGGER.error("Error executing request : {}", ex, url.toString());
+      String message = ex.getMessage();
+      Matcher match = CODE_PATTERN.matcher(message);
+      if (match.find()) {
+        int errorCode = Integer.parseInt(match.group(1));
+        throw new SysomosException(message, ex, errorCode);
+      } else {
+        throw new SysomosException(ex.getMessage(), ex);
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java
 
b/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java
index e3b4848..7efffcc 100644
--- 
a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java
+++ 
b/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosJsonSerDeIT.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.sysomos.json.Sysomos;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,34 +35,34 @@ import java.io.InputStreamReader;
  */
 public class SysomosJsonSerDeIT {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SysomosJsonSerDeIT.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SysomosJsonSerDeIT.class);
 
-    private ObjectMapper mapper = new ObjectMapper();
+  private ObjectMapper mapper = new ObjectMapper();
 
-    @Test
-    public void Test()
-    {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
-        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+  @Test
+  public void testSysomosJsonSerDe() {
 
-        InputStream is = 
SysomosJsonSerDeIT.class.getResourceAsStream("/sysomos_jsons.txt");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.FALSE);
+    mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
+    
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
 
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                LOGGER.debug(line);
+    InputStream is = 
SysomosJsonSerDeIT.class.getResourceAsStream("/sysomos_jsons.txt");
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
 
-                Sysomos ser = mapper.readValue(line, Sysomos.class);
+    try {
+      while (br.ready()) {
+        String line = br.readLine();
+        LOGGER.debug(line);
 
-                String des = mapper.writeValueAsString(ser);
-                LOGGER.debug(des);
-            }
-        } catch( Exception e ) {
-            e.printStackTrace();
-            Assert.fail();
-        }
+        Sysomos ser = mapper.readValue(line, Sysomos.class);
+
+        String des = mapper.writeValueAsString(ser);
+        LOGGER.debug(des);
+      }
+    } catch ( Exception ex ) {
+      ex.printStackTrace();
+      Assert.fail();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java
 
b/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java
index b9ee2e1..e078d02 100644
--- 
a/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java
+++ 
b/streams-contrib/streams-provider-sysomos/src/test/java/com/sysomos/test/SysomosXmlSerDeIT.java
@@ -27,7 +27,6 @@ import com.fasterxml.jackson.dataformat.xml.XmlMapper;
 import com.sysomos.xml.BeatApi;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,60 +40,63 @@ import java.io.InputStreamReader;
  */
 public class SysomosXmlSerDeIT {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SysomosXmlSerDeIT.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SysomosXmlSerDeIT.class);
 
-    private XmlMapper xmlMapper;
+  private XmlMapper xmlMapper;
 
-    @Before
-    public void Before() {
+  /**
+   * before.
+   */
+  @Before
+  public void before() {
 
-        XmlFactory f = new XmlFactory(new InputFactoryImpl(),
-                new OutputFactoryImpl());
+    XmlFactory xmlFactory = new XmlFactory(new InputFactoryImpl(),
+        new OutputFactoryImpl());
 
-        JacksonXmlModule module = new JacksonXmlModule();
+    JacksonXmlModule module = new JacksonXmlModule();
 
-        module.setDefaultUseWrapper(false);
+    module.setDefaultUseWrapper(false);
 
-        xmlMapper = new XmlMapper(f, module);
+    xmlMapper = new XmlMapper(xmlFactory, module);
 
-        xmlMapper
-                .configure(
-                        DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
-                        Boolean.TRUE);
-        xmlMapper
-                .configure(
-                        
DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
-                        Boolean.TRUE);
-        xmlMapper
-                .configure(
-                        DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
-                        Boolean.TRUE);
-        xmlMapper.configure(
-                DeserializationFeature.READ_ENUMS_USING_TO_STRING,
-                Boolean.TRUE);
+    xmlMapper
+        .configure(
+            DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
+            Boolean.TRUE);
+    xmlMapper
+        .configure(
+            DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
+            Boolean.TRUE);
+    xmlMapper
+        .configure(
+            DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
+            Boolean.TRUE);
+    xmlMapper.configure(
+        DeserializationFeature.READ_ENUMS_USING_TO_STRING,
+        Boolean.TRUE);
 
-    }
+  }
+
+  @Test
+  public void test() {
+
+    InputStream is = 
SysomosXmlSerDeIT.class.getResourceAsStream("/sysomos_xmls.txt");
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
+
+    try {
+      while (br.ready()) {
+        String line = br.readLine();
+        LOGGER.debug(line);
+
+        BeatApi ser = xmlMapper.readValue(line, BeatApi.class);
 
-    @Test
-    public void Test()
-    {
-        InputStream is = 
SysomosXmlSerDeIT.class.getResourceAsStream("/sysomos_xmls.txt");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                LOGGER.debug(line);
-
-                BeatApi ser = xmlMapper.readValue(line, BeatApi.class);
-
-                String des = xmlMapper.writeValueAsString(ser);
-                LOGGER.debug(des);
-            }
-        } catch( Exception e ) {
-            e.printStackTrace();
-            Assert.fail();
-        }
+        String des = xmlMapper.writeValueAsString(ser);
+        LOGGER.debug(des);
+      }
+    } catch ( Exception ex ) {
+      ex.printStackTrace();
+      Assert.fail();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java
 
b/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java
index b4289ee..a088726 100644
--- 
a/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java
+++ 
b/streams-contrib/streams-provider-sysomos/src/test/java/org/apache/streams/sysomos/test/provider/SysomosProviderIT.java
@@ -18,63 +18,49 @@
 
 package org.apache.streams.sysomos.test.provider;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import com.sysomos.SysomosConfiguration;
-import org.apache.commons.lang.StringUtils;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.sysomos.provider.SysomosProvider;
-import org.junit.Assert;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.FileReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.LineNumberReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.util.List;
 
 /**
- * Integration test for SysomosProviderIT
- *
- * Created by sblackmon on 10/21/16.
+ * Integration test for SysomosProviderIT.
  */
 @Ignore("this is ignored because the project doesn't have credentials to test 
it with during CI")
 public class SysomosProviderIT {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SysomosProviderIT.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SysomosProviderIT.class);
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    @Test
-    public void testRssStreamProvider() throws Exception {
+  @Test
+  public void testRssStreamProvider() throws Exception {
 
-        String configfile = "./target/test-classes/RssStreamProviderIT.conf";
-        String outfile = 
"./target/test-classes/RssStreamProviderIT.stdout.txt";
+    String configfile = "./target/test-classes/RssStreamProviderIT.conf";
+    String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt";
 
-        SysomosProvider.main(Lists.newArrayList(configfile, 
outfile).toArray(new String[2]));
+    SysomosProvider.main(Lists.newArrayList(configfile, outfile).toArray(new 
String[2]));
 
-        File out = new File(outfile);
-        assert (out.exists());
-        assert (out.canRead());
-        assert (out.isFile());
+    File out = new File(outfile);
+    assert (out.exists());
+    assert (out.canRead());
+    assert (out.isFile());
 
-        FileReader outReader = new FileReader(out);
-        LineNumberReader outCounter = new LineNumberReader(outReader);
+    FileReader outReader = new FileReader(out);
+    LineNumberReader outCounter = new LineNumberReader(outReader);
 
-        while(outCounter.readLine() != null) {}
+    while (outCounter.readLine() != null) {}
 
-        assert (outCounter.getLineNumber() >= 1);
+    assert (outCounter.getLineNumber() >= 1);
 
-    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/StreamsTwitterMapper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/StreamsTwitterMapper.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/StreamsTwitterMapper.java
index aca185c..671a830 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/StreamsTwitterMapper.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/StreamsTwitterMapper.java
@@ -18,69 +18,87 @@
 
 package org.apache.streams.twitter.converter;
 
+import org.apache.streams.data.util.RFC3339Utils;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
+
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.streams.data.util.RFC3339Utils;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
 /**
  * This class assist with handling twitter's date-time format during conversion
  *
+ * <p/>
  * Deprecated: use StreamsJacksonMapper.getInstance() with 
TwitterDateTimeFormat on the classpath instead
  */
 @Deprecated
 public class StreamsTwitterMapper extends StreamsJacksonMapper {
 
-    public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterActivityUtil.class);
 
-    public static final DateTimeFormatter TWITTER_FORMATTER = 
DateTimeFormat.forPattern(TWITTER_FORMAT);
+  public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
 
-    public static Long getMillis(String dateTime) {
+  public static final DateTimeFormatter TWITTER_FORMATTER = 
DateTimeFormat.forPattern(TWITTER_FORMAT);
 
-        // this function is for pig which doesn't handle exceptions well
-        try {
-            return TWITTER_FORMATTER.parseMillis(dateTime);
-        } catch( Exception e ) {
-            return null;
-        }
+  /**
+   * Convert to millis with TWITTER_FORMATTER.
+   * @param dateTime dateTime as String
+   * @return millis as Long
+   */
+  public static Long getMillis(String dateTime) {
 
+    // this function is for pig which doesn't handle exceptions well
+    try {
+      return TWITTER_FORMATTER.parseMillis(dateTime);
+    } catch ( Exception ex ) {
+      return null;
     }
 
-    private static final StreamsTwitterMapper INSTANCE = new 
StreamsTwitterMapper();
+  }
 
-    public static StreamsTwitterMapper getInstance(){
-        return INSTANCE;
-    }
+  private static final StreamsTwitterMapper INSTANCE = new 
StreamsTwitterMapper();
+
+  public static StreamsTwitterMapper getInstance() {
+    return INSTANCE;
+  }
 
-    public StreamsTwitterMapper() {
-        super();
-        registerModule(new SimpleModule()
-        {
-            {
-                addDeserializer(DateTime.class, new 
StdDeserializer<DateTime>(DateTime.class) {
-                    @Override
-                    public DateTime deserialize(JsonParser jpar, 
DeserializationContext context) throws IOException, JsonProcessingException {
-                        DateTime result = null;
-                        try {
-                            result = 
TWITTER_FORMATTER.parseDateTime(jpar.getValueAsString());
-                        } catch( Exception ignored ) { }
-                        try {
-                            result = 
RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString());
-                        } catch( Exception ignored ) { }
-                        return result;
-                    }
-                });
+  /**
+   * StreamsTwitterMapper constructor.
+   */
+  public StreamsTwitterMapper() {
+    super();
+    registerModule(new SimpleModule() {
+      {
+        addDeserializer(DateTime.class, new 
StdDeserializer<DateTime>(DateTime.class) {
+          @Override
+          public DateTime deserialize(JsonParser jpar, DeserializationContext 
context) throws IOException, JsonProcessingException {
+            DateTime result = null;
+            try {
+              result = 
TWITTER_FORMATTER.parseDateTime(jpar.getValueAsString());
+            } catch ( Exception ignored ) {
+              LOGGER.trace("ignored", ignored);
             }
+            try {
+              result = 
RFC3339Utils.getInstance().parseToUTC(jpar.getValueAsString());
+            } catch ( Exception ignored ) {
+              LOGGER.trace("ignored", ignored);
+            }
+            return result;
+          }
         });
+      }
+    });
 
-    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateTimeFormat.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateTimeFormat.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateTimeFormat.java
index 5a34868..d8da2c1 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateTimeFormat.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDateTimeFormat.java
@@ -20,12 +20,15 @@ package org.apache.streams.twitter.converter;
 
 import org.apache.streams.jackson.StreamsDateTimeFormat;
 
+/**
+ * TwitterDateTimeFormat.
+ */
 public class TwitterDateTimeFormat implements StreamsDateTimeFormat {
 
-    public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
+  public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
 
-    @Override
-    public String getFormat() {
-        return TWITTER_FORMAT;
-    }
+  @Override
+  public String getFormat() {
+    return TWITTER_FORMAT;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
index 3c71f9a..f555e8d 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterDocumentClassifier.java
@@ -18,10 +18,6 @@
 
 package org.apache.streams.twitter.converter;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.streams.data.DocumentClassifier;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.pojo.Delete;
@@ -32,6 +28,11 @@ import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.twitter.pojo.UserstreamEvent;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -39,47 +40,53 @@ import java.util.List;
 import static 
org.apache.streams.twitter.converter.TwitterDateTimeFormat.TWITTER_FORMAT;
 
 /**
- * Ensures twitter documents can be converted to Activity
+ * Ensures twitter documents can be converted to Activity.
  */
 public class TwitterDocumentClassifier implements DocumentClassifier {
 
-    public List<Class> detectClasses(Object document) {
+  @Override
+  public List<Class> detectClasses(Object document) {
 
-        Preconditions.checkNotNull(document);
+    Preconditions.checkNotNull(document);
 
-        ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(Lists.newArrayList(TWITTER_FORMAT));
+    ObjectMapper mapper = 
StreamsJacksonMapper.getInstance(Lists.newArrayList(TWITTER_FORMAT));
 
-        ObjectNode objectNode;
-        try {
-            if( document instanceof String )
-                objectNode = mapper.readValue((String)document, 
ObjectNode.class);
-            else if( document instanceof ObjectNode )
-                objectNode = (ObjectNode) document;
-            else
-                objectNode = mapper.convertValue(document, ObjectNode.class);
-        } catch (IOException e) {
-            return new ArrayList<>();
-        }
-
-        List<Class> classList = new ArrayList<>();
+    ObjectNode objectNode;
+    try {
+      if ( document instanceof String ) {
+        objectNode = mapper.readValue((String) document, ObjectNode.class);
+      } else if ( document instanceof ObjectNode ) {
+        objectNode = (ObjectNode) document;
+      } else {
+        objectNode = mapper.convertValue(document, ObjectNode.class);
+      }
+    } catch (IOException ex) {
+      return new ArrayList<>();
+    }
 
-        if( objectNode.findValue("retweeted_status") != null && 
objectNode.get("retweeted_status") != null)
-            classList.add(Retweet.class);
-        else if( objectNode.findValue("delete") != null )
-            classList.add(Delete.class);
-        else if( objectNode.findValue("friends") != null ||
-                 objectNode.findValue("friends_str") != null )
-            classList.add(FriendList.class);
-        else if( objectNode.findValue("target_object") != null )
-            classList.add(UserstreamEvent.class);
-        else if( objectNode.findValue("follower") != null && 
objectNode.findValue("followee") != null)
-            classList.add(Follow.class);
-        else if ( objectNode.findValue("location") != null && 
objectNode.findValue("user") == null)
-            classList.add(User.class);
-        else
-            classList.add(Tweet.class);
+    List<Class> classList = new ArrayList<>();
 
-        return classList;
+    if ( objectNode.findValue("retweeted_status") != null
+        && objectNode.get("retweeted_status") != null) {
+      classList.add(Retweet.class);
+    } else if ( objectNode.findValue("delete") != null ) {
+      classList.add(Delete.class);
+    } else if ( objectNode.findValue("friends") != null
+        || objectNode.findValue("friends_str") != null ) {
+      classList.add(FriendList.class);
+    } else if ( objectNode.findValue("target_object") != null ) {
+      classList.add(UserstreamEvent.class);
+    } else if ( objectNode.findValue("follower") != null
+        && objectNode.findValue("followee") != null) {
+      classList.add(Follow.class);
+    } else if ( objectNode.findValue("location") != null
+        && objectNode.findValue("user") == null) {
+      classList.add(User.class);
+    } else {
+      classList.add(Tweet.class);
     }
 
+    return classList;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
index e0ed4a4..f34c14a 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterFollowActivityConverter.java
@@ -18,67 +18,69 @@
 
 package org.apache.streams.twitter.converter;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Provider;
-import org.apache.streams.twitter.pojo.Follow;
 import org.apache.streams.twitter.converter.util.TwitterActivityUtil;
+import org.apache.streams.twitter.pojo.Follow;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
 
 import java.io.Serializable;
 import java.util.List;
 
 public class TwitterFollowActivityConverter implements 
ActivityConverter<Follow>, Serializable {
 
-    public TwitterFollowActivityConverter() {
-    }
-
-    private static TwitterFollowActivityConverter instance = new 
TwitterFollowActivityConverter();
-
-    public static TwitterFollowActivityConverter getInstance() {
-        return instance;
-    }
-
-    public static Class requiredClass = Follow.class;
-
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public Follow fromActivity(Activity deserialized) throws 
ActivityConversionException {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public List<Activity> toActivityList(Follow event) throws 
ActivityConversionException {
-
-        Activity activity = new Activity();
-        activity.setVerb("follow");
-        activity.setActor(TwitterActivityUtil.buildActor(event.getFollower()));
-        
activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee()));
-        activity.setId(activity.getActor().getId() + "-follow->" + 
activity.getObject().getId());
-        activity.setProvider((Provider) new Provider().withId("twitter"));
-        return Lists.newArrayList(activity);
-    }
-
-    @Override
-    public List<Follow> fromActivityList(List<Activity> list) {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public List<Activity> toActivityList(List<Follow> list) {
-        throw new NotImplementedException();
-    }
+  public TwitterFollowActivityConverter() {
+  }
+
+  private static TwitterFollowActivityConverter instance = new 
TwitterFollowActivityConverter();
+
+  public static TwitterFollowActivityConverter getInstance() {
+    return instance;
+  }
+
+  public static Class requiredClass = Follow.class;
+
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
+
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
+
+  @Override
+  public Follow fromActivity(Activity deserialized) throws 
ActivityConversionException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<Follow> fromActivityList(List<Activity> list) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<Activity> toActivityList(Follow event) throws 
ActivityConversionException {
+
+    Activity activity = new Activity();
+    activity.setVerb("follow");
+    activity.setActor(TwitterActivityUtil.buildActor(event.getFollower()));
+    activity.setObject(TwitterActivityUtil.buildActor(event.getFollowee()));
+    activity.setId(activity.getActor().getId() + "-follow->" + 
activity.getObject().getId());
+    activity.setProvider((Provider) new Provider().withId("twitter"));
+    return Lists.newArrayList(activity);
+  }
+
+  @Override
+  public List<Activity> toActivityList(List<Follow> list) {
+    throw new NotImplementedException();
+  }
+
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
index 3e61ef9..ac031b4 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonDeleteActivityConverter.java
@@ -18,8 +18,6 @@
 
 package org.apache.streams.twitter.converter;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
@@ -27,63 +25,63 @@ import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Tweet;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+
 import java.io.Serializable;
 import java.util.List;
 
 import static 
org.apache.streams.twitter.converter.util.TwitterActivityUtil.updateActivity;
 
-
 /**
-* Created with IntelliJ IDEA.
-* User: mdelaet
-* Date: 9/30/13
-* Time: 9:24 AM
-* To change this template use File | Settings | File Templates.
-*/
+ * TwitterJsonDeleteActivityConverter.
+ */
+//TODO: use class explicitly somewhere
 public class TwitterJsonDeleteActivityConverter implements 
ActivityConverter<Delete>, Serializable {
 
-    public static Class requiredClass = Delete.class;
+  public static Class requiredClass = Delete.class;
 
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
 
-    private static TwitterJsonDeleteActivityConverter instance = new 
TwitterJsonDeleteActivityConverter();
+  private static TwitterJsonDeleteActivityConverter instance = new 
TwitterJsonDeleteActivityConverter();
 
-    public static TwitterJsonDeleteActivityConverter getInstance() {
-        return instance;
-    }
+  public static TwitterJsonDeleteActivityConverter getInstance() {
+    return instance;
+  }
 
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
 
-    @Override
-    public Delete fromActivity(Activity deserialized) throws 
ActivityConversionException {
-        throw new NotImplementedException();
-    }
+  @Override
+  public Delete fromActivity(Activity deserialized) throws 
ActivityConversionException {
+    throw new NotImplementedException();
+  }
 
-    @Override
-    public List<Activity> toActivityList(List<Delete> serializedList) {
-        throw new NotImplementedException();
-    }
+  @Override
+  public List<Delete> fromActivityList(List<Activity> list) {
+    throw new NotImplementedException();
+  }
 
-    public List<Activity> toActivityList(Delete delete) throws 
ActivityConversionException {
+  @Override
+  public List<Activity> toActivityList(List<Delete> serializedList) {
+    throw new NotImplementedException();
+  }
 
-        Activity activity = new Activity();
-        updateActivity(delete, activity);
-        return Lists.newArrayList(activity);
-    }
+  @Override
+  public List<Activity> toActivityList(Delete delete) throws 
ActivityConversionException {
 
-    @Override
-    public List<Delete> fromActivityList(List<Activity> list) {
-        throw new NotImplementedException();
-    }
+    Activity activity = new Activity();
+    updateActivity(delete, activity);
+    return Lists.newArrayList(activity);
+  }
 
-    public ActivityObject buildTarget(Tweet tweet) {
-        return null;
-    }
+  public ActivityObject buildTarget(Tweet tweet) {
+    return null;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
index 30a1916..13e2568 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonRetweetActivityConverter.java
@@ -18,13 +18,14 @@
 
 package org.apache.streams.twitter.converter;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Retweet;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+
 import java.io.Serializable;
 import java.util.List;
 
@@ -32,52 +33,54 @@ import static 
org.apache.streams.twitter.converter.util.TwitterActivityUtil.upda
 
 public class TwitterJsonRetweetActivityConverter implements 
ActivityConverter<Retweet>, Serializable {
 
-    public static Class requiredClass = Retweet.class;
-
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
-
-    private static TwitterJsonRetweetActivityConverter instance = new 
TwitterJsonRetweetActivityConverter();
-
-    public static TwitterJsonRetweetActivityConverter getInstance() {
-        return instance;
-    }
-
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
-
-    @Override
-    public Retweet fromActivity(Activity deserialized) throws 
ActivityConversionException {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public List<Activity> toActivityList(Retweet retweet) throws 
ActivityConversionException {
-
-        Activity activity = new Activity();
-        updateActivity(retweet, activity);
-
-        return Lists.newArrayList(activity);
-    }
-
-    @Override
-    public List<Retweet> fromActivityList(List<Activity> list) {
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public List<Activity> toActivityList(List<Retweet> serializedList) {
-        List<Activity> result = Lists.newArrayList();
-        for( Retweet item : serializedList ) {
-            try {
-                List<Activity> activities = toActivityList(item);
-                result.addAll(activities);
-            } catch (ActivityConversionException e) {}
-        }
-        return result;
+  public static Class requiredClass = Retweet.class;
+
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
+
+  private static TwitterJsonRetweetActivityConverter instance = new 
TwitterJsonRetweetActivityConverter();
+
+  public TwitterJsonRetweetActivityConverter getInstance() {
+    return instance;
+  }
+
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
+
+  @Override
+  public Retweet fromActivity(Activity deserialized) throws 
ActivityConversionException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<Retweet> fromActivityList(List<Activity> list) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public List<Activity> toActivityList(Retweet retweet) throws 
ActivityConversionException {
+
+    Activity activity = new Activity();
+    updateActivity(retweet, activity);
+
+    return Lists.newArrayList(activity);
+  }
+
+  @Override
+  public List<Activity> toActivityList(List<Retweet> serializedList) {
+    List<Activity> result = Lists.newArrayList();
+    for ( Retweet item : serializedList ) {
+      try {
+        List<Activity> activities = toActivityList(item);
+        result.addAll(activities);
+      } catch (ActivityConversionException ex) {
+        //
+      }
     }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
index 0997a7f..c3b5b15 100644
--- 
a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
+++ 
b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/TwitterJsonTweetActivityConverter.java
@@ -18,13 +18,14 @@
 
 package org.apache.streams.twitter.converter;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivityConverter;
 import org.apache.streams.exceptions.ActivityConversionException;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.pojo.Tweet;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+
 import java.io.Serializable;
 import java.util.List;
 
@@ -32,53 +33,55 @@ import static 
org.apache.streams.twitter.converter.util.TwitterActivityUtil.upda
 
 public class TwitterJsonTweetActivityConverter implements 
ActivityConverter<Tweet>, Serializable {
 
-    public static Class requiredClass = Tweet.class;
+  public static Class requiredClass = Tweet.class;
 
-    @Override
-    public Class requiredClass() {
-        return requiredClass;
-    }
+  @Override
+  public Class requiredClass() {
+    return requiredClass;
+  }
 
-    private static TwitterJsonTweetActivityConverter instance = new 
TwitterJsonTweetActivityConverter();
+  private static TwitterJsonTweetActivityConverter instance = new 
TwitterJsonTweetActivityConverter();
 
-    public static TwitterJsonTweetActivityConverter getInstance() {
-        return instance;
-    }
+  public static TwitterJsonTweetActivityConverter getInstance() {
+    return instance;
+  }
 
-    @Override
-    public String serializationFormat() {
-        return null;
-    }
+  @Override
+  public String serializationFormat() {
+    return null;
+  }
 
-    @Override
-    public Tweet fromActivity(Activity deserialized) throws 
ActivityConversionException {
-        throw new NotImplementedException();
-    }
+  @Override
+  public Tweet fromActivity(Activity deserialized) throws 
ActivityConversionException {
+    throw new NotImplementedException();
+  }
 
-    @Override
-    public List<Activity> toActivityList(Tweet tweet) throws 
ActivityConversionException {
+  @Override
+  public List<Tweet> fromActivityList(List<Activity> list) {
+    throw new NotImplementedException();
+  }
 
-        Activity activity = new Activity();
+  @Override
+  public List<Activity> toActivityList(Tweet tweet) throws 
ActivityConversionException {
 
-        updateActivity(tweet, activity);
+    Activity activity = new Activity();
 
-        return Lists.newArrayList(activity);
-    }
+    updateActivity(tweet, activity);
 
-    @Override
-    public List<Tweet> fromActivityList(List<Activity> list) {
-        throw new NotImplementedException();
-    }
+    return Lists.newArrayList(activity);
+  }
 
-    @Override
-    public List<Activity> toActivityList(List<Tweet> serializedList) {
-        List<Activity> result = Lists.newArrayList();
-        for( Tweet item : serializedList ) {
-            try {
-                List<Activity> activities = toActivityList(item);
-                result.addAll(activities);
-            } catch (ActivityConversionException e) {}
-        }
-        return result;
+  @Override
+  public List<Activity> toActivityList(List<Tweet> serializedList) {
+    List<Activity> result = Lists.newArrayList();
+    for ( Tweet item : serializedList ) {
+      try {
+        List<Activity> activities = toActivityList(item);
+        result.addAll(activities);
+      } catch (ActivityConversionException ex) {
+        //
+      }
     }
+    return result;
+  }
 }


Reply via email to