isolate isRunning and readCurrent (STREAMS-425)
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a0fb1937 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a0fb1937 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a0fb1937 Branch: refs/heads/master Commit: a0fb1937df887b789db7a57bfab1fc99f8db45b8 Parents: 3f80b0c Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Fri Oct 14 15:49:54 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Fri Oct 14 15:49:54 2016 -0500 ---------------------------------------------------------------------- .../gplus/provider/AbstractGPlusProvider.java | 71 ++++++++++++-------- 1 file changed, 43 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a0fb1937/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java index 44e1b03..b9e9b2d 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java @@ -29,6 +29,10 @@ import com.google.api.services.plus.Plus; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.typesafe.config.Config; import org.apache.streams.config.StreamsConfigurator; @@ -48,6 +52,7 @@ import java.io.File; import java.io.IOException; import java.math.BigInteger; import java.security.GeneralSecurityException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -74,7 +79,11 @@ public abstract class AbstractGPlusProvider implements StreamsProvider { private static final Gson GSON = new Gson(); private GPlusConfiguration config; - private ExecutorService executor; + + List<ListenableFuture<Object>> futures = new ArrayList<>(); + + private ListeningExecutorService executor; + private BlockingQueue<StreamsDatum> datumQueue; private BlockingQueue<Runnable> runnables; private AtomicBoolean isComplete; @@ -94,6 +103,28 @@ public abstract class AbstractGPlusProvider implements StreamsProvider { } @Override + public void prepare(Object configurationObject) { + + Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile()); + Preconditions.checkNotNull(config.getOauth().getAppName()); + Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress()); + + try { + this.plus = createPlusClient(); + } catch (IOException|GeneralSecurityException e) { + LOGGER.error("Failed to created oauth for GPlus : {}", e); + throw new RuntimeException(e); + } + // GPlus rate limits you to 5 calls per second, so there is not a need to execute more than one + // collector unless you have multiple oauth tokens + //TODO make this configurable based on the number of oauth tokens + this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + this.datumQueue = new LinkedBlockingQueue<>(1000); + this.isComplete = new AtomicBoolean(false); + this.previousPullWasEmpty = false; + } + + @Override public void startStream() { BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2); @@ -143,33 +174,6 @@ public abstract class AbstractGPlusProvider implements StreamsProvider { return null; } - @Override - public boolean isRunning() { - return !this.isComplete.get(); - } - - @Override - public void prepare(Object configurationObject) { - - Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile()); - Preconditions.checkNotNull(config.getOauth().getAppName()); - Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress()); - - try { - this.plus = createPlusClient(); - } catch (IOException|GeneralSecurityException e) { - LOGGER.error("Failed to created oauth for GPlus : {}", e); - throw new RuntimeException(e); - } - // GPlus rate limits you to 5 calls per second, so there is not a need to execute more than one - // collector unless you have multiple oauth tokens - //TODO make this configurable based on the number of oauth tokens - this.executor = Executors.newFixedThreadPool(1); - this.datumQueue = new LinkedBlockingQueue<>(1000); - this.isComplete = new AtomicBoolean(false); - this.previousPullWasEmpty = false; - } - @VisibleForTesting protected Plus createPlusClient() throws IOException, GeneralSecurityException { credential = new GoogleCredential.Builder() @@ -243,4 +247,15 @@ public abstract class AbstractGPlusProvider implements StreamsProvider { this.config.setGooglePlusUsers(gPlusUsers); } + @Override + public boolean isRunning() { + if (datumQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) { + LOGGER.info("Completed"); + isComplete.set(true); + LOGGER.info("Exiting"); + } + return !isComplete.get(); + } + + }
