http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusPersonSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusPersonSerDeIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusPersonSerDeIT.java
deleted file mode 100644
index 6bb2f57..0000000
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/GooglePlusPersonSerDeIT.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Provider;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.api.services.plus.model.Person;
-import com.google.gplus.serializer.util.GPlusPersonDeserializer;
-import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-/**
- * Tests conversion of gplus inputs to Activity.
- */
-public class GooglePlusPersonSerDeIT {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusPersonSerDeIT.class);
-  private ObjectMapper objectMapper;
-  private GooglePlusActivityUtil googlePlusActivityUtil;
-
-  /**
-   * setup.
-   */
-  @BeforeClass
-  public void setup() {
-    objectMapper = StreamsJacksonMapper.getInstance();
-    SimpleModule simpleModule = new SimpleModule();
-    simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
-    objectMapper.registerModule(simpleModule);
-    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-
-    googlePlusActivityUtil = new GooglePlusActivityUtil();
-  }
-
-  @Test
-  public void testPersonObjects() {
-    InputStream is = 
GooglePlusPersonSerDeIT.class.getResourceAsStream("/google_plus_person_jsons.txt");
-    InputStreamReader isr = new InputStreamReader(is);
-    BufferedReader br = new BufferedReader(isr);
-
-    try {
-      while (br.ready()) {
-        String line = br.readLine();
-        if (!StringUtils.isEmpty(line)) {
-          LOGGER.info("raw: {}", line);
-          Activity activity = new Activity();
-
-          Person person = objectMapper.readValue(line, Person.class);
-
-          GooglePlusActivityUtil.updateActivity(person, activity);
-          LOGGER.info("activity: {}", activity);
-
-          assertNotNull(activity);
-          assertTrue (activity.getId().contains("id:googleplus:update"));
-          assertEquals(activity.getVerb(), "update");
-
-          Provider provider = activity.getProvider();
-          assertEquals(provider.getId(), "id:providers:googleplus");
-          assertEquals(provider.getDisplayName(), "GooglePlus");
-
-          ActivityObject actor = activity.getActor();
-          assertNotNull(actor.getImage());
-          assertTrue (actor.getId().contains("id:googleplus:"));
-          assertNotNull(actor.getUrl());
-
-        }
-      }
-    } catch (Exception ex) {
-      LOGGER.error("Exception while testing serializability: {}", ex);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
deleted file mode 100644
index b75021c..0000000
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/processor/GooglePlusActivitySerDeIT.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.processor;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.extensions.ExtensionUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Provider;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.gplus.serializer.util.GPlusActivityDeserializer;
-import com.google.gplus.serializer.util.GooglePlusActivityUtil;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-/**
- * Tests conversion of gplus inputs to Activity.
- */
-public class GooglePlusActivitySerDeIT {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusActivitySerDeIT.class);
-  private ObjectMapper objectMapper;
-
-  /**
-   * setup.
-   */
-  @BeforeClass
-  public void setup() {
-    objectMapper = StreamsJacksonMapper.getInstance();
-    SimpleModule simpleModule = new SimpleModule();
-    
simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, 
new GPlusActivityDeserializer());
-    objectMapper.registerModule(simpleModule);
-    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
-  }
-
-  @Test
-  @SuppressWarnings("unchecked")
-  public void testActivityObjects() {
-    InputStream is = 
GooglePlusActivitySerDeIT.class.getResourceAsStream("/google_plus_activity_jsons.txt");
-    InputStreamReader isr = new InputStreamReader(is);
-    BufferedReader br = new BufferedReader(isr);
-
-    try {
-      while (br.ready()) {
-        String line = br.readLine();
-        if (!StringUtils.isEmpty(line)) {
-          LOGGER.info("raw: {}", line);
-          Activity activity = new Activity();
-
-          com.google.api.services.plus.model.Activity googlePlusActivity =
-              objectMapper.readValue(line, 
com.google.api.services.plus.model.Activity.class);
-
-          GooglePlusActivityUtil.updateActivity(googlePlusActivity, activity);
-          LOGGER.info("activity: {}", activity);
-
-          assertNotNull(activity);
-          assert (activity.getId().contains("id:googleplus:post"));
-          assertEquals(activity.getVerb(), "post");
-
-          Provider provider = activity.getProvider();
-          assertEquals(provider.getId(), "id:providers:googleplus");
-          assertEquals(provider.getDisplayName(), "GooglePlus");
-
-          ActivityObject actor = activity.getActor();
-          assertNotNull(actor.getImage());
-          assert (actor.getId().contains("id:googleplus:"));
-          assertNotNull(actor.getUrl());
-
-          assertNotNull(activity.getPublished());
-          assertNotNull(activity.getTitle());
-          assertNotNull(activity.getUrl());
-
-          Map<String, Object> extensions = 
ExtensionUtil.getInstance().getExtensions(activity);
-          assertNotNull(extensions);
-
-          if (activity.getContent() != null) {
-            assertNotNull(extensions.get("rebroadcasts"));
-            assertNotNull(extensions.get("keywords"));
-            assertNotNull(extensions.get("likes"));
-            assert (((Map<String, Object>) 
extensions.get("rebroadcasts")).containsKey("count"));
-            assert (((Map<String, Object>) 
extensions.get("likes")).containsKey("count"));
-          }
-        }
-      }
-    } catch (Exception ex) {
-      LOGGER.error("Exception while testing serializability: {}", ex);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
deleted file mode 100644
index b3590c4..0000000
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestAbstractGPlusProvider.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.GPlusConfiguration;
-import org.apache.streams.google.gplus.GPlusOAuthConfiguration;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-
-import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import com.google.api.services.plus.Plus;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Unit tests for {@link com.google.gplus.provider.AbstractGPlusProvider}
- */
-public class TestAbstractGPlusProvider extends RandomizedTest {
-
-  /**
-   * Test that every collector will be run and that data queued from the 
collectors will be processed.
-   */
-  @Test
-  @Repeat(iterations = 3)
-  public void testDataCollectorRunsPerUser() {
-    int numUsers = randomIntBetween(1, 1000);
-    List<UserInfo> userList = new LinkedList<>();
-    for (int i = 0; i < numUsers; ++i) {
-      userList.add(new UserInfo());
-    }
-    GPlusConfiguration config = new GPlusConfiguration();
-    GPlusOAuthConfiguration oauth = new GPlusOAuthConfiguration();
-    oauth.setAppName("a");
-    oauth.setPathToP12KeyFile("a");
-    oauth.setServiceAccountEmailAddress("a");
-    config.setOauth(oauth);
-    config.setGooglePlusUsers(userList);
-    AbstractGPlusProvider provider = new AbstractGPlusProvider(config) {
-
-      @Override
-      protected Plus createPlusClient() throws IOException {
-        return mock(Plus.class);
-      }
-
-      @Override
-      protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
-        final BlockingQueue<StreamsDatum> q = queue;
-        return () -> {
-          try {
-            q.put(new StreamsDatum(null));
-          } catch (InterruptedException ie) {
-            fail("Test was interrupted");
-          }
-        };
-      }
-    };
-
-    try {
-      provider.prepare(null);
-      provider.startStream();
-      int datumCount = 0;
-      while (provider.isRunning()) {
-        datumCount += provider.readCurrent().size();
-      }
-      assertEquals(numUsers, datumCount);
-    } finally {
-      provider.cleanUp();
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
deleted file mode 100644
index ffff3e0..0000000
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserActivityCollector.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import 
org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
-
-import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.api.services.plus.Plus;
-import com.google.api.services.plus.model.Activity;
-import com.google.api.services.plus.model.ActivityFeed;
-import com.google.gplus.serializer.util.GPlusActivityDeserializer;
-import org.joda.time.DateTime;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Unit tests for {@link com.google.gplus.provider.GPlusUserActivityCollector}
- */
-public class TestGPlusUserActivityCollector extends RandomizedTest {
-
-  private static final String ACTIVITY_TEMPLATE = "{ \"kind\": 
\"plus#activity\", \"etag\": 
\"\\\"Vea_b94Y77GDGgRK7gFNPnolKQw/v1-6aVSBGT4qiStMoz7f2_AN2fM\\\"\", \"title\": 
\"\", \"published\": \"%s\", \"updated\": \"2014-10-27T06:26:33.927Z\", \"id\": 
\"z13twrlznpvtzz52w22mdt1y0k3of1djw04\", \"url\": 
\"https://plus.google.com/116771159471120611293/posts/GR7CGR8N5VL\";, \"actor\": 
{ \"id\": \"116771159471120611293\", \"displayName\": \"displayName\", \"url\": 
\"https://plus.google.com/116771159471120611293\";, \"image\": { \"url\": 
\"https://lh6.googleusercontent.com/-C0fiZBxdvw0/AAAAAAAAAAI/AAAAAAAAJ5k/K4pgR3_-_ms/photo.jpg?sz=50\";
 } }, \"verb\": \"share\", \"object\": { \"objectType\": \"activity\", \"id\": 
\"z13zgvtiurjgfti1v234iflghvq2c1dge04\", \"actor\": { \"id\": 
\"104954254300557350002\", \"displayName\": \"displayName\", \"url\": 
\"https://plus.google.com/104954254300557350002\";, \"image\": { \"url\": 
\"https://lh4.googleusercontent.com/-SO1scj4p2LA/AAAAAAAAAAI/AAAAAAAAI-s/efA
 9LBVe144/photo.jpg?sz=50\" } }, \"content\": \"\", \"url\": 
\"https://plus.google.com/104954254300557350002/posts/AwewXhtn7ws\";, 
\"replies\": { \"totalItems\": 0, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/comments\";
 }, \"plusoners\": { \"totalItems\": 9, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/plusoners\";
 }, \"resharers\": { \"totalItems\": 0, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/resharers\";
 }, \"attachments\": [ { \"objectType\": \"photo\", \"id\": 
\"104954254300557350002.6074732746360957410\", \"content\": \"26/10/2014 - 1\", 
\"url\": 
\"https://plus.google.com/photos/104954254300557350002/albums/6074732747132702225/6074732746360957410\";,
 \"image\": { \"url\": 
\"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w506-h750/2014%2B-%2B1\";,
 \"type\": \"ima
 ge/jpeg\" }, \"fullImage\": { \"url\": 
\"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w600-h1141/2014%2B-%2B1\";,
 \"type\": \"image/jpeg\", \"height\": 1141, \"width\": 600 } } ] }, 
\"annotation\": \"Truth 😜\", \"provider\": { \"title\": \"Reshared Post\" }, 
\"access\": { \"kind\": \"plus#acl\", \"description\": \"Public\", \"items\": [ 
{ \"type\": \"public\" } ] } }";
-  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
-  private static final String IN_RANGE_IDENTIFIER = "data in range";
-
-  static {
-    SimpleModule simpleModule = new SimpleModule();
-    simpleModule.addDeserializer(Activity.class, new 
GPlusActivityDeserializer());
-    MAPPER.registerModule(simpleModule);
-    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-  }
-
-  /**
-   * Creates a randomized activity and randomized date range.
-   *
-   * <p/>
-   * The activity feed is separated into three chunks,
-   * |. . . data too recent to be in date range . . .||. . . data in date 
range. . .||. . . data too old to be in date range|
-   * [index 0, 
.............................................................................................,
 index length-1]
-   *
-   * <p/>
-   * Inside of those chunks data has no order, but the list is ordered by 
those three chunks.
-   *
-   * <p/>
-   * The test will check to see if the num of data in the date range make onto 
the output queue.
-   */
-  @Test
-  @Repeat(iterations = 3)
-  public void testWithBeforeAndAfterDates() throws InterruptedException {
-    //initialize counts assuming no date ranges will be used
-    int numActivities = randomIntBetween(0, 1000);
-    int numActivitiesInDateRange = numActivities;
-    int numberOutOfRange = 0;
-    int numBeforeRange = 0;
-    int numAfterRange = 0;
-    //determine if date ranges will be used
-    DateTime beforeDate = null;
-    DateTime afterDate = null;
-    if (randomInt() % 2 == 0) {
-      beforeDate = DateTime.now().minusDays(randomIntBetween(1,5));
-    }
-    if (randomInt() % 2 == 0) {
-      if (beforeDate == null) {
-        afterDate = DateTime.now().minusDays(randomIntBetween(1, 10));
-      } else {
-        afterDate = beforeDate.minusDays(randomIntBetween(1, 10));
-      }
-    }
-    //update counts if date ranges are going to be used.
-    if (beforeDate != null || afterDate != null ) { //assign amount to be in 
range
-      numActivitiesInDateRange = randomIntBetween(0, numActivities);
-      numberOutOfRange = numActivities - numActivitiesInDateRange;
-    }
-    if (beforeDate == null && afterDate != null) { //assign all out of range 
to be before the start of the range
-      numBeforeRange = numberOutOfRange;
-    } else if (beforeDate != null && afterDate == null) { //assign all out of 
range to be after the start of the range
-      numAfterRange = numberOutOfRange;
-    } else if (beforeDate != null && afterDate != null) { //assign half before 
range and half after the range
-      numAfterRange = (numberOutOfRange / 2) + (numberOutOfRange % 2);
-      numBeforeRange = numberOutOfRange / 2;
-    }
-
-    Plus plus = createMockPlus(numBeforeRange, numAfterRange, 
numActivitiesInDateRange, afterDate, beforeDate);
-    BackOffStrategy strategy = new ConstantTimeBackOffStrategy(1);
-    BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
-    UserInfo userInfo = new UserInfo();
-    userInfo.setUserId("A");
-    userInfo.setAfterDate(afterDate);
-    userInfo.setBeforeDate(beforeDate);
-    GPlusUserActivityCollector collector = new 
GPlusUserActivityCollector(plus, datums, strategy, userInfo);
-    collector.run();
-
-    assertEquals(numActivitiesInDateRange, datums.size());
-    while (!datums.isEmpty()) {
-      StreamsDatum datum = datums.take();
-      assertNotNull(datum);
-      assertNotNull(datum.getDocument());
-      assertTrue(datum.getDocument() instanceof String);
-      assertTrue(((String)datum.getDocument()).contains(IN_RANGE_IDENTIFIER)); 
//only in range documents are on the out going queue.
-    }
-  }
-
-
-  private Plus createMockPlus(final int numBefore, final int numAfter, final 
int numInRange, final DateTime after, final DateTime before) {
-    Plus plus = mock(Plus.class);
-    final Plus.Activities activities = createMockPlusActivities(numBefore, 
numAfter, numInRange, after, before);
-    doAnswer(invocationOnMock -> activities).when(plus).activities();
-    return plus;
-  }
-
-  private Plus.Activities createMockPlusActivities(
-      final int numBefore,
-      final int numAfter,
-      final int numInRange,
-      final DateTime after,
-      final DateTime before) {
-    Plus.Activities activities = mock(Plus.Activities.class);
-    try {
-      Plus.Activities.List list = createMockPlusActivitiesList(numBefore, 
numAfter, numInRange, after, before);
-      when(activities.list(anyString(), anyString())).thenReturn(list);
-    } catch (IOException ioe) {
-      fail("Should not have thrown exception while creating mock. : " + 
ioe.getMessage());
-    }
-    return activities;
-  }
-
-  private Plus.Activities.List createMockPlusActivitiesList(
-      final int numBefore,
-      final int numAfter,
-      final int numInRange,
-      final DateTime after,
-      final DateTime before) {
-    Plus.Activities.List list = mock(Plus.Activities.List.class);
-    when(list.setMaxResults(anyLong())).thenReturn(list);
-    when(list.setPageToken(anyString())).thenReturn(list);
-    ActivityFeedAnswer answer = new ActivityFeedAnswer(numBefore, numAfter, 
numInRange, after, before);
-    try {
-      doAnswer(answer).when(list).execute();
-    } catch (IOException ioe) {
-      fail("Should not have thrown exception while creating mock. : " + 
ioe.getMessage());
-    }
-    return list;
-  }
-
-
-  private static ActivityFeed createMockActivityFeed(
-      int numBefore,
-      int numAfter,
-      int numInRange,
-      DateTime after,
-      DateTime before,
-      boolean page) {
-    ActivityFeed feed = new ActivityFeed();
-    List<Activity> list = new LinkedList<>();
-    for (int i = 0; i < numAfter; ++i) {
-      DateTime published = before.plus(randomIntBetween(0, Integer.MAX_VALUE));
-      Activity activity = createActivityWithPublishedDate(published);
-      list.add(activity);
-    }
-    for (int i = 0; i < numInRange; ++i) {
-      DateTime published;
-      if ((before == null && after == null) || before == null) {
-        published = DateTime.now(); // no date range or end time date range so 
just make the time now.
-      } else if (after == null) {
-        published = before.minusMillis(randomIntBetween(1, 
Integer.MAX_VALUE)); //no beginning to range
-      } else { // has to be in range
-        long range = before.getMillis() - after.getMillis();
-        published = after.plus(range / 2); //in the middle
-      }
-      Activity activity = createActivityWithPublishedDate(published);
-      activity.setTitle(IN_RANGE_IDENTIFIER);
-      list.add(activity);
-    }
-    for (int i = 0; i < numBefore; ++i) {
-      DateTime published = after.minusMillis(randomIntBetween(1, 
Integer.MAX_VALUE));
-      Activity activity = createActivityWithPublishedDate(published);
-      list.add(activity);
-    }
-    if (page) {
-      feed.setNextPageToken("A");
-    } else {
-      feed.setNextPageToken(null);
-    }
-    feed.setItems(list);
-    return feed;
-  }
-
-  private static Activity createActivityWithPublishedDate(DateTime dateTime) {
-    Activity activity = new Activity();
-    activity.setPublished(new 
com.google.api.client.util.DateTime(dateTime.getMillis()));
-    activity.setId("a");
-    return activity;
-  }
-
-  private static class ActivityFeedAnswer implements Answer<ActivityFeed> {
-    private int afterCount = 0;
-    private int beforeCount = 0;
-    private int inCount = 0;
-    private int maxBatch = 100;
-
-    private int numAfter;
-    private int numInRange;
-    private int numBefore;
-    private DateTime after;
-    private DateTime before;
-
-    private ActivityFeedAnswer(int numBefore, int numAfter, int numInRange, 
DateTime after, DateTime before) {
-      this.numBefore = numBefore;
-      this.numAfter = numAfter;
-      this.numInRange = numInRange;
-      this.after = after;
-      this.before = before;
-    }
-
-
-
-
-    @Override
-    public ActivityFeed answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-      int totalCount = 0;
-      int batchAfter = 0;
-      int batchBefore = 0;
-      int batchIn = 0;
-      if (afterCount != numAfter) {
-        if (numAfter - afterCount >= maxBatch) {
-          afterCount += maxBatch;
-          batchAfter += maxBatch;
-          totalCount += batchAfter;
-        } else {
-          batchAfter += numAfter - afterCount;
-          totalCount += numAfter - afterCount;
-          afterCount = numAfter;
-        }
-      }
-      if (totalCount < maxBatch && inCount != numInRange) {
-        if (numInRange - inCount >= maxBatch - totalCount) {
-          inCount += maxBatch - totalCount;
-          batchIn += maxBatch - totalCount;
-          totalCount += batchIn;
-        } else {
-          batchIn += numInRange - inCount;
-          totalCount += numInRange - inCount;
-          inCount = numInRange;
-        }
-      }
-      if (totalCount < maxBatch && beforeCount != numBefore) {
-        if (numBefore - batchBefore >= maxBatch - totalCount) {
-          batchBefore += maxBatch - totalCount;
-          totalCount = maxBatch;
-          beforeCount += batchBefore;
-        } else {
-          batchBefore += numBefore - beforeCount;
-          totalCount += numBefore - beforeCount;
-          beforeCount = numBefore;
-        }
-      }
-
-      return createMockActivityFeed(
-          batchBefore,
-          batchAfter,
-          batchIn,
-          after,
-          before,
-          numAfter != afterCount || inCount != numInRange || beforeCount != 
numBefore);
-    }
-  }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
deleted file mode 100644
index 8d30314..0000000
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/provider/TestGPlusUserDataCollector.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.google.gplus.configuration.UserInfo;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import 
org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
-
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.services.plus.Plus;
-import com.google.api.services.plus.model.Person;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Basic Units for {@link com.google.gplus.provider.GPlusUserDataCollector}.
- */
-public class TestGPlusUserDataCollector {
-
-  private static final String NO_ERROR = "no error";
-
-  /**
-   * Test that on success a datum will be added to the queue.
-   * @throws Exception Exception
-   */
-  @Test
-  public void testSucessfullPull() throws Exception {
-    Plus plus = createMockPlus(0, null);
-    BackOffStrategy backOff = new ConstantTimeBackOffStrategy(1);
-    BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
-    UserInfo user = new UserInfo();
-    user.setUserId("A");
-
-    GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, 
backOff, datums, user);
-    collector.run();
-
-    assertEquals(1, datums.size());
-    StreamsDatum datum = datums.take();
-    assertNotNull(datum);
-    assertEquals(NO_ERROR, datum.getId());
-    assertNotNull(datum.getDocument());
-    assertTrue(datum.getDocument() instanceof String);
-  }
-
-  /**
-   * Test that on failure, no datums are output.
-   * @throws Exception Exception
-   */
-  @Test
-  public void testFail() throws Exception {
-    Plus plus = createMockPlus(3, mock(GoogleJsonResponseException.class));
-    UserInfo user = new UserInfo();
-    user.setUserId("A");
-    BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
-    BackOffStrategy backOffStrategy = new ConstantTimeBackOffStrategy(1);
-
-    GPlusUserDataCollector collector = new GPlusUserDataCollector(plus, 
backOffStrategy, datums, user);
-    collector.run();
-
-    assertEquals(0, datums.size());
-  }
-
-  private Plus createMockPlus(final int succedOnTry, final Throwable 
throwable) {
-    Plus plus = mock(Plus.class);
-    doAnswer(invocationOnMock -> createMockPeople(succedOnTry, 
throwable)).when(plus).people();
-    return plus;
-  }
-
-  private Plus.People createMockPeople(final int succedOnTry, final Throwable 
throwable) {
-    Plus.People people = mock(Plus.People.class);
-    try {
-      when(people.get(anyString())).thenAnswer(invocationOnMock -> 
createMockGetNoError(succedOnTry, throwable));
-    } catch (IOException ioe) {
-      fail("No Excpetion should have been thrown while creating mocks");
-    }
-    return people;
-  }
-
-  private Plus.People.Get createMockGetNoError(final int succedOnTry, final 
Throwable throwable) {
-    Plus.People.Get get = mock(Plus.People.Get.class);
-    try {
-      doAnswer(new Answer() {
-        private int counter = 0;
-
-        @Override
-        public Person answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-          if (counter == succedOnTry) {
-            Person person = new Person();
-            person.setId(NO_ERROR);
-            return person;
-          } else {
-            ++counter;
-            throw throwable;
-          }
-        }
-      }).when(get).execute();
-    } catch (IOException ioe) {
-      fail("No Excpetion should have been thrown while creating mocks");
-    }
-    return get;
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
deleted file mode 100644
index 96a9d89..0000000
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/com/google/gplus/serializer/util/GPlusEventClassifierTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.google.gplus.serializer.util;
-
-import org.apache.streams.jackson.StreamsJacksonMapper;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.api.services.plus.model.Activity;
-import com.google.api.services.plus.model.Person;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * GPlusEventClassifierTest tests GPlusEventClassifier.
- */
-public class GPlusEventClassifierTest {
-
-  private static StreamsJacksonMapper mapper = 
StreamsJacksonMapper.getInstance();
-
-  @Test
-  public void classifyActivityTest() {
-    try {
-      Activity activity = new Activity();
-      activity.setKind("plus#activity");
-      Class retClass = 
GPlusEventClassifier.detectClass(mapper.writeValueAsString(activity));
-
-      assertEquals(retClass, Activity.class);
-    } catch (Exception ex) {
-      //
-    }
-  }
-
-  @Test
-  public void classifyPersonTest() {
-    try {
-      Person person = new Person();
-      person.setKind("plus#person");
-      Class retClass = 
GPlusEventClassifier.detectClass(mapper.writeValueAsString(person));
-
-      assertEquals(retClass, Person.class);
-    } catch (Exception ex) {
-      //
-    }
-  }
-
-  @Test
-  public void classifyObjectNodeTest() {
-    try {
-      Person person = new Person();
-      person.setKind("fake");
-      Class retClass = 
GPlusEventClassifier.detectClass(mapper.writeValueAsString(person));
-
-      assertEquals(retClass, ObjectNode.class);
-    } catch (Exception ex) {
-      //
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/GooglePlusCommentSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/GooglePlusCommentSerDeIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/GooglePlusCommentSerDeIT.java
new file mode 100644
index 0000000..1fb79ca
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/GooglePlusCommentSerDeIT.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus;
+
+import org.apache.streams.gplus.serializer.util.GPlusCommentDeserializer;
+import org.apache.streams.gplus.serializer.util.GooglePlusActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.services.plus.model.Comment;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+/**
+ * Tests conversion of gplus inputs to Activity.
+ */
+public class GooglePlusCommentSerDeIT {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusCommentSerDeIT.class);
+  private ObjectMapper objectMapper;
+
+  /**
+   * setup.
+   */
+  @BeforeClass
+  public void setupTestCommentObjects() {
+    objectMapper = StreamsJacksonMapper.getInstance();
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Comment.class, new 
GPlusCommentDeserializer());
+    objectMapper.registerModule(simpleModule);
+    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+  }
+
+  @Test
+  public void testCommentObjects() {
+    InputStream is = 
GooglePlusCommentSerDeIT.class.getResourceAsStream("/google_plus_comments_jsons.txt");
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
+
+    Activity activity = new Activity();
+    List<Comment> comments = new ArrayList<>();
+
+    try {
+      while (br.ready()) {
+        String line = br.readLine();
+        if (!StringUtils.isEmpty(line)) {
+          LOGGER.info("raw: {}", line);
+          Comment comment = objectMapper.readValue(line, Comment.class);
+
+          LOGGER.info("comment: {}", comment);
+
+          assertNotNull(comment);
+          assertNotNull(comment.getEtag());
+          assertNotNull(comment.getId());
+          assertNotNull(comment.getInReplyTo());
+          assertNotNull(comment.getObject());
+          assertNotNull(comment.getPlusoners());
+          assertNotNull(comment.getPublished());
+          assertNotNull(comment.getUpdated());
+          assertNotNull(comment.getSelfLink());
+          assertEquals(comment.getVerb(), "post");
+
+          comments.add(comment);
+        }
+      }
+
+      assertEquals(comments.size(), 3);
+
+      GooglePlusActivityUtil.updateActivity(comments, activity);
+      assertNotNull(activity);
+      assertNotNull(activity.getObject());
+      assertEquals(activity.getObject().getAttachments().size(), 3);
+    } catch (Exception ex) {
+      LOGGER.error("Exception while testing serializability: {}", ex);
+    }
+  }
+
+  @Test
+  public void testEmptyComments() {
+    Activity activity = new Activity();
+    GooglePlusActivityUtil.updateActivity(new ArrayList<>(), activity);
+    assertNull(activity.getObject());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/GooglePlusPersonSerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/GooglePlusPersonSerDeIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/GooglePlusPersonSerDeIT.java
new file mode 100644
index 0000000..ce53bf3
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/GooglePlusPersonSerDeIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus;
+
+import org.apache.streams.gplus.serializer.util.GPlusPersonDeserializer;
+import org.apache.streams.gplus.serializer.util.GooglePlusActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Provider;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.services.plus.model.Person;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Tests conversion of gplus inputs to Activity.
+ */
+public class GooglePlusPersonSerDeIT {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusPersonSerDeIT.class);
+  private ObjectMapper objectMapper;
+
+  /**
+   * setup.
+   */
+  @BeforeClass
+  public void setup() {
+    objectMapper = StreamsJacksonMapper.getInstance();
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
+    objectMapper.registerModule(simpleModule);
+    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+  }
+
+  @Test
+  public void testPersonObjects() {
+    InputStream is = 
GooglePlusPersonSerDeIT.class.getResourceAsStream("/google_plus_person_jsons.txt");
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
+
+    try {
+      while (br.ready()) {
+        String line = br.readLine();
+        if (!StringUtils.isEmpty(line)) {
+          LOGGER.info("raw: {}", line);
+          Activity activity = new Activity();
+
+          Person person = objectMapper.readValue(line, Person.class);
+
+          GooglePlusActivityUtil.updateActivity(person, activity);
+          LOGGER.info("activity: {}", activity);
+
+          assertNotNull(activity);
+          assertTrue(activity.getId().contains("id:googleplus:update"));
+          assertEquals(activity.getVerb(), "update");
+
+          Provider provider = activity.getProvider();
+          assertEquals(provider.getId(), "id:providers:googleplus");
+          assertEquals(provider.getDisplayName(), "GooglePlus");
+
+          ActivityObject actor = activity.getActor();
+          assertNotNull(actor.getImage());
+          assertTrue(actor.getId().contains("id:googleplus:"));
+          assertNotNull(actor.getUrl());
+
+        }
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while testing serializability: {}", ex);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/processors/GooglePlusActivitySerDeIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/processors/GooglePlusActivitySerDeIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/processors/GooglePlusActivitySerDeIT.java
new file mode 100644
index 0000000..b7bb3bf
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/processors/GooglePlusActivitySerDeIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.processors;
+
+import org.apache.streams.gplus.serializer.util.GPlusActivityDeserializer;
+import org.apache.streams.gplus.serializer.util.GooglePlusActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Provider;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * Tests conversion of gplus inputs to Activity.
+ */
+public class GooglePlusActivitySerDeIT {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusActivitySerDeIT.class);
+  private ObjectMapper objectMapper;
+
+  /**
+   * setup.
+   */
+  @BeforeClass
+  public void setup() {
+    objectMapper = StreamsJacksonMapper.getInstance();
+    SimpleModule simpleModule = new SimpleModule();
+    
simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, 
new GPlusActivityDeserializer());
+    objectMapper.registerModule(simpleModule);
+    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testActivityObjects() {
+    InputStream is = 
GooglePlusActivitySerDeIT.class.getResourceAsStream("/google_plus_activity_jsons.txt");
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
+
+    try {
+      while (br.ready()) {
+        String line = br.readLine();
+        if (!StringUtils.isEmpty(line)) {
+          LOGGER.info("raw: {}", line);
+          Activity activity = new Activity();
+
+          com.google.api.services.plus.model.Activity googlePlusActivity =
+              objectMapper.readValue(line, 
com.google.api.services.plus.model.Activity.class);
+
+          GooglePlusActivityUtil.updateActivity(googlePlusActivity, activity);
+          LOGGER.info("activity: {}", activity);
+
+          assertNotNull(activity);
+          assert (activity.getId().contains("id:googleplus:post"));
+          assertEquals(activity.getVerb(), "post");
+
+          Provider provider = activity.getProvider();
+          assertEquals(provider.getId(), "id:providers:googleplus");
+          assertEquals(provider.getDisplayName(), "GooglePlus");
+
+          ActivityObject actor = activity.getActor();
+          assertNotNull(actor.getImage());
+          assert (actor.getId().contains("id:googleplus:"));
+          assertNotNull(actor.getUrl());
+
+          assertNotNull(activity.getPublished());
+          assertNotNull(activity.getTitle());
+          assertNotNull(activity.getUrl());
+
+          Map<String, Object> extensions = 
ExtensionUtil.getInstance().getExtensions(activity);
+          assertNotNull(extensions);
+
+          if (activity.getContent() != null) {
+            assertNotNull(extensions.get("rebroadcasts"));
+            assertNotNull(extensions.get("keywords"));
+            assertNotNull(extensions.get("likes"));
+            assert (((Map<String, Object>) 
extensions.get("rebroadcasts")).containsKey("count"));
+            assert (((Map<String, Object>) 
extensions.get("likes")).containsKey("count"));
+          }
+        }
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Exception while testing serializability: {}", ex);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/processors/GooglePlusTypeConverterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/processors/GooglePlusTypeConverterIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/processors/GooglePlusTypeConverterIT.java
new file mode 100644
index 0000000..d3472c4
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/processors/GooglePlusTypeConverterIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.processors;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.gplus.processor.GooglePlusTypeConverter;
+import org.apache.streams.gplus.serializer.util.GPlusActivityDeserializer;
+import org.apache.streams.gplus.serializer.util.GPlusPersonDeserializer;
+import org.apache.streams.gplus.serializer.util.GooglePlusActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.services.plus.model.Person;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Tests conversion of gplus inputs to Activity
+ */
+public class GooglePlusTypeConverterIT {
+
+  private final static Logger LOGGER = 
LoggerFactory.getLogger(GooglePlusTypeConverterIT.class);
+  private GooglePlusTypeConverter googlePlusTypeConverter;
+  private ObjectMapper objectMapper;
+
+  @BeforeClass
+  public void setup() {
+    objectMapper = StreamsJacksonMapper.getInstance();
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Person.class, new GPlusPersonDeserializer());
+    
simpleModule.addDeserializer(com.google.api.services.plus.model.Activity.class, 
new GPlusActivityDeserializer());
+    objectMapper.registerModule(simpleModule);
+    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+    googlePlusTypeConverter = new GooglePlusTypeConverter();
+    googlePlusTypeConverter.prepare(null);
+  }
+
+  @Test(dependsOnGroups = {"testGPlusUserDataProvider"})
+  public void testProcessPerson() throws IOException, 
ActivitySerializerException {
+
+    File file = new 
File("target/test-classes/GPlusUserDataProviderIT.stdout.txt");
+    InputStream is = new FileInputStream(file);
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
+
+    while (br.ready()) {
+      String line = br.readLine();
+      if (!StringUtils.isEmpty(line)) {
+        LOGGER.info("raw: {}", line);
+        Activity activity = new Activity();
+
+        Person person = objectMapper.readValue(line, Person.class);
+        StreamsDatum streamsDatum = new StreamsDatum(person);
+
+        assertNotNull(streamsDatum.getDocument());
+
+        List<StreamsDatum> retList = 
googlePlusTypeConverter.process(streamsDatum);
+        GooglePlusActivityUtil.updateActivity(person, activity);
+
+        assertEquals(retList.size(), 1);
+        assert (retList.get(0).getDocument() instanceof Activity);
+        assertEquals(activity, retList.get(0).getDocument());
+      }
+    }
+  }
+
+  @Test(dependsOnGroups = {"testGPlusUserActivityProvider"})
+  public void testProcessActivity() throws IOException, 
ActivitySerializerException {
+
+    File file = new 
File("target/test-classes/GPlusUserActivityProviderIT.stdout.txt");
+    InputStream is = new FileInputStream(file);
+    InputStreamReader isr = new InputStreamReader(is);
+    BufferedReader br = new BufferedReader(isr);
+
+    while (br.ready()) {
+      String line = br.readLine();
+      if (!StringUtils.isEmpty(line)) {
+        LOGGER.info("raw: {}", line);
+        Activity activity = new Activity();
+
+        com.google.api.services.plus.model.Activity gPlusActivity = 
objectMapper.readValue(line, com.google.api.services.plus.model.Activity.class);
+        StreamsDatum streamsDatum = new StreamsDatum(gPlusActivity);
+
+        assertNotNull(streamsDatum.getDocument());
+
+        List<StreamsDatum> retList = 
googlePlusTypeConverter.process(streamsDatum);
+        GooglePlusActivityUtil.updateActivity(gPlusActivity, activity);
+
+        assertEquals(retList.size(), 1);
+        assertTrue(retList.get(0).getDocument() instanceof Activity);
+        assertEquals(activity, retList.get(0).getDocument());
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/GPlusUserActivityProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/GPlusUserActivityProviderIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/GPlusUserActivityProviderIT.java
new file mode 100644
index 0000000..9f8524d
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/GPlusUserActivityProviderIT.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.providers;
+
+import org.apache.streams.gplus.provider.GPlusUserActivityProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class GPlusUserActivityProviderIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusUserActivityProviderIT.class);
+
+  @Test(groups={"testGPlusUserActivityProvider"})
+  public void testGPlusUserActivityProvider() throws Exception {
+
+    String configfile = 
"./target/test-classes/GPlusUserActivityProviderIT.conf";
+    String outfile = 
"./target/test-classes/GPlusUserActivityProviderIT.stdout.txt";
+
+    String[] args = new String[2];
+    args[0] = configfile;
+    args[1] = outfile;
+
+    Thread testThread = new Thread(() -> {
+      try {
+        GPlusUserActivityProvider.main(args);
+      } catch ( Exception ex ) {
+        LOGGER.error("Test Exception!", ex);
+      }
+    });
+    testThread.start();
+    testThread.join(60000);
+
+    File out = new File(outfile);
+    assert (out.exists());
+    assert (out.canRead());
+    assert (out.isFile());
+
+    FileReader outReader = new FileReader(out);
+    LineNumberReader outCounter = new LineNumberReader(outReader);
+
+    while (outCounter.readLine() != null) {}
+
+    assert (outCounter.getLineNumber() >= 1);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/GPlusUserDataProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/GPlusUserDataProviderIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/GPlusUserDataProviderIT.java
new file mode 100644
index 0000000..012cdc2
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/GPlusUserDataProviderIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.providers;
+
+import org.apache.streams.gplus.provider.GPlusUserDataProvider;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+public class GPlusUserDataProviderIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GPlusUserDataProviderIT.class);
+
+  @Test(groups={"testGPlusUserDataProvider"})
+  public void testGPlusUserDataProvider() throws Exception {
+
+    String configfile = "./target/test-classes/GPlusUserDataProviderIT.conf";
+    String outfile = 
"./target/test-classes/GPlusUserDataProviderIT.stdout.txt";
+
+    String[] args = new String[2];
+    args[0] = configfile;
+    args[1] = outfile;
+
+    Thread testThread = new Thread(() -> {
+      try {
+        GPlusUserDataProvider.main(args);
+      } catch ( Exception ex ) {
+        LOGGER.error("Test Exception!", ex);
+      }
+    });
+    testThread.start();
+    testThread.join(60000);
+
+    GPlusUserDataProvider.main(new String[]{configfile, outfile});
+
+    File out = new File(outfile);
+    assert (out.exists());
+    assert (out.canRead());
+    assert (out.isFile());
+
+    FileReader outReader = new FileReader(out);
+    LineNumberReader outCounter = new LineNumberReader(outReader);
+
+    while (outCounter.readLine() != null) {}
+
+    assert (outCounter.getLineNumber() >= 1);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestAbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestAbstractGPlusProvider.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestAbstractGPlusProvider.java
new file mode 100644
index 0000000..afd70fa
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestAbstractGPlusProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.providers;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.GPlusConfiguration;
+import org.apache.streams.google.gplus.GPlusOAuthConfiguration;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.gplus.provider.AbstractGPlusProvider;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.google.api.services.plus.Plus;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link 
org.apache.streams.gplus.provider.AbstractGPlusProvider}
+ */
+public class TestAbstractGPlusProvider extends RandomizedTest {
+
+  /**
+   * Test that every collector will be run and that data queued from the 
collectors will be processed.
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testDataCollectorRunsPerUser() {
+    int numUsers = randomIntBetween(1, 1000);
+    List<UserInfo> userList = new LinkedList<>();
+    for (int i = 0; i < numUsers; ++i) {
+      userList.add(new UserInfo());
+    }
+    GPlusConfiguration config = new GPlusConfiguration();
+    GPlusOAuthConfiguration oauth = new GPlusOAuthConfiguration();
+    oauth.setAppName("a");
+    oauth.setPathToP12KeyFile("a");
+    oauth.setServiceAccountEmailAddress("a");
+    config.setOauth(oauth);
+    config.setGooglePlusUsers(userList);
+    AbstractGPlusProvider provider = new AbstractGPlusProvider(config) {
+
+      @Override
+      protected Plus createPlusClient() throws IOException {
+        return mock(Plus.class);
+      }
+
+      @Override
+      protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, Plus plus, UserInfo userInfo) {
+        final BlockingQueue<StreamsDatum> q = queue;
+        return () -> {
+          try {
+            q.put(new StreamsDatum(null));
+          } catch (InterruptedException ie) {
+            fail("Test was interrupted");
+          }
+        };
+      }
+    };
+
+    try {
+      provider.prepare(null);
+      provider.startStream();
+      int datumCount = 0;
+      while (provider.isRunning()) {
+        datumCount += provider.readCurrent().size();
+      }
+      assertEquals(numUsers, datumCount);
+    } finally {
+      provider.cleanUp();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b71cce83/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestGPlusUserActivityCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestGPlusUserActivityCollector.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestGPlusUserActivityCollector.java
new file mode 100644
index 0000000..ae0c4f7
--- /dev/null
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/providers/TestGPlusUserActivityCollector.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.gplus.providers;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.google.gplus.configuration.UserInfo;
+import org.apache.streams.gplus.provider.GPlusUserActivityCollector;
+import org.apache.streams.gplus.serializer.util.GPlusActivityDeserializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import 
org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.api.services.plus.Plus;
+import com.google.api.services.plus.model.Activity;
+import com.google.api.services.plus.model.ActivityFeed;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link 
org.apache.streams.gplus.provider.GPlusUserActivityCollector}
+ */
+public class TestGPlusUserActivityCollector extends RandomizedTest {
+
+  private static final String ACTIVITY_TEMPLATE = "{ \"kind\": 
\"plus#activity\", \"etag\": 
\"\\\"Vea_b94Y77GDGgRK7gFNPnolKQw/v1-6aVSBGT4qiStMoz7f2_AN2fM\\\"\", \"title\": 
\"\", \"published\": \"%s\", \"updated\": \"2014-10-27T06:26:33.927Z\", \"id\": 
\"z13twrlznpvtzz52w22mdt1y0k3of1djw04\", \"url\": 
\"https://plus.google.com/116771159471120611293/posts/GR7CGR8N5VL\";, \"actor\": 
{ \"id\": \"116771159471120611293\", \"displayName\": \"displayName\", \"url\": 
\"https://plus.google.com/116771159471120611293\";, \"image\": { \"url\": 
\"https://lh6.googleusercontent.com/-C0fiZBxdvw0/AAAAAAAAAAI/AAAAAAAAJ5k/K4pgR3_-_ms/photo.jpg?sz=50\";
 } }, \"verb\": \"share\", \"object\": { \"objectType\": \"activity\", \"id\": 
\"z13zgvtiurjgfti1v234iflghvq2c1dge04\", \"actor\": { \"id\": 
\"104954254300557350002\", \"displayName\": \"displayName\", \"url\": 
\"https://plus.google.com/104954254300557350002\";, \"image\": { \"url\": 
\"https://lh4.googleusercontent.com/-SO1scj4p2LA/AAAAAAAAAAI/AAAAAAAAI-s/efA
 9LBVe144/photo.jpg?sz=50\" } }, \"content\": \"\", \"url\": 
\"https://plus.google.com/104954254300557350002/posts/AwewXhtn7ws\";, 
\"replies\": { \"totalItems\": 0, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/comments\";
 }, \"plusoners\": { \"totalItems\": 9, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/plusoners\";
 }, \"resharers\": { \"totalItems\": 0, \"selfLink\": 
\"https://content.googleapis.com/plus/v1/activities/z13twrlznpvtzz52w22mdt1y0k3of1djw04/people/resharers\";
 }, \"attachments\": [ { \"objectType\": \"photo\", \"id\": 
\"104954254300557350002.6074732746360957410\", \"content\": \"26/10/2014 - 1\", 
\"url\": 
\"https://plus.google.com/photos/104954254300557350002/albums/6074732747132702225/6074732746360957410\";,
 \"image\": { \"url\": 
\"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w506-h750/2014%2B-%2B1\";,
 \"type\": \"ima
 ge/jpeg\" }, \"fullImage\": { \"url\": 
\"https://lh4.googleusercontent.com/-oO3fnARlDm0/VE3JP1xHKeI/AAAAAAAAeCY/-X2jzc6HruA/w600-h1141/2014%2B-%2B1\";,
 \"type\": \"image/jpeg\", \"height\": 1141, \"width\": 600 } } ] }, 
\"annotation\": \"Truth 😜\", \"provider\": { \"title\": \"Reshared Post\" }, 
\"access\": { \"kind\": \"plus#acl\", \"description\": \"Public\", \"items\": [ 
{ \"type\": \"public\" } ] } }";
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+  private static final String IN_RANGE_IDENTIFIER = "data in range";
+
+  static {
+    SimpleModule simpleModule = new SimpleModule();
+    simpleModule.addDeserializer(Activity.class, new 
GPlusActivityDeserializer());
+    MAPPER.registerModule(simpleModule);
+    MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  }
+
+  private static ActivityFeed createMockActivityFeed(
+      int numBefore,
+      int numAfter,
+      int numInRange,
+      DateTime after,
+      DateTime before,
+      boolean page) {
+    ActivityFeed feed = new ActivityFeed();
+    List<Activity> list = new LinkedList<>();
+    for (int i = 0; i < numAfter; ++i) {
+      DateTime published = before.plus(randomIntBetween(0, Integer.MAX_VALUE));
+      Activity activity = createActivityWithPublishedDate(published);
+      list.add(activity);
+    }
+    for (int i = 0; i < numInRange; ++i) {
+      DateTime published;
+      if ((before == null && after == null) || before == null) {
+        published = DateTime.now(); // no date range or end time date range so 
just make the time now.
+      } else if (after == null) {
+        published = before.minusMillis(randomIntBetween(1, 
Integer.MAX_VALUE)); //no beginning to range
+      } else { // has to be in range
+        long range = before.getMillis() - after.getMillis();
+        published = after.plus(range / 2); //in the middle
+      }
+      Activity activity = createActivityWithPublishedDate(published);
+      activity.setTitle(IN_RANGE_IDENTIFIER);
+      list.add(activity);
+    }
+    for (int i = 0; i < numBefore; ++i) {
+      DateTime published = after.minusMillis(randomIntBetween(1, 
Integer.MAX_VALUE));
+      Activity activity = createActivityWithPublishedDate(published);
+      list.add(activity);
+    }
+    if (page) {
+      feed.setNextPageToken("A");
+    } else {
+      feed.setNextPageToken(null);
+    }
+    feed.setItems(list);
+    return feed;
+  }
+
+  private static Activity createActivityWithPublishedDate(DateTime dateTime) {
+    Activity activity = new Activity();
+    activity.setPublished(new 
com.google.api.client.util.DateTime(dateTime.getMillis()));
+    activity.setId("a");
+    return activity;
+  }
+
+  /**
+   * Creates a randomized activity and randomized date range.
+   * <p/>
+   * The activity feed is separated into three chunks,
+   * |. . . data too recent to be in date range . . .||. . . data in date 
range. . .||. . . data too old to be in date range|
+   * [index 0, 
.............................................................................................,
 index length-1]
+   * <p/>
+   * Inside of those chunks data has no order, but the list is ordered by 
those three chunks.
+   * <p/>
+   * The test will check to see if the num of data in the date range make onto 
the output queue.
+   */
+  @Test
+  @Repeat(iterations = 3)
+  public void testWithBeforeAndAfterDates() throws InterruptedException {
+    //initialize counts assuming no date ranges will be used
+    int numActivities = randomIntBetween(0, 1000);
+    int numActivitiesInDateRange = numActivities;
+    int numberOutOfRange = 0;
+    int numBeforeRange = 0;
+    int numAfterRange = 0;
+    //determine if date ranges will be used
+    DateTime beforeDate = null;
+    DateTime afterDate = null;
+    if (randomInt() % 2 == 0) {
+      beforeDate = DateTime.now().minusDays(randomIntBetween(1, 5));
+    }
+    if (randomInt() % 2 == 0) {
+      if (beforeDate == null) {
+        afterDate = DateTime.now().minusDays(randomIntBetween(1, 10));
+      } else {
+        afterDate = beforeDate.minusDays(randomIntBetween(1, 10));
+      }
+    }
+    //update counts if date ranges are going to be used.
+    if (beforeDate != null || afterDate != null) { //assign amount to be in 
range
+      numActivitiesInDateRange = randomIntBetween(0, numActivities);
+      numberOutOfRange = numActivities - numActivitiesInDateRange;
+    }
+    if (beforeDate == null && afterDate != null) { //assign all out of range 
to be before the start of the range
+      numBeforeRange = numberOutOfRange;
+    } else if (beforeDate != null && afterDate == null) { //assign all out of 
range to be after the start of the range
+      numAfterRange = numberOutOfRange;
+    } else if (beforeDate != null && afterDate != null) { //assign half before 
range and half after the range
+      numAfterRange = (numberOutOfRange / 2) + (numberOutOfRange % 2);
+      numBeforeRange = numberOutOfRange / 2;
+    }
+
+    Plus plus = createMockPlus(numBeforeRange, numAfterRange, 
numActivitiesInDateRange, afterDate, beforeDate);
+    BackOffStrategy strategy = new ConstantTimeBackOffStrategy(1);
+    BlockingQueue<StreamsDatum> datums = new LinkedBlockingQueue<>();
+    UserInfo userInfo = new UserInfo();
+    userInfo.setUserId("A");
+    userInfo.setAfterDate(afterDate);
+    userInfo.setBeforeDate(beforeDate);
+    GPlusUserActivityCollector collector = new 
GPlusUserActivityCollector(plus, datums, strategy, userInfo);
+    collector.run();
+
+    assertEquals(numActivitiesInDateRange, datums.size());
+    while (!datums.isEmpty()) {
+      StreamsDatum datum = datums.take();
+      assertNotNull(datum);
+      assertNotNull(datum.getDocument());
+      assertTrue(datum.getDocument() instanceof String);
+      assertTrue(((String)datum.getDocument()).contains(IN_RANGE_IDENTIFIER)); 
//only in range documents are on the out going queue.
+    }
+  }
+
+  private Plus createMockPlus(final int numBefore, final int numAfter, final 
int numInRange, final DateTime after, final DateTime before) {
+    Plus plus = mock(Plus.class);
+    final Plus.Activities activities = createMockPlusActivities(numBefore, 
numAfter, numInRange, after, before);
+    doAnswer(invocationOnMock -> activities).when(plus).activities();
+    return plus;
+  }
+
+  private Plus.Activities createMockPlusActivities(
+      final int numBefore,
+      final int numAfter,
+      final int numInRange,
+      final DateTime after,
+      final DateTime before) {
+    Plus.Activities activities = mock(Plus.Activities.class);
+    try {
+      Plus.Activities.List list = createMockPlusActivitiesList(numBefore, 
numAfter, numInRange, after, before);
+      when(activities.list(anyString(), anyString())).thenReturn(list);
+    } catch (IOException ioe) {
+      fail("Should not have thrown exception while creating mock. : " + 
ioe.getMessage());
+    }
+    return activities;
+  }
+
+  private Plus.Activities.List createMockPlusActivitiesList(
+      final int numBefore,
+      final int numAfter,
+      final int numInRange,
+      final DateTime after,
+      final DateTime before) {
+    Plus.Activities.List list = mock(Plus.Activities.List.class);
+    when(list.setMaxResults(anyLong())).thenReturn(list);
+    when(list.setPageToken(anyString())).thenReturn(list);
+    ActivityFeedAnswer answer = new ActivityFeedAnswer(numBefore, numAfter, 
numInRange, after, before);
+    try {
+      doAnswer(answer).when(list).execute();
+    } catch (IOException ioe) {
+      fail("Should not have thrown exception while creating mock. : " + 
ioe.getMessage());
+    }
+    return list;
+  }
+
+  private static class ActivityFeedAnswer implements Answer<ActivityFeed> {
+    private int afterCount = 0;
+    private int beforeCount = 0;
+    private int inCount = 0;
+    private int maxBatch = 100;
+
+    private int numAfter;
+    private int numInRange;
+    private int numBefore;
+    private DateTime after;
+    private DateTime before;
+
+    private ActivityFeedAnswer(int numBefore, int numAfter, int numInRange, 
DateTime after, DateTime before) {
+      this.numBefore = numBefore;
+      this.numAfter = numAfter;
+      this.numInRange = numInRange;
+      this.after = after;
+      this.before = before;
+    }
+
+    @Override
+    public ActivityFeed answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+      int totalCount = 0;
+      int batchAfter = 0;
+      int batchBefore = 0;
+      int batchIn = 0;
+      if (afterCount != numAfter) {
+        if (numAfter - afterCount >= maxBatch) {
+          afterCount += maxBatch;
+          batchAfter += maxBatch;
+          totalCount += batchAfter;
+        } else {
+          batchAfter += numAfter - afterCount;
+          totalCount += numAfter - afterCount;
+          afterCount = numAfter;
+        }
+      }
+      if (totalCount < maxBatch && inCount != numInRange) {
+        if (numInRange - inCount >= maxBatch - totalCount) {
+          inCount += maxBatch - totalCount;
+          batchIn += maxBatch - totalCount;
+          totalCount += batchIn;
+        } else {
+          batchIn += numInRange - inCount;
+          totalCount += numInRange - inCount;
+          inCount = numInRange;
+        }
+      }
+      if (totalCount < maxBatch && beforeCount != numBefore) {
+        if (numBefore - batchBefore >= maxBatch - totalCount) {
+          batchBefore += maxBatch - totalCount;
+          totalCount = maxBatch;
+          beforeCount += batchBefore;
+        } else {
+          batchBefore += numBefore - beforeCount;
+          totalCount += numBefore - beforeCount;
+          beforeCount = numBefore;
+        }
+      }
+
+      return createMockActivityFeed(
+          batchBefore,
+          batchAfter,
+          batchIn,
+          after,
+          before,
+          numAfter != afterCount || inCount != numInRange || beforeCount != 
numBefore);
+    }
+  }
+
+
+}

Reply via email to