Repository: flink
Updated Branches:
  refs/heads/master e0d8c147e -> dfaec3370


[FLINK-7372] [JobGraph] Remove ActorGateway from JobGraph

The JobGraph has an unncessary dependency on the ActorGateway via its
JobGraph#uploadUserJars method. In order to get rid of this dependency
for future Flip-6 changes, this commit retrieves the BlobServer's
address beforehand and directly passes it to this method.

This closes #4483.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d52ccd29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d52ccd29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d52ccd29

Branch: refs/heads/master
Commit: d52ccd2941ff25c3c61146b25c52df1ddc09d8da
Parents: e0d8c14
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sat Aug 5 00:28:15 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Aug 10 10:56:48 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/client/program/ClientTest.java |  5 +-
 .../webmonitor/handlers/JarRunHandler.java      | 15 ++++-
 .../apache/flink/runtime/blob/BlobClient.java   | 56 ++---------------
 .../apache/flink/runtime/client/JobClient.java  | 39 +++++++++++-
 .../client/JobSubmissionClientActor.java        | 30 +++++++++-
 .../apache/flink/runtime/jobgraph/JobGraph.java | 63 +++-----------------
 .../runtime/client/JobClientActorTest.java      |  7 ++-
 7 files changed, 101 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 97794dd..ba2fc94 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -322,8 +322,9 @@ public class ClientTest extends TestLogger {
                                getSender().tell(
                                                decorateMessage(new 
JobManagerMessages.ResponseLeaderSessionID(leaderSessionID)),
                                                getSelf());
-                       }
-                       else {
+                       } else if (message instanceof 
JobManagerMessages.RequestBlobManagerPort$) {
+                               getSender().tell(1337, getSelf());
+                       } else {
                                getSender().tell(
                                                decorateMessage(new 
Status.Failure(new Exception("Unknown message " + message))),
                                                getSelf());

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 9a7cabe..303b180 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -31,7 +31,10 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
 
@@ -61,8 +64,18 @@ public class JarRunHandler extends JarActionHandler {
                try {
                        JarActionHandlerConfig config = 
JarActionHandlerConfig.fromParams(pathParams, queryParams);
                        Tuple2<JobGraph, ClassLoader> graph = 
getJobGraphAndClassLoader(config);
+
+                       final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = JobClient.retrieveBlobServerAddress(jobManager, 
timeout);
+                       final InetSocketAddress blobServerAddress;
+
+                       try {
+                               blobServerAddress = 
blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+                       } catch (Exception e) {
+                               throw new ProgramInvocationException("Failed to 
retrieve BlobServer address.", e);
+                       }
+
                        try {
-                               graph.f0.uploadUserJars(jobManager, timeout, 
clientConfig);
+                               graph.f0.uploadUserJars(blobServerAddress, 
clientConfig);
                        } catch (IOException e) {
                                throw new ProgramInvocationException("Failed to 
upload jar files to the job manager", e);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index ce59d75..0882ec3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -24,16 +24,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
@@ -484,50 +478,6 @@ public final class BlobClient implements Closeable {
        }
 
        /**
-        * Retrieves the {@link BlobServer} address from the JobManager and 
uploads
-        * the JAR files to it.
-        *
-        * @param jobManager   Server address of the {@link BlobServer}
-        * @param askTimeout   Ask timeout for blob server address retrieval
-        * @param clientConfig Any additional configuration for the blob client
-        * @param jars         List of JAR files to upload
-        * @throws IOException Thrown if the address retrieval or upload fails
-        */
-       public static List<BlobKey> uploadJarFiles(
-                       ActorGateway jobManager,
-                       FiniteDuration askTimeout,
-                       Configuration clientConfig,
-                       List<Path> jars) throws IOException {
-
-               if (jars.isEmpty()) {
-                       return Collections.emptyList();
-               } else {
-                       Object msg = 
JobManagerMessages.getRequestBlobManagerPort();
-                       Future<Object> futureBlobPort = jobManager.ask(msg, 
askTimeout);
-
-                       try {
-                               // Retrieve address
-                               Object result = Await.result(futureBlobPort, 
askTimeout);
-                               if (result instanceof Integer) {
-                                       int port = (Integer) result;
-                                       LOG.info("Blob client connecting to " + 
jobManager.path());
-
-                                       Option<String> jmHost = 
jobManager.actor().path().address().host();
-                                       String jmHostname = jmHost.isDefined() 
? jmHost.get() : "localhost";
-                                       InetSocketAddress serverAddress = new 
InetSocketAddress(jmHostname, port);
-
-                                       // Now, upload
-                                       return uploadJarFiles(serverAddress, 
clientConfig, jars);
-                               } else {
-                                       throw new Exception("Expected port 
number (int) as answer, received " + result);
-                               }
-                       } catch (Exception e) {
-                               throw new IOException("Could not retrieve the 
JobManager's blob port.", e);
-                       }
-               }
-       }
-
-       /**
         * Uploads the JAR files to a {@link BlobServer} at the given address.
         *
         * @param serverAddress Server address of the {@link BlobServer}
@@ -535,8 +485,10 @@ public final class BlobClient implements Closeable {
         * @param jars List of JAR files to upload
         * @throws IOException Thrown if the upload fails
         */
-       public static List<BlobKey> uploadJarFiles(InetSocketAddress 
serverAddress,
-                       Configuration clientConfig, List<Path> jars) throws 
IOException {
+       public static List<BlobKey> uploadJarFiles(
+                       InetSocketAddress serverAddress,
+                       Configuration clientConfig,
+                       List<Path> jars) throws IOException {
                if (jars.isEmpty()) {
                        return Collections.emptyList();
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 6a49564..01d09a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -50,12 +51,15 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -417,8 +421,19 @@ public class JobClient {
                checkNotNull(timeout, "The timeout must not be null.");
 
                LOG.info("Checking and uploading JAR files");
+
+               final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = retrieveBlobServerAddress(jobManagerGateway, timeout);
+
+               final InetSocketAddress blobServerAddress;
+
+               try {
+                       blobServerAddress = 
blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+               } catch (Exception e) {
+                       throw new JobSubmissionException(jobGraph.getJobID(), 
"Could not retrieve BlobServer address.", e);
+               }
+
                try {
-                       jobGraph.uploadUserJars(jobManagerGateway, timeout, 
config);
+                       jobGraph.uploadUserJars(blobServerAddress, config);
                }
                catch (IOException e) {
                        throw new JobSubmissionException(jobGraph.getJobID(),
@@ -473,4 +488,26 @@ public class JobClient {
                }
        }
 
+       /**
+        * Utility method to retrieve the BlobServer address from the given 
JobManager gateway.
+        *
+        * @param jobManagerGateway to obtain the BlobServer address from
+        * @param timeout for this operation
+        * @return CompletableFuture containing the BlobServer address
+        */
+       public static CompletableFuture<InetSocketAddress> 
retrieveBlobServerAddress(
+                       ActorGateway jobManagerGateway,
+                       FiniteDuration timeout) {
+
+               CompletableFuture<Integer> futureBlobPort = FutureUtils.toJava(
+                       jobManagerGateway
+                               
.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout)
+                               .mapTo(ClassTag$.MODULE$.apply(Integer.class)));
+
+               final Option<String> jmHost = 
jobManagerGateway.actor().path().address().host();
+               final String jmHostname = jmHost.isDefined() ? jmHost.get() : 
"localhost";
+
+               return futureBlobPort.thenApply(
+                       (Integer blobPort) -> new InetSocketAddress(jmHostname, 
blobPort));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index babb0f6..7d9f452 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -36,7 +36,10 @@ import org.apache.flink.runtime.util.SerializedThrowable;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -144,8 +147,28 @@ public class JobSubmissionClientActor extends 
JobClientActor {
 
                                LOG.info("Upload jar files to job manager {}.", 
jobManager.path());
 
+                               final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = 
JobClient.retrieveBlobServerAddress(jobManagerGateway, timeout);
+                               final InetSocketAddress blobServerAddress;
+
                                try {
-                                       
jobGraph.uploadUserJars(jobManagerGateway, timeout, clientConfig);
+                                       blobServerAddress = 
blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+                               } catch (Exception e) {
+                                       getSelf().tell(
+                                               decorateMessage(new 
JobManagerMessages.JobResultFailure(
+                                                       new SerializedThrowable(
+                                                               new 
JobSubmissionException(
+                                                                       
jobGraph.getJobID(),
+                                                                       "Could 
not retrieve BlobServer address.",
+                                                                       e)
+                                                       )
+                                               )),
+                                               ActorRef.noSender());
+
+                                       return null;
+                               }
+
+                               try {
+                                       
jobGraph.uploadUserJars(blobServerAddress, clientConfig);
                                } catch (IOException exception) {
                                        getSelf().tell(
                                                decorateMessage(new 
JobManagerMessages.JobResultFailure(
@@ -156,8 +179,9 @@ public class JobSubmissionClientActor extends 
JobClientActor {
                                                                        
exception)
                                                        )
                                                )),
-                                               ActorRef.noSender()
-                                       );
+                                               ActorRef.noSender());
+
+                                       return null;
                                }
 
                                LOG.info("Submit job to the job manager {}.", 
jobManager.path());

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 2f5cd25..f0327a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -22,15 +22,11 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.util.SerializedValue;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -527,65 +523,24 @@ public class JobGraph implements Serializable {
        }
 
        /**
-        * Uploads the previously added user jar file to the job manager 
through the job manager's BLOB server.
-        *
-        * @param serverAddress
-        *        the network address of the BLOB server
-        * @param blobClientConfig
-        *        the blob client configuration
-        * @throws IOException
-        *         thrown if an I/O error occurs during the upload
-        */
-       public void uploadRequiredJarFiles(InetSocketAddress serverAddress,
-                       Configuration blobClientConfig) throws IOException {
-               if (this.userJars.isEmpty()) {
-                       return;
-               }
-
-               BlobClient bc = null;
-               try {
-                       bc = new BlobClient(serverAddress, blobClientConfig);
-
-                       for (final Path jar : this.userJars) {
-
-                               final FileSystem fs = jar.getFileSystem();
-                               FSDataInputStream is = null;
-                               try {
-                                       is = fs.open(jar);
-                                       final BlobKey key = bc.put(is);
-                                       this.userJarBlobKeys.add(key);
-                               }
-                               finally {
-                                       if (is != null) {
-                                               is.close();
-                                       }
-                               }
-                       }
-               }
-               finally {
-                       if (bc != null) {
-                               bc.close();
-                       }
-               }
-       }
-
-       /**
         * Uploads the previously added user JAR files to the job manager 
through
         * the job manager's BLOB server. The respective port is retrieved from 
the
         * JobManager. This function issues a blocking call.
         *
-        * @param jobManager JobManager actor gateway
-        * @param askTimeout Ask timeout
+        * @param blobServerAddress of the blob server to upload the jars to
         * @param blobClientConfig the blob client configuration
         * @throws IOException Thrown, if the file upload to the JobManager 
failed.
         */
-       public void uploadUserJars(ActorGateway jobManager, FiniteDuration 
askTimeout,
+       public void uploadUserJars(
+                       InetSocketAddress blobServerAddress,
                        Configuration blobClientConfig) throws IOException {
-               List<BlobKey> blobKeys = BlobClient.uploadJarFiles(jobManager, 
askTimeout, blobClientConfig, userJars);
+               if (!userJars.isEmpty()) {
+                       List<BlobKey> blobKeys = 
BlobClient.uploadJarFiles(blobServerAddress, blobClientConfig, userJars);
 
-               for (BlobKey blobKey : blobKeys) {
-                       if (!userJarBlobKeys.contains(blobKey)) {
-                               userJarBlobKeys.add(blobKey);
+                       for (BlobKey blobKey : blobKeys) {
+                               if (!userJarBlobKeys.contains(blobKey)) {
+                                       userJarBlobKeys.add(blobKey);
+                               }
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d52ccd29/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index 8530b0f..919a784 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -347,6 +347,9 @@ public class JobClientActorTest extends TestLogger {
 
                @Override
                protected void handleMessage(Object message) throws Exception {
+                       if (message instanceof RequestBlobManagerPort$) {
+                               getSender().tell(1337, getSelf());
+                       }
                }
 
                @Override
@@ -388,7 +391,9 @@ public class JobClientActorTest extends TestLogger {
                                        testFuture.tell(Acknowledge.get(), 
getSelf());
                                }
                        }
-                       else if (message instanceof RegisterTest) {
+                       else if (message instanceof RequestBlobManagerPort$) {
+                               getSender().tell(1337, getSelf());
+                       } else if (message instanceof RegisterTest) {
                                testFuture = getSender();
 
                                if (jobAccepted) {

Reply via email to