Repository: incubator-streams
Updated Branches:
  refs/heads/master e40e6287e -> 7810361d2


Replace Guava APIs with Java 8, this closes apache/incubator-streams#347


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7810361d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7810361d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7810361d

Branch: refs/heads/master
Commit: 7810361d2e5a2f236967b3208053ec80b0e5885a
Parents: e40e628
Author: smarthi <smar...@apache.org>
Authored: Tue Jan 3 23:00:20 2017 -0500
Committer: smarthi <smar...@apache.org>
Committed: Tue Jan 3 23:00:20 2017 -0500

----------------------------------------------------------------------
 .../filters/test/VerbDefinitionFilterTest.java  | 23 ++++++++++----------
 .../amazon/kinesis/KinesisPersistReader.java    |  7 +++---
 .../amazon/kinesis/KinesisPersistWriter.java    |  4 ++--
 .../org/apache/streams/s3/S3PersistReader.java  |  7 +++---
 .../apache/streams/s3/S3PersistReaderTask.java  |  4 ++--
 .../test/TestMetadataFromDocumentProcessor.java |  4 ++--
 .../streams/hdfs/WebHdfsPersistReader.java      |  4 ++--
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |  6 ++---
 .../streams/hdfs/WebHdfsPersistWriter.java      |  4 ++--
 .../regex/RegexHashtagExtractorTest.java        | 17 +++++++++------
 .../regex/RegexMentionExtractorTest.java        | 23 +++++++++++---------
 .../streams/regex/RegexUrlExtractorTest.java    | 21 ++++++++++--------
 .../provider/FacebookDataCollector.java         |  4 ++--
 .../provider/FacebookFriendFeedProvider.java    |  7 +++---
 .../provider/FacebookFriendUpdatesProvider.java | 14 ++++++------
 .../facebook/provider/FacebookProvider.java     | 10 ++++-----
 .../provider/FacebookUserstreamProvider.java    | 21 +++++++++---------
 .../streams/moreover/MoreoverProvider.java      |  3 +--
 .../streams/moreover/MoreoverProviderTask.java  |  4 +---
 .../sysomos/provider/SysomosProvider.java       | 15 +++++--------
 .../streams/sysomos/util/SysomosUtils.java      |  4 ++--
 .../test/providers/EmptyResultSetProvider.java  |  9 ++++----
 .../test/providers/NumericMessageProvider.java  |  7 +++---
 .../test/component/FileReaderProvider.java      |  4 ++--
 24 files changed, 114 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java
 
b/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java
index 09cb2e6..48b9204 100644
--- 
a/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java
+++ 
b/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java
@@ -26,10 +26,11 @@ import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.verbs.VerbDefinition;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Sets;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Tests for {$link: org.apache.streams.verbs.VerbDefinitionResolver}
@@ -44,7 +45,7 @@ public class VerbDefinitionFilterTest {
     @Test
     public void testVerbMatchFilter() throws Exception {
         VerbDefinition definition = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"),
 VerbDefinition.class);
-        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
         filter.prepare(null);
         StreamsDatum datum1 = new 
StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", 
Activity.class));
         List<StreamsDatum> result1 = filter.process(datum1);
@@ -60,7 +61,7 @@ public class VerbDefinitionFilterTest {
     @Test
     public void testProviderFilter() throws Exception {
         VerbDefinition definition = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/provider.json"),
 VerbDefinition.class);
-        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
         filter.prepare(null);
         StreamsDatum datum1 = new 
StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"post\",\"provider\":{\"id\":\"providerId\",\"objectType\":\"product\"}}\n",
 Activity.class));
         List<StreamsDatum> result1 = filter.process(datum1);
@@ -76,7 +77,7 @@ public class VerbDefinitionFilterTest {
     @Test
     public void testActorFilter() throws Exception {
         VerbDefinition definition = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/actor.json"),
 VerbDefinition.class);
-        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
         filter.prepare(null);
         StreamsDatum datum1 = new 
StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"post\",\"actor\":{\"id\":\"actorId\",\"objectType\":\"page\"}}\n",
 Activity.class));
         List<StreamsDatum> result1 = filter.process(datum1);
@@ -93,7 +94,7 @@ public class VerbDefinitionFilterTest {
     @Test
     public void testObjectFilter() throws Exception {
         VerbDefinition definition = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/object.json"),
 VerbDefinition.class);
-        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
         filter.prepare(null);
         StreamsDatum datum1 = new 
StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"post\",\"object\":{\"id\":\"objectId\"}}\n",
 Activity.class));
         List<StreamsDatum> result1 = filter.process(datum1);
@@ -109,7 +110,7 @@ public class VerbDefinitionFilterTest {
     @Test
     public void testMultiFilter() throws Exception {
         VerbDefinition definition = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/follow.json"),
 VerbDefinition.class);
-        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
         filter.prepare(null);
         StreamsDatum datum1 = new 
StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"follow\",\"actor\":{\"id\":\"actorId\",\"objectType\":\"page\"}}}\n",
 Activity.class));
         List<StreamsDatum> result1 = filter.process(datum1);
@@ -129,7 +130,7 @@ public class VerbDefinitionFilterTest {
     @Test
     public void testTargetRequired() throws Exception {
         VerbDefinition definition = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/targetrequired.json"),
 VerbDefinition.class);
-        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
         filter.prepare(null);
         StreamsDatum datum1 = new 
StreamsDatum(mapper.readValue("{\"id\":\"id\",\"verb\":\"post\",\"object\":{\"id\":\"objectId\",\"objectType\":\"task\"}}\n",
 Activity.class));
         List<StreamsDatum> result1 = filter.process(datum1);
@@ -145,7 +146,7 @@ public class VerbDefinitionFilterTest {
     @Test
     public void testAllWildcard() throws Exception {
         VerbDefinition definition = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"),
 VerbDefinition.class);
-        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Sets.newHashSet(definition));
+        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet()));
         filter.prepare(null);
         StreamsDatum datum1 = new 
StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", 
Activity.class));
         List<StreamsDatum> result1 = filter.process(datum1);
@@ -180,7 +181,7 @@ public class VerbDefinitionFilterTest {
         VerbDefinition object = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/object.json"),
 VerbDefinition.class);
         VerbDefinition target = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/targetrequired.json"),
 VerbDefinition.class);
         VerbDefinition follow = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/follow.json"),
 VerbDefinition.class);
-        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Sets.newHashSet(provider,actor,object,target,follow));
+        VerbDefinitionKeepFilter filter = new 
VerbDefinitionKeepFilter(Stream.of(provider,actor,object,target,follow).collect(Collectors.toSet()));
         filter.prepare(null);
         StreamsDatum datum1 = new 
StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", 
Activity.class));
         List<StreamsDatum> result1 = filter.process(datum1);
@@ -214,7 +215,7 @@ public class VerbDefinitionFilterTest {
     @Test
     public void testVerbDropFilter() throws Exception {
         VerbDefinition definition = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"),
 VerbDefinition.class);
-        VerbDefinitionDropFilter filter = new 
VerbDefinitionDropFilter(Sets.newHashSet(definition));
+        VerbDefinitionDropFilter filter = new 
VerbDefinitionDropFilter(Stream.of(definition).collect(Collectors.toSet()));
         filter.prepare(null);
         StreamsDatum datum1 = new 
StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", 
Activity.class));
         List<StreamsDatum> result1 = filter.process(datum1);
@@ -230,7 +231,7 @@ public class VerbDefinitionFilterTest {
     @Test
     public void testDropAllWildcard() throws Exception {
         VerbDefinition definition = 
mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"),
 VerbDefinition.class);
-        VerbDefinitionDropFilter filter = new 
VerbDefinitionDropFilter(Sets.newHashSet(definition));
+        VerbDefinitionDropFilter filter = new 
VerbDefinitionDropFilter(Stream.of(definition).collect(Collectors.toSet()));
         filter.prepare(null);
         StreamsDatum datum1 = new 
StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", 
Activity.class));
         List<StreamsDatum> result1 = filter.process(datum1);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
index 42cce8a..5b18a91 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
@@ -34,9 +34,8 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
-import com.google.common.collect.Queues;
 import com.typesafe.config.Config;
+import org.apache.commons.lang3.StringUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,7 +129,7 @@ public class KinesisPersistReader implements 
StreamsPersistReader, Serializable
 
     StreamsResultSet current;
     synchronized( KinesisPersistReader.class ) {
-      current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+      current = new StreamsResultSet(new 
ConcurrentLinkedQueue<>(persistQueue));
       persistQueue.clear();
     }
     return current;
@@ -162,7 +161,7 @@ public class KinesisPersistReader implements 
StreamsPersistReader, Serializable
       
clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
 
       this.client = new AmazonKinesisClient(credentials, clientConfig);
-      if (!Strings.isNullOrEmpty(config.getRegion()))
+      if (StringUtils.isNotEmpty(config.getRegion()))
         
this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
     }
     streamNames = this.config.getStreams();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
index 6e2db0f..d528f15 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
@@ -35,8 +35,8 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.PutRecordRequest;
 import com.amazonaws.services.kinesis.model.PutRecordResult;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
 import com.typesafe.config.Config;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -124,7 +124,7 @@ public class KinesisPersistWriter implements 
StreamsPersistWriter {
       
clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
 
       this.client = new AmazonKinesisClient(credentials, clientConfig);
-      if (!Strings.isNullOrEmpty(config.getRegion())) {
+      if (StringUtils.isNotEmpty(config.getRegion())) {
         
this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index da1a00e..3a2cf3b 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -37,8 +37,8 @@ import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
+import org.apache.commons.lang3.StringUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +47,7 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -132,7 +133,7 @@ public class S3PersistReader implements 
StreamsPersistReader, DatumStatusCountab
       clientOptions.setPathStyleAccess(false);
 
       this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
-      if ( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion())) {
+      if (StringUtils.isNotEmpty(s3ReaderConfiguration.getRegion())) {
         
this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion())));
       }
       this.amazonS3Client.setS3ClientOptions(clientOptions);
@@ -206,7 +207,7 @@ public class S3PersistReader implements 
StreamsPersistReader, DatumStatusCountab
     StreamsResultSet current;
 
     synchronized ( S3PersistReader.class ) {
-      current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+      current = new StreamsResultSet(new 
ConcurrentLinkedQueue<>(persistQueue));
       current.setCounter(new DatumStatusCounter());
       current.getCounter().add(countersCurrent);
       countersTotal.add(countersCurrent);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 82bcba7..775e7e0 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -22,7 +22,7 @@ import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.util.ComponentUtils;
 
-import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +57,7 @@ public class S3PersistReaderTask implements Runnable {
       String line;
       try {
         while ((line = bufferedReader.readLine()) != null) {
-          if ( !Strings.isNullOrEmpty(line) ) {
+          if (StringUtils.isNotEmpty(line) ) {
             reader.countersCurrent.incrementAttempt();
             StreamsDatum entry = reader.lineReaderUtil.processLine(line);
             ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
index 8b45eb2..a0b483f 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
@@ -25,7 +25,6 @@ import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Sets;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Before;
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -74,7 +74,7 @@ public class TestMetadataFromDocumentProcessor {
         .getResourceAsStream("activities");
     List<String> files = IOUtils.readLines(testActivityFolderStream, 
StandardCharsets.UTF_8);
 
-    Set<ActivityObject> objects = Sets.newHashSet();
+    Set<ActivityObject> objects = new HashSet<>();
 
     for( String file : files) {
       LOGGER.info("File: " + file );

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 0036fea..a673d8f 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -53,6 +53,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -217,7 +218,6 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader, DatumStatusCo
     }
     streamsConfiguration = StreamsConfigurator.detectConfiguration();
     persistQueue = Queues.synchronizedQueue(new 
LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getBatchSize().intValue()));
-    //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue());
     executor = Executors.newSingleThreadExecutor();
     mapper = StreamsJacksonMapper.getInstance();
   }
@@ -252,7 +252,7 @@ public class WebHdfsPersistReader implements 
StreamsPersistReader, DatumStatusCo
     StreamsResultSet current;
 
     synchronized ( WebHdfsPersistReader.class ) {
-      current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+      current = new StreamsResultSet(new 
ConcurrentLinkedQueue<>(persistQueue));
       current.setCounter(new DatumStatusCounter());
       current.getCounter().add(countersCurrent);
       countersTotal.add(countersCurrent);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index d18bda9..fa287eb 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -21,8 +21,8 @@ package org.apache.streams.hdfs;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
 
-import com.google.common.base.Strings;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,7 +82,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
         do {
           try {
             line = bufferedReader.readLine();
-            if ( !Strings.isNullOrEmpty(line) ) {
+            if (StringUtils.isNotEmpty(line)) {
               reader.countersCurrent.incrementAttempt();
               StreamsDatum entry = reader.lineReaderUtil.processLine(line);
               if ( entry != null ) {
@@ -98,7 +98,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
             reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
           }
         }
-        while ( !Strings.isNullOrEmpty(line) );
+        while (StringUtils.isNotEmpty(line));
         LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
         try {
           bufferedReader.close();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 29a6b73..9079c7e 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -29,7 +29,7 @@ import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
@@ -101,7 +101,7 @@ public class WebHdfsPersistWriter implements 
StreamsPersistWriter, Flushable, Cl
     StringBuilder uriBuilder = new StringBuilder();
     uriBuilder.append(hdfsConfiguration.getScheme());
     uriBuilder.append("://");
-    if ( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) {
+    if (StringUtils.isNotEmpty(hdfsConfiguration.getHost())) {
       uriBuilder.append(hdfsConfiguration.getHost() + ":" + 
hdfsConfiguration.getPort());
     } else {
       uriBuilder.append("/");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
index 1912ff0..b17df37 100644
--- 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
+++ 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
@@ -23,15 +23,17 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
 
-import com.google.common.collect.Sets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -52,11 +54,12 @@ public class RegexHashtagExtractorTest {
     @Parameterized.Parameters
     public static Collection<Object[]> params() {
         return Arrays.asList(new Object[][]{
-                {"This is the #content of a standard tweet", 
Sets.newHashSet("content")},
-                {"This is the content of a standard tweet", Sets.newHashSet()},
-                {"This is the #content of a standard #tweet", 
Sets.newHashSet("content", "tweet")},
-                {"UNIX 时间1400000000 秒…… 
(该睡觉了,各位夜猫子)#程序员#", Sets.newHashSet("程序员")},
-                {"This is the body of a #fbpost. It can have multiple lines of 
#content, as well as much more detailed and flowery #language.", 
Sets.newHashSet("content", "fbpost", "language")}
+                {"This is the #content of a standard tweet", 
Stream.of("content").collect(Collectors.toSet())},
+                {"This is the content of a standard tweet", new HashSet<>()},
+                {"This is the #content of a standard #tweet", 
Stream.of("content", "tweet").collect(Collectors.toSet())},
+                {"UNIX 时间1400000000 秒…… 
(该睡觉了,各位夜猫子)#程序员#", 
Stream.of("程序员").collect(Collectors.toSet())},
+                {"This is the body of a #fbpost. It can have multiple lines of 
#content, as well as much more detailed and flowery #language.",
+                    Stream.of("content", "fbpost", 
"language").collect(Collectors.toSet())}
         });
     }
 
@@ -68,7 +71,7 @@ public class RegexHashtagExtractorTest {
         assertThat(result.size(), is(equalTo(1)));
         Activity output = (Activity)result.get(0).getDocument();
         Set<String> extracted = (Set) 
ExtensionUtil.getInstance().ensureExtensions(output).get(RegexHashtagExtractor.EXTENSION_KEY);
-        Sets.SetView<String> diff = Sets.difference(extracted, hashtags);
+        Set<String> diff = extracted.stream().filter((x) -> 
!hashtags.contains(x)).collect(Collectors.toSet());
         assertThat(diff.size(), is(equalTo(0)));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
index bb0e95d..3706b15 100644
--- 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
+++ 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
@@ -34,6 +34,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -54,23 +56,24 @@ public class RegexMentionExtractorTest {
     @Parameterized.Parameters
     public static Collection<Object[]> params() {
         return Arrays.asList(new Object[][]{
-                {"This is the @content of a standard tweet", 
Sets.newHashSet(new HashMap<String, Object>() {{
-                    put("displayName", "content");
-                }})},
-                {"This is the content of a standard tweet", 
Sets.newHashSet(new HashMap<String, Object>())},
-                {"This is the @content of a standard @tweet",  
Sets.newHashSet(new HashMap<String, Object>() {{
+                {"This is the @content of a standard tweet", Stream.of(new 
HashMap<String, Object>() {{
+                  put("displayName", "content");
+                }}).collect(Collectors.toSet())},
+                {"This is the content of a standard tweet", Stream.of(new 
HashMap<String, Object>()).collect(Collectors.toSet())},
+                {"This is the @content of a standard @tweet",  Stream.of(new 
HashMap<String, Object>() {{
                     put("displayName", "content");
                 }},new HashMap<String, Object>() {{
                     put("displayName", "tweet");
-                }})},
-                {"UNIX 时间1400000000 秒…… 
(该睡觉了,各位夜猫子)@程序员#", Sets.newHashSet(new 
HashMap<String, Object>() {{
+                }}).collect(Collectors.toSet())},
+                {"UNIX 时间1400000000 秒…… 
(该睡觉了,各位夜猫子)@程序员#", Stream.of(new HashMap<String, 
Object>() {{
                     put("displayName", "程序员");
-                }})},
-                {"This is the body of a @fbpost. It can have multiple lines of 
#content, as well as much more detailed and flowery @language.",  
Sets.newHashSet(new HashMap<String, Object>() {{
+                }}).collect(Collectors.toSet())},
+                {"This is the body of a @fbpost. It can have multiple lines of 
#content, as well as much more detailed and flowery @language.",
+                    Stream.of(new HashMap<String, Object>() {{
                     put("displayName", "fbpost");
                 }},new HashMap<String, Object>() {{
                     put("displayName", "language");
-                }})}
+                }}).collect(Collectors.toSet())}
         });
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
index 64e8599..7c7948b 100644
--- 
a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
+++ 
b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
@@ -22,15 +22,17 @@ package org.apache.streams.regex;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.pojo.json.Activity;
 
-import com.google.common.collect.Sets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -51,12 +53,13 @@ public class RegexUrlExtractorTest {
     @Parameterized.Parameters
     public static Collection<Object[]> params() {
         return Arrays.asList(new Object[][]{
-                {"This is the http://t.co/foo of a standard tweet", 
Sets.newHashSet("http://t.co/foo";)},
-                {"This is the https://t.co/foo of a standard tweet", 
Sets.newHashSet("https://t.co/foo";)},
-                {"This is the https://t.co/foo of a standard tweet 
https://t.co/foo";, Sets.newHashSet("https://t.co/foo";)},
-                {"This is the http://amd.com/test of a standard tweet", 
Sets.newHashSet("http://amd.com/test";)},
-                {"This is the content of a standard tweet", Sets.newHashSet()},
-                {"This is the 
http://www.google.com/articles/awesome?with=query&params=true of a standard 
@tweet",  
Sets.newHashSet("http://www.google.com/articles/awesome?with=query&params=true";)}
+                {"This is the http://t.co/foo of a standard tweet", 
Stream.of("http://t.co/foo";).collect(Collectors.toSet())},
+                {"This is the https://t.co/foo of a standard tweet", 
Stream.of("https://t.co/foo";).collect(Collectors.toSet())},
+                {"This is the https://t.co/foo of a standard tweet 
https://t.co/foo";, Stream.of("https://t.co/foo";).collect(Collectors.toSet())},
+                {"This is the http://amd.com/test of a standard tweet", 
Stream.of("http://amd.com/test";).collect(Collectors.toSet())},
+                {"This is the content of a standard tweet", new HashSet<>()},
+                {"This is the 
http://www.google.com/articles/awesome?with=query&params=true of a standard 
@tweet",
+                    
Stream.of("http://www.google.com/articles/awesome?with=query&params=true";).collect(Collectors.toSet())}
         });
     }
 
@@ -67,8 +70,8 @@ public class RegexUrlExtractorTest {
         List<StreamsDatum> result = new RegexUrlExtractor().process(datum);
         assertThat(result.size(), is(equalTo(1)));
         Activity output = (Activity)result.get(0).getDocument();
-        Set<String> extracted = Sets.newHashSet(output.getLinks());
-        Sets.SetView<String> diff = Sets.difference(links, extracted);
+        Set<String> extracted = new HashSet<>(output.getLinks());
+        Set<String> diff = links.stream().filter((x) -> 
!extracted.contains(x)).collect(Collectors.toSet());
         assertThat(diff.size(), is(equalTo(0)));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
index 52ec222..0a13b64 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
@@ -27,10 +27,10 @@ import 
org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
 import 
org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
 import facebook4j.Facebook;
 import facebook4j.FacebookFactory;
 import facebook4j.conf.ConfigurationBuilder;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,7 +112,7 @@ public abstract class FacebookDataCollector implements 
Runnable {
       LOGGER.debug("appAccessToken : {}", 
this.config.getOauth().getAppAccessToken());
     }
     cb.setJSONStoreEnabled(true);
-    if (!Strings.isNullOrEmpty(config.getVersion())) {
+    if (StringUtils.isNotEmpty(config.getVersion())) {
       cb.setRestBaseURL("https://graph.facebook.com/"; + config.getVersion() + 
"/");
     }
     LOGGER.debug("appId : {}", this.config.getOauth().getAppId());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
index 14d5a64..e256418 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
@@ -29,8 +29,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
 import facebook4j.Facebook;
@@ -54,6 +52,7 @@ import java.util.Iterator;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -166,7 +165,7 @@ public class FacebookFriendFeedProvider implements 
StreamsProvider, Serializable
     StreamsResultSet current;
 
     synchronized (FacebookUserstreamProvider.class) {
-      current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+      current = new StreamsResultSet(new 
ConcurrentLinkedQueue<>(providerQueue));
       current.setCounter(new DatumStatusCounter());
       current.getCounter().add(countersCurrent);
       countersTotal.add(countersCurrent);
@@ -220,7 +219,7 @@ public class FacebookFriendFeedProvider implements 
StreamsProvider, Serializable
   @Override
   public void prepare(Object configurationObject) {
 
-    executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+    executor = newFixedThreadPoolWithQueueSize(5, 20);
 
     Objects.requireNonNull(providerQueue);
     Objects.requireNonNull(this.klass);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
index c973863..39b7771 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
@@ -30,8 +30,6 @@ import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
 import facebook4j.Facebook;
@@ -51,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigInteger;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.Queue;
@@ -63,6 +62,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 /**
  * FacebookFriendUpdatesProvider provides updates from friend feed.
@@ -237,7 +237,7 @@ public class FacebookFriendUpdatesProvider implements 
StreamsProvider, Serializa
   @Override
   public void prepare(Object configurationObject) {
 
-    executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+    executor = newFixedThreadPoolWithQueueSize(5, 20);
 
     Objects.requireNonNull(providerQueue);
     Objects.requireNonNull(this.klass);
@@ -289,7 +289,7 @@ public class FacebookFriendUpdatesProvider implements 
StreamsProvider, Serializa
     FacebookUserstreamProvider provider;
     Facebook client;
 
-    private Set<Post> priorPollResult = Sets.newHashSet();
+    private Set<Post> priorPollResult = new HashSet<>();
 
     public FacebookFeedPollingTask(FacebookUserstreamProvider 
facebookUserstreamProvider) {
       provider = facebookUserstreamProvider;
@@ -301,9 +301,9 @@ public class FacebookFriendUpdatesProvider implements 
StreamsProvider, Serializa
       while (provider.isRunning()) {
         try {
           ResponseList<Post> postResponseList = client.getHome();
-          Set<Post> update = Sets.newHashSet(postResponseList);
-          Set<Post> repeats = Sets.intersection(priorPollResult, 
Sets.newHashSet(update));
-          Set<Post> entrySet = Sets.difference(update, repeats);
+          Set<Post> update = new HashSet<>(postResponseList);
+          Set<Post> repeats = 
priorPollResult.stream().filter(update::contains).collect(Collectors.toSet());
+          Set<Post> entrySet = update.stream().filter((x) -> 
!repeats.contains(x)).collect(Collectors.toSet());
           for (Post item : entrySet) {
             String json = DataObjectFactory.getRawJSON(item);
             org.apache.streams.facebook.Post post = mapper.readValue(json, 
org.apache.streams.facebook.Post.class);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
index a665023..70e0a65 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java
@@ -29,8 +29,6 @@ import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.SerializationUtil;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -43,11 +41,13 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -105,7 +105,7 @@ public abstract class FacebookProvider implements 
StreamsProvider {
   @Override
   public StreamsResultSet readCurrent() {
     int batchSize = 0;
-    BlockingQueue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
+    BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>();
     while (!this.datums.isEmpty() && batchSize < MAX_BATCH_SIZE) {
       
ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums), 
batch);
       ++batchSize;
@@ -125,7 +125,7 @@ public abstract class FacebookProvider implements 
StreamsProvider {
 
   @Override
   public void prepare(Object configurationObject) {
-    this.datums = Queues.newLinkedBlockingQueue();
+    this.datums = new LinkedBlockingQueue<>();
     this.isComplete = new AtomicBoolean(false);
     this.executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
   }
@@ -141,7 +141,7 @@ public abstract class FacebookProvider implements 
StreamsProvider {
    * @param idsToAfterDate idsToAfterDate
    */
   public void overrideIds(Map<String, DateTime> idsToAfterDate) {
-    Set<IdConfig> ids = Sets.newHashSet();
+    Set<IdConfig> ids = new HashSet<>();
     for (String id : idsToAfterDate.keySet()) {
       IdConfig idConfig = new IdConfig();
       idConfig.setId(id);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
index d198e43..c502ada 100644
--- 
a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
+++ 
b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java
@@ -29,10 +29,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
 import facebook4j.Facebook;
@@ -51,11 +47,13 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -63,6 +61,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 public class FacebookUserstreamProvider implements StreamsProvider, 
Serializable {
 
@@ -88,7 +87,7 @@ public class FacebookUserstreamProvider implements 
StreamsProvider, Serializable
     this.configuration = config;
   }
 
-  protected ListeningExecutorService executor;
+  protected ExecutorService executor;
 
   protected DateTime start;
   protected DateTime end;
@@ -187,7 +186,7 @@ public class FacebookUserstreamProvider implements 
StreamsProvider, Serializable
     StreamsResultSet current;
 
     synchronized (FacebookUserstreamProvider.class) {
-      current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+      current = new StreamsResultSet(new 
ConcurrentLinkedQueue<>(providerQueue));
       current.setCounter(new DatumStatusCounter());
       current.getCounter().add(countersCurrent);
       countersTotal.add(countersCurrent);
@@ -241,7 +240,7 @@ public class FacebookUserstreamProvider implements 
StreamsProvider, Serializable
   @Override
   public void prepare(Object configurationObject) {
 
-    executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+    executor = newFixedThreadPoolWithQueueSize(5, 20);
 
     Objects.requireNonNull(providerQueue);
     Objects.requireNonNull(this.klass);
@@ -299,7 +298,7 @@ public class FacebookUserstreamProvider implements 
StreamsProvider, Serializable
     Facebook client;
     String id;
 
-    private Set<Post> priorPollResult = Sets.newHashSet();
+    private Set<Post> priorPollResult = new HashSet<>();
 
     public FacebookFeedPollingTask(FacebookUserstreamProvider 
facebookUserstreamProvider) {
       this.provider = facebookUserstreamProvider;
@@ -318,9 +317,9 @@ public class FacebookUserstreamProvider implements 
StreamsProvider, Serializable
         try {
           postResponseList = client.getFeed(id);
 
-          Set<Post> update = Sets.newHashSet(postResponseList);
-          Set<Post> repeats = Sets.intersection(priorPollResult, 
Sets.newHashSet(update));
-          Set<Post> entrySet = Sets.difference(update, repeats);
+          Set<Post> update = new HashSet<>(postResponseList);
+          Set<Post> repeats = 
priorPollResult.stream().filter(update::contains).collect(Collectors.toSet());
+          Set<Post> entrySet = update.stream().filter((x) -> 
!repeats.contains(x)).collect(Collectors.toSet());
           LOGGER.debug(this.id + " response: " + update.size() + " previous: " 
+ repeats.size() + " new: " + entrySet.size());
           for (Post item : entrySet) {
             String json = DataObjectFactory.getRawJSON(item);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
index f7b9d88..a295898 100644
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
+++ 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
-import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -106,7 +105,7 @@ public class MoreoverProvider implements StreamsProvider {
     Collection<StreamsDatum> currentIterator = new ArrayList<>();
     Iterators.addAll(currentIterator, providerQueue.iterator());
 
-    StreamsResultSet current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
+    StreamsResultSet current = new StreamsResultSet(new 
ConcurrentLinkedQueue<>(currentIterator));
 
     providerQueue.clear();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
index 88aec81..595fb4f 100644
--- 
a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
+++ 
b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
@@ -37,7 +37,6 @@ public class MoreoverProviderTask implements Runnable {
 
   private String lastSequence;
   private final String apiKey;
-  private final String apiId;
   private final Queue<StreamsDatum> results;
   private final MoreoverClient moClient;
   private boolean started = false;
@@ -51,11 +50,10 @@ public class MoreoverProviderTask implements Runnable {
    */
   public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> 
results, String lastSequence) {
     //logger.info("Constructed new task {} for {} {} {}", 
UUID.randomUUID().toString(), apiId, apiKey, lastSequence);
-    this.apiId = apiId;
     this.apiKey = apiKey;
     this.results = results;
     this.lastSequence = lastSequence;
-    this.moClient = new MoreoverClient(this.apiId, this.apiKey, 
this.lastSequence);
+    this.moClient = new MoreoverClient(apiId, this.apiKey, this.lastSequence);
     initializeClient(moClient);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
index ec1f317..fffe7a1 100644
--- 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
+++ 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java
@@ -32,8 +32,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.sysomos.SysomosConfiguration;
 import com.typesafe.config.Config;
@@ -49,10 +47,11 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.math.BigInteger;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -76,7 +75,7 @@ public class SysomosProvider implements StreamsProvider {
 
   public static final String STREAMS_ID = "SysomosProvider";
 
-  public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
+  public enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE }
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SysomosProvider.class);
 
@@ -91,7 +90,7 @@ public class SysomosProvider implements StreamsProvider {
   protected volatile Queue<StreamsDatum> providerQueue;
 
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
-  private final Set<String> completedHeartbeats = Sets.newHashSet();
+  private final Set<String> completedHeartbeats = new HashSet<>();
   private final long maxQueued;
   private final long minLatency;
   private final long scheduledLatency;
@@ -332,7 +331,7 @@ public class SysomosProvider implements StreamsProvider {
   }
 
   private Queue<StreamsDatum> constructQueue() {
-    return Queues.newConcurrentLinkedQueue();
+    return new ConcurrentLinkedQueue<>();
   }
 
   public int getCount() {
@@ -379,9 +378,7 @@ public class SysomosProvider implements StreamsProvider {
     provider.startStream();
     do {
       
Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(),
 TimeUnit.MILLISECONDS);
-      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-      while (iterator.hasNext()) {
-        StreamsDatum datum = iterator.next();
+      for (StreamsDatum datum : provider.readCurrent()) {
         String json;
         try {
           json = mapper.writeValueAsString(datum.getDocument());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
index 82d538d..0acc81f 100644
--- 
a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
+++ 
b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java
@@ -21,8 +21,8 @@ package org.apache.streams.sysomos.util;
 
 import org.apache.streams.sysomos.SysomosException;
 
-import com.google.common.base.Strings;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 import org.slf4j.Logger;
@@ -66,7 +66,7 @@ public class SysomosUtils {
       writer.flush();
 
       String xmlResponse = writer.toString();
-      if (Strings.isNullOrEmpty(xmlResponse)) {
+      if (StringUtils.isEmpty(xmlResponse)) {
         throw new SysomosException("XML Response from Sysomos was empty : "
             + xmlResponse
             + "\n"

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
index 571c0fc..b85ad4a 100644
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java
@@ -19,14 +19,13 @@
 
 package org.apache.streams.local.test.providers;
 
-import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 
-import com.google.common.collect.Queues;
 import org.joda.time.DateTime;
 
 import java.math.BigInteger;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Provides new, empty instances of result set.
@@ -45,17 +44,17 @@ public class EmptyResultSetProvider implements 
StreamsProvider {
 
   @Override
   public StreamsResultSet readCurrent() {
-    return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+    return new StreamsResultSet(new LinkedBlockingQueue<>());
   }
 
   @Override
   public StreamsResultSet readNew(BigInteger sequence) {
-    return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+    return new StreamsResultSet(new LinkedBlockingQueue<>());
   }
 
   @Override
   public StreamsResultSet readRange(DateTime start, DateTime end) {
-    return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue());
+    return new StreamsResultSet(new LinkedBlockingQueue<>());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
index 88494a8..21f37ba 100644
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java
@@ -22,12 +22,13 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 
-import com.google.common.collect.Queues;
 import org.joda.time.DateTime;
 
 import java.math.BigInteger;
 import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Test StreamsProvider that sends out StreamsDatums numbered from 0 to 
numMessages.
@@ -57,7 +58,7 @@ public class NumericMessageProvider implements 
StreamsProvider {
   @Override
   public StreamsResultSet readCurrent() {
     int batchSize = 0;
-    Queue<StreamsDatum> batch = Queues.newLinkedBlockingQueue();
+    Queue<StreamsDatum> batch = new LinkedBlockingQueue<>();
     try {
       while (!this.data.isEmpty() && batchSize < DEFAULT_BATCH_SIZE) {
         batch.add(this.data.take());
@@ -97,7 +98,7 @@ public class NumericMessageProvider implements 
StreamsProvider {
   }
 
   private BlockingQueue<StreamsDatum> constructQueue() {
-    BlockingQueue<StreamsDatum> datums = 
Queues.newArrayBlockingQueue(numMessages);
+    BlockingQueue<StreamsDatum> datums = new ArrayBlockingQueue<>(numMessages);
     for(int i=0;i<numMessages;i++) {
       datums.add(new StreamsDatum(i));
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
index 0fbfae9..632d079 100644
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java
@@ -22,12 +22,12 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 
-import com.google.common.collect.Queues;
 import org.joda.time.DateTime;
 
 import java.math.BigInteger;
 import java.util.Queue;
 import java.util.Scanner;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * FOR TESTING PURPOSES ONLY.
@@ -91,7 +91,7 @@ public class FileReaderProvider implements StreamsProvider {
   }
 
   private Queue<StreamsDatum> constructQueue(Scanner scanner) {
-    Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue();
+    Queue<StreamsDatum> data = new LinkedBlockingQueue<>();
     while(scanner.hasNextLine()) {
       data.add(converter.convert(scanner.nextLine()));
     }

Reply via email to