Repository: incubator-streams Updated Branches: refs/heads/master e40e6287e -> 7810361d2
Replace Guava APIs with Java 8, this closes apache/incubator-streams#347 Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7810361d Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7810361d Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7810361d Branch: refs/heads/master Commit: 7810361d2e5a2f236967b3208053ec80b0e5885a Parents: e40e628 Author: smarthi <smar...@apache.org> Authored: Tue Jan 3 23:00:20 2017 -0500 Committer: smarthi <smar...@apache.org> Committed: Tue Jan 3 23:00:20 2017 -0500 ---------------------------------------------------------------------- .../filters/test/VerbDefinitionFilterTest.java | 23 ++++++++++---------- .../amazon/kinesis/KinesisPersistReader.java | 7 +++--- .../amazon/kinesis/KinesisPersistWriter.java | 4 ++-- .../org/apache/streams/s3/S3PersistReader.java | 7 +++--- .../apache/streams/s3/S3PersistReaderTask.java | 4 ++-- .../test/TestMetadataFromDocumentProcessor.java | 4 ++-- .../streams/hdfs/WebHdfsPersistReader.java | 4 ++-- .../streams/hdfs/WebHdfsPersistReaderTask.java | 6 ++--- .../streams/hdfs/WebHdfsPersistWriter.java | 4 ++-- .../regex/RegexHashtagExtractorTest.java | 17 +++++++++------ .../regex/RegexMentionExtractorTest.java | 23 +++++++++++--------- .../streams/regex/RegexUrlExtractorTest.java | 21 ++++++++++-------- .../provider/FacebookDataCollector.java | 4 ++-- .../provider/FacebookFriendFeedProvider.java | 7 +++--- .../provider/FacebookFriendUpdatesProvider.java | 14 ++++++------ .../facebook/provider/FacebookProvider.java | 10 ++++----- .../provider/FacebookUserstreamProvider.java | 21 +++++++++--------- .../streams/moreover/MoreoverProvider.java | 3 +-- .../streams/moreover/MoreoverProviderTask.java | 4 +--- .../sysomos/provider/SysomosProvider.java | 15 +++++-------- .../streams/sysomos/util/SysomosUtils.java | 4 ++-- .../test/providers/EmptyResultSetProvider.java | 9 ++++---- .../test/providers/NumericMessageProvider.java | 7 +++--- .../test/component/FileReaderProvider.java | 4 ++-- 24 files changed, 114 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java b/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java index 09cb2e6..48b9204 100644 --- a/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java +++ b/streams-components/streams-filters/src/test/java/org/apache/streams/filters/test/VerbDefinitionFilterTest.java @@ -26,10 +26,11 @@ import org.apache.streams.pojo.json.Activity; import org.apache.streams.verbs.VerbDefinition; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Sets; import org.junit.Test; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Tests for {$link: org.apache.streams.verbs.VerbDefinitionResolver} @@ -44,7 +45,7 @@ public class VerbDefinitionFilterTest { @Test public void testVerbMatchFilter() throws Exception { VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"), VerbDefinition.class); - VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition)); + VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet())); filter.prepare(null); StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", Activity.class)); List<StreamsDatum> result1 = filter.process(datum1); @@ -60,7 +61,7 @@ public class VerbDefinitionFilterTest { @Test public void testProviderFilter() throws Exception { VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/provider.json"), VerbDefinition.class); - VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition)); + VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet())); filter.prepare(null); StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"post\",\"provider\":{\"id\":\"providerId\",\"objectType\":\"product\"}}\n", Activity.class)); List<StreamsDatum> result1 = filter.process(datum1); @@ -76,7 +77,7 @@ public class VerbDefinitionFilterTest { @Test public void testActorFilter() throws Exception { VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/actor.json"), VerbDefinition.class); - VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition)); + VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet())); filter.prepare(null); StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"post\",\"actor\":{\"id\":\"actorId\",\"objectType\":\"page\"}}\n", Activity.class)); List<StreamsDatum> result1 = filter.process(datum1); @@ -93,7 +94,7 @@ public class VerbDefinitionFilterTest { @Test public void testObjectFilter() throws Exception { VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/object.json"), VerbDefinition.class); - VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition)); + VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet())); filter.prepare(null); StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"post\",\"object\":{\"id\":\"objectId\"}}\n", Activity.class)); List<StreamsDatum> result1 = filter.process(datum1); @@ -109,7 +110,7 @@ public class VerbDefinitionFilterTest { @Test public void testMultiFilter() throws Exception { VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/follow.json"), VerbDefinition.class); - VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition)); + VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet())); filter.prepare(null); StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"follow\",\"actor\":{\"id\":\"actorId\",\"objectType\":\"page\"}}}\n", Activity.class)); List<StreamsDatum> result1 = filter.process(datum1); @@ -129,7 +130,7 @@ public class VerbDefinitionFilterTest { @Test public void testTargetRequired() throws Exception { VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/targetrequired.json"), VerbDefinition.class); - VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition)); + VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet())); filter.prepare(null); StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"id\",\"verb\":\"post\",\"object\":{\"id\":\"objectId\",\"objectType\":\"task\"}}\n", Activity.class)); List<StreamsDatum> result1 = filter.process(datum1); @@ -145,7 +146,7 @@ public class VerbDefinitionFilterTest { @Test public void testAllWildcard() throws Exception { VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"), VerbDefinition.class); - VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(definition)); + VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(definition).collect(Collectors.toSet())); filter.prepare(null); StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", Activity.class)); List<StreamsDatum> result1 = filter.process(datum1); @@ -180,7 +181,7 @@ public class VerbDefinitionFilterTest { VerbDefinition object = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/object.json"), VerbDefinition.class); VerbDefinition target = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/targetrequired.json"), VerbDefinition.class); VerbDefinition follow = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/follow.json"), VerbDefinition.class); - VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Sets.newHashSet(provider,actor,object,target,follow)); + VerbDefinitionKeepFilter filter = new VerbDefinitionKeepFilter(Stream.of(provider,actor,object,target,follow).collect(Collectors.toSet())); filter.prepare(null); StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", Activity.class)); List<StreamsDatum> result1 = filter.process(datum1); @@ -214,7 +215,7 @@ public class VerbDefinitionFilterTest { @Test public void testVerbDropFilter() throws Exception { VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"), VerbDefinition.class); - VerbDefinitionDropFilter filter = new VerbDefinitionDropFilter(Sets.newHashSet(definition)); + VerbDefinitionDropFilter filter = new VerbDefinitionDropFilter(Stream.of(definition).collect(Collectors.toSet())); filter.prepare(null); StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", Activity.class)); List<StreamsDatum> result1 = filter.process(datum1); @@ -230,7 +231,7 @@ public class VerbDefinitionFilterTest { @Test public void testDropAllWildcard() throws Exception { VerbDefinition definition = mapper.readValue(VerbDefinitionFilterTest.class.getResourceAsStream("/post.json"), VerbDefinition.class); - VerbDefinitionDropFilter filter = new VerbDefinitionDropFilter(Sets.newHashSet(definition)); + VerbDefinitionDropFilter filter = new VerbDefinitionDropFilter(Stream.of(definition).collect(Collectors.toSet())); filter.prepare(null); StreamsDatum datum1 = new StreamsDatum(mapper.readValue("{\"id\":\"1\",\"verb\":\"notpost\"}\n", Activity.class)); List<StreamsDatum> result1 = filter.process(datum1); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java index 42cce8a..5b18a91 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java @@ -34,9 +34,8 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.Shard; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Strings; -import com.google.common.collect.Queues; import com.typesafe.config.Config; +import org.apache.commons.lang3.StringUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,7 +129,7 @@ public class KinesisPersistReader implements StreamsPersistReader, Serializable StreamsResultSet current; synchronized( KinesisPersistReader.class ) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current = new StreamsResultSet(new ConcurrentLinkedQueue<>(persistQueue)); persistQueue.clear(); } return current; @@ -162,7 +161,7 @@ public class KinesisPersistReader implements StreamsPersistReader, Serializable clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString())); this.client = new AmazonKinesisClient(credentials, clientConfig); - if (!Strings.isNullOrEmpty(config.getRegion())) + if (StringUtils.isNotEmpty(config.getRegion())) this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion()))); } streamNames = this.config.getStreams(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java index 6e2db0f..d528f15 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java @@ -35,8 +35,8 @@ import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.PutRecordRequest; import com.amazonaws.services.kinesis.model.PutRecordResult; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Strings; import com.typesafe.config.Config; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,7 +124,7 @@ public class KinesisPersistWriter implements StreamsPersistWriter { clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString())); this.client = new AmazonKinesisClient(credentials, clientConfig); - if (!Strings.isNullOrEmpty(config.getRegion())) { + if (StringUtils.isNotEmpty(config.getRegion())) { this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion()))); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java index da1a00e..3a2cf3b 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java @@ -37,8 +37,8 @@ import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Strings; import com.google.common.collect.Queues; +import org.apache.commons.lang3.StringUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +47,7 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -132,7 +133,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab clientOptions.setPathStyleAccess(false); this.amazonS3Client = new AmazonS3Client(credentials, clientConfig); - if ( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion())) { + if (StringUtils.isNotEmpty(s3ReaderConfiguration.getRegion())) { this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion()))); } this.amazonS3Client.setS3ClientOptions(clientOptions); @@ -206,7 +207,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab StreamsResultSet current; synchronized ( S3PersistReader.class ) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current = new StreamsResultSet(new ConcurrentLinkedQueue<>(persistQueue)); current.setCounter(new DatumStatusCounter()); current.getCounter().add(countersCurrent); countersTotal.add(countersCurrent); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java index 82bcba7..775e7e0 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java @@ -22,7 +22,7 @@ import org.apache.streams.core.DatumStatus; import org.apache.streams.core.StreamsDatum; import org.apache.streams.util.ComponentUtils; -import com.google.common.base.Strings; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +57,7 @@ public class S3PersistReaderTask implements Runnable { String line; try { while ((line = bufferedReader.readLine()) != null) { - if ( !Strings.isNullOrEmpty(line) ) { + if (StringUtils.isNotEmpty(line) ) { reader.countersCurrent.incrementAttempt(); StreamsDatum entry = reader.lineReaderUtil.processLine(line); ComponentUtils.offerUntilSuccess(entry, reader.persistQueue); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java index 8b45eb2..a0b483f 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java @@ -25,7 +25,6 @@ import org.apache.streams.pojo.json.Activity; import org.apache.streams.pojo.json.ActivityObject; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Sets; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.SerializationUtils; import org.junit.Before; @@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -74,7 +74,7 @@ public class TestMetadataFromDocumentProcessor { .getResourceAsStream("activities"); List<String> files = IOUtils.readLines(testActivityFolderStream, StandardCharsets.UTF_8); - Set<ActivityObject> objects = Sets.newHashSet(); + Set<ActivityObject> objects = new HashSet<>(); for( String file : files) { LOGGER.info("File: " + file ); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java index 0036fea..a673d8f 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java @@ -53,6 +53,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -217,7 +218,6 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo } streamsConfiguration = StreamsConfigurator.detectConfiguration(); persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getBatchSize().intValue())); - //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue()); executor = Executors.newSingleThreadExecutor(); mapper = StreamsJacksonMapper.getInstance(); } @@ -252,7 +252,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo StreamsResultSet current; synchronized ( WebHdfsPersistReader.class ) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue)); + current = new StreamsResultSet(new ConcurrentLinkedQueue<>(persistQueue)); current.setCounter(new DatumStatusCounter()); current.getCounter().add(countersCurrent); countersTotal.add(countersCurrent); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java index d18bda9..fa287eb 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java @@ -21,8 +21,8 @@ package org.apache.streams.hdfs; import org.apache.streams.core.DatumStatus; import org.apache.streams.core.StreamsDatum; -import com.google.common.base.Strings; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +82,7 @@ public class WebHdfsPersistReaderTask implements Runnable { do { try { line = bufferedReader.readLine(); - if ( !Strings.isNullOrEmpty(line) ) { + if (StringUtils.isNotEmpty(line)) { reader.countersCurrent.incrementAttempt(); StreamsDatum entry = reader.lineReaderUtil.processLine(line); if ( entry != null ) { @@ -98,7 +98,7 @@ public class WebHdfsPersistReaderTask implements Runnable { reader.countersCurrent.incrementStatus(DatumStatus.FAIL); } } - while ( !Strings.isNullOrEmpty(line) ); + while (StringUtils.isNotEmpty(line)); LOGGER.info("Finished Processing " + fileStatus.getPath().getName()); try { bufferedReader.close(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java index 29a6b73..9079c7e 100644 --- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java +++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java @@ -29,7 +29,7 @@ import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Strings; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; @@ -101,7 +101,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl StringBuilder uriBuilder = new StringBuilder(); uriBuilder.append(hdfsConfiguration.getScheme()); uriBuilder.append("://"); - if ( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) { + if (StringUtils.isNotEmpty(hdfsConfiguration.getHost())) { uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort()); } else { uriBuilder.append("/"); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java index 1912ff0..b17df37 100644 --- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java +++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java @@ -23,15 +23,17 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.Activity; -import com.google.common.collect.Sets; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -52,11 +54,12 @@ public class RegexHashtagExtractorTest { @Parameterized.Parameters public static Collection<Object[]> params() { return Arrays.asList(new Object[][]{ - {"This is the #content of a standard tweet", Sets.newHashSet("content")}, - {"This is the content of a standard tweet", Sets.newHashSet()}, - {"This is the #content of a standard #tweet", Sets.newHashSet("content", "tweet")}, - {"UNIX æ¶é´1400000000 ç§â¦â¦ ï¼è¯¥ç¡è§äºï¼åä½å¤ç«åï¼#ç¨åºå#", Sets.newHashSet("ç¨åºå")}, - {"This is the body of a #fbpost. It can have multiple lines of #content, as well as much more detailed and flowery #language.", Sets.newHashSet("content", "fbpost", "language")} + {"This is the #content of a standard tweet", Stream.of("content").collect(Collectors.toSet())}, + {"This is the content of a standard tweet", new HashSet<>()}, + {"This is the #content of a standard #tweet", Stream.of("content", "tweet").collect(Collectors.toSet())}, + {"UNIX æ¶é´1400000000 ç§â¦â¦ ï¼è¯¥ç¡è§äºï¼åä½å¤ç«åï¼#ç¨åºå#", Stream.of("ç¨åºå").collect(Collectors.toSet())}, + {"This is the body of a #fbpost. It can have multiple lines of #content, as well as much more detailed and flowery #language.", + Stream.of("content", "fbpost", "language").collect(Collectors.toSet())} }); } @@ -68,7 +71,7 @@ public class RegexHashtagExtractorTest { assertThat(result.size(), is(equalTo(1))); Activity output = (Activity)result.get(0).getDocument(); Set<String> extracted = (Set) ExtensionUtil.getInstance().ensureExtensions(output).get(RegexHashtagExtractor.EXTENSION_KEY); - Sets.SetView<String> diff = Sets.difference(extracted, hashtags); + Set<String> diff = extracted.stream().filter((x) -> !hashtags.contains(x)).collect(Collectors.toSet()); assertThat(diff.size(), is(equalTo(0))); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java index bb0e95d..3706b15 100644 --- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java +++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java @@ -34,6 +34,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -54,23 +56,24 @@ public class RegexMentionExtractorTest { @Parameterized.Parameters public static Collection<Object[]> params() { return Arrays.asList(new Object[][]{ - {"This is the @content of a standard tweet", Sets.newHashSet(new HashMap<String, Object>() {{ - put("displayName", "content"); - }})}, - {"This is the content of a standard tweet", Sets.newHashSet(new HashMap<String, Object>())}, - {"This is the @content of a standard @tweet", Sets.newHashSet(new HashMap<String, Object>() {{ + {"This is the @content of a standard tweet", Stream.of(new HashMap<String, Object>() {{ + put("displayName", "content"); + }}).collect(Collectors.toSet())}, + {"This is the content of a standard tweet", Stream.of(new HashMap<String, Object>()).collect(Collectors.toSet())}, + {"This is the @content of a standard @tweet", Stream.of(new HashMap<String, Object>() {{ put("displayName", "content"); }},new HashMap<String, Object>() {{ put("displayName", "tweet"); - }})}, - {"UNIX æ¶é´1400000000 ç§â¦â¦ ï¼è¯¥ç¡è§äºï¼åä½å¤ç«åï¼@ç¨åºå#", Sets.newHashSet(new HashMap<String, Object>() {{ + }}).collect(Collectors.toSet())}, + {"UNIX æ¶é´1400000000 ç§â¦â¦ ï¼è¯¥ç¡è§äºï¼åä½å¤ç«åï¼@ç¨åºå#", Stream.of(new HashMap<String, Object>() {{ put("displayName", "ç¨åºå"); - }})}, - {"This is the body of a @fbpost. It can have multiple lines of #content, as well as much more detailed and flowery @language.", Sets.newHashSet(new HashMap<String, Object>() {{ + }}).collect(Collectors.toSet())}, + {"This is the body of a @fbpost. It can have multiple lines of #content, as well as much more detailed and flowery @language.", + Stream.of(new HashMap<String, Object>() {{ put("displayName", "fbpost"); }},new HashMap<String, Object>() {{ put("displayName", "language"); - }})} + }}).collect(Collectors.toSet())} }); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java index 64e8599..7c7948b 100644 --- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java +++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java @@ -22,15 +22,17 @@ package org.apache.streams.regex; import org.apache.streams.core.StreamsDatum; import org.apache.streams.pojo.json.Activity; -import com.google.common.collect.Sets; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -51,12 +53,13 @@ public class RegexUrlExtractorTest { @Parameterized.Parameters public static Collection<Object[]> params() { return Arrays.asList(new Object[][]{ - {"This is the http://t.co/foo of a standard tweet", Sets.newHashSet("http://t.co/foo")}, - {"This is the https://t.co/foo of a standard tweet", Sets.newHashSet("https://t.co/foo")}, - {"This is the https://t.co/foo of a standard tweet https://t.co/foo", Sets.newHashSet("https://t.co/foo")}, - {"This is the http://amd.com/test of a standard tweet", Sets.newHashSet("http://amd.com/test")}, - {"This is the content of a standard tweet", Sets.newHashSet()}, - {"This is the http://www.google.com/articles/awesome?with=query¶ms=true of a standard @tweet", Sets.newHashSet("http://www.google.com/articles/awesome?with=query¶ms=true")} + {"This is the http://t.co/foo of a standard tweet", Stream.of("http://t.co/foo").collect(Collectors.toSet())}, + {"This is the https://t.co/foo of a standard tweet", Stream.of("https://t.co/foo").collect(Collectors.toSet())}, + {"This is the https://t.co/foo of a standard tweet https://t.co/foo", Stream.of("https://t.co/foo").collect(Collectors.toSet())}, + {"This is the http://amd.com/test of a standard tweet", Stream.of("http://amd.com/test").collect(Collectors.toSet())}, + {"This is the content of a standard tweet", new HashSet<>()}, + {"This is the http://www.google.com/articles/awesome?with=query¶ms=true of a standard @tweet", + Stream.of("http://www.google.com/articles/awesome?with=query¶ms=true").collect(Collectors.toSet())} }); } @@ -67,8 +70,8 @@ public class RegexUrlExtractorTest { List<StreamsDatum> result = new RegexUrlExtractor().process(datum); assertThat(result.size(), is(equalTo(1))); Activity output = (Activity)result.get(0).getDocument(); - Set<String> extracted = Sets.newHashSet(output.getLinks()); - Sets.SetView<String> diff = Sets.difference(links, extracted); + Set<String> extracted = new HashSet<>(output.getLinks()); + Set<String> diff = links.stream().filter((x) -> !extracted.contains(x)).collect(Collectors.toSet()); assertThat(diff.size(), is(equalTo(0))); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java index 52ec222..0a13b64 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java @@ -27,10 +27,10 @@ import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager; import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; import facebook4j.Facebook; import facebook4j.FacebookFactory; import facebook4j.conf.ConfigurationBuilder; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,7 +112,7 @@ public abstract class FacebookDataCollector implements Runnable { LOGGER.debug("appAccessToken : {}", this.config.getOauth().getAppAccessToken()); } cb.setJSONStoreEnabled(true); - if (!Strings.isNullOrEmpty(config.getVersion())) { + if (StringUtils.isNotEmpty(config.getVersion())) { cb.setRestBaseURL("https://graph.facebook.com/" + config.getVersion() + "/"); } LOGGER.debug("appId : {}", this.config.getOauth().getAppId()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java index 14d5a64..e256418 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java @@ -29,8 +29,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.ComponentUtils; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Queues; -import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; import com.typesafe.config.ConfigRenderOptions; import facebook4j.Facebook; @@ -54,6 +52,7 @@ import java.util.Iterator; import java.util.Objects; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -166,7 +165,7 @@ public class FacebookFriendFeedProvider implements StreamsProvider, Serializable StreamsResultSet current; synchronized (FacebookUserstreamProvider.class) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); + current = new StreamsResultSet(new ConcurrentLinkedQueue<>(providerQueue)); current.setCounter(new DatumStatusCounter()); current.getCounter().add(countersCurrent); countersTotal.add(countersCurrent); @@ -220,7 +219,7 @@ public class FacebookFriendFeedProvider implements StreamsProvider, Serializable @Override public void prepare(Object configurationObject) { - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + executor = newFixedThreadPoolWithQueueSize(5, 20); Objects.requireNonNull(providerQueue); Objects.requireNonNull(this.klass); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java index c973863..39b7771 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java @@ -30,8 +30,6 @@ import org.apache.streams.util.ComponentUtils; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; import com.typesafe.config.ConfigRenderOptions; import facebook4j.Facebook; @@ -51,6 +49,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.math.BigInteger; +import java.util.HashSet; import java.util.Iterator; import java.util.Objects; import java.util.Queue; @@ -63,6 +62,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** * FacebookFriendUpdatesProvider provides updates from friend feed. @@ -237,7 +237,7 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa @Override public void prepare(Object configurationObject) { - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + executor = newFixedThreadPoolWithQueueSize(5, 20); Objects.requireNonNull(providerQueue); Objects.requireNonNull(this.klass); @@ -289,7 +289,7 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa FacebookUserstreamProvider provider; Facebook client; - private Set<Post> priorPollResult = Sets.newHashSet(); + private Set<Post> priorPollResult = new HashSet<>(); public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { provider = facebookUserstreamProvider; @@ -301,9 +301,9 @@ public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializa while (provider.isRunning()) { try { ResponseList<Post> postResponseList = client.getHome(); - Set<Post> update = Sets.newHashSet(postResponseList); - Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); - Set<Post> entrySet = Sets.difference(update, repeats); + Set<Post> update = new HashSet<>(postResponseList); + Set<Post> repeats = priorPollResult.stream().filter(update::contains).collect(Collectors.toSet()); + Set<Post> entrySet = update.stream().filter((x) -> !repeats.contains(x)).collect(Collectors.toSet()); for (Post item : entrySet) { String json = DataObjectFactory.getRawJSON(item); org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java index a665023..70e0a65 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookProvider.java @@ -29,8 +29,6 @@ import org.apache.streams.util.ComponentUtils; import org.apache.streams.util.SerializationUtil; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Queues; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -43,11 +41,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -105,7 +105,7 @@ public abstract class FacebookProvider implements StreamsProvider { @Override public StreamsResultSet readCurrent() { int batchSize = 0; - BlockingQueue<StreamsDatum> batch = Queues.newLinkedBlockingQueue(); + BlockingQueue<StreamsDatum> batch = new LinkedBlockingQueue<>(); while (!this.datums.isEmpty() && batchSize < MAX_BATCH_SIZE) { ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.datums), batch); ++batchSize; @@ -125,7 +125,7 @@ public abstract class FacebookProvider implements StreamsProvider { @Override public void prepare(Object configurationObject) { - this.datums = Queues.newLinkedBlockingQueue(); + this.datums = new LinkedBlockingQueue<>(); this.isComplete = new AtomicBoolean(false); this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); } @@ -141,7 +141,7 @@ public abstract class FacebookProvider implements StreamsProvider { * @param idsToAfterDate idsToAfterDate */ public void overrideIds(Map<String, DateTime> idsToAfterDate) { - Set<IdConfig> ids = Sets.newHashSet(); + Set<IdConfig> ids = new HashSet<>(); for (String id : idsToAfterDate.keySet()) { IdConfig idConfig = new IdConfig(); idConfig.setId(id); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java index d198e43..c502ada 100644 --- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java +++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookUserstreamProvider.java @@ -29,10 +29,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.util.ComponentUtils; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Queues; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; import com.typesafe.config.ConfigRenderOptions; import facebook4j.Facebook; @@ -51,11 +47,13 @@ import java.io.IOException; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -63,6 +61,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; public class FacebookUserstreamProvider implements StreamsProvider, Serializable { @@ -88,7 +87,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable this.configuration = config; } - protected ListeningExecutorService executor; + protected ExecutorService executor; protected DateTime start; protected DateTime end; @@ -187,7 +186,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable StreamsResultSet current; synchronized (FacebookUserstreamProvider.class) { - current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue)); + current = new StreamsResultSet(new ConcurrentLinkedQueue<>(providerQueue)); current.setCounter(new DatumStatusCounter()); current.getCounter().add(countersCurrent); countersTotal.add(countersCurrent); @@ -241,7 +240,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable @Override public void prepare(Object configurationObject) { - executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); + executor = newFixedThreadPoolWithQueueSize(5, 20); Objects.requireNonNull(providerQueue); Objects.requireNonNull(this.klass); @@ -299,7 +298,7 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable Facebook client; String id; - private Set<Post> priorPollResult = Sets.newHashSet(); + private Set<Post> priorPollResult = new HashSet<>(); public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) { this.provider = facebookUserstreamProvider; @@ -318,9 +317,9 @@ public class FacebookUserstreamProvider implements StreamsProvider, Serializable try { postResponseList = client.getFeed(id); - Set<Post> update = Sets.newHashSet(postResponseList); - Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update)); - Set<Post> entrySet = Sets.difference(update, repeats); + Set<Post> update = new HashSet<>(postResponseList); + Set<Post> repeats = priorPollResult.stream().filter(update::contains).collect(Collectors.toSet()); + Set<Post> entrySet = update.stream().filter((x) -> !repeats.contains(x)).collect(Collectors.toSet()); LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size()); for (Post item : entrySet) { String json = DataObjectFactory.getRawJSON(item); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java index f7b9d88..a295898 100644 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java @@ -30,7 +30,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; -import com.google.common.collect.Queues; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -106,7 +105,7 @@ public class MoreoverProvider implements StreamsProvider { Collection<StreamsDatum> currentIterator = new ArrayList<>(); Iterators.addAll(currentIterator, providerQueue.iterator()); - StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator)); + StreamsResultSet current = new StreamsResultSet(new ConcurrentLinkedQueue<>(currentIterator)); providerQueue.clear(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java index 88aec81..595fb4f 100644 --- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java +++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java @@ -37,7 +37,6 @@ public class MoreoverProviderTask implements Runnable { private String lastSequence; private final String apiKey; - private final String apiId; private final Queue<StreamsDatum> results; private final MoreoverClient moClient; private boolean started = false; @@ -51,11 +50,10 @@ public class MoreoverProviderTask implements Runnable { */ public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results, String lastSequence) { //logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(), apiId, apiKey, lastSequence); - this.apiId = apiId; this.apiKey = apiKey; this.results = results; this.lastSequence = lastSequence; - this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence); + this.moClient = new MoreoverClient(apiId, this.apiKey, this.lastSequence); initializeClient(moClient); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java index ec1f317..fffe7a1 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/provider/SysomosProvider.java @@ -32,8 +32,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; -import com.google.common.collect.Queues; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import com.sysomos.SysomosConfiguration; import com.typesafe.config.Config; @@ -49,10 +47,11 @@ import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; import java.math.BigInteger; -import java.util.Iterator; +import java.util.HashSet; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -76,7 +75,7 @@ public class SysomosProvider implements StreamsProvider { public static final String STREAMS_ID = "SysomosProvider"; - public static enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE } + public enum Mode { CONTINUOUS, BACKFILL_AND_TERMINATE } private static final Logger LOGGER = LoggerFactory.getLogger(SysomosProvider.class); @@ -91,7 +90,7 @@ public class SysomosProvider implements StreamsProvider { protected volatile Queue<StreamsDatum> providerQueue; private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final Set<String> completedHeartbeats = Sets.newHashSet(); + private final Set<String> completedHeartbeats = new HashSet<>(); private final long maxQueued; private final long minLatency; private final long scheduledLatency; @@ -332,7 +331,7 @@ public class SysomosProvider implements StreamsProvider { } private Queue<StreamsDatum> constructQueue() { - return Queues.newConcurrentLinkedQueue(); + return new ConcurrentLinkedQueue<>(); } public int getCount() { @@ -379,9 +378,7 @@ public class SysomosProvider implements StreamsProvider { provider.startStream(); do { Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS); - Iterator<StreamsDatum> iterator = provider.readCurrent().iterator(); - while (iterator.hasNext()) { - StreamsDatum datum = iterator.next(); + for (StreamsDatum datum : provider.readCurrent()) { String json; try { json = mapper.writeValueAsString(datum.getDocument()); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java index 82d538d..0acc81f 100644 --- a/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java +++ b/streams-contrib/streams-provider-sysomos/src/main/java/org/apache/streams/sysomos/util/SysomosUtils.java @@ -21,8 +21,8 @@ package org.apache.streams.sysomos.util; import org.apache.streams.sysomos.SysomosException; -import com.google.common.base.Strings; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; @@ -66,7 +66,7 @@ public class SysomosUtils { writer.flush(); String xmlResponse = writer.toString(); - if (Strings.isNullOrEmpty(xmlResponse)) { + if (StringUtils.isEmpty(xmlResponse)) { throw new SysomosException("XML Response from Sysomos was empty : " + xmlResponse + "\n" http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java index 571c0fc..b85ad4a 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java @@ -19,14 +19,13 @@ package org.apache.streams.local.test.providers; -import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; -import com.google.common.collect.Queues; import org.joda.time.DateTime; import java.math.BigInteger; +import java.util.concurrent.LinkedBlockingQueue; /** * Provides new, empty instances of result set. @@ -45,17 +44,17 @@ public class EmptyResultSetProvider implements StreamsProvider { @Override public StreamsResultSet readCurrent() { - return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue()); + return new StreamsResultSet(new LinkedBlockingQueue<>()); } @Override public StreamsResultSet readNew(BigInteger sequence) { - return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue()); + return new StreamsResultSet(new LinkedBlockingQueue<>()); } @Override public StreamsResultSet readRange(DateTime start, DateTime end) { - return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue()); + return new StreamsResultSet(new LinkedBlockingQueue<>()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java index 88494a8..21f37ba 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java @@ -22,12 +22,13 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; -import com.google.common.collect.Queues; import org.joda.time.DateTime; import java.math.BigInteger; import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; /** * Test StreamsProvider that sends out StreamsDatums numbered from 0 to numMessages. @@ -57,7 +58,7 @@ public class NumericMessageProvider implements StreamsProvider { @Override public StreamsResultSet readCurrent() { int batchSize = 0; - Queue<StreamsDatum> batch = Queues.newLinkedBlockingQueue(); + Queue<StreamsDatum> batch = new LinkedBlockingQueue<>(); try { while (!this.data.isEmpty() && batchSize < DEFAULT_BATCH_SIZE) { batch.add(this.data.take()); @@ -97,7 +98,7 @@ public class NumericMessageProvider implements StreamsProvider { } private BlockingQueue<StreamsDatum> constructQueue() { - BlockingQueue<StreamsDatum> datums = Queues.newArrayBlockingQueue(numMessages); + BlockingQueue<StreamsDatum> datums = new ArrayBlockingQueue<>(numMessages); for(int i=0;i<numMessages;i++) { datums.add(new StreamsDatum(i)); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7810361d/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java index 0fbfae9..632d079 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/test/component/FileReaderProvider.java @@ -22,12 +22,12 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; -import com.google.common.collect.Queues; import org.joda.time.DateTime; import java.math.BigInteger; import java.util.Queue; import java.util.Scanner; +import java.util.concurrent.LinkedBlockingQueue; /** * FOR TESTING PURPOSES ONLY. @@ -91,7 +91,7 @@ public class FileReaderProvider implements StreamsProvider { } private Queue<StreamsDatum> constructQueue(Scanner scanner) { - Queue<StreamsDatum> data = Queues.newLinkedBlockingQueue(); + Queue<StreamsDatum> data = new LinkedBlockingQueue<>(); while(scanner.hasNextLine()) { data.add(converter.convert(scanner.nextLine())); }