[FLINK-5141] [runtime] Add 'waitUntilTaskManagerRegistrationsComplete()' to 
MiniCluster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82c1fcfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82c1fcfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82c1fcfa

Branch: refs/heads/flip-6
Commit: 82c1fcfa1f34b963f45146830d51b1490b0dc1e3
Parents: c0086b5
Author: Stephan Ewen <[email protected]>
Authored: Wed Nov 30 17:35:47 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../leaderelection/LeaderAddressAndId.java      | 73 +++++++++++++++++
 .../flink/runtime/minicluster/MiniCluster.java  | 58 ++++++++++++-
 .../minicluster/MiniClusterJobDispatcher.java   |  2 +-
 .../OneTimeLeaderListenerFuture.java            | 60 ++++++++++++++
 .../resourcemanager/ResourceManager.java        | 11 +++
 .../resourcemanager/ResourceManagerGateway.java |  8 ++
 .../runtime/minicluster/MiniClusterITCase.java  |  8 ++
 .../Flip6LocalStreamEnvironment.java            | 23 +++---
 .../LocalStreamEnvironmentITCase.java           | 81 +++++++++++++++++++
 .../flink/core/testutils/CheckedThread.java     | 85 ++++++++++++++++++++
 10 files changed, 392 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
new file mode 100644
index 0000000..23cd34b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A combination of a leader address and leader id.
+ */
+public class LeaderAddressAndId {
+
+       private final String leaderAddress;
+       private final UUID leaderId;
+
+       public LeaderAddressAndId(String leaderAddress, UUID leaderId) {
+               this.leaderAddress = checkNotNull(leaderAddress);
+               this.leaderId = checkNotNull(leaderId);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public String leaderAddress() {
+               return leaderAddress;
+       }
+
+       public UUID leaderId() {
+               return leaderId;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public int hashCode() {
+               return 31 * leaderAddress.hashCode()+ leaderId.hashCode();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               else if (o != null && o.getClass() == LeaderAddressAndId.class) 
{
+                       final LeaderAddressAndId that = (LeaderAddressAndId) o;
+                       return this.leaderAddress.equals(that.leaderAddress) && 
this.leaderId.equals(that.leaderId);
+               }
+               else {
+                       return false;
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "LeaderAddressAndId (" + leaderAddress + " / " + 
leaderId + ')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3ede5b5..1b9f265 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -27,11 +27,15 @@ import 
org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -169,6 +173,7 @@ public class MiniCluster {
                        final boolean singleRpc = 
config.getUseSingleRpcSystem();
 
                        try {
+                               LOG.info("Starting Metrics Registry");
                                metricRegistry = 
createMetricRegistry(configuration);
 
                                RpcService[] jobManagerRpcServices = new 
RpcService[numJobManagers];
@@ -176,10 +181,12 @@ public class MiniCluster {
                                RpcService[] resourceManagerRpcServices = new 
RpcService[numResourceManagers];
 
                                // bring up all the RPC services
-                               if (singleRpc) {
-                                       // one common RPC for all
-                                       commonRpcService = 
createRpcService(configuration, rpcTimeout, false, null);
+                               LOG.info("Starting RPC Service(s)");
+
+                               // we always need the 'commonRpcService' for 
auxiliary calls
+                               commonRpcService = 
createRpcService(configuration, rpcTimeout, false, null);
 
+                               if (singleRpc) {
                                        // set that same RPC service for all 
JobManagers and TaskManagers
                                        for (int i = 0; i < numJobManagers; 
i++) {
                                                jobManagerRpcServices[i] = 
commonRpcService;
@@ -236,7 +243,7 @@ public class MiniCluster {
                                                configuration, haServices, 
metricRegistry, numTaskManagers, taskManagerRpcServices);
 
                                // bring up the dispatcher that launches 
JobManagers when jobs submitted
-                               LOG.info("Starting job dispatcher for {} 
JobManger(s)", numJobManagers);
+                               LOG.info("Starting job dispatcher(s) for {} 
JobManger(s)", numJobManagers);
                                jobDispatcher = new MiniClusterJobDispatcher(
                                                configuration, haServices, 
metricRegistry, numJobManagers, jobManagerRpcServices);
                        }
@@ -357,6 +364,49 @@ public class MiniCluster {
                }
        }
 
+       public void waitUntilTaskManagerRegistrationsComplete() throws 
Exception {
+               LeaderRetrievalService rmMasterListener = null;
+               Future<LeaderAddressAndId> addressAndIdFuture;
+
+               try {
+                       synchronized (lock) {
+                               checkState(running, "FlinkMiniCluster is not 
running");
+
+                               OneTimeLeaderListenerFuture listenerFuture = 
new OneTimeLeaderListenerFuture();
+                               rmMasterListener = 
haServices.getResourceManagerLeaderRetriever();
+                               rmMasterListener.start(listenerFuture);
+                               addressAndIdFuture = listenerFuture.future(); 
+                       }
+
+                       final LeaderAddressAndId addressAndId = 
addressAndIdFuture.get();
+
+                       final ResourceManagerGateway resourceManager = 
+                                       
commonRpcService.connect(addressAndId.leaderAddress(), 
ResourceManagerGateway.class).get();
+
+                       final int numTaskManagersToWaitFor = 
taskManagerRunners.length;
+
+                       // poll and wait until enough TaskManagers are available
+                       while (true) {
+                               int numTaskManagersAvailable = 
+                                               
resourceManager.getNumberOfRegisteredTaskManagers(addressAndId.leaderId()).get();
+
+                               if (numTaskManagersAvailable >= 
numTaskManagersToWaitFor) {
+                                       break;
+                               }
+                               Thread.sleep(2);
+                       }
+               }
+               finally {
+                       try {
+                               if (rmMasterListener != null) {
+                                       rmMasterListener.stop();
+                               }
+                       } catch (Exception e) {
+                               LOG.warn("Error shutting down leader listener 
for ResourceManager");
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  running jobs
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 8ac8eba..7fffaee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -143,7 +143,7 @@ public class MiniClusterJobDispatcher {
                        if (!shutdown) {
                                shutdown = true;
 
-                               LOG.info("Shutting down the dispatcher");
+                               LOG.info("Shutting down the job dispatcher");
 
                                // in this shutdown code we copy the references 
to the stack first,
                                // to avoid concurrent modification

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
new file mode 100644
index 0000000..b0157d8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
+import java.util.UUID;
+
+/**
+ * A leader listener that exposes a future for the first leader notification.  
+ * 
+ * <p>The future can be obtained via the {@link #future()} method.
+ */
+public class OneTimeLeaderListenerFuture implements LeaderRetrievalListener {
+
+       private final FlinkCompletableFuture<LeaderAddressAndId> future;
+
+       public OneTimeLeaderListenerFuture() {
+               this.future = new FlinkCompletableFuture<>();
+       }
+
+       /**
+        * Gets the future that is completed with the leader address and ID. 
+        * @return The future.
+        */
+       public FlinkFuture<LeaderAddressAndId> future() {
+               return future;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionID) {
+               future.complete(new LeaderAddressAndId(leaderAddress, 
leaderSessionID));
+       }
+
+       @Override
+       public void handleError(Exception exception) {
+               future.completeExceptionally(exception);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 145cc40..76b4a86 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -502,6 +503,16 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                shutDownApplication(finalStatus, optionalDiagnostics);
        }
 
+       @RpcMethod
+       public Integer getNumberOfRegisteredTaskManagers(UUID leaderSessionId) 
throws LeaderIdMismatchException {
+               if (this.leaderSessionId != null && 
this.leaderSessionId.equals(leaderSessionId)) {
+                       return taskExecutors.size();
+               }
+               else {
+                       throw new 
LeaderIdMismatchException(this.leaderSessionId, leaderSessionId);
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Testing methods
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 0a37bb9..8235ea7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -122,4 +122,12 @@ public interface ResourceManagerGateway extends RpcGateway 
{
         * @param optionalDiagnostics
         */
        void shutDownCluster(final ApplicationStatus finalStatus, final String 
optionalDiagnostics);
+
+       /**
+        * Gets the currently registered number of TaskManagers.
+        * 
+        * @param leaderSessionId The leader session ID with which to address 
the ResourceManager.
+        * @return The future to the number of registered TaskManagers.
+        */
+       Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index 2cf2d4d..d9a1896 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -33,6 +33,10 @@ import org.junit.Test;
  */
 public class MiniClusterITCase extends TestLogger {
 
+       // 
------------------------------------------------------------------------
+       //  Simple Job Running Tests
+       // 
------------------------------------------------------------------------
+
        @Test
        public void runJobWithSingleRpcService() throws Exception {
                MiniClusterConfiguration cfg = new MiniClusterConfiguration();
@@ -63,6 +67,10 @@ public class MiniClusterITCase extends TestLogger {
                executeJob(miniCluster);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
        private static void executeJob(MiniCluster miniCluster) throws 
Exception {
                miniCluster.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index a0c128e..2007d35 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -30,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,8 +67,9 @@ public class Flip6LocalStreamEnvironment extends 
StreamExecutionEnvironment {
                                        "The Flip6LocalStreamEnvironment cannot 
be used when submitting a program through a client, " +
                                                        "or running in a 
TestEnvironment context.");
                }
-               
+
                this.conf = config == null ? new Configuration() : config;
+               setParallelism(1);
        }
 
        /**
@@ -85,17 +86,12 @@ public class Flip6LocalStreamEnvironment extends 
StreamExecutionEnvironment {
                StreamGraph streamGraph = getStreamGraph();
                streamGraph.setJobName(jobName);
 
-               JobGraph jobGraph = streamGraph.getJobGraph();
+               // TODO - temp fix to enforce restarts due to a bug in the 
allocation protocol
+               
streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
 5));
 
+               JobGraph jobGraph = streamGraph.getJobGraph();
                jobGraph.setAllowQueuedScheduling(true);
 
-               // As jira FLINK-5140 described,
-               // we have to set restart strategy to handle 
NoResourceAvailableException.
-               ExecutionConfig executionConfig = new ExecutionConfig();
-               executionConfig.setRestartStrategy(
-                       RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
-               jobGraph.setExecutionConfig(executionConfig);
-
                Configuration configuration = new Configuration();
                configuration.addAll(jobGraph.getJobConfiguration());
                
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
@@ -105,7 +101,8 @@ public class Flip6LocalStreamEnvironment extends 
StreamExecutionEnvironment {
 
                MiniClusterConfiguration cfg = new 
MiniClusterConfiguration(configuration);
 
-               // Currently we do not reuse slot anymore, so we need to sum 
all parallelism of vertices up.
+               // Currently we do not reuse slot anymore,
+               // so we need to sum up the parallelism of all vertices
                int slotsCount = 0;
                for (JobVertex jobVertex : jobGraph.getVertices()) {
                        slotsCount += jobVertex.getParallelism();
@@ -119,8 +116,10 @@ public class Flip6LocalStreamEnvironment extends 
StreamExecutionEnvironment {
                MiniCluster miniCluster = new MiniCluster(cfg);
                try {
                        miniCluster.start();
+                       miniCluster.waitUntilTaskManagerRegistrationsComplete();
                        return miniCluster.runJobBlocking(jobGraph);
-               } finally {
+               }
+               finally {
                        transformations.clear();
                        miniCluster.shutdown();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
new file mode 100644
index 0000000..a360d0e
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class LocalStreamEnvironmentITCase {
+
+       /**
+        * Test test verifies that the execution environment can be used to 
execute a
+        * single job with multiple slots.
+        */
+       @Test
+       public void testRunIsolatedJob() throws Exception {
+               Flip6LocalStreamEnvironment env = new 
Flip6LocalStreamEnvironment();
+               assertEquals(1, env.getParallelism());
+
+               addSmallBoundedJob(env, 3);
+               env.execute();
+       }
+
+       /**
+        * Test test verifies that the execution environment can be used to 
execute multiple
+        * bounded streaming jobs after one another.
+        */
+       @Test
+       public void testMultipleJobsAfterAnother() throws Exception {
+               Flip6LocalStreamEnvironment env = new 
Flip6LocalStreamEnvironment();
+
+               addSmallBoundedJob(env, 3);
+               env.execute();
+
+               addSmallBoundedJob(env, 5);
+               env.execute();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static void addSmallBoundedJob(StreamExecutionEnvironment env, 
int parallelism) {
+               DataStream<Long> stream = env
+                               .generateSequence(1, 100)
+                                       .setParallelism(parallelism)
+                                       .slotSharingGroup("group_1");
+
+               stream
+                               .filter(new FilterFunction<Long>() {
+                                       @Override
+                                       public boolean filter(Long value) {
+                                               return false;
+                                       }
+                               })
+                                       .setParallelism(parallelism)
+                                       .startNewChain()
+                                       .slotSharingGroup("group_2")
+
+                               .print()
+                                       .setParallelism(parallelism);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c1fcfa/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
new file mode 100644
index 0000000..aedbb5c
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CheckedThread.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.testutils;
+
+/**
+ * A thread that additionally catches exceptions and offers a joining method 
that
+ * re-throws the exceptions.
+ * 
+ * <p>Rather than overriding {@link Thread#run()} (or supplying a {@link 
Runnable}), one
+ * needs to extends this class and implement the {@link #go()} method. That 
method may
+ * throw exceptions.
+ * 
+ * <p>Exception from the {@link #go()} method are caught and re-thrown when 
joining this
+ * thread via the {@link #sync()} method.
+ */
+public abstract class CheckedThread extends Thread {
+
+       /** The error thrown from the main work method */
+       private volatile Throwable error;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This method needs to be overwritten to contain the main work logic.
+        * It takes the role of {@link Thread#run()}, but should propagate 
exceptions.
+        * 
+        * @throws Exception The exceptions thrown here will be re-thrown in 
the {@link #sync()} method.
+        */
+       public abstract void go() throws Exception;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This method is final - thread work should go into the {@link #go()} 
method instead.
+        */
+       @Override
+       public final void run() {
+               try {
+                       go();
+               }
+               catch (Throwable t) {
+                       error = t;
+               }
+       }
+
+       /**
+        * Waits until the thread is completed and checks whether any error 
occurred during
+        * the execution.
+        * 
+        * <p>This method blocks like {@link #join()}, but performs an 
additional check for
+        * exceptions thrown from the {@link #go()} method.
+        */
+       public void sync() throws Exception {
+               super.join();
+
+               // propagate the error
+               if (error != null) {
+                       if (error instanceof Error) {
+                               throw (Error) error;
+                       }
+                       else if (error instanceof Exception) {
+                               throw (Exception) error;
+                       }
+                       else {
+                               throw new Exception(error.getMessage(), error);
+                       }
+               }
+       }
+}

Reply via email to