[FLINK-8811] [flip6] Add initial implementation of the MiniClusterClient

The MiniClusterClient directly talks to the MiniCluster avoiding polling
latencies of th RestClusterClient.

This closes #5600.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f30ca210
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f30ca210
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f30ca210

Branch: refs/heads/release-1.5
Commit: f30ca2101f2151214b138f37f472f886fbdfd9f0
Parents: 02cb9e3
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Feb 28 17:36:39 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Mar 2 08:53:48 2018 +0100

----------------------------------------------------------------------
 .../flink/client/program/MiniClusterClient.java | 171 +++++++++++++++++++
 .../flink/runtime/minicluster/MiniCluster.java  |   7 +
 .../flink/test/util/MiniClusterResource.java    |  53 ++++--
 3 files changed, 216 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f30ca210/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
new file mode 100644
index 0000000..5baae5b
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.FlinkException;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client to interact with a {@link MiniCluster}.
+ */
+public class MiniClusterClient extends 
ClusterClient<MiniClusterClient.MiniClusterId> {
+
+       private final MiniCluster miniCluster;
+
+       public MiniClusterClient(@Nonnull Configuration configuration, @Nonnull 
MiniCluster miniCluster) throws Exception {
+               super(configuration, miniCluster.getHighAvailabilityServices());
+
+               this.miniCluster = miniCluster;
+       }
+
+       @Override
+       protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+               if (isDetached()) {
+                       try {
+                               miniCluster.runDetached(jobGraph);
+                       } catch (JobExecutionException | InterruptedException 
e) {
+                               throw new ProgramInvocationException(
+                                       String.format("Could not run job %s in 
detached mode.", jobGraph.getJobID()),
+                                       e);
+                       }
+
+                       return new JobSubmissionResult(jobGraph.getJobID());
+               } else {
+                       try {
+                               return miniCluster.executeJobBlocking(jobGraph);
+                       } catch (JobExecutionException | InterruptedException 
e) {
+                               throw new ProgramInvocationException(
+                                       String.format("Could not run job %s.", 
jobGraph.getJobID()),
+                                       e);
+                       }
+               }
+       }
+
+       @Override
+       public void cancel(JobID jobId) throws Exception {
+               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+       }
+
+       @Override
+       public String cancelWithSavepoint(JobID jobId, @Nullable String 
savepointDirectory) throws Exception {
+               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+       }
+
+       @Override
+       public void stop(JobID jobId) throws Exception {
+               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+       }
+
+       @Override
+       public CompletableFuture<String> triggerSavepoint(JobID jobId, 
@Nullable String savepointDirectory) throws FlinkException {
+               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath, Time timeout) throws FlinkException {
+               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+       }
+
+       @Override
+       public CompletableFuture<Collection<JobStatusMessage>> listJobs() 
throws Exception {
+               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+       }
+
+       @Override
+       public Map<String, Object> getAccumulators(JobID jobID) throws 
Exception {
+               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+       }
+
+       @Override
+       public Map<String, Object> getAccumulators(JobID jobID, ClassLoader 
loader) throws Exception {
+               throw new UnsupportedOperationException("MiniClusterClient does 
not yet support this operation.");
+       }
+
+       @Override
+       public MiniClusterClient.MiniClusterId getClusterId() {
+               return MiniClusterId.INSTANCE;
+       }
+
+       @Override
+       public LeaderConnectionInfo getClusterConnectionInfo() throws 
LeaderRetrievalException {
+               return LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+                       highAvailabilityServices.getDispatcherLeaderRetriever(),
+                       timeout);
+       }
+
+       // ======================================
+       // Legacy methods
+       // ======================================
+
+       @Override
+       public void waitForClusterToBeReady() {
+               // no op
+       }
+
+       @Override
+       public String getWebInterfaceURL() {
+               return miniCluster.getRestAddress().toString();
+       }
+
+       @Override
+       public GetClusterStatusResponse getClusterStatus() {
+               return null;
+       }
+
+       @Override
+       public List<String> getNewMessages() {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public int getMaxSlots() {
+               return 0;
+       }
+
+       @Override
+       public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+               return false;
+       }
+
+       enum MiniClusterId {
+               INSTANCE
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f30ca210/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3bd17d1..cbfb266 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -178,6 +178,13 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                }
        }
 
+       public HighAvailabilityServices getHighAvailabilityServices() {
+               synchronized (lock) {
+                       checkState(running, "MiniCluster is not yet running.");
+                       return haServices;
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  life cycle
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f30ca210/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 1c5da62..a1ce647 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -19,15 +19,20 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
@@ -35,8 +40,6 @@ import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -59,6 +62,8 @@ public class MiniClusterResource extends ExternalResource {
 
        private JobExecutorService jobExecutorService;
 
+       private ClusterClient<?> clusterClient;
+
        private int numberSlots = -1;
 
        private TestEnvironment executionEnvironment;
@@ -80,6 +85,10 @@ public class MiniClusterResource extends ExternalResource {
                return numberSlots;
        }
 
+       public ClusterClient<?> getClusterClient() {
+               return clusterClient;
+       }
+
        public TestEnvironment getTestEnvironment() {
                return executionEnvironment;
        }
@@ -87,7 +96,7 @@ public class MiniClusterResource extends ExternalResource {
        @Override
        public void before() throws Exception {
 
-               jobExecutorService = startJobExecutorService(miniClusterType);
+               startJobExecutorService(miniClusterType);
 
                numberSlots = 
miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * 
miniClusterResourceConfiguration.getNumberTaskManagers();
 
@@ -102,6 +111,16 @@ public class MiniClusterResource extends ExternalResource {
                TestStreamEnvironment.unsetAsContext();
                TestEnvironment.unsetAsContext();
 
+               Exception exception = null;
+
+               try {
+                       clusterClient.shutdown();
+               } catch (Exception e) {
+                       exception = e;
+               }
+
+               clusterClient = null;
+
                final CompletableFuture<?> terminationFuture = 
jobExecutorService.closeAsync();
 
                try {
@@ -109,40 +128,43 @@ public class MiniClusterResource extends ExternalResource 
{
                                
miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(),
                                TimeUnit.MILLISECONDS);
                } catch (Exception e) {
-                       LOG.warn("Could not properly shut down the 
MiniClusterResource.", e);
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                }
 
                jobExecutorService = null;
+
+               if (exception != null) {
+                       LOG.warn("Could not properly shut down the 
MiniClusterResource.", exception);
+               }
        }
 
-       private JobExecutorService startJobExecutorService(MiniClusterType 
miniClusterType) throws Exception {
-               final JobExecutorService jobExecutorService;
+       private void startJobExecutorService(MiniClusterType miniClusterType) 
throws Exception {
                switch (miniClusterType) {
                        case OLD:
-                               jobExecutorService = startOldMiniCluster();
+                               startOldMiniCluster();
                                break;
                        case FLIP6:
-                               jobExecutorService = startFlip6MiniCluster();
+                               startFlip6MiniCluster();
                                break;
                        default:
                                throw new FlinkRuntimeException("Unknown 
MiniClusterType "  + miniClusterType + '.');
                }
-
-               return jobExecutorService;
        }
 
-       private JobExecutorService startOldMiniCluster() throws Exception {
+       private void startOldMiniCluster() throws Exception {
                final Configuration configuration = new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
miniClusterResourceConfiguration.getNumberTaskManagers());
                
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
 
-               return TestBaseUtils.startCluster(
+               final LocalFlinkMiniCluster flinkMiniCluster = 
TestBaseUtils.startCluster(
                        configuration,
                        true);
+
+               jobExecutorService = flinkMiniCluster;
+               clusterClient = new StandaloneClusterClient(configuration, 
flinkMiniCluster.highAvailabilityServices());
        }
 
-       @Nonnull
-       private JobExecutorService startFlip6MiniCluster() throws Exception {
+       private void startFlip6MiniCluster() throws Exception {
                final Configuration configuration = 
miniClusterResourceConfiguration.getConfiguration();
 
                // we need to set this since a lot of test expect this because 
TestBaseUtils.startCluster()
@@ -165,7 +187,8 @@ public class MiniClusterResource extends ExternalResource {
                // update the port of the rest endpoint
                configuration.setInteger(RestOptions.REST_PORT, 
miniCluster.getRestAddress().getPort());
 
-               return miniCluster;
+               jobExecutorService = miniCluster;
+               clusterClient = new MiniClusterClient(configuration, 
miniCluster);
        }
 
        /**

Reply via email to