Repository: incubator-streams Updated Branches: refs/heads/master 11e3a0f1b -> 48d54c290
wrap up STREAMS-403 and STREAMS-425 Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/48d54c29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/48d54c29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/48d54c29 Branch: refs/heads/master Commit: 48d54c2902050e2f76f0ea1618da7cc1cadbd574 Parents: 11e3a0f Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Fri Oct 21 11:25:57 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Fri Oct 21 11:25:57 2016 -0500 ---------------------------------------------------------------------- .../provider/InstagramAbstractProvider.java | 42 +++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/48d54c29/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java index 8bbd900..fe4b8da 100644 --- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java +++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java @@ -16,6 +16,10 @@ package org.apache.streams.instagram.provider; import com.google.common.collect.Queues; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; @@ -31,12 +35,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigInteger; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -53,8 +61,11 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { protected InstagramConfiguration config; private InstagramDataCollector dataCollector; - protected Queue<StreamsDatum> dataQueue; //exposed for testing - private ExecutorService executorService; + protected Queue<StreamsDatum> dataQueue; + private ListeningExecutorService executorService; + + List<ListenableFuture<Object>> futures = new ArrayList<>(); + private AtomicBoolean isCompleted; public InstagramAbstractProvider() { @@ -65,6 +76,12 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { this.config = SerializationUtil.cloneBySerialization(config); } + public static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { + return new ThreadPoolExecutor(nThreads, nThreads, + 5000L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); + } + @Override public String getId() { return STREAMS_ID; @@ -73,8 +90,9 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { @Override public void startStream() { this.dataCollector = getInstagramDataCollector(); - this.executorService = Executors.newSingleThreadExecutor(); - this.executorService.submit(this.dataCollector); + this.executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = this.executorService.submit(this.dataCollector); + this.futures.add(future); } /** @@ -92,7 +110,6 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { ComponentUtils.offerUntilSuccess(ComponentUtils.pollWhileNotEmpty(this.dataQueue), batch); ++count; } - this.isCompleted.set(batch.size() == 0 && this.dataQueue.isEmpty() && this.dataCollector.isCompleted()); return new StreamsResultSet(batch); } @@ -107,11 +124,6 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { } @Override - public boolean isRunning() { - return !this.isCompleted.get(); - } - - @Override public void prepare(Object configurationObject) { this.dataQueue = Queues.newConcurrentLinkedQueue(); this.isCompleted = new AtomicBoolean(false); @@ -204,4 +216,14 @@ public abstract class InstagramAbstractProvider implements StreamsProvider { return usersInfo; } + @Override + public boolean isRunning() { + if (dataQueue.isEmpty() && executorService.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + isCompleted.set(true); + LOGGER.info("Exiting"); + } + return !isCompleted.get(); + } + }
