Switched everything over to ComponentUtils.offerUntilSuccess per @mFranklin's request.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ae27541e Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ae27541e Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ae27541e Branch: refs/heads/master Commit: ae27541e08674f4db6996e065516b32b8fe0f45d Parents: d1018e9 Author: Matthew Hager <matthew.ha...@gmail.com> Authored: Mon May 5 18:05:09 2014 -0500 Committer: Matthew Hager <matthew.ha...@gmail.com> Committed: Mon May 5 18:05:09 2014 -0500 ---------------------------------------------------------------------- .../org/apache/streams/s3/S3PersistReaderTask.java | 16 ++-------------- .../java/org/apache/streams/s3/S3PersistWriter.java | 1 - .../twitter/provider/TwitterTimelineProvider.java | 14 ++++++-------- .../provider/TwitterUserInformationProvider.java | 3 ++- 4 files changed, 10 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java index 9967216..73763e6 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java @@ -20,6 +20,7 @@ package org.apache.streams.s3; import com.google.common.base.Strings; import org.apache.streams.core.DatumStatus; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.util.ComponentUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public class S3PersistReaderTask implements Runnable { reader.countersCurrent.incrementAttempt(); String[] fields = line.split(Character.toString(reader.DELIMITER)); StreamsDatum entry = new StreamsDatum(fields[3], fields[0]); - write( entry ); + ComponentUtils.offerUntilSuccess(entry, reader.persistQueue); reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS); } } @@ -81,17 +82,4 @@ public class S3PersistReaderTask implements Runnable { LOGGER.error("There was an issue closing file: {}", file); } } - - - private void write( StreamsDatum entry ) { - boolean success; - do { - synchronized( S3PersistReader.class ) { - success = reader.persistQueue.offer(entry); - } - Thread.yield(); - } - while( !success ); - } - } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java index 98671ba..058f748 100644 --- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java +++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java @@ -62,7 +62,6 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab }}; private OutputStreamWriter currentWriter = null; - protected volatile Queue<StreamsDatum> persistQueue; public AmazonS3Client getAmazonS3Client() { return this.amazonS3Client; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index 2c39cf9..b456fa4 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -2,7 +2,6 @@ package org.apache.streams.twitter.provider; import com.google.common.base.Preconditions; import com.google.common.collect.Queues; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; @@ -11,18 +10,21 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.twitter.TwitterStreamConfiguration; +import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sun.reflect.generics.reflectiveObjects.NotImplementedException; import twitter4j.*; import twitter4j.conf.ConfigurationBuilder; -import twitter4j.json.DataObjectFactory; import java.io.Serializable; import java.math.BigInteger; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Created by sblackmon on 12/10/13. @@ -105,17 +107,13 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable { //while (keepTrying < 10) while (keepTrying < 1) { - try { statuses = client.getUserTimeline(currentId, paging); for (Status tStat : statuses) { String json = TwitterObjectFactory.getRawJSON(tStat); - - while(!providerQueue.offer(new StreamsDatum(json))) { - sleep(); - } + ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue); } paging.setPage(paging.getPage() + 1); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java index 04aa1fe..049c3bb 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -26,6 +26,7 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.twitter.TwitterUserInformationConfiguration; +import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,7 +123,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ for (User tStat : client.lookupUsers(toQuery)) { String json = DataObjectFactory.getRawJSON(tStat); - providerQueue.offer(new StreamsDatum(json)); + ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue); } keepTrying = 10; }