[FLINK-7375] Replace ActorGateway with JobManagerGateway in JobClient

In order to make the JobClient code independent of Akka, this PR replaces the
ActorGateway parameters by JobManagerGateway. AkkaJobManagerGateway is the
respective implementation of the JobManagerGateway for Akka. Moreover, this
PR introduces useful ExceptionUtils method for handling of Future exceptions.
Additionally, the SerializedThrowable has been moved to flink-core.

This closes #4486.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dfaec337
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dfaec337
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dfaec337

Branch: refs/heads/master
Commit: dfaec337059cc59800d6c708d03a8194db487872
Parents: d52ccd2
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sun Aug 6 17:56:41 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Aug 10 10:59:10 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |  25 +--
 .../apache/flink/client/program/ClientTest.java |   2 +-
 .../org/apache/flink/util/ExceptionUtils.java   |  38 ++++
 .../apache/flink/util/SerializedThrowable.java  | 181 ++++++++++++++++++
 .../webmonitor/handlers/JarRunHandler.java      |  28 +--
 .../BackPressureStatsTrackerITCase.java         |   5 +-
 .../StackTraceSampleCoordinatorITCase.java      |   5 +-
 .../runtime/akka/AkkaJobManagerGateway.java     | 122 ++++++++++++
 .../apache/flink/runtime/client/JobClient.java  | 128 +++++--------
 .../flink/runtime/client/JobClientActor.java    |   2 +-
 .../runtime/client/JobListeningContext.java     |   7 +-
 .../client/JobSubmissionClientActor.java        |  12 +-
 .../messages/FatalErrorOccurred.java            |   2 +-
 .../flink/runtime/concurrent/FutureUtils.java   |  13 ++
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +-
 .../executiongraph/StatusListenerMessenger.java |   2 +-
 .../partition/ProducerFailedException.java      |   2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   4 +-
 .../runtime/jobmaster/JobManagerGateway.java    |  66 +++++++
 .../flink/runtime/jobmaster/JobMaster.java      |   2 +-
 .../messages/checkpoint/DeclineCheckpoint.java  |   2 +-
 .../runtime/taskmanager/TaskExecutionState.java |   2 +-
 .../flink/runtime/util/SerializedThrowable.java | 184 -------------------
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../runtime/messages/JobManagerMessages.scala   |   2 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   8 +-
 .../partition/ProducerFailedExceptionTest.java  |   2 +-
 .../runtime/util/SerializedThrowableTest.java   |   1 +
 28 files changed, 524 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 3018a8c..7bc2655 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -35,11 +35,11 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobListeningContext;
-import org.apache.flink.runtime.client.JobRetrievalException;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -500,7 +500,12 @@ public abstract class ClusterClient {
 
                try {
                        logAndSysout("Submitting Job with JobID: " + 
jobGraph.getJobID() + ". Returning after job submission.");
-                       JobClient.submitJobDetached(jobManagerGateway, 
flinkConfig, jobGraph, timeout, classLoader);
+                       JobClient.submitJobDetached(
+                               new AkkaJobManagerGateway(jobManagerGateway),
+                               flinkConfig,
+                               jobGraph,
+                               Time.milliseconds(timeout.toMillis()),
+                               classLoader);
                        return new JobSubmissionResult(jobGraph.getJobID());
                } catch (JobExecutionException e) {
                        throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
@@ -525,16 +530,8 @@ public abstract class ClusterClient {
                                fe);
                }
 
-               ActorGateway jobManagerGateway;
-               try {
-                       jobManagerGateway = getJobManagerGateway();
-               } catch (Exception e) {
-                       throw new JobRetrievalException(jobID, "Could not 
retrieve the JobManager Gateway");
-               }
-
                final JobListeningContext listeningContext = 
JobClient.attachToRunningJob(
                        jobID,
-                       jobManagerGateway,
                        flinkConfig,
                        actorSystem,
                        highAvailabilityServices,
@@ -563,16 +560,8 @@ public abstract class ClusterClient {
                                fe);
                }
 
-               ActorGateway jobManagerGateway;
-               try {
-                       jobManagerGateway = getJobManagerGateway();
-               } catch (Exception e) {
-                       throw new JobRetrievalException(jobID, "Could not 
retrieve the JobManager Gateway", e);
-               }
-
                return JobClient.attachToRunningJob(
                        jobID,
-                       jobManagerGateway,
                        flinkConfig,
                        actorSystem,
                        highAvailabilityServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index ba2fc94..99f51ad 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -43,8 +43,8 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index ca81465..9c8907b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -298,6 +299,43 @@ public final class ExceptionUtils {
                return false;
        }
 
+       /**
+        * Unpacks an {@link ExecutionException} and returns its cause. 
Otherwise the given
+        * Throwable is returned.
+        *
+        * @param throwable to unpack if it is an ExecutionException
+        * @return Cause of ExecutionException or given Throwable
+        */
+       public static Throwable stripExecutionException(Throwable throwable) {
+               while (throwable instanceof ExecutionException && 
throwable.getCause() != null) {
+                       throwable = throwable.getCause();
+               }
+
+               return throwable;
+       }
+
+       /**
+        * Tries to find a {@link SerializedThrowable} as the cause of the 
given throwable and throws its
+        * deserialized value. If there is no such throwable, then the original 
throwable is thrown.
+        *
+        * @param throwable to check for a SerializedThrowable
+        * @param classLoader to be used for the deserialization of the 
SerializedThrowable
+        * @throws Throwable either the deserialized throwable or the given 
throwable
+        */
+       public static void tryDeserializeAndThrow(Throwable throwable, 
ClassLoader classLoader) throws Throwable {
+               Throwable current = throwable;
+
+               while (!(current instanceof SerializedThrowable) && 
current.getCause() != null) {
+                       current = current.getCause();
+               }
+
+               if (current instanceof SerializedThrowable) {
+                       throw ((SerializedThrowable) 
current).deserializeError(classLoader);
+               } else {
+                       throw throwable;
+               }
+       }
+
        // 
------------------------------------------------------------------------
 
        /** Private constructor to prevent instantiation. */

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java 
b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
new file mode 100644
index 0000000..dab7cda
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.lang.ref.WeakReference;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Utility class for dealing with user-defined Throwable types that are 
serialized (for
+ * example during RPC/Actor communication), but cannot be resolved with the 
default
+ * class loader.
+ * 
+ * <p>This exception mimics the original exception with respect to message and 
stack trace,
+ * and contains the original exception in serialized form. The original 
exception
+ * can be re-obtained by supplying the appropriate class loader.
+ */
+public class SerializedThrowable extends Exception implements Serializable {
+       
+       private static final long serialVersionUID = 7284183123441947635L;
+       
+       /** The original exception in serialized form */
+       private final byte[] serializedException;
+       
+       /** Name of the original error class */
+       private final String originalErrorClassName;
+       
+       /** The original stack trace, to be printed */
+       private final String fullStringifiedStackTrace;
+
+       /** The original exception, not transported via serialization, 
+        * because the class may not be part of the system class loader.
+        * In addition, we make sure our cached references to not prevent
+        * unloading the exception class. */
+       private transient WeakReference<Throwable> cachedException;
+
+
+       /**
+        * Create a new SerializedThrowable.
+        * 
+        * @param exception The exception to serialize.
+        */
+       public SerializedThrowable(Throwable exception) {
+               this(exception, new HashSet<Throwable>());
+       }
+
+       private SerializedThrowable(Throwable exception, Set<Throwable> 
alreadySeen) {
+               super(getMessageOrError(exception));
+
+               if (!(exception instanceof SerializedThrowable)) {
+                       // serialize and memoize the original message
+                       byte[] serialized;
+                       try {
+                               serialized = 
InstantiationUtil.serializeObject(exception);
+                       }
+                       catch (Throwable t) {
+                               serialized = null;
+                       }
+                       this.serializedException = serialized;
+                       this.cachedException = new 
WeakReference<Throwable>(exception);
+
+                       // record the original exception's properties (name, 
stack prints)
+                       this.originalErrorClassName = 
exception.getClass().getName();
+                       this.fullStringifiedStackTrace = 
ExceptionUtils.stringifyException(exception);
+
+                       // mimic the original exception's stack trace
+                       setStackTrace(exception.getStackTrace());
+
+                       // mimic the original exception's cause
+                       if (exception.getCause() == null) {
+                               initCause(null);
+                       }
+                       else {
+                               // exception causes may by cyclic, so we 
truncate the cycle when we find it 
+                               if (alreadySeen.add(exception)) {
+                                       // we are not in a cycle, yet
+                                       initCause(new 
SerializedThrowable(exception.getCause(), alreadySeen));
+                               }
+                       }
+
+               }
+               else {
+                       // copy from that serialized throwable
+                       SerializedThrowable other = (SerializedThrowable) 
exception;
+                       this.serializedException = other.serializedException;
+                       this.originalErrorClassName = 
other.originalErrorClassName;
+                       this.fullStringifiedStackTrace = 
other.fullStringifiedStackTrace;
+                       this.cachedException = other.cachedException;
+                       this.setStackTrace(other.getStackTrace());
+                       this.initCause(other.getCause());
+               }
+       }
+
+       public Throwable deserializeError(ClassLoader classloader) {
+               if (serializedException == null) {
+                       // failed to serialize the original exception
+                       // return this SerializedThrowable as a stand in
+                       return this;
+               }
+
+               Throwable cached = cachedException == null ? null : 
cachedException.get();
+               if (cached == null) {
+                       try {
+                               cached = 
InstantiationUtil.deserializeObject(serializedException, classloader);
+                               cachedException = new 
WeakReference<Throwable>(cached);
+                       }
+                       catch (Throwable t) {
+                               // something went wrong
+                               // return this SerializedThrowable as a stand in
+                               return this;
+                       }
+               }
+               return cached;
+       }
+
+       public String getOriginalErrorClassName() {
+               return originalErrorClassName;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Override the behavior of Throwable
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void printStackTrace(PrintStream s) {
+               s.print(fullStringifiedStackTrace);
+               s.flush();
+       }
+       
+       @Override
+       public void printStackTrace(PrintWriter s) {
+               s.print(fullStringifiedStackTrace);
+               s.flush();
+       }
+       
+       @Override
+       public String toString() {
+               String message = getLocalizedMessage();
+               return (message != null) ? (originalErrorClassName + ": " + 
message) : originalErrorClassName;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Static utilities
+       // 
------------------------------------------------------------------------
+
+       public static Throwable get(Throwable serThrowable, ClassLoader loader) 
{
+               if (serThrowable instanceof SerializedThrowable) {
+                       return 
((SerializedThrowable)serThrowable).deserializeError(loader);
+               } else {
+                       return serThrowable;
+               }
+       }
+
+       private static String getMessageOrError(Throwable error) {
+               try {
+                       return error.getMessage();
+               }
+               catch (Throwable t) {
+                       return "(failed to get message)";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 303b180..282fea8 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -29,12 +31,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.StringWriter;
-import java.net.InetSocketAddress;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.duration.FiniteDuration;
 
@@ -65,23 +63,13 @@ public class JarRunHandler extends JarActionHandler {
                        JarActionHandlerConfig config = 
JarActionHandlerConfig.fromParams(pathParams, queryParams);
                        Tuple2<JobGraph, ClassLoader> graph = 
getJobGraphAndClassLoader(config);
 
-                       final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = JobClient.retrieveBlobServerAddress(jobManager, 
timeout);
-                       final InetSocketAddress blobServerAddress;
-
-                       try {
-                               blobServerAddress = 
blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-                       } catch (Exception e) {
-                               throw new ProgramInvocationException("Failed to 
retrieve BlobServer address.", e);
-                       }
-
-                       try {
-                               graph.f0.uploadUserJars(blobServerAddress, 
clientConfig);
-                       } catch (IOException e) {
-                               throw new ProgramInvocationException("Failed to 
upload jar files to the job manager", e);
-                       }
-
                        try {
-                               JobClient.submitJobDetached(jobManager, 
clientConfig, graph.f0, timeout, graph.f1);
+                               JobClient.submitJobDetached(
+                                       new AkkaJobManagerGateway(jobManager),
+                                       clientConfig,
+                                       graph.f0,
+                                       Time.milliseconds(timeout.toMillis()),
+                                       graph.f1);
                        } catch (JobExecutionException e) {
                                throw new ProgramInvocationException("Failed to 
submit the job to the job manager", e);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 4d80145..0e4734d 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -156,10 +157,10 @@ public class BackPressureStatsTrackerITCase extends 
TestLogger {
 
                                                        // Submit the job and 
wait until it is running
                                                        
JobClient.submitJobDetached(
-                                                                       jm,
+                                                                       new 
AkkaJobManagerGateway(jm),
                                                                        config,
                                                                        
jobGraph,
-                                                                       
deadline,
+                                                                       
Time.milliseconds(deadline.toMillis()),
                                                                        
ClassLoader.getSystemClassLoader());
 
                                                        jm.tell(new 
WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 47b43a5..bd12668 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -131,10 +132,10 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                                                        for (int i = 0; i < 
maxAttempts; i++, sleepTime *= 2) {
                                                                // Submit the 
job and wait until it is running
                                                                
JobClient.submitJobDetached(
-                                                                               
jm,
+                                                                               
new AkkaJobManagerGateway(jm),
                                                                                
config,
                                                                                
jobGraph,
-                                                                               
deadline,
+                                                                               
Time.milliseconds(deadline.toMillis()),
                                                                                
ClassLoader.getSystemClassLoader());
 
                                                                jm.tell(new 
WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
new file mode 100644
index 0000000..6ee78dd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import scala.Option;
+import scala.reflect.ClassTag$;
+
+/**
+ * Implementation of the {@link JobManagerGateway} for the {@link 
ActorGateway}.
+ */
+public class AkkaJobManagerGateway implements JobManagerGateway {
+
+       private final ActorGateway jobManagerGateway;
+       private final String hostname;
+
+       public AkkaJobManagerGateway(ActorGateway jobManagerGateway) {
+               this.jobManagerGateway = 
Preconditions.checkNotNull(jobManagerGateway);
+
+               final Option<String> optHostname = 
jobManagerGateway.actor().path().address().host();
+
+               hostname = optHostname.isDefined() ? optHostname.get() : 
"localhost";
+       }
+
+       @Override
+       public String getAddress() {
+               return jobManagerGateway.path();
+       }
+
+       @Override
+       public String getHostname() {
+               return hostname;
+       }
+
+       @Override
+       public 
CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> 
requestClassloadingProps(JobID jobId, Time timeout) {
+               return FutureUtils
+                       .toJava(jobManagerGateway
+                               .ask(
+                                       new 
JobManagerMessages.RequestClassloadingProps(jobId),
+                                       FutureUtils.toFiniteDuration(timeout)))
+                       .thenApply(
+                               (Object response) -> {
+                                       if (response instanceof 
JobManagerMessages.ClassloadingProps) {
+                                               return 
Optional.of(((JobManagerMessages.ClassloadingProps) response));
+                                       } else if (response instanceof 
JobManagerMessages.JobNotFound) {
+                                               return Optional.empty();
+                                       } else {
+                                               throw new 
FlinkFutureException("Unknown response: " + response + '.');
+                                       }
+                               });
+       }
+
+       @Override
+       public CompletableFuture<Integer> requestBlobServerPort(Time timeout) {
+               return FutureUtils.toJava(
+                       jobManagerGateway
+                               
.ask(JobManagerMessages.getRequestBlobManagerPort(), 
FutureUtils.toFiniteDuration(timeout))
+                               .mapTo(ClassTag$.MODULE$.apply(Integer.class)));
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, 
ListeningBehaviour listeningBehaviour, Time timeout) {
+               return FutureUtils
+                       .toJava(
+                               jobManagerGateway.ask(
+                                       new JobManagerMessages.SubmitJob(
+                                               jobGraph,
+                                               listeningBehaviour),
+                                       FutureUtils.toFiniteDuration(timeout)))
+                       .thenApply(
+                               (Object response) -> {
+                                       if (response instanceof 
JobManagerMessages.JobSubmitSuccess) {
+                                               
JobManagerMessages.JobSubmitSuccess success = 
((JobManagerMessages.JobSubmitSuccess) response);
+
+                                               if 
(Objects.equals(success.jobId(), jobGraph.getJobID())) {
+                                                       return 
Acknowledge.get();
+                                               } else {
+                                                       throw new 
FlinkFutureException("JobManager responded for wrong Job. This Job: " +
+                                                               
jobGraph.getJobID() + ", response: " + success.jobId());
+                                               }
+                                       } else if (response instanceof 
JobManagerMessages.JobResultFailure) {
+                                               
JobManagerMessages.JobResultFailure failure = 
((JobManagerMessages.JobResultFailure) response);
+
+                                               throw new 
FlinkFutureException("Job submission failed.", failure.cause());
+                                       } else {
+                                               throw new 
FlinkFutureException("Unknown response to SubmitJob message: " + response + 
'.');
+                                       }
+                               }
+                       );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 01d09a1..562e697 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -28,19 +28,21 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,13 +53,13 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.Collection;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -144,7 +146,6 @@ public class JobClient {
         */
        public static JobListeningContext attachToRunningJob(
                        JobID jobID,
-                       ActorGateway jobManagerGateWay,
                        Configuration configuration,
                        ActorSystem actorSystem,
                        HighAvailabilityServices highAvailabilityServices,
@@ -152,7 +153,6 @@ public class JobClient {
                        boolean sysoutLogUpdates) {
 
                checkNotNull(jobID, "The jobID must not be null.");
-               checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
                checkNotNull(configuration, "The configuration must not be 
null.");
                checkNotNull(actorSystem, "The actorSystem must not be null.");
                checkNotNull(highAvailabilityServices, "The high availability 
services must not be null.");
@@ -193,36 +193,37 @@ public class JobClient {
         * @throws JobRetrievalException if anything goes wrong
         */
        public static ClassLoader retrieveClassLoader(
-               JobID jobID,
-               ActorGateway jobManager,
-               Configuration config,
-               HighAvailabilityServices highAvailabilityServices)
+                       JobID jobID,
+                       JobManagerGateway jobManager,
+                       Configuration config,
+                       HighAvailabilityServices highAvailabilityServices,
+                       Time timeout)
                throws JobRetrievalException {
 
-               final Object jmAnswer;
+               final 
CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> clPropsFuture 
= jobManager
+                       .requestClassloadingProps(jobID, timeout);
+
+               final Optional<JobManagerMessages.ClassloadingProps> optProps;
+
                try {
-                       jmAnswer = Await.result(
-                               jobManager.ask(
-                                       new 
JobManagerMessages.RequestClassloadingProps(jobID),
-                                       
AkkaUtils.getDefaultTimeoutAsFiniteDuration()),
-                               AkkaUtils.getDefaultTimeoutAsFiniteDuration());
+                       optProps = clPropsFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
                } catch (Exception e) {
-                       throw new JobRetrievalException(jobID, "Couldn't 
retrieve class loading properties from JobManager.", e);
+                       throw new JobRetrievalException(jobID, "Could not 
retrieve the class loading properties from JobManager.", e);
                }
 
-               if (jmAnswer instanceof JobManagerMessages.ClassloadingProps) {
-                       JobManagerMessages.ClassloadingProps props = 
((JobManagerMessages.ClassloadingProps) jmAnswer);
+               if (optProps.isPresent()) {
+                       JobManagerMessages.ClassloadingProps props = 
optProps.get();
 
-                       Option<String> jmHost = 
jobManager.actor().path().address().host();
-                       String jmHostname = jmHost.isDefined() ? jmHost.get() : 
"localhost";
-                       InetSocketAddress serverAddress = new 
InetSocketAddress(jmHostname, props.blobManagerPort());
+                       InetSocketAddress serverAddress = new 
InetSocketAddress(jobManager.getHostname(), props.blobManagerPort());
                        final BlobCache blobClient;
                        try {
                                // TODO: Fix lifecycle of BlobCache to properly 
close it upon usage
                                blobClient = new BlobCache(serverAddress, 
config, highAvailabilityServices.createBlobStore());
                        } catch (IOException e) {
-                               throw new JobRetrievalException(jobID,
-                                       "Failed to setup blob cache", e);
+                               throw new JobRetrievalException(
+                                       jobID,
+                                       "Failed to setup BlobCache.",
+                                       e);
                        }
 
                        final Collection<BlobKey> requiredJarFiles = 
props.requiredJarFiles();
@@ -250,10 +251,8 @@ public class JobClient {
                        }
 
                        return new FlinkUserCodeClassLoader(allURLs, 
JobClient.class.getClassLoader());
-               } else if (jmAnswer instanceof JobManagerMessages.JobNotFound) {
-                       throw new JobRetrievalException(jobID, "Couldn't 
retrieve class loader. Job " + jobID + " not found");
                } else {
-                       throw new JobRetrievalException(jobID, "Unknown 
response from JobManager: " + jmAnswer);
+                       throw new JobRetrievalException(jobID, "Couldn't 
retrieve class loader. Job " + jobID + " not found");
                }
        }
 
@@ -407,13 +406,13 @@ public class JobClient {
         * @param jobManagerGateway Gateway to the JobManager which will 
execute the jobs
         * @param config The cluster wide configuration.
         * @param jobGraph The job
-        * @param timeout  Timeout in which the JobManager must have responded.
+        * @param timeout Timeout in which the JobManager must have responded.
         */
        public static void submitJobDetached(
-                       ActorGateway jobManagerGateway,
+                       JobManagerGateway jobManagerGateway,
                        Configuration config,
                        JobGraph jobGraph,
-                       FiniteDuration timeout,
+                       Time timeout,
                        ClassLoader classLoader) throws JobExecutionException {
 
                checkNotNull(jobManagerGateway, "The jobManagerGateway must not 
be null.");
@@ -427,7 +426,7 @@ public class JobClient {
                final InetSocketAddress blobServerAddress;
 
                try {
-                       blobServerAddress = 
blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+                       blobServerAddress = 
blobServerAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                        throw new JobSubmissionException(jobGraph.getJobID(), 
"Could not retrieve BlobServer address.", e);
                }
@@ -440,52 +439,27 @@ public class JobClient {
                                "Could not upload the program's JAR files to 
the JobManager.", e);
                }
 
-               Object result;
+               CompletableFuture<Acknowledge> submissionFuture = 
jobManagerGateway.submitJob(jobGraph, ListeningBehaviour.DETACHED, timeout);
+
                try {
-                       Future<Object> future = jobManagerGateway.ask(
-                               new JobManagerMessages.SubmitJob(
-                                       jobGraph,
-                                       ListeningBehaviour.DETACHED // only 
receive the Acknowledge for the job submission message
-                               ),
-                               timeout);
-
-                       result = Await.result(future, timeout);
-               }
-               catch (TimeoutException e) {
+                       submissionFuture.get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+               } catch (TimeoutException e) {
                        throw new JobTimeoutException(jobGraph.getJobID(),
-                                       "JobManager did not respond within " + 
timeout.toString(), e);
-               }
-               catch (Throwable t) {
-                       throw new JobSubmissionException(jobGraph.getJobID(),
-                                       "Failed to send job to JobManager: " + 
t.getMessage(), t.getCause());
-               }
+                               "JobManager did not respond within " + timeout, 
e);
+               } catch (Throwable throwable) {
+                       Throwable stripped = 
ExceptionUtils.stripExecutionException(throwable);
 
-               if (result instanceof JobManagerMessages.JobSubmitSuccess) {
-                       JobID respondedID = 
((JobManagerMessages.JobSubmitSuccess) result).jobId();
-
-                       // validate response
-                       if (!respondedID.equals(jobGraph.getJobID())) {
-                               throw new 
JobExecutionException(jobGraph.getJobID(),
-                                               "JobManager responded for wrong 
Job. This Job: " +
-                                               jobGraph.getJobID() + ", 
response: " + respondedID);
-                       }
-               }
-               else if (result instanceof JobManagerMessages.JobResultFailure) 
{
                        try {
-                               SerializedThrowable t = 
((JobManagerMessages.JobResultFailure) result).cause();
-                               throw t.deserializeError(classLoader);
-                       }
-                       catch (JobExecutionException e) {
-                               throw e;
-                       }
-                       catch (Throwable t) {
-                               throw new 
JobExecutionException(jobGraph.getJobID(),
-                                               "JobSubmission failed: " + 
t.getMessage(), t);
+                               ExceptionUtils.tryDeserializeAndThrow(stripped, 
classLoader);
+                       } catch (JobExecutionException jee) {
+                               throw jee;
+                       } catch (Throwable t) {
+                               throw new JobExecutionException(
+                                       jobGraph.getJobID(),
+                                       "JobSubmission failed.",
+                                       t);
                        }
                }
-               else {
-                       throw new JobExecutionException(jobGraph.getJobID(), 
"Unexpected response from JobManager: " + result);
-               }
        }
 
        /**
@@ -496,16 +470,12 @@ public class JobClient {
         * @return CompletableFuture containing the BlobServer address
         */
        public static CompletableFuture<InetSocketAddress> 
retrieveBlobServerAddress(
-                       ActorGateway jobManagerGateway,
-                       FiniteDuration timeout) {
+                       JobManagerGateway jobManagerGateway,
+                       Time timeout) {
 
-               CompletableFuture<Integer> futureBlobPort = FutureUtils.toJava(
-                       jobManagerGateway
-                               
.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout)
-                               .mapTo(ClassTag$.MODULE$.apply(Integer.class)));
+               CompletableFuture<Integer> futureBlobPort = 
jobManagerGateway.requestBlobServerPort(timeout);
 
-               final Option<String> jmHost = 
jobManagerGateway.actor().path().address().host();
-               final String jmHostname = jmHost.isDefined() ? jmHost.get() : 
"localhost";
+               final String jmHostname = jobManagerGateway.getHostname();
 
                return futureBlobPort.thenApply(
                        (Integer blobPort) -> new InetSocketAddress(jmHostname, 
blobPort));

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
index 793041f..ccf8b49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.JobManagerActorRef;
 import 
org.apache.flink.runtime.messages.JobClientMessages.JobManagerLeaderAddress;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.FiniteDuration;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
index bb448be..eb045c0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.client;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -136,9 +138,10 @@ public final class JobListeningContext {
                        // lazily initializes the class loader when it is needed
                        classLoader = JobClient.retrieveClassLoader(
                                jobID,
-                               getJobManager(),
+                               new AkkaJobManagerGateway(getJobManager()),
                                configuration,
-                               highAvailabilityServices);
+                               highAvailabilityServices,
+                               Time.milliseconds(timeout.toMillis()));
                        LOG.info("Reconstructed class loader for Job {}", 
jobID);
                }
                return classLoader;

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 7d9f452..4ca6e8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -22,8 +22,11 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.Status;
 import akka.dispatch.Futures;
+
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
@@ -32,7 +35,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
@@ -143,11 +146,14 @@ public class JobSubmissionClientActor extends 
JobClientActor {
                Futures.future(new Callable<Object>() {
                        @Override
                        public Object call() throws Exception {
-                               ActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManager, leaderSessionID);
+                               final ActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManager, leaderSessionID);
+                               final AkkaJobManagerGateway 
akkaJobManagerGateway = new AkkaJobManagerGateway(jobManagerGateway);
 
                                LOG.info("Upload jar files to job manager {}.", 
jobManager.path());
 
-                               final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = 
JobClient.retrieveBlobServerAddress(jobManagerGateway, timeout);
+                               final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = JobClient.retrieveBlobServerAddress(
+                                       akkaJobManagerGateway,
+                                       Time.milliseconds(timeout.toMillis()));
                                final InetSocketAddress blobServerAddress;
 
                                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
index f91f3fe..ae6bb39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/FatalErrorOccurred.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework.messages;
 
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index e0a9a0b..043c603 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.Preconditions;
 
 import akka.dispatch.OnComplete;
@@ -29,10 +30,12 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -292,6 +295,16 @@ public class FutureUtils {
                return result;
        }
 
+       /**
+        * Converts Flink time into a {@link FiniteDuration}.
+        *
+        * @param time to convert into a FiniteDuration
+        * @return FiniteDuration with the length of the given time
+        */
+       public static FiniteDuration toFiniteDuration(Time time) {
+               return new FiniteDuration(time.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+       }
+
        // 
------------------------------------------------------------------------
        //  Converting futures
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ae9b5f1..139f484 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -63,7 +63,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
index 01f1e75..ea69c44 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 import java.util.UUID;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
index 934234d..0ffac2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ProducerFailedException.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 /**
  * Network-stack level Exception to notify remote receiver about a failed

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f0327a3..1c68515 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -524,8 +524,8 @@ public class JobGraph implements Serializable {
 
        /**
         * Uploads the previously added user JAR files to the job manager 
through
-        * the job manager's BLOB server. The respective port is retrieved from 
the
-        * JobManager. This function issues a blocking call.
+        * the job manager's BLOB server. The BLOB servers' address is given as 
a
+        * parameter. This function issues a blocking call.
         *
         * @param blobServerAddress of the blob server to upload the jars to
         * @param blobClientConfig the blob client configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
new file mode 100644
index 0000000..cba7b06
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Public JobManager gateway.
+ *
+ * <p>This interface constitutes the operations an external component can
+ * trigger on the JobManager.
+ */
+public interface JobManagerGateway extends RpcGateway {
+
+       /**
+        * Requests the class loading properties for the given JobID.
+        *
+        * @param jobId for which the class loading properties are requested
+        * @param timeout for this operation
+        * @return Future containing the optional class loading properties if 
they could be retrieved from the JobManager.
+        */
+       CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> 
requestClassloadingProps(JobID jobId, Time timeout);
+
+       /**
+        * Requests the BlobServer port.
+        *
+        * @param timeout for this operation
+        * @return Future containing the BlobServer port
+        */
+       CompletableFuture<Integer> requestBlobServerPort(Time timeout);
+
+       /**
+        * Submits a job to the JobManager.
+        *
+        * @param jobGraph to submit
+        * @param listeningBehaviour of the client
+        * @param timeout for this operation
+        * @return Future containing an Acknowledge message if the submission 
succeeded
+        */
+       CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, 
ListeningBehaviour listeningBehaviour, Time timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e2e117a..cdae89f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -91,7 +91,7 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
index 830b751..7b0b55c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheck
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 /**
  * This message is sent from the {@link 
org.apache.flink.runtime.taskmanager.TaskManager} to the

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 9395435..bd66aa0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
deleted file mode 100644
index 63f4363..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedThrowable.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.lang.ref.WeakReference;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Utility class for dealing with user-defined Throwable types that are 
serialized (for
- * example during RPC/Actor communication), but cannot be resolved with the 
default
- * class loader.
- * 
- * <p>This exception mimics the original exception with respect to message and 
stack trace,
- * and contains the original exception in serialized form. The original 
exception
- * can be re-obtained by supplying the appropriate class loader.
- */
-public class SerializedThrowable extends Exception implements Serializable {
-       
-       private static final long serialVersionUID = 7284183123441947635L;
-       
-       /** The original exception in serialized form */
-       private final byte[] serializedException;
-       
-       /** Name of the original error class */
-       private final String originalErrorClassName;
-       
-       /** The original stack trace, to be printed */
-       private final String fullStringifiedStackTrace;
-
-       /** The original exception, not transported via serialization, 
-        * because the class may not be part of the system class loader.
-        * In addition, we make sure our cached references to not prevent
-        * unloading the exception class. */
-       private transient WeakReference<Throwable> cachedException;
-
-
-       /**
-        * Create a new SerializedThrowable.
-        * 
-        * @param exception The exception to serialize.
-        */
-       public SerializedThrowable(Throwable exception) {
-               this(exception, new HashSet<Throwable>());
-       }
-
-       private SerializedThrowable(Throwable exception, Set<Throwable> 
alreadySeen) {
-               super(getMessageOrError(exception));
-
-               if (!(exception instanceof SerializedThrowable)) {
-                       // serialize and memoize the original message
-                       byte[] serialized;
-                       try {
-                               serialized = 
InstantiationUtil.serializeObject(exception);
-                       }
-                       catch (Throwable t) {
-                               serialized = null;
-                       }
-                       this.serializedException = serialized;
-                       this.cachedException = new 
WeakReference<Throwable>(exception);
-
-                       // record the original exception's properties (name, 
stack prints)
-                       this.originalErrorClassName = 
exception.getClass().getName();
-                       this.fullStringifiedStackTrace = 
ExceptionUtils.stringifyException(exception);
-
-                       // mimic the original exception's stack trace
-                       setStackTrace(exception.getStackTrace());
-
-                       // mimic the original exception's cause
-                       if (exception.getCause() == null) {
-                               initCause(null);
-                       }
-                       else {
-                               // exception causes may by cyclic, so we 
truncate the cycle when we find it 
-                               if (alreadySeen.add(exception)) {
-                                       // we are not in a cycle, yet
-                                       initCause(new 
SerializedThrowable(exception.getCause(), alreadySeen));
-                               }
-                       }
-
-               }
-               else {
-                       // copy from that serialized throwable
-                       SerializedThrowable other = (SerializedThrowable) 
exception;
-                       this.serializedException = other.serializedException;
-                       this.originalErrorClassName = 
other.originalErrorClassName;
-                       this.fullStringifiedStackTrace = 
other.fullStringifiedStackTrace;
-                       this.cachedException = other.cachedException;
-                       this.setStackTrace(other.getStackTrace());
-                       this.initCause(other.getCause());
-               }
-       }
-
-       public Throwable deserializeError(ClassLoader classloader) {
-               if (serializedException == null) {
-                       // failed to serialize the original exception
-                       // return this SerializedThrowable as a stand in
-                       return this;
-               }
-
-               Throwable cached = cachedException == null ? null : 
cachedException.get();
-               if (cached == null) {
-                       try {
-                               cached = 
InstantiationUtil.deserializeObject(serializedException, classloader);
-                               cachedException = new 
WeakReference<Throwable>(cached);
-                       }
-                       catch (Throwable t) {
-                               // something went wrong
-                               // return this SerializedThrowable as a stand in
-                               return this;
-                       }
-               }
-               return cached;
-       }
-
-       public String getOriginalErrorClassName() {
-               return originalErrorClassName;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Override the behavior of Throwable
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public void printStackTrace(PrintStream s) {
-               s.print(fullStringifiedStackTrace);
-               s.flush();
-       }
-       
-       @Override
-       public void printStackTrace(PrintWriter s) {
-               s.print(fullStringifiedStackTrace);
-               s.flush();
-       }
-       
-       @Override
-       public String toString() {
-               String message = getLocalizedMessage();
-               return (message != null) ? (originalErrorClassName + ": " + 
message) : originalErrorClassName;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Static utilities
-       // 
------------------------------------------------------------------------
-
-       public static Throwable get(Throwable serThrowable, ClassLoader loader) 
{
-               if (serThrowable instanceof SerializedThrowable) {
-                       return 
((SerializedThrowable)serThrowable).deserializeError(loader);
-               } else {
-                       return serThrowable;
-               }
-       }
-
-       private static String getMessageOrError(Throwable error) {
-               try {
-                       return error.getMessage();
-               }
-               catch (Throwable t) {
-                       return "(failed to get message)";
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 80fa506..fc668d6 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -84,7 +84,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
-import org.apache.flink.util.{InstantiationUtil, NetUtils}
+import org.apache.flink.util.{InstantiationUtil, NetUtils, SerializedThrowable}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 4db2584..52f3777 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -32,7 +32,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID
 import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, 
JobStatus, JobVertexID}
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph
 import org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage
-import org.apache.flink.runtime.util.SerializedThrowable
+import org.apache.flink.util.SerializedThrowable
 
 import scala.collection.JavaConverters._
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index f0a96ca..c5c87ac 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -26,10 +26,11 @@ import akka.pattern.Patterns.gracefulStop
 import akka.pattern.ask
 import akka.actor.{ActorRef, ActorSystem}
 import com.typesafe.config.Config
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.{JobExecutionResult, JobID, 
JobSubmissionResult}
 import org.apache.flink.configuration.{AkkaOptions, ConfigConstants, 
Configuration, JobManagerOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
-import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
 import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader
@@ -582,10 +583,11 @@ abstract class FlinkMiniCluster(
           e)
       }
 
-    JobClient.submitJobDetached(jobManagerGateway,
+    JobClient.submitJobDetached(
+      new AkkaJobManagerGateway(jobManagerGateway),
       configuration,
       jobGraph,
-      timeout,
+      Time.milliseconds(timeout.toMillis),
       userCodeClassLoader)
 
     new JobSubmissionResult(jobGraph.getJobID)

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
index ca2de0c..6bff0f6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ProducerFailedExceptionTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.SerializedThrowable;
 
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dfaec337/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
index 45988f5..0f309b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedThrowableTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedThrowable;
 
 import org.junit.Test;
 

Reply via email to