isolate isRunning and readCurrent (STREAMS-425)

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a0fb1937
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a0fb1937
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a0fb1937

Branch: refs/heads/master
Commit: a0fb1937df887b789db7a57bfab1fc99f8db45b8
Parents: 3f80b0c
Author: Steve Blackmon @steveblackmon <[email protected]>
Authored: Fri Oct 14 15:49:54 2016 -0500
Committer: Steve Blackmon @steveblackmon <[email protected]>
Committed: Fri Oct 14 15:49:54 2016 -0500

----------------------------------------------------------------------
 .../gplus/provider/AbstractGPlusProvider.java   | 71 ++++++++++++--------
 1 file changed, 43 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a0fb1937/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
index 44e1b03..b9e9b2d 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/AbstractGPlusProvider.java
@@ -29,6 +29,10 @@ import com.google.api.services.plus.Plus;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.gson.Gson;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
@@ -48,6 +52,7 @@ import java.io.File;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.security.GeneralSecurityException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -74,7 +79,11 @@ public abstract class AbstractGPlusProvider implements 
StreamsProvider {
     private static final Gson GSON = new Gson();
 
     private GPlusConfiguration config;
-    private ExecutorService executor;
+
+    List<ListenableFuture<Object>> futures = new ArrayList<>();
+
+    private ListeningExecutorService executor;
+
     private BlockingQueue<StreamsDatum> datumQueue;
     private BlockingQueue<Runnable> runnables;
     private AtomicBoolean isComplete;
@@ -94,6 +103,28 @@ public abstract class AbstractGPlusProvider implements 
StreamsProvider {
     }
 
     @Override
+    public void prepare(Object configurationObject) {
+
+        Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile());
+        Preconditions.checkNotNull(config.getOauth().getAppName());
+        
Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress());
+
+        try {
+            this.plus = createPlusClient();
+        } catch (IOException|GeneralSecurityException e) {
+            LOGGER.error("Failed to created oauth for GPlus : {}", e);
+            throw new RuntimeException(e);
+        }
+        // GPlus rate limits you to 5 calls per second, so there is not a need 
to execute more than one
+        // collector unless you have multiple oauth tokens
+        //TODO make this configurable based on the number of oauth tokens
+        this.executor = 
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+        this.datumQueue = new LinkedBlockingQueue<>(1000);
+        this.isComplete = new AtomicBoolean(false);
+        this.previousPullWasEmpty = false;
+    }
+
+    @Override
     public void startStream() {
 
         BackOffStrategy backOffStrategy = new ExponentialBackOffStrategy(2);
@@ -143,33 +174,6 @@ public abstract class AbstractGPlusProvider implements 
StreamsProvider {
         return null;
     }
 
-    @Override
-    public boolean isRunning() {
-        return !this.isComplete.get();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-
-        Preconditions.checkNotNull(config.getOauth().getPathToP12KeyFile());
-        Preconditions.checkNotNull(config.getOauth().getAppName());
-        
Preconditions.checkNotNull(config.getOauth().getServiceAccountEmailAddress());
-
-        try {
-            this.plus = createPlusClient();
-        } catch (IOException|GeneralSecurityException e) {
-            LOGGER.error("Failed to created oauth for GPlus : {}", e);
-            throw new RuntimeException(e);
-        }
-        // GPlus rate limits you to 5 calls per second, so there is not a need 
to execute more than one
-        // collector unless you have multiple oauth tokens
-        //TODO make this configurable based on the number of oauth tokens
-        this.executor = Executors.newFixedThreadPool(1);
-        this.datumQueue = new LinkedBlockingQueue<>(1000);
-        this.isComplete = new AtomicBoolean(false);
-        this.previousPullWasEmpty = false;
-    }
-
     @VisibleForTesting
     protected Plus createPlusClient() throws IOException, 
GeneralSecurityException {
         credential = new GoogleCredential.Builder()
@@ -243,4 +247,15 @@ public abstract class AbstractGPlusProvider implements 
StreamsProvider {
         this.config.setGooglePlusUsers(gPlusUsers);
     }
 
+    @Override
+    public boolean isRunning() {
+       if (datumQueue.isEmpty() && executor.isTerminated() && 
Futures.allAsList(futures).isDone()) {
+            LOGGER.info("Completed");
+            isComplete.set(true);
+           LOGGER.info("Exiting");
+       }
+       return !isComplete.get();
+    }
+
+
 }

Reply via email to