http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
 
b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
index 5cc6fe7..e11628f 100644
--- 
a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
+++ 
b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailProvider.java
@@ -18,6 +18,13 @@
 
 package com.google.gmail.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+
 import com.google.common.base.Preconditions;
 import com.google.gmail.GMailConfiguration;
 import com.googlecode.gmail4j.GmailClient;
@@ -26,12 +33,7 @@ import com.googlecode.gmail4j.http.HttpGmailConnection;
 import com.googlecode.gmail4j.javamail.ImapGmailClient;
 import com.googlecode.gmail4j.javamail.ImapGmailConnection;
 import com.googlecode.gmail4j.rss.RssGmailClient;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,141 +52,141 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Created by sblackmon on 12/10/13.
+ * GMailProvider collects messages from GMail.
  */
 public class GMailProvider implements StreamsProvider, Serializable {
 
-    public final static String STREAMS_ID = "GMailProvider";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GMailProvider.class);
+  public final static String STREAMS_ID = "GMailProvider";
 
-    private GMailConfiguration config;
+  private final static Logger LOGGER = 
LoggerFactory.getLogger(GMailProvider.class);
 
-    private Class klass;
+  private GMailConfiguration config;
 
-    public GMailConfiguration getConfig() {
-        return config;
-    }
+  private Class klass;
 
-    public void setConfig(GMailConfiguration config) {
-        this.config = config;
-    }
+  public GMailConfiguration getConfig() {
+    return config;
+  }
 
-    protected BlockingQueue inQueue = new LinkedBlockingQueue<>(10000);
+  public void setConfig(GMailConfiguration config) {
+    this.config = config;
+  }
 
-    protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<>();
-    protected Future task;
+  protected BlockingQueue inQueue = new LinkedBlockingQueue<>(10000);
 
-    public BlockingQueue<Object> getInQueue() {
-        return inQueue;
-    }
+  protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<>();
+  protected Future task;
 
-    protected GmailClient rssClient;
-    protected ImapGmailClient imapClient;
+  public BlockingQueue<Object> getInQueue() {
+    return inQueue;
+  }
 
-    private ExecutorService executor;
+  protected GmailClient rssClient;
+  protected ImapGmailClient imapClient;
 
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int 
nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
-    }
+  private ExecutorService executor;
 
-    public GMailProvider() {
-        this.config = new ComponentConfigurator<>(GMailConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail"));
-    }
+  private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, 
int queueSize) {
+    return new ThreadPoolExecutor(nThreads, nThreads,
+        5000L, TimeUnit.MILLISECONDS,
+        new ArrayBlockingQueue<Runnable>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
+  }
 
-    public GMailProvider(GMailConfiguration config) {
-        this.config = config;
-    }
+  public GMailProvider() {
+    this.config = new ComponentConfigurator<>(GMailConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail"));
+  }
 
-    public GMailProvider(Class klass) {
-        this.config = new ComponentConfigurator<>(GMailConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail"));
-        this.klass = klass;
-    }
+  public GMailProvider(GMailConfiguration config) {
+    this.config = config;
+  }
 
-    public GMailProvider(GMailConfiguration config, Class klass) {
-        this.config = config;
-        this.klass = klass;
-    }
+  public GMailProvider(Class klass) {
+    this.config = new ComponentConfigurator<>(GMailConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("gmail"));
+    this.klass = klass;
+  }
 
-    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
-    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+  public GMailProvider(GMailConfiguration config, Class klass) {
+    this.config = config;
+    this.klass = klass;
+  }
 
-    @Override
-    public String getId() {
-        return "GMailProvider";
-    }
+  protected DatumStatusCounter countersTotal = new DatumStatusCounter();
+  protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
 
-    @Override
-    public void startStream() {
+  @Override
+  public String getId() {
+    return "GMailProvider";
+  }
 
-        task = executor.submit(new GMailImapProviderTask(this));
+  @Override
+  public void startStream() {
 
-    }
+    task = executor.submit(new GMailImapProviderTask(this));
 
-    @Override
-    public StreamsResultSet readCurrent() {
+  }
 
-        StreamsResultSet current;
+  @Override
+  public StreamsResultSet readCurrent() {
 
-        synchronized( GMailProvider.class ) {
-            current = new StreamsResultSet(new 
ConcurrentLinkedQueue<>(providerQueue));
-            current.setCounter(new DatumStatusCounter());
-            current.getCounter().add(countersCurrent);
-            countersTotal.add(countersCurrent);
-            countersCurrent = new DatumStatusCounter();
-            providerQueue.clear();
-        }
+    StreamsResultSet current;
 
-        return current;
+    synchronized( GMailProvider.class ) {
+      current = new StreamsResultSet(new 
ConcurrentLinkedQueue<>(providerQueue));
+      current.setCounter(new DatumStatusCounter());
+      current.getCounter().add(countersCurrent);
+      countersTotal.add(countersCurrent);
+      countersCurrent = new DatumStatusCounter();
+      providerQueue.clear();
     }
 
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
+    return current;
+  }
 
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
 
-    @Override
-    public boolean isRunning() {
-        return !task.isDone() && !task.isCancelled();
-    }
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public boolean isRunning() {
+    return !task.isDone() && !task.isCancelled();
+  }
 
-        Preconditions.checkNotNull(this.klass);
+  @Override
+  public void prepare(Object configurationObject) {
 
-        Preconditions.checkNotNull(config.getUserName());
-        Preconditions.checkNotNull(config.getPassword());
+    Preconditions.checkNotNull(this.klass);
 
-        rssClient = new RssGmailClient();
-        GmailConnection rssConnection = new 
HttpGmailConnection(config.getUserName(), config.getPassword().toCharArray());
-        rssClient.setConnection(rssConnection);
+    Preconditions.checkNotNull(config.getUserName());
+    Preconditions.checkNotNull(config.getPassword());
 
-        imapClient = new ImapGmailClient();
-        GmailConnection imapConnection = new ImapGmailConnection();
-        imapConnection.setLoginCredentials(config.getUserName(), 
config.getPassword().toCharArray());
-        imapClient.setConnection(imapConnection);
+    rssClient = new RssGmailClient();
+    GmailConnection rssConnection = new 
HttpGmailConnection(config.getUserName(), config.getPassword().toCharArray());
+    rssClient.setConnection(rssConnection);
 
-        executor = Executors.newSingleThreadExecutor();
+    imapClient = new ImapGmailClient();
+    GmailConnection imapConnection = new ImapGmailConnection();
+    imapConnection.setLoginCredentials(config.getUserName(), 
config.getPassword().toCharArray());
+    imapClient.setConnection(imapConnection);
 
-        startStream();
-    }
+    executor = Executors.newSingleThreadExecutor();
+
+    startStream();
+  }
 
-    @Override
-    public void cleanUp() {
-        try {
-            executor.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
+  @Override
+  public void cleanUp() {
+    try {
+      executor.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
 
b/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
deleted file mode 100644
index 6fbfd83..0000000
--- 
a/streams-contrib/streams-provider-google/google-gmail/src/main/java/com/google/gmail/provider/GMailRssProviderTask.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gmail.provider;
-
-import com.googlecode.gmail4j.GmailMessage;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.util.ComponentUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class GMailRssProviderTask implements Runnable {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GMailRssProviderTask.class);
-
-    private GMailProvider provider;
-
-    public GMailRssProviderTask(GMailProvider provider) {
-        this.provider = provider;
-    }
-
-    @Override
-    public void run() {
-
-        final List<GmailMessage> messages = 
this.provider.rssClient.getUnreadMessages();
-        for (GmailMessage message : messages) {
-
-            StreamsDatum entry = new StreamsDatum(message);
-
-            ComponentUtils.offerUntilSuccess(entry, 
this.provider.providerQueue);
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
 
b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
index 13fa25a..2da9e82 100644
--- 
a/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
+++ 
b/streams-contrib/streams-provider-google/google-gmail/src/test/java/com/google/gmail/test/GMailMessageSerDeTest.java
@@ -20,6 +20,7 @@ package com.google.gmail.test;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -31,37 +32,37 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 
 /**
- * Tests conversion of gplus inputs to Activity
+ * Tests conversion of gmail inputs to Activity
  */
 @Ignore("ignore until test resources are available.")
 public class GMailMessageSerDeTest {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GMailMessageSerDeTest.class);
+  private final static Logger LOGGER = 
LoggerFactory.getLogger(GMailMessageSerDeTest.class);
 
-    private ObjectMapper mapper = new ObjectMapper();
+  private ObjectMapper mapper = new ObjectMapper();
 
-    @Ignore
-    @Test
-    public void Tests()
-    {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
-        
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
+  @Ignore
+  @Test
+  public void Tests()
+  {
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
Boolean.TRUE);
+    mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, 
Boolean.TRUE);
+    
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, 
Boolean.TRUE);
 
-        InputStream is = 
GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
+    InputStream is = 
GMailMessageSerDeTest.class.getResourceAsStream("/datasift_jsons.txt");
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
 
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                LOGGER.debug(line);
+    try {
+      while (br.ready()) {
+        String line = br.readLine();
+        LOGGER.debug(line);
 
-                // implement
-            }
-        } catch( Exception e ) {
-            e.printStackTrace();
-            Assert.fail();
-        }
+        // implement
+      }
+    } catch( Exception e ) {
+      e.printStackTrace();
+      Assert.fail();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java
index d926541..833fe23 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusCommentProcessor.java
@@ -19,71 +19,78 @@
 
 package com.google.gplus.processor;
 
-import com.google.api.services.plus.model.Comment;
-import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.pojo.json.Activity;
+
+import com.google.api.services.plus.model.Comment;
+import com.google.gplus.serializer.util.GooglePlusActivityUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * GooglePlusCommentProcessor collects comments about a google plus activity.
+ */
 public class GooglePlusCommentProcessor implements StreamsProcessor {
-    private final static String STREAMS_ID = "GooglePlusCommentProcessor";
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusCommentProcessor.class);
-    private GooglePlusActivityUtil googlePlusActivityUtil;
-    private int count;
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        StreamsDatum result = null;
-
-        try {
-            Object item = entry.getDocument();
-            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-
-            //Get G+ activity ID from our own activity ID
-            if (item instanceof Activity) {
-                Activity activity = (Activity) item;
-                String activityId = getGPlusID(activity.getId());
-
-                //Call Google Plus API to get list of comments for this 
activity ID
-                /* TODO: FILL ME OUT WITH THE API CALL **/
-                List<Comment> comments = new ArrayList<>();
-
-                googlePlusActivityUtil.updateActivity(comments, activity);
-                result = new StreamsDatum(activity);
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.error("Exception while converting Comment to Activity: {}", 
e.getMessage());
-        }
-
-        if( result != null )
-            return com.google.common.collect.Lists.newArrayList(result);
-        else
-            return new ArrayList<>();
-    }
+  private static final String STREAMS_ID = "GooglePlusCommentProcessor";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusCommentProcessor.class);
+  private GooglePlusActivityUtil googlePlusActivityUtil;
+  private int count;
 
-    @Override
-    public void prepare(Object configurationObject) {
-        googlePlusActivityUtil = new GooglePlusActivityUtil();
-        count = 0;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    StreamsDatum result = null;
+
+    try {
+      Object item = entry.getDocument();
+      LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
 
-    @Override
-    public void cleanUp() {
+      //Get G+ activity ID from our own activity ID
+      if (item instanceof Activity) {
+        Activity activity = (Activity) item;
+        String activityId = getGPlusID(activity.getId());
 
+        //Call Google Plus API to get list of comments for this activity ID
+        /* TODO: FILL ME OUT WITH THE API CALL **/
+        List<Comment> comments = new ArrayList<>();
+
+        googlePlusActivityUtil.updateActivity(comments, activity);
+        result = new StreamsDatum(activity);
+      }
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      LOGGER.error("Exception while converting Comment to Activity: {}", 
ex.getMessage());
     }
 
-    private String getGPlusID(String activityID) {
-        String[] activityParts = activityID.split(":");
-        return (activityParts.length > 0) ? activityParts[activityParts.length 
- 1] : "";
+    if ( result != null ) {
+      return com.google.common.collect.Lists.newArrayList(result);
+    } else {
+      return new ArrayList<>();
     }
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    googlePlusActivityUtil = new GooglePlusActivityUtil();
+    count = 0;
+  }
+
+  @Override
+  public void cleanUp() {
+
+  }
+
+  private String getGPlusID(String activityId) {
+    String[] activityParts = activityId.split(":");
+    return (activityParts.length > 0) ? activityParts[activityParts.length - 
1] : "";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
index d44a487..fe4d5da 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/processor/GooglePlusTypeConverter.java
@@ -18,7 +18,11 @@
 
 package com.google.gplus.processor;
 
-import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.api.services.plus.model.Person;
 import com.google.common.collect.Lists;
@@ -26,112 +30,112 @@ import 
com.google.gplus.serializer.util.GPlusActivityDeserializer;
 import com.google.gplus.serializer.util.GPlusEventClassifier;
 import com.google.gplus.serializer.util.GPlusPersonDeserializer;
 import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ObjectNode;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Queue;
 
+/**
+ * GooglePlusTypeConverter is a StreamsProcessor that converts gplus 
activities to activitystreams activities.
+ */
 public class GooglePlusTypeConverter implements StreamsProcessor {
-    public final static String STREAMS_ID = "GooglePlusTypeConverter";
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusTypeConverter.class);
-    private StreamsJacksonMapper mapper;
-    private Queue<Person> inQueue;
-    private Queue<StreamsDatum> outQueue;
-    private GooglePlusActivityUtil googlePlusActivityUtil;
-    private int count = 0;
-
-    public GooglePlusTypeConverter() {}
-
-    public Queue<StreamsDatum> getProcessorOutputQueue() {
-        return outQueue;
+  public static final String STREAMS_ID = "GooglePlusTypeConverter";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusTypeConverter.class);
+  private StreamsJacksonMapper mapper;
+  private Queue<Person> inQueue;
+  private Queue<StreamsDatum> outQueue;
+  private GooglePlusActivityUtil googlePlusActivityUtil;
+  private int count = 0;
+
+  public GooglePlusTypeConverter() {}
+
+  public Queue<StreamsDatum> getProcessorOutputQueue() {
+    return outQueue;
+  }
+
+  public void setProcessorInputQueue(Queue<Person> inputQueue) {
+    inQueue = inputQueue;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    StreamsDatum result = null;
+
+    try {
+      Object item = entry.getDocument();
+
+      LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+      Activity activity = null;
+
+      if (item instanceof String) {
+        item = deserializeItem(item);
+      }
+
+      if (item instanceof Person) {
+        activity = new Activity();
+        googlePlusActivityUtil.updateActivity((Person)item, activity);
+      } else if (item instanceof com.google.api.services.plus.model.Activity) {
+        activity = new Activity();
+        
googlePlusActivityUtil.updateActivity((com.google.api.services.plus.model.Activity)item,
 activity);
+      }
+
+      if (activity != null) {
+        result = new StreamsDatum(activity);
+        count++;
+      }
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      LOGGER.error("Exception while converting Person to Activity: {}", 
ex.getMessage());
     }
 
-    public void setProcessorInputQueue(Queue<Person> inputQueue) {
-        inQueue = inputQueue;
+    if ( result != null ) {
+      return Lists.newArrayList(result);
+    } else {
+      return Lists.newArrayList();
     }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        StreamsDatum result = null;
-
-        try {
-            Object item = entry.getDocument();
-
-            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-            Activity activity = null;
-
-            if(item instanceof String) {
-                item = deserializeItem(item);
-            }
-
-            if(item instanceof Person) {
-                activity = new Activity();
-                googlePlusActivityUtil.updateActivity((Person)item, activity);
-            } else if(item instanceof 
com.google.api.services.plus.model.Activity) {
-                activity = new Activity();
-                
googlePlusActivityUtil.updateActivity((com.google.api.services.plus.model.Activity)item,
 activity);
-            }
-
-            if(activity != null) {
-                result = new StreamsDatum(activity);
-                count++;
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.error("Exception while converting Person to Activity: {}", 
e.getMessage());
-        }
-
-        if( result != null )
-            return Lists.newArrayList(result);
-        else
-            return Lists.newArrayList();
+  }
+
+  private Object deserializeItem(Object item) {
+    try {
+      Class klass = GPlusEventClassifier.detectClass((String) item);
+
+      if (klass.equals(Person.class)) {
+        item = mapper.readValue((String) item, Person.class);
+      } else if 
(klass.equals(com.google.api.services.plus.model.Activity.class)) {
+        item = mapper.readValue((String) item, 
com.google.api.services.plus.model.Activity.class);
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while trying to deserializeItem: {}", ex);
     }
 
-    private Object deserializeItem(Object item) {
-        try {
-            Class klass = GPlusEventClassifier.detectClass((String) item);
+    return item;
+  }
 
-            if (klass.equals(Person.class)) {
-                item = mapper.readValue((String) item, Person.class);
-            } else if 
(klass.equals(com.google.api.services.plus.model.Activity.class)) {
-                item = mapper.readValue((String) item, 
com.google.api.services.plus.model.Activity.class);
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to deserializeItem: {}", e);
-        }
+  @Override
+  public void prepare(Object configurationObject) {
+    googlePlusActivityUtil = new GooglePlusActivityUtil();
+    mapper = StreamsJacksonMapper.getInstance();
 
-        return item;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        googlePlusActivityUtil = new GooglePlusActivityUtil();
-        mapper = StreamsJacksonMapper.getInstance();
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
+    mapper.registerModule(simpleModule);
 
-        SimpleModule simpleModule = new SimpleModule();
-        simpleModule.addDeserializer(Person.class, new 
GPlusPersonDeserializer());
-        mapper.registerModule(simpleModule);
+    simpleModule = new SimpleModule();
+    
simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, 
new GPlusActivityDeserializer());
+    mapper.registerModule(simpleModule);
+  }
 
-        simpleModule = new SimpleModule();
-        
simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, 
new GPlusActivityDeserializer());
-        mapper.registerModule(simpleModule);
-    }
-
-    @Override
-    public void cleanUp() {
-        //No-op
-    }
+  @Override
+  public void cleanUp() {
+    //No-op
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
index 734e711..e08c571 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
@@ -18,6 +18,17 @@
 
 package com.google.gplus.provider;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.util.ComponentUtils;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import 
org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+
 import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets;
 import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
 import com.google.api.client.http.HttpTransport;
@@ -31,16 +42,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gson.Gson;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.google.gplus.GPlusConfiguration;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import 
org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,198 +63,202 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Provider that creates a GPlus client and will run task that queue data to 
an outing queue
+ * Provider that creates a GPlus client and will run task that queue data to 
an outing queue.
  */
 public abstract class AbstractGPlusProvider implements StreamsProvider {
 
-    public final static String STREAMS_ID = "AbstractGPlusProvider";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(AbstractGPlusProvider.class);
-    private final static Set<String> SCOPE = new HashSet<String>() {{ 
add("https://www.googleapis.com/auth/plus.login";);}};
-    private final static int MAX_BATCH_SIZE = 1000;
+  public static final String STREAMS_ID = "AbstractGPlusProvider";
 
-    private static final HttpTransport TRANSPORT = new NetHttpTransport();
-    private static final JacksonFactory JSON_FACTORY = new JacksonFactory();
-    private static final Gson GSON = new Gson();
-
-    private GPlusConfiguration config;
-
-    List<ListenableFuture<Object>> futures = new ArrayList<>();
-
-    private ListeningExecutorService executor;
-
-    private BlockingQueue<StreamsDatum> datumQueue;
-    private BlockingQueue<Runnable> runnables;
-    private AtomicBoolean isComplete;
-    private boolean previousPullWasEmpty;
-
-    protected GoogleClientSecrets clientSecrets;
-    protected GoogleCredential credential;
-    protected Plus plus;
-
-    public AbstractGPlusProvider() {
-        this.config = new ComponentConfigurator<>(GPlusConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("gplus"));
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractGPlusProvider.class);
+  private static final Set<String> SCOPE = new HashSet<String>() {
+    {
+      add("https://www.googleapis.com/auth/plus.login";);
     }
+  };
+  private static final int MAX_BATCH_SIZE = 1000;
 
-    public AbstractGPlusProvider(GPlusConfiguration config) {
-        this.config = config;
-    }
+  private static final HttpTransport TRANSPORT = new NetHttpTransport();
+  private static final JacksonFactory JSON_FACTORY = new JacksonFactory();
+  private static final Gson GSON = new Gson();
 
-    @Override
-    public void prepare(Object configurationObject) {
-
-        Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile());
-        Preconditions.checkNotNull(config.getOauth().getAppName());
-        
Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress());
-
-        try {
-            this.plus = createPlusClient();
-        } catch (IOException|GeneralSecurityException e) {
-            LOGGER.error("Failed to created oauth for GPlus : {}", e);
-            throw new RuntimeException(e);
-        }
-        // GPlus rate limits you to 5 calls per second, so there is not a need 
to execute more than one
-        // collector unless you have multiple oauth tokens
-        //TODO make this configurable based on the number of oauth tokens
-        this.executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
-        this.datumQueue = new LinkedBlockingQueue<>(1000);
-        this.isComplete = new AtomicBoolean(false);
-        this.previousPullWasEmpty = false;
-    }
+  private GPlusConfiguration config;
 
-    @Override
-    public void startStream() {
-
-        BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
-        for(UserInfo user : this.config.getGooglePlusUsers()) {
-            if(this.config.getDefaultAfterDate() != null && 
user.getAfterDate() == null) {
-                user.setAfterDate(this.config.getDefaultAfterDate());
-            }
-            if(this.config.getDefaultBeforeDate() != null && 
user.getBeforeDate() == null) {
-                user.setBeforeDate(this.config.getDefaultBeforeDate());
-            }
-            this.executor.submit(getDataCollector(backOffStrategy, 
this.datumQueue, this.plus, user));
-        }
-        this.executor.shutdown();
-    }
+  List<ListenableFuture<Object>> futures = new ArrayList<>();
 
-    protected abstract Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo);
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  private ListeningExecutorService executor;
 
-    @Override
-    public StreamsResultSet readCurrent() {
-        BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
-        int batchCount = 0;
-        while(!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
-            StreamsDatum datum = 
ComponentUtils.pollWhileNotEmpty(this.datumQueue);
-            if(datum != null) {
-                ++batchCount;
-                ComponentUtils.offerUntilSuccess(datum, batch);
-            }
-        }
-        boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() 
&&this.executor.isTerminated();
-        this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty);
-        this.previousPullWasEmpty = pullIsEmpty;
-        return new StreamsResultSet(batch);
-    }
+  private BlockingQueue<StreamsDatum> datumQueue;
+  private BlockingQueue<Runnable> runnables;
+  private AtomicBoolean isComplete;
+  private boolean previousPullWasEmpty;
 
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
+  protected GoogleClientSecrets clientSecrets;
+  protected GoogleCredential credential;
+  protected Plus plus;
 
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
+  public AbstractGPlusProvider() {
+    this.config = new ComponentConfigurator<>(GPlusConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("gplus"));
+  }
 
-    @VisibleForTesting
-    protected Plus createPlusClient() throws IOException, 
GeneralSecurityException {
-        credential = new GoogleCredential.Builder()
-                .setJsonFactory(JSON_FACTORY)
-                .setTransport(TRANSPORT)
-                .setServiceAccountScopes(SCOPE)
-                
.setServiceAccountId(this.config.getOauth().getServiceAccountEmailAddress())
-                .setServiceAccountPrivateKeyFromP12File(new 
File(this.config.getOauth().getPathToP12KeyFile()))
-                .build();
-        return new Plus.Builder(TRANSPORT,JSON_FACTORY, 
credential).setApplicationName(this.config.getOauth().getAppName()).build();
-    }
+  public AbstractGPlusProvider(GPlusConfiguration config) {
+    this.config = config;
+  }
 
-    @Override
-    public void cleanUp() {
-        ComponentUtils.shutdownExecutor(this.executor, 10, 10);
-        this.executor = null;
-    }
+  @Override
+  public void prepare(Object configurationObject) {
 
-    public GPlusConfiguration getConfig() {
-        return config;
-    }
+    Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile());
+    Preconditions.checkNotNull(config.getOauth().getAppName());
+    
Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress());
 
-    public void setConfig(GPlusConfiguration config) {
-        this.config = config;
+    try {
+      this.plus = createPlusClient();
+    } catch (IOException | GeneralSecurityException ex) {
+      LOGGER.error("Failed to created oauth for GPlus : {}", ex);
+      throw new RuntimeException(ex);
     }
-
-    /**
-     * Set and overwrite the default before date that was read from the 
configuration file.
-     * @param defaultBeforeDate
-     */
-    public void setDefaultBeforeDate(DateTime defaultBeforeDate) {
-        this.config.setDefaultBeforeDate(defaultBeforeDate);
+    // GPlus rate limits you to 5 calls per second, so there is not a need to 
execute more than one
+    // collector unless you have multiple oauth tokens
+    //TODO make this configurable based on the number of oauth tokens
+    this.executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+    this.datumQueue = new LinkedBlockingQueue<>(1000);
+    this.isComplete = new AtomicBoolean(false);
+    this.previousPullWasEmpty = false;
+  }
+
+  @Override
+  public void startStream() {
+
+    BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
+    for (UserInfo user : this.config.getGooglePlusUsers()) {
+      if (this.config.getDefaultAfterDate() != null && user.getAfterDate() == 
null) {
+        user.setAfterDate(this.config.getDefaultAfterDate());
+      }
+      if (this.config.getDefaultBeforeDate() != null && user.getBeforeDate() 
== null) {
+        user.setBeforeDate(this.config.getDefaultBeforeDate());
+      }
+      this.executor.submit(getDataCollector(backOffStrategy, this.datumQueue, 
this.plus, user));
     }
-
-    /**
-     * Set and overwrite the default after date that was read from teh 
configuration file.
-     * @param defaultAfterDate
-     */
-    public void setDefaultAfterDate(DateTime defaultAfterDate) {
-        this.config.setDefaultAfterDate(defaultAfterDate);
+    this.executor.shutdown();
+  }
+
+  protected abstract Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo);
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+    BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
+    int batchCount = 0;
+    while (!this.datumQueue.isEmpty() && batchCount < MAX_BATCH_SIZE) {
+      StreamsDatum datum = ComponentUtils.pollWhileNotEmpty(this.datumQueue);
+      if (datum != null) {
+        ++batchCount;
+        ComponentUtils.offerUntilSuccess(datum, batch);
+      }
     }
-
-    /**
-     * Sets and overwrite the user info from the configuaration file.  Uses 
the defaults before and after dates.
-     * @param userIds
-     */
-    public void setUserInfoWithDefaultDates(Set<String> userIds) {
-        List<UserInfo> gPlusUsers = new LinkedList<>();
-        for(String userId : userIds) {
-            UserInfo user = new UserInfo();
-            user.setUserId(userId);
-            user.setAfterDate(this.config.getDefaultAfterDate());
-            user.setBeforeDate(this.config.getDefaultBeforeDate());
-            gPlusUsers.add(user);
-        }
-        this.config.setGooglePlusUsers(gPlusUsers);
+    boolean pullIsEmpty = batch.isEmpty() && this.datumQueue.isEmpty() && 
this.executor.isTerminated();
+    this.isComplete.set(this.previousPullWasEmpty && pullIsEmpty);
+    this.previousPullWasEmpty = pullIsEmpty;
+    return new StreamsResultSet(batch);
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @VisibleForTesting
+  protected Plus createPlusClient() throws IOException, 
GeneralSecurityException {
+    credential = new GoogleCredential.Builder()
+        .setJsonFactory(JSON_FACTORY)
+        .setTransport(TRANSPORT)
+        .setServiceAccountScopes(SCOPE)
+        
.setServiceAccountId(this.config.getOauth().getServiceAccountEmailAddress())
+        .setServiceAccountPrivateKeyFromP12File(new 
File(this.config.getOauth().getPathToP12KeyFile()))
+        .build();
+    return new Plus.Builder(TRANSPORT,JSON_FACTORY, 
credential).setApplicationName(this.config.getOauth().getAppName()).build();
+  }
+
+  @Override
+  public void cleanUp() {
+    ComponentUtils.shutdownExecutor(this.executor, 10, 10);
+    this.executor = null;
+  }
+
+  public GPlusConfiguration getConfig() {
+    return config;
+  }
+
+  public void setConfig(GPlusConfiguration config) {
+    this.config = config;
+  }
+
+  /**
+   * Set and overwrite the default before date that was read from the 
configuration file.
+   * @param defaultBeforeDate defaultBeforeDate
+   */
+  public void setDefaultBeforeDate(DateTime defaultBeforeDate) {
+    this.config.setDefaultBeforeDate(defaultBeforeDate);
+  }
+
+  /**
+   * Set and overwrite the default after date that was read from teh 
configuration file.
+   * @param defaultAfterDate defaultAfterDate
+   */
+  public void setDefaultAfterDate(DateTime defaultAfterDate) {
+    this.config.setDefaultAfterDate(defaultAfterDate);
+  }
+
+  /**
+   * Sets and overwrite the user info from the configuaration file.  Uses the 
defaults before and after dates.
+   * @param userIds userIds
+   */
+  public void setUserInfoWithDefaultDates(Set<String> userIds) {
+    List<UserInfo> gplusUsers = new LinkedList<>();
+    for (String userId : userIds) {
+      UserInfo user = new UserInfo();
+      user.setUserId(userId);
+      user.setAfterDate(this.config.getDefaultAfterDate());
+      user.setBeforeDate(this.config.getDefaultBeforeDate());
+      gplusUsers.add(user);
     }
-
-    /**
-     * Set and overwrite user into from teh configuration file. Only sets 
after dater.
-     * @param usersAndAfterDates
-     */
-    public void setUserInfoWithAfterDate(Map<String, DateTime> 
usersAndAfterDates) {
-        List<UserInfo> gPlusUsers = new LinkedList<>();
-        for(String userId : usersAndAfterDates.keySet()) {
-            UserInfo user = new UserInfo();
-            user.setUserId(userId);
-            user.setAfterDate(usersAndAfterDates.get(userId));
-            gPlusUsers.add(user);
-        }
-        this.config.setGooglePlusUsers(gPlusUsers);
+    this.config.setGooglePlusUsers(gplusUsers);
+  }
+
+  /**
+   * Set and overwrite user into from the configuration file. Only sets after 
date.
+   * @param usersAndAfterDates usersAndAfterDates
+   */
+  public void setUserInfoWithAfterDate(Map<String, DateTime> 
usersAndAfterDates) {
+    List<UserInfo> gplusUsers = new LinkedList<>();
+    for (String userId : usersAndAfterDates.keySet()) {
+      UserInfo user = new UserInfo();
+      user.setUserId(userId);
+      user.setAfterDate(usersAndAfterDates.get(userId));
+      gplusUsers.add(user);
     }
-
-    @Override
-    public boolean isRunning() {
-       if (datumQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
-            LOGGER.info("Completed");
-            isComplete.set(true);
-           LOGGER.info("Exiting");
-       }
-       return !isComplete.get();
+    this.config.setGooglePlusUsers(gplusUsers);
+  }
+
+  @Override
+  public boolean isRunning() {
+    if (datumQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
+      LOGGER.info("Completed");
+      isComplete.set(true);
+      LOGGER.info("Exiting");
     }
+    return !isComplete.get();
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
index 4991e94..20f5002 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusActivitySerializer.java
@@ -18,50 +18,54 @@
 
 package com.google.gplus.provider;
 
-import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.streams.data.ActivitySerializer;
 import org.apache.streams.pojo.json.Activity;
+
+import com.google.gplus.serializer.util.GooglePlusActivityUtil;
+
+import org.apache.commons.lang.NotImplementedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-
+/**
+ * GPlusActivitySerializer converts gplus activities to as1 activities.
+ */
 public class GPlusActivitySerializer implements 
ActivitySerializer<com.google.api.services.plus.model.Activity> {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusActivitySerializer.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusActivitySerializer.class);
 
-    AbstractGPlusProvider provider;
+  AbstractGPlusProvider provider;
 
-    public GPlusActivitySerializer(AbstractGPlusProvider provider) {
+  public GPlusActivitySerializer(AbstractGPlusProvider provider) {
 
-        this.provider = provider;
-    }
+    this.provider = provider;
+  }
 
-    public GPlusActivitySerializer() {
-    }
+  public GPlusActivitySerializer() {
+  }
 
-    @Override
-    public String serializationFormat() {
-        return "gplus.v1";
-    }
+  @Override
+  public String serializationFormat() {
+    return "gplus.v1";
+  }
 
-    @Override
-    public com.google.api.services.plus.model.Activity serialize(Activity 
deserialized) {
-        throw new NotImplementedException("Not currently implemented");
-    }
+  @Override
+  public com.google.api.services.plus.model.Activity serialize(Activity 
deserialized) {
+    throw new NotImplementedException("Not currently implemented");
+  }
 
-    @Override
-    public Activity deserialize(com.google.api.services.plus.model.Activity 
gplusActivity) {
-        Activity activity = new Activity();
+  @Override
+  public Activity deserialize(com.google.api.services.plus.model.Activity 
gplusActivity) {
+    Activity activity = new Activity();
 
-        GooglePlusActivityUtil.updateActivity(gplusActivity, activity);
-        return activity;
-    }
+    GooglePlusActivityUtil.updateActivity(gplusActivity, activity);
+    return activity;
+  }
 
-    @Override
-    public List<Activity> 
deserializeAll(List<com.google.api.services.plus.model.Activity> 
serializedList) {
-        throw new NotImplementedException("Not currently implemented");
-    }
+  @Override
+  public List<Activity> 
deserializeAll(List<com.google.api.services.plus.model.Activity> 
serializedList) {
+    throw new NotImplementedException("Not currently implemented");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
index 5be2f9c..edbc663 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusDataCollector.java
@@ -18,51 +18,52 @@
 
 package com.google.gplus.provider;
 
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import org.apache.streams.util.api.requests.backoff.BackOffException;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- *
+ * GPlusDataCollector collects GPlus Data on behalf of providers.
  */
 public abstract class GPlusDataCollector implements Runnable {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusDataCollector.class);
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusDataCollector.class);
 
-    /**
-     * Looks at the status code of the expception.  If the code indicates that 
the request should be retried,
-     * it executes the back off strategy and returns true.
-     * @param gjre
-     * @param backOff
-     * @return returns true if the error code of the exception indicates the 
request should be retried.
-     */
-    public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, 
BackOffStrategy backOff) throws BackOffException {
-        boolean tryAgain = false;
-        switch (gjre.getStatusCode()) {
-            case 400 :
-                LOGGER.warn("Bad Request  : {}",  gjre);
-                break;
-            case 401 :
-                LOGGER.warn("Invalid Credentials : {}", gjre);
-            case 403 :
-                LOGGER.warn("Possible rate limit exception. Retrying. : {}", 
gjre.getMessage());
-                backOff.backOff();
-                tryAgain = true;
-                break;
-            case 503 :
-                LOGGER.warn("Google Backend Service Error : {}", gjre);
-                break;
-            default:
-                LOGGER.warn("Google Service returned error : {}", gjre);
-                tryAgain = true;
-                backOff.backOff();
-                break;
-        }
-        return tryAgain;
+  /**
+   * Looks at the status code of the exception.  If the code indicates that 
the request should be retried,
+   * it executes the back off strategy and returns true.
+   * @param gjre GoogleJsonResponseException
+   * @param backOff BackOffStrategy
+   * @return returns true if the error code of the exception indicates the 
request should be retried.
+   */
+  public boolean backoffAndIdentifyIfRetry(GoogleJsonResponseException gjre, 
BackOffStrategy backOff) throws BackOffException {
+    boolean tryAgain = false;
+    switch (gjre.getStatusCode()) {
+      case 400 :
+        LOGGER.warn("Bad Request  : {}",  gjre);
+        break;
+      case 401 :
+        LOGGER.warn("Invalid Credentials : {}", gjre);
+        break;
+      case 403 :
+        LOGGER.warn("Possible rate limit exception. Retrying. : {}", 
gjre.getMessage());
+        backOff.backOff();
+        tryAgain = true;
+        break;
+      case 503 :
+        LOGGER.warn("Google Backend Service Error : {}", gjre);
+        break;
+      default:
+        LOGGER.warn("Google Service returned error : {}", gjre);
+        tryAgain = true;
+        backOff.backOff();
+        break;
     }
-
-
+    return tryAgain;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
deleted file mode 100644
index 1f1ee2f..0000000
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusEventProcessor.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Objects;
-import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-
-public class GPlusEventProcessor implements Runnable {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GPlusEventProcessor.class);
-
-    private ObjectMapper mapper = new ObjectMapper();
-
-    private BlockingQueue<String> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    private Class outClass;
-
-    private GPlusActivitySerializer gPlusActivitySerializer = new 
GPlusActivitySerializer();
-
-    private final static String TERMINATE = "TERMINATE";
-
-    public GPlusEventProcessor(BlockingQueue<String> inQueue, 
Queue<StreamsDatum> outQueue, Class inClass, Class outClass) {
-        this.inQueue = inQueue;
-        this.outQueue = outQueue;
-        this.outClass = outClass;
-    }
-
-    public GPlusEventProcessor(BlockingQueue<String> inQueue, 
Queue<StreamsDatum> outQueue, Class outClass) {
-        this.inQueue = inQueue;
-        this.outQueue = outQueue;
-        this.outClass = outClass;
-    }
-
-    @Override
-    public void run() {
-
-        while(true) {
-            try {
-                String item = inQueue.take();
-                Thread.sleep(new Random().nextInt(100));
-                if(Objects.equals(item, TERMINATE)) {
-                    LOGGER.info("Terminating!");
-                    break;
-                }
-
-                // first check for valid json
-                ObjectNode node = (ObjectNode)mapper.readTree(item);
-
-                // if the target is string, just pass-through
-                if( String.class.equals(outClass))
-                    outQueue.offer(new StreamsDatum(item));
-                else {
-                    // convert to desired format
-                    com.google.api.services.plus.model.Activity gplusActivity 
= mapper.readValue(item, com.google.api.services.plus.model.Activity.class);
-
-                    Activity streamsActivity = 
gPlusActivitySerializer.deserialize(gplusActivity);
-
-                    outQueue.offer(new StreamsDatum(streamsActivity));
-                }
-
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
index f475e5d..5585bfc 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityCollector.java
@@ -18,6 +18,11 @@
 
 package com.google.gplus.provider;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
 import com.fasterxml.jackson.core.JsonGenerationException;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -30,10 +35,7 @@ import com.google.api.services.plus.Plus;
 import com.google.api.services.plus.model.Activity;
 import com.google.api.services.plus.model.ActivityFeed;
 import com.google.gplus.serializer.util.GPlusActivityDeserializer;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,90 +48,107 @@ import java.util.concurrent.BlockingQueue;
  */
 public class GPlusUserActivityCollector extends GPlusDataCollector {
 
-    /**
-     * Key for all public activities
-     * https://developers.google.com/+/api/latest/activities/list
-     */
-    private static final String PUBLIC_COLLECTION = "public";
-    /**
-     * Max results allowed per request
-     * https://developers.google.com/+/api/latest/activities/list
-     */
-    private static final long MAX_RESULTS = 100;
-    private static final int MAX_ATTEMPTS = 5;
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusUserActivityCollector.class);
-    private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+  /**
+   * Key for all public activities
+   * https://developers.google.com/+/api/latest/activities/list
+   */
+  private static final String PUBLIC_COLLECTION = "public";
+  /**
+   * Max results allowed per request
+   * https://developers.google.com/+/api/latest/activities/list
+   */
+  private static final long MAX_RESULTS = 100;
+  private static final int MAX_ATTEMPTS = 5;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusUserActivityCollector.class);
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
 
-    static { //set up mapper for Google Activity Object
-        SimpleModule simpleModule = new SimpleModule();
-        simpleModule.addDeserializer(Activity.class, new 
GPlusActivityDeserializer());
-        simpleModule.addSerializer(com.google.api.client.util.DateTime.class, 
new 
StdSerializer<com.google.api.client.util.DateTime>(com.google.api.client.util.DateTime.class)
 {
-            @Override
-            public void serialize(com.google.api.client.util.DateTime 
dateTime, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) 
throws IOException, JsonGenerationException {
-                jsonGenerator.writeString(dateTime.toStringRfc3339());
-            }
+  static { //set up mapper for Google Activity Object
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Activity.class, new 
GPlusActivityDeserializer());
+    simpleModule.addSerializer(
+        com.google.api.client.util.DateTime.class,
+        new 
StdSerializer<com.google.api.client.util.DateTime>(com.google.api.client.util.DateTime.class)
 {
+          @Override
+          public void serialize(
+              com.google.api.client.util.DateTime dateTime,
+              JsonGenerator jsonGenerator,
+              SerializerProvider serializerProvider)
+              throws IOException {
+            jsonGenerator.writeString(dateTime.toStringRfc3339());
+          }
         });
-        MAPPER.registerModule(simpleModule);
-        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-    }
+    MAPPER.registerModule(simpleModule);
+    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
 
-    private BlockingQueue<StreamsDatum> datumQueue;
-    private BackOffStrategy backOff;
-    private Plus gPlus;
-    private UserInfo userInfo;
+  private BlockingQueue<StreamsDatum> datumQueue;
+  private BackOffStrategy backOff;
+  private Plus plus;
+  private UserInfo userInfo;
 
-    public GPlusUserActivityCollector(Plus gPlus, BlockingQueue<StreamsDatum> 
datumQueue, BackOffStrategy backOff, UserInfo userInfo) {
-        this.gPlus = gPlus;
-        this.datumQueue = datumQueue;
-        this.backOff = backOff;
-        this.userInfo = userInfo;
-    }
+  /**
+   * GPlusUserActivityCollector constructor.
+   * @param plus Plus
+   * @param datumQueue BlockingQueue<StreamsDatum>
+   * @param backOff BackOffStrategy
+   * @param userInfo UserInfo
+   */
+  public GPlusUserActivityCollector(Plus plus, BlockingQueue<StreamsDatum> 
datumQueue, BackOffStrategy backOff, UserInfo userInfo) {
+    this.plus = plus;
+    this.datumQueue = datumQueue;
+    this.backOff = backOff;
+    this.userInfo = userInfo;
+  }
 
-    @Override
-    public void run() {
-        collectActivityData();
-    }
+  @Override
+  public void run() {
+    collectActivityData();
+  }
 
-    protected void collectActivityData() {
+  protected void collectActivityData() {
+    try {
+      ActivityFeed feed = null;
+      boolean tryAgain = false;
+      int attempt = 0;
+      DateTime afterDate = userInfo.getAfterDate();
+      DateTime beforeDate = userInfo.getBeforeDate();
+      do {
         try {
-            ActivityFeed feed = null;
-            boolean tryAgain = false;
-            int attempt = 0;
-            DateTime afterDate = userInfo.getAfterDate();
-            DateTime beforeDate = userInfo.getBeforeDate();
-            do {
-                try {
-                    if(feed == null) {
-                        feed = 
this.gPlus.activities().list(this.userInfo.getUserId(), 
PUBLIC_COLLECTION).setMaxResults(MAX_RESULTS).execute();
-                    } else {
-                        feed = 
this.gPlus.activities().list(this.userInfo.getUserId(), 
PUBLIC_COLLECTION).setMaxResults(MAX_RESULTS).setPageToken(feed.getNextPageToken()).execute();
-                    }
-                    this.backOff.reset(); //successful pull reset api.
-                    for(com.google.api.services.plus.model.Activity activity : 
feed.getItems()) {
-                        DateTime published = new 
DateTime(activity.getPublished().getValue());
-                        if(        (afterDate == null && beforeDate == null)
-                                || (beforeDate == null && 
afterDate.isBefore(published))
-                                || (afterDate == null && 
beforeDate.isAfter(published))
-                                || ((afterDate != null && beforeDate != null) 
&& (afterDate.isBefore(published) && beforeDate.isAfter(published)))) {
-                            String json = MAPPER.writeValueAsString(activity);
-                            this.datumQueue.put(new StreamsDatum(json, 
activity.getId()));
-                        } else if(afterDate != null && 
afterDate.isAfter(published)) {
-                            feed.setNextPageToken(null); // do not fetch next 
page
-                            break;
-                        }
-                    }
-                } catch (GoogleJsonResponseException gjre) {
-                    tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff);
-                    ++attempt;
-                }
-            } while((tryAgain || (feed != null && feed.getNextPageToken() != 
null)) && attempt < MAX_ATTEMPTS);
-        } catch (Throwable t) {
-            if(t instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
+          if (feed == null) {
+            feed = this.plus.activities().list(this.userInfo.getUserId(), 
PUBLIC_COLLECTION)
+                .setMaxResults(MAX_RESULTS).execute();
+          } else {
+            feed = this.plus.activities().list(this.userInfo.getUserId(), 
PUBLIC_COLLECTION)
+                .setMaxResults(MAX_RESULTS)
+                .setPageToken(feed.getNextPageToken()).execute();
+          }
+          this.backOff.reset(); //successful pull reset api.
+          for (com.google.api.services.plus.model.Activity activity : 
feed.getItems()) {
+            DateTime published = new 
DateTime(activity.getPublished().getValue());
+            if ((afterDate == null && beforeDate == null)
+                || (beforeDate == null && afterDate.isBefore(published))
+                || (afterDate == null && beforeDate.isAfter(published))
+                || ((afterDate != null && beforeDate != null) && 
(afterDate.isBefore(published) && beforeDate.isAfter(published)))) {
+              String json = MAPPER.writeValueAsString(activity);
+              this.datumQueue.put(new StreamsDatum(json, activity.getId()));
+            } else if (afterDate != null && afterDate.isAfter(published)) {
+              feed.setNextPageToken(null); // do not fetch next page
+              break;
             }
-            t.printStackTrace();
-            LOGGER.warn("Unable to pull Activities for user={} : 
{}",this.userInfo.getUserId(), t);
+          }
+        } catch (GoogleJsonResponseException gjre) {
+          tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff);
+          ++attempt;
         }
+      }
+      while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) 
&& attempt < MAX_ATTEMPTS);
+    } catch (Throwable th) {
+      if (th instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      th.printStackTrace();
+      LOGGER.warn("Unable to pull Activities for user={} : 
{}",this.userInfo.getUserId(), th);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
index e6b2223..97b08fd 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserActivityProvider.java
@@ -18,13 +18,6 @@
 
 package com.google.gplus.provider;
 
-import com.google.api.services.plus.Plus;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.gson.Gson;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
@@ -33,6 +26,14 @@ import org.apache.streams.google.gplus.GPlusConfiguration;
 import org.apache.streams.google.gplus.configuration.UserInfo;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 
+import com.google.api.services.plus.Plus;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -43,76 +44,88 @@ import java.util.concurrent.TimeUnit;
 /**
  *  Retrieve recent activity from a list of accounts.
  *
+ *  <p/>
  *  To use from command line:
  *
+ *  <p/>
  *  Supply (at least) the following required configuration in application.conf:
  *
+ *  <p/>
  *  gplus.oauth.pathToP12KeyFile
  *  gplus.oauth.serviceAccountEmailAddress
  *  gplus.apiKey
  *  gplus.googlePlusUsers
  *
+ *  <p/>
  *  Launch using:
  *
+ *  <p/>
  *  mvn exec:java 
-Dexec.mainClass=com.google.gplus.provider.GPlusUserActivityProvider 
-Dexec.args="application.conf activity.json"
  */
-public class GPlusUserActivityProvider extends AbstractGPlusProvider{
-
-    private final static String STREAMS_ID = "GPlusUserActivityProvider";
-
-    public GPlusUserActivityProvider() {
-        super();
-    }
-
-    public GPlusUserActivityProvider(GPlusConfiguration config) {
-        super(config);
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
-        return new GPlusUserActivityCollector(plus, queue, strategy, userInfo);
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        Preconditions.checkArgument(args.length >= 2);
-
-        String configfile = args[0];
-        String outfile = args[1];
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
-
-        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
-        GPlusConfiguration config = new 
ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, 
"gplus");
-        GPlusUserActivityProvider provider = new 
GPlusUserActivityProvider(config);
-
-        Gson gson = new Gson();
-
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-            for (StreamsDatum datum : provider.readCurrent()) {
-                String json;
-                if (datum.getDocument() instanceof String)
-                    json = (String) datum.getDocument();
-                else
-                    json = gson.toJson(datum.getDocument());
-                outStream.println(json);
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
+public class GPlusUserActivityProvider extends AbstractGPlusProvider {
+
+  private static final String STREAMS_ID = "GPlusUserActivityProvider";
+
+  public GPlusUserActivityProvider() {
+    super();
+  }
+
+  public GPlusUserActivityProvider(GPlusConfiguration config) {
+    super(config);
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
+    return new GPlusUserActivityCollector(plus, queue, strategy, userInfo);
+  }
+
+  /**
+   * Retrieve recent activity from a list of accounts.
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+    GPlusConfiguration config = new 
ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, 
"gplus");
+    GPlusUserActivityProvider provider = new GPlusUserActivityProvider(config);
+
+    Gson gson = new Gson();
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+      for (StreamsDatum datum : provider.readCurrent()) {
+        String json;
+        if (datum.getDocument() instanceof String) {
+          json = (String) datum.getDocument();
+        } else {
+          json = gson.toJson(datum.getDocument());
+        }
+        outStream.println(json);
+      }
     }
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
index 78a1649..3da3468 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataCollector.java
@@ -18,6 +18,11 @@
 
 package com.google.gplus.provider;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
@@ -25,73 +30,77 @@ import 
com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.services.plus.Plus;
 import com.google.api.services.plus.model.Person;
 import com.google.gplus.serializer.util.GPlusPersonDeserializer;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 
 /**
- * Collects user profile information for a specific GPlus user
+ * Collects user profile information for a specific GPlus user.
  */
 public  class GPlusUserDataCollector extends GPlusDataCollector {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusUserDataCollector.class);
-    private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
-    private static final int MAX_ATTEMPTS = 5;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusUserDataCollector.class);
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+  private static final int MAX_ATTEMPTS = 5;
 
-    static { //set up Mapper for Person objects
-        SimpleModule simpleModule = new SimpleModule();
-        simpleModule.addDeserializer(Person.class, new 
GPlusPersonDeserializer());
-        MAPPER.registerModule(simpleModule);
-        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-    }
+  static { //set up Mapper for Person objects
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
+    MAPPER.registerModule(simpleModule);
+    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
 
-    private BackOffStrategy backOffStrategy;
-    private Plus gPlus;
-    private BlockingQueue<StreamsDatum> datumQueue;
-    private UserInfo userInfo;
+  private BackOffStrategy backOffStrategy;
+  private Plus plus;
+  private BlockingQueue<StreamsDatum> datumQueue;
+  private UserInfo userInfo;
 
+  /**
+   * GPlusUserDataCollector constructor.
+   * @param plus Plus
+   * @param backOffStrategy BackOffStrategy
+   * @param datumQueue BlockingQueue of StreamsDatum
+   * @param userInfo UserInfo
+   */
+  public GPlusUserDataCollector(Plus plus, BackOffStrategy backOffStrategy, 
BlockingQueue<StreamsDatum> datumQueue, UserInfo userInfo) {
+    this.plus = plus;
+    this.backOffStrategy = backOffStrategy;
+    this.datumQueue = datumQueue;
+    this.userInfo = userInfo;
+  }
 
-    public GPlusUserDataCollector(Plus gPlus, BackOffStrategy backOffStrategy, 
BlockingQueue<StreamsDatum> datumQueue, UserInfo userInfo) {
-        this.gPlus = gPlus;
-        this.backOffStrategy = backOffStrategy;
-        this.datumQueue = datumQueue;
-        this.userInfo = userInfo;
-    }
-
-    protected void queueUserHistory() {
+  protected void queueUserHistory() {
+    try {
+      boolean tryAgain = false;
+      int attempts = 0;
+      com.google.api.services.plus.model.Person person = null;
+      do {
         try {
-            boolean tryAgain = false;
-            int attempts = 0;
-            com.google.api.services.plus.model.Person person = null;
-            do {
-                try {
-                    person = 
this.gPlus.people().get(userInfo.getUserId()).execute();
-                    this.backOffStrategy.reset();
-                    tryAgain = person == null;
-                } catch (GoogleJsonResponseException gjre) {
-                    tryAgain = backoffAndIdentifyIfRetry(gjre, 
this.backOffStrategy);
-                }
-                ++attempts;
-            } while(tryAgain && attempts < MAX_ATTEMPTS);
-            String json = MAPPER.writeValueAsString(person);
-            this.datumQueue.put(new StreamsDatum(json, person.getId()));
-        } catch (Throwable t) {
-            LOGGER.warn("Unable to pull user data for user={} : {}", 
userInfo.getUserId(), t);
-            if(t instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
+          person = this.plus.people().get(userInfo.getUserId()).execute();
+          this.backOffStrategy.reset();
+          tryAgain = person == null;
+        } catch (GoogleJsonResponseException gjre) {
+          tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOffStrategy);
         }
+        ++attempts;
+      }
+      while (tryAgain && attempts < MAX_ATTEMPTS);
+      String json = MAPPER.writeValueAsString(person);
+      this.datumQueue.put(new StreamsDatum(json, person.getId()));
+    } catch (Throwable throwable) {
+      LOGGER.warn("Unable to pull user data for user={} : {}", 
userInfo.getUserId(), throwable);
+      if (throwable instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
     }
+  }
 
-    @Override
-    public void run() {
-        queueUserHistory();
-    }
+  @Override
+  public void run() {
+    queueUserHistory();
+  }
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
index 1541818..28bcb55 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
@@ -18,25 +18,22 @@
 
 package com.google.gplus.provider;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.api.services.plus.Plus;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.gson.Gson;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.google.gplus.GPlusConfiguration;
 import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 
+import com.google.api.services.plus.Plus;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -48,78 +45,90 @@ import java.util.concurrent.TimeUnit;
 /**
  *  Retrieve current profile status for a list of accounts.
  *
+ *  <p/>
  *  To use from command line:
  *
+ *  <p/>
  *  Supply (at least) the following required configuration in application.conf:
  *
+ *  <p/>
  *  gplus.oauth.pathToP12KeyFile
  *  gplus.oauth.serviceAccountEmailAddress
  *  gplus.apiKey
  *  gplus.googlePlusUsers
  *
+ *  <p/>
  *  Launch using:
  *
+ *  <p/>
  *  mvn exec:java 
-Dexec.mainClass=com.google.gplus.provider.GPlusUserDataProvider 
-Dexec.args="application.conf profiles.json"
  */
-public class GPlusUserDataProvider extends AbstractGPlusProvider{
-
-    public final static String STREAMS_ID = "GPlusUserDataProvider";
-
-    public GPlusUserDataProvider() {
-        super();
-    }
-
-    public GPlusUserDataProvider(GPlusConfiguration config) {
-        super(config);
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
-        return new GPlusUserDataCollector(plus, strategy, queue, userInfo);
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        Preconditions.checkArgument(args.length >= 2);
-
-        String configfile = args[0];
-        String outfile = args[1];
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config testResourceConfig = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
-
-        StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
-        GPlusConfiguration config = new 
ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, 
"gplus");
-        GPlusUserDataProvider provider = new GPlusUserDataProvider(config);
-
-        Gson gson = new Gson();
-
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = 
provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
-                String json;
-                if (datum.getDocument() instanceof String)
-                    json = (String) datum.getDocument();
-                else
-                    json = gson.toJson(datum.getDocument());
-                outStream.println(json);
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
+public class GPlusUserDataProvider extends AbstractGPlusProvider {
+
+  public static final String STREAMS_ID = "GPlusUserDataProvider";
+
+  public GPlusUserDataProvider() {
+    super();
+  }
+
+  public GPlusUserDataProvider(GPlusConfiguration config) {
+    super(config);
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
+    return new GPlusUserDataCollector(plus, strategy, queue, userInfo);
+  }
+
+  /**
+   * Retrieve current profile status for a list of accounts.
+   * @param args args
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File file = new File(configfile);
+    assert (file.exists());
+    Config testResourceConfig = ConfigFactory.parseFileAnySyntax(file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = 
StreamsConfigurator.detectConfiguration(typesafe);
+    GPlusConfiguration config = new 
ComponentConfigurator<>(GPlusConfiguration.class).detectConfiguration(typesafe, 
"gplus");
+    GPlusUserDataProvider provider = new GPlusUserDataProvider(config);
+
+    Gson gson = new Gson();
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new 
FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
+        if (datum.getDocument() instanceof String) {
+          json = (String) datum.getDocument();
+        } else {
+          json = gson.toJson(datum.getDocument());
+        }
+        outStream.println(json);
+      }
     }
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
 }


Reply via email to