http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
deleted file mode 100644
index dcf542a..0000000
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.client.program;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.FlinkPlan;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobClient;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
-import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
-import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorSystem;
-
-/**
- * Encapsulates the functionality necessary to submit a program to a remote 
cluster.
- */
-public class Client {
-
-       private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-
-       /** The optimizer used in the optimization of batch programs */
-       final Optimizer compiler;
-
-       /** The actor system used to communicate with the JobManager */
-       private final ActorSystem actorSystem;
-
-       /** Configuration of the client */
-       private final Configuration config;
-
-       /** Timeout for futures */
-       private final FiniteDuration timeout;
-
-       /** Lookup timeout for the job manager retrieval service */
-       private final FiniteDuration lookupTimeout;
-
-       /**
-        * If != -1, this field specifies the total number of available slots 
on the cluster
-        * connected to the client.
-        */
-       private final int maxSlots;
-
-       /** Flag indicating whether to sysout print execution updates */
-       private boolean printStatusDuringExecution = true;
-
-       /**
-        * For interactive invocations, the Job ID is only available after the 
ContextEnvironment has
-        * been run inside the user JAR. We pass the Client to every instance 
of the ContextEnvironment
-        * which lets us access the last JobID here.
-        */
-       private JobID lastJobID;
-
-       // 
------------------------------------------------------------------------
-       //                            Construction
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a instance that submits the programs to the JobManager 
defined in the
-        * configuration. This method will try to resolve the JobManager 
hostname and throw an exception
-        * if that is not possible.
-        *
-        * @param config The config used to obtain the job-manager's address, 
and used to configure the optimizer.
-        *
-        * @throws java.io.IOException Thrown, if the client's actor system 
could not be started.
-        * @throws java.net.UnknownHostException Thrown, if the JobManager's 
hostname could not be resolved.
-        */
-       public Client(Configuration config) throws IOException {
-               this(config, -1);
-       }
-
-       /**
-        * Creates a new instance of the class that submits the jobs to a 
job-manager.
-        * at the given address using the default port.
-        *
-        * @param config The configuration for the client-side processes, like 
the optimizer.
-        * @param maxSlots maxSlots The number of maxSlots on the cluster if != 
-1.
-        *
-        * @throws java.io.IOException Thrown, if the client's actor system 
could not be started.
-        * @throws java.net.UnknownHostException Thrown, if the JobManager's 
hostname could not be resolved.
-        */
-       public Client(Configuration config, int maxSlots) throws IOException {
-               this.config = Preconditions.checkNotNull(config);
-               this.compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), config);
-               this.maxSlots = maxSlots;
-
-               LOG.info("Starting client actor system");
-
-               try {
-                       this.actorSystem = 
JobClient.startJobClientActorSystem(config);
-               } catch (Exception e) {
-                       throw new IOException("Could start client actor 
system.", e);
-               }
-
-               timeout = AkkaUtils.getClientTimeout(config);
-               lookupTimeout = AkkaUtils.getLookupTimeout(config);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Startup & Shutdown
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Shuts down the client. This stops the internal actor system and 
actors.
-        */
-       public void shutdown() {
-               if (!this.actorSystem.isTerminated()) {
-                       this.actorSystem.shutdown();
-                       this.actorSystem.awaitTermination();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Configuration
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Configures whether the client should print progress updates during 
the execution to {@code System.out}.
-        * All updates are logged via the SLF4J loggers regardless of this 
setting.
-        *
-        * @param print True to print updates to standard out during execution, 
false to not print them.
-        */
-       public void setPrintStatusDuringExecution(boolean print) {
-               this.printStatusDuringExecution = print;
-       }
-
-       /**
-        * @return whether the client will print progress updates during the 
execution to {@code System.out}
-        */
-       public boolean getPrintStatusDuringExecution() {
-               return this.printStatusDuringExecution;
-       }
-
-       /**
-        * @return -1 if unknown. The maximum number of available processing 
slots at the Flink cluster
-        * connected to this client.
-        */
-       public int getMaxSlots() {
-               return this.maxSlots;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Access to the Program's Plan
-       // 
------------------------------------------------------------------------
-
-       public static String getOptimizedPlanAsJson(Optimizer compiler, 
PackagedProgram prog, int parallelism)
-                       throws CompilerException, ProgramInvocationException
-       {
-               PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-               return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) 
getOptimizedPlan(compiler, prog, parallelism));
-       }
-
-       public static FlinkPlan getOptimizedPlan(Optimizer compiler, 
PackagedProgram prog, int parallelism)
-                       throws CompilerException, ProgramInvocationException
-       {
-               
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-               if (prog.isUsingProgramEntryPoint()) {
-                       return getOptimizedPlan(compiler, 
prog.getPlanWithJars(), parallelism);
-               } else if (prog.isUsingInteractiveMode()) {
-                       // temporary hack to support the optimizer plan preview
-                       OptimizerPlanEnvironment env = new 
OptimizerPlanEnvironment(compiler);
-                       if (parallelism > 0) {
-                               env.setParallelism(parallelism);
-                       }
-
-                       return env.getOptimizedPlan(prog);
-               } else {
-                       throw new RuntimeException("Couldn't determine program 
mode.");
-               }
-       }
-
-       public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan 
p, int parallelism) throws CompilerException {
-               if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
-                       LOG.debug("Changing plan default parallelism from {} to 
{}", p.getDefaultParallelism(), parallelism);
-                       p.setDefaultParallelism(parallelism);
-               }
-               LOG.debug("Set parallelism {}, plan default parallelism {}", 
parallelism, p.getDefaultParallelism());
-
-               return compiler.compile(p);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Program submission / execution
-       // 
------------------------------------------------------------------------
-
-       public JobSubmissionResult runBlocking(PackagedProgram prog, int 
parallelism) throws ProgramInvocationException {
-               
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-               if (prog.isUsingProgramEntryPoint()) {
-                       return runBlocking(prog.getPlanWithJars(), parallelism, 
prog.getSavepointPath());
-               }
-               else if (prog.isUsingInteractiveMode()) {
-                       LOG.info("Starting program in interactive mode");
-                       ContextEnvironment.setAsContext(new 
ContextEnvironmentFactory(this, prog.getAllLibraries(),
-                                       prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, true,
-                                       prog.getSavepointPath()));
-
-                       // invoke here
-                       try {
-                               prog.invokeInteractiveModeForExecution();
-                       }
-                       finally {
-                               ContextEnvironment.unsetContext();
-                       }
-
-                       return new JobSubmissionResult(lastJobID);
-               }
-               else {
-                       throw new RuntimeException();
-               }
-       }
-
-       public JobSubmissionResult runDetached(PackagedProgram prog, int 
parallelism)
-                       throws ProgramInvocationException
-       {
-               
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
-               if (prog.isUsingProgramEntryPoint()) {
-                       return runDetached(prog.getPlanWithJars(), parallelism, 
prog.getSavepointPath());
-               }
-               else if (prog.isUsingInteractiveMode()) {
-                       LOG.info("Starting program in interactive mode");
-                       ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, prog.getAllLibraries(),
-                                       prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, false,
-                                       prog.getSavepointPath());
-                       ContextEnvironment.setAsContext(factory);
-
-                       // invoke here
-                       try {
-                               prog.invokeInteractiveModeForExecution();
-                               return ((DetachedEnvironment) 
factory.getLastEnvCreated()).finalizeExecute();
-                       }
-                       finally {
-                               ContextEnvironment.unsetContext();
-                       }
-               }
-               else {
-                       throw new RuntimeException("PackagedProgram does not 
have a valid invocation mode.");
-               }
-       }
-
-       public JobExecutionResult runBlocking(JobWithJars program, int 
parallelism) throws ProgramInvocationException {
-               return runBlocking(program, parallelism, null);
-       }
-
-       /**
-        * Runs a program on the Flink cluster to which this client is 
connected. The call blocks until the
-        * execution is complete, and returns afterwards.
-        *
-        * @param program The program to be executed.
-        * @param parallelism The default parallelism to use when running the 
program. The default parallelism is used
-        *                    when the program does not set a parallelism by 
itself.
-        *
-        * @throws CompilerException Thrown, if the compiler encounters an 
illegal situation.
-        * @throws ProgramInvocationException Thrown, if the program could not 
be instantiated from its jar file,
-        *                                    or if the submission failed. That 
might be either due to an I/O problem,
-        *                                    i.e. the job-manager is 
unreachable, or due to the fact that the
-        *                                    parallel execution failed.
-        */
-       public JobExecutionResult runBlocking(JobWithJars program, int 
parallelism, String savepointPath)
-                       throws CompilerException, ProgramInvocationException {
-               ClassLoader classLoader = program.getUserCodeClassLoader();
-               if (classLoader == null) {
-                       throw new IllegalArgumentException("The given 
JobWithJars does not provide a usercode class loader.");
-               }
-
-               OptimizedPlan optPlan = getOptimizedPlan(compiler, program, 
parallelism);
-               return runBlocking(optPlan, program.getJarFiles(), 
program.getClasspaths(), classLoader, savepointPath);
-       }
-
-       public JobSubmissionResult runDetached(JobWithJars program, int 
parallelism) throws ProgramInvocationException {
-               return runDetached(program, parallelism, null);
-       }
-
-       /**
-        * Submits a program to the Flink cluster to which this client is 
connected. The call returns after the
-        * program was submitted and does not wait for the program to complete.
-        *
-        * @param program The program to be executed.
-        * @param parallelism The default parallelism to use when running the 
program. The default parallelism is used
-        *                    when the program does not set a parallelism by 
itself.
-        *
-        * @throws CompilerException Thrown, if the compiler encounters an 
illegal situation.
-        * @throws ProgramInvocationException Thrown, if the program could not 
be instantiated from its jar file,
-        *                                    or if the submission failed. That 
might be either due to an I/O problem,
-        *                                    i.e. the job-manager is 
unreachable.
-        */
-       public JobSubmissionResult runDetached(JobWithJars program, int 
parallelism, String savepointPath)
-                       throws CompilerException, ProgramInvocationException {
-               ClassLoader classLoader = program.getUserCodeClassLoader();
-               if (classLoader == null) {
-                       throw new IllegalArgumentException("The given 
JobWithJars does not provide a usercode class loader.");
-               }
-
-               OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, 
program, parallelism);
-               return runDetached(optimizedPlan, program.getJarFiles(), 
program.getClasspaths(), classLoader, savepointPath);
-       }
-
-       public JobExecutionResult runBlocking(
-                       FlinkPlan compiledPlan, List<URL> libraries, List<URL> 
classpaths, ClassLoader classLoader) throws ProgramInvocationException {
-               return runBlocking(compiledPlan, libraries, classpaths, 
classLoader, null);
-       }
-
-       public JobExecutionResult runBlocking(FlinkPlan compiledPlan, List<URL> 
libraries, List<URL> classpaths,
-                       ClassLoader classLoader, String savepointPath) throws 
ProgramInvocationException
-       {
-               JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, 
savepointPath);
-               return runBlocking(job, classLoader);
-       }
-
-       public JobSubmissionResult runDetached(FlinkPlan compiledPlan, 
List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws 
ProgramInvocationException {
-               return runDetached(compiledPlan, libraries, classpaths, 
classLoader, null);
-       }
-
-       public JobSubmissionResult runDetached(FlinkPlan compiledPlan, 
List<URL> libraries, List<URL> classpaths,
-                       ClassLoader classLoader, String savepointPath) throws 
ProgramInvocationException
-       {
-               JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, 
savepointPath);
-               return runDetached(job, classLoader);
-       }
-
-       public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
-               LeaderRetrievalService leaderRetrievalService;
-               try {
-                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(config);
-               } catch (Exception e) {
-                       throw new ProgramInvocationException("Could not create 
the leader retrieval service.", e);
-               }
-
-               try {
-                       this.lastJobID = jobGraph.getJobID();
-                       return JobClient.submitJobAndWait(actorSystem, 
leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, 
classLoader);
-               } catch (JobExecutionException e) {
-                       throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
-               }
-       }
-
-       public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
-               ActorGateway jobManagerGateway;
-
-               try {
-                       jobManagerGateway = getJobManagerGateway();
-               } catch (Exception e) {
-                       throw new ProgramInvocationException("Failed to 
retrieve the JobManager gateway.", e);
-               }
-
-               LOG.info("Checking and uploading JAR files");
-               try {
-                       JobClient.uploadJarFiles(jobGraph, jobManagerGateway, 
timeout);
-               }
-               catch (IOException e) {
-                       throw new ProgramInvocationException("Could not upload 
the program's JAR files to the JobManager.", e);
-               }
-               try {
-                       this.lastJobID = jobGraph.getJobID();
-                       JobClient.submitJobDetached(jobManagerGateway, 
jobGraph, timeout, classLoader);
-                       return new JobSubmissionResult(jobGraph.getJobID());
-               } catch (JobExecutionException e) {
-                               throw new ProgramInvocationException("The 
program execution failed: " + e.getMessage(), e);
-               }
-       }
-
-       /**
-        * Cancels a job identified by the job id.
-        * @param jobId the job id
-        * @throws Exception In case an error occurred.
-        */
-       public void cancel(JobID jobId) throws Exception {
-               final ActorGateway jobManagerGateway = getJobManagerGateway();
-
-               final Future<Object> response;
-               try {
-                       response = jobManagerGateway.ask(new 
JobManagerMessages.CancelJob(jobId), timeout);
-               } catch (final Exception e) {
-                       throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
-               }
-
-               final Object result = Await.result(response, timeout);
-
-               if (result instanceof JobManagerMessages.CancellationSuccess) {
-                       LOG.info("Job cancellation with ID " + jobId + " 
succeeded.");
-               } else if (result instanceof 
JobManagerMessages.CancellationFailure) {
-                       final Throwable t = 
((JobManagerMessages.CancellationFailure) result).cause();
-                       LOG.info("Job cancellation with ID " + jobId + " 
failed.", t);
-                       throw new Exception("Failed to cancel the job because 
of \n" + t.getMessage());
-               } else {
-                       throw new Exception("Unknown message received while 
cancelling: " + result.getClass().getName());
-               }
-       }
-
-       /**
-        * Stops a program on Flink cluster whose job-manager is configured in 
this client's configuration.
-        * Stopping works only for streaming programs. Be aware, that the 
program might continue to run for
-        * a while after sending the stop command, because after sources 
stopped to emit data all operators
-        * need to finish processing.
-        * 
-        * @param jobId
-        *            the job ID of the streaming program to stop
-        * @throws Exception
-        *             If the job ID is invalid (ie, is unknown or refers to a 
batch job) or if sending the stop signal
-        *             failed. That might be due to an I/O problem, ie, the 
job-manager is unreachable.
-        */
-       public void stop(final JobID jobId) throws Exception {
-               final ActorGateway jobManagerGateway = getJobManagerGateway();
-
-               final Future<Object> response;
-               try {
-                       response = jobManagerGateway.ask(new 
JobManagerMessages.StopJob(jobId), timeout);
-               } catch (final Exception e) {
-                       throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
-               }
-
-               final Object result = Await.result(response, timeout);
-
-               if (result instanceof JobManagerMessages.StoppingSuccess) {
-                       LOG.info("Job stopping with ID " + jobId + " 
succeeded.");
-               } else if (result instanceof 
JobManagerMessages.StoppingFailure) {
-                       final Throwable t = 
((JobManagerMessages.StoppingFailure) result).cause();
-                       LOG.info("Job stopping with ID " + jobId + " failed.", 
t);
-                       throw new Exception("Failed to stop the job because of 
\n" + t.getMessage());
-               } else {
-                       throw new Exception("Unknown message received while 
stopping: " + result.getClass().getName());
-               }
-       }
-
-       /**
-        * Requests and returns the accumulators for the given job identifier. 
Accumulators can be
-        * requested while a is running or after it has finished. The default 
class loader is used
-        * to deserialize the incoming accumulator results.
-        * @param jobID The job identifier of a job.
-        * @return A Map containing the accumulator's name and its value.
-        */
-       public Map<String, Object> getAccumulators(JobID jobID) throws 
Exception {
-               return getAccumulators(jobID, 
ClassLoader.getSystemClassLoader());
-       }
-
-       /**
-        * Requests and returns the accumulators for the given job identifier. 
Accumulators can be
-        * requested while a is running or after it has finished.
-        * @param jobID The job identifier of a job.
-        * @param loader The class loader for deserializing the accumulator 
results.
-        * @return A Map containing the accumulator's name and its value.
-        */
-       public Map<String, Object> getAccumulators(JobID jobID, ClassLoader 
loader) throws Exception {
-               ActorGateway jobManagerGateway = getJobManagerGateway();
-
-               Future<Object> response;
-               try {
-                       response = jobManagerGateway.ask(new 
RequestAccumulatorResults(jobID), timeout);
-               } catch (Exception e) {
-                       throw new Exception("Failed to query the job manager 
gateway for accumulators.", e);
-               }
-
-               Object result = Await.result(response, timeout);
-
-               if (result instanceof AccumulatorResultsFound) {
-                       Map<String, SerializedValue<Object>> 
serializedAccumulators =
-                                       ((AccumulatorResultsFound) 
result).result();
-
-                       return 
AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
-
-               } else if (result instanceof AccumulatorResultsErroneous) {
-                       throw ((AccumulatorResultsErroneous) result).cause();
-               } else {
-                       throw new Exception("Failed to fetch accumulators for 
the job " + jobID + ".");
-               }
-       }
-
-
-       // 
------------------------------------------------------------------------
-       //  Sessions
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Tells the JobManager to finish the session (job) defined by the 
given ID.
-        * 
-        * @param jobId The ID that identifies the session.
-        */
-       public void endSession(JobID jobId) throws Exception {
-               if (jobId == null) {
-                       throw new IllegalArgumentException("The JobID must not 
be null.");
-               }
-               endSessions(Collections.singletonList(jobId));
-       }
-
-       /**
-        * Tells the JobManager to finish the sessions (jobs) defined by the 
given IDs.
-        *
-        * @param jobIds The IDs that identify the sessions.
-        */
-       public void endSessions(List<JobID> jobIds) throws Exception {
-               if (jobIds == null) {
-                       throw new IllegalArgumentException("The JobIDs must not 
be null");
-               }
-
-               ActorGateway jobManagerGateway = getJobManagerGateway();
-               
-               for (JobID jid : jobIds) {
-                       if (jid != null) {
-                               LOG.info("Telling job manager to end the 
session {}.", jid);
-                               jobManagerGateway.tell(new 
JobManagerMessages.RemoveCachedJob(jid));
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Internal translation methods
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates the optimized plan for a given program, using this client's 
compiler.
-        *
-        * @param prog The program to be compiled.
-        * @return The compiled and optimized plan, as returned by the compiler.
-        * @throws CompilerException Thrown, if the compiler encounters an 
illegal situation.
-        * @throws ProgramInvocationException Thrown, if the program could not 
be instantiated from its jar file.
-        */
-       private static OptimizedPlan getOptimizedPlan(Optimizer compiler, 
JobWithJars prog, int parallelism)
-                       throws CompilerException, ProgramInvocationException {
-               return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
-       }
-
-       public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) 
throws ProgramInvocationException {
-               return getJobGraph(optPlan, prog.getAllLibraries(), 
prog.getClasspaths(), null);
-       }
-
-       public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, 
String savepointPath) throws ProgramInvocationException {
-               return getJobGraph(optPlan, prog.getAllLibraries(), 
prog.getClasspaths(), savepointPath);
-       }
-
-       private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, 
List<URL> classpaths, String savepointPath) {
-               JobGraph job;
-               if (optPlan instanceof StreamingPlan) {
-                       job = ((StreamingPlan) optPlan).getJobGraph();
-                       job.setSavepointPath(savepointPath);
-               } else {
-                       JobGraphGenerator gen = new 
JobGraphGenerator(this.config);
-                       job = gen.compileJobGraph((OptimizedPlan) optPlan);
-               }
-
-               for (URL jar : jarFiles) {
-                       try {
-                               job.addJar(new Path(jar.toURI()));
-                       } catch (URISyntaxException e) {
-                               throw new RuntimeException("URL is invalid. 
This should not happen.", e);
-                       }
-               }
- 
-               job.setClasspaths(classpaths);
-
-               return job;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Helper methods
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Returns the {@link ActorGateway} of the current job manager leader 
using
-        * the {@link LeaderRetrievalService}.
-        *
-        * @return ActorGateway of the current job manager leader
-        * @throws Exception
-        */
-       private ActorGateway getJobManagerGateway() throws Exception {
-               LOG.info("Looking up JobManager");
-               LeaderRetrievalService leaderRetrievalService;
-
-               leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(config);
-
-               return LeaderRetrievalUtils.retrieveLeaderGateway(
-                       leaderRetrievalService,
-                       actorSystem,
-                       lookupTimeout);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
new file mode 100644
index 0000000..b56428d
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import akka.actor.ActorRef;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorSystem;
+
+
+/**
+ * Encapsulates the functionality necessary to submit a program to a remote 
cluster.
+ */
+public abstract class ClusterClient {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ClusterClient.class);
+
+       /** The optimizer used in the optimization of batch programs */
+       final Optimizer compiler;
+
+       /** The actor system used to communicate with the JobManager */
+       protected final ActorSystem actorSystem;
+
+       /** Configuration of the client */
+       protected final Configuration flinkConfig;
+
+       /** Timeout for futures */
+       protected final FiniteDuration timeout;
+
+       /** Lookup timeout for the job manager retrieval service */
+       private final FiniteDuration lookupTimeout;
+
+       /** Flag indicating whether to sysout print execution updates */
+       private boolean printStatusDuringExecution = true;
+
+       /**
+        * For interactive invocations, the Job ID is only available after the 
ContextEnvironment has
+        * been run inside the user JAR. We pass the Client to every instance 
of the ContextEnvironment
+        * which lets us access the last JobID here.
+        */
+       private JobID lastJobID;
+
+       /** Switch for blocking/detached job submission of the client */
+       private boolean detachedJobSubmission = false;
+
+       // 
------------------------------------------------------------------------
+       //                            Construction
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a instance that submits the programs to the JobManager 
defined in the
+        * configuration. This method will try to resolve the JobManager 
hostname and throw an exception
+        * if that is not possible.
+        *
+        * @param flinkConfig The config used to obtain the job-manager's 
address, and used to configure the optimizer.
+        *
+        * @throws java.io.IOException Thrown, if the client's actor system 
could not be started.
+        */
+       public ClusterClient(Configuration flinkConfig) throws IOException {
+
+               this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+               this.compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), flinkConfig);
+
+               this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
+               this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
+
+               this.actorSystem = createActorSystem();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Startup & Shutdown
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Method to create the ActorSystem of the Client. May be overriden in 
subclasses.
+        * @return ActorSystem
+        * @throws IOException
+        */
+       protected ActorSystem createActorSystem() throws IOException {
+
+               if (actorSystem != null) {
+                       throw new RuntimeException("This method may only be 
called once.");
+               }
+
+               // start actor system
+               LOG.info("Starting client actor system.");
+
+               String hostName = 
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+               int port = 
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+               if (hostName == null || port == -1) {
+                       throw new IOException("The initial JobManager address 
has not been set correctly.");
+               }
+               InetSocketAddress initialJobManagerAddress = new 
InetSocketAddress(hostName, port);
+
+               // find name of own public interface, able to connect to the JM
+               // try to find address for 2 seconds. log after 400 ms.
+               InetAddress ownHostname = 
ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400);
+               return AkkaUtils.createActorSystem(flinkConfig,
+                       new Some<>(new Tuple2<String, 
Object>(ownHostname.getCanonicalHostName(), 0)));
+       }
+
+       /**
+        * Shuts down the client. This stops the internal actor system and 
actors.
+        */
+       public void shutdown() {
+               try {
+                       finalizeCluster();
+               } finally {
+                       if (!this.actorSystem.isTerminated()) {
+                               this.actorSystem.shutdown();
+                               this.actorSystem.awaitTermination();
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Configuration
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Configures whether the client should print progress updates during 
the execution to {@code System.out}.
+        * All updates are logged via the SLF4J loggers regardless of this 
setting.
+        *
+        * @param print True to print updates to standard out during execution, 
false to not print them.
+        */
+       public void setPrintStatusDuringExecution(boolean print) {
+               this.printStatusDuringExecution = print;
+       }
+
+       /**
+        * @return whether the client will print progress updates during the 
execution to {@code System.out}
+        */
+       public boolean getPrintStatusDuringExecution() {
+               return this.printStatusDuringExecution;
+       }
+
+       /**
+        * Gets the current JobManager address from the Flink configuration 
(may change in case of a HA setup).
+        * @return The address (host and port) of the leading JobManager
+        */
+       public InetSocketAddress getJobManagerAddressFromConfig() {
+               try {
+               String hostName = 
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+               int port = 
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+               return new InetSocketAddress(hostName, port);
+               } catch (Exception e) {
+                       throw new RuntimeException("Failed to retrieve 
JobManager address", e);
+               }
+       }
+
+       /**
+        * Gets the current JobManager address (may change in case of a HA 
setup).
+        * @return The address (host and port) of the leading JobManager
+        */
+       public InetSocketAddress getJobManagerAddress() {
+               try {
+                       final ActorRef jmActor = getJobManagerGateway().actor();
+                       return 
AkkaUtils.getInetSockeAddressFromAkkaURL(jmActor.path().toSerializationFormat());
+               } catch (Exception e) {
+                       throw new RuntimeException("Failed to retrieve 
JobManager address", e);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Access to the Program's Plan
+       // 
------------------------------------------------------------------------
+
+       public static String getOptimizedPlanAsJson(Optimizer compiler, 
PackagedProgram prog, int parallelism)
+                       throws CompilerException, ProgramInvocationException
+       {
+               PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+               return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) 
getOptimizedPlan(compiler, prog, parallelism));
+       }
+
+       public static FlinkPlan getOptimizedPlan(Optimizer compiler, 
PackagedProgram prog, int parallelism)
+                       throws CompilerException, ProgramInvocationException
+       {
+               
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+               if (prog.isUsingProgramEntryPoint()) {
+                       return getOptimizedPlan(compiler, 
prog.getPlanWithJars(), parallelism);
+               } else if (prog.isUsingInteractiveMode()) {
+                       // temporary hack to support the optimizer plan preview
+                       OptimizerPlanEnvironment env = new 
OptimizerPlanEnvironment(compiler);
+                       if (parallelism > 0) {
+                               env.setParallelism(parallelism);
+                       }
+
+                       return env.getOptimizedPlan(prog);
+               } else {
+                       throw new RuntimeException("Couldn't determine program 
mode.");
+               }
+       }
+
+       public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan 
p, int parallelism) throws CompilerException {
+               if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
+                       LOG.debug("Changing plan default parallelism from {} to 
{}", p.getDefaultParallelism(), parallelism);
+                       p.setDefaultParallelism(parallelism);
+               }
+               LOG.debug("Set parallelism {}, plan default parallelism {}", 
parallelism, p.getDefaultParallelism());
+
+               return compiler.compile(p);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Program submission / execution
+       // 
------------------------------------------------------------------------
+
+       /**
+        * General purpose method to run a user jar from the CliFrontend in 
either blocking or detached mode, depending
+        * on whether {@code setDetached(true)} or {@code setDetached(false)}.
+        * @param prog the packaged program
+        * @param parallelism the parallelism to execute the contained Flink job
+        * @return The result of the execution
+        * @throws ProgramInvocationException
+        */
+       public JobSubmissionResult run(PackagedProgram prog, int parallelism)
+                       throws ProgramInvocationException
+       {
+               
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+               if (prog.isUsingProgramEntryPoint()) {
+                       return run(prog.getPlanWithJars(), parallelism, 
prog.getSavepointPath());
+               }
+               else if (prog.isUsingInteractiveMode()) {
+                       LOG.info("Starting program in interactive mode");
+                       ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(this, prog.getAllLibraries(),
+                                       prog.getClasspaths(), 
prog.getUserCodeClassLoader(), parallelism, isDetached(),
+                                       prog.getSavepointPath());
+                       ContextEnvironment.setAsContext(factory);
+
+                       try {
+                               // invoke main method
+                               prog.invokeInteractiveModeForExecution();
+                               if (isDetached()) {
+                                       // in detached mode, we execute the 
whole user code to extract the Flink job, afterwards we run it here
+                                       return ((DetachedEnvironment) 
factory.getLastEnvCreated()).finalizeExecute();
+                               }
+                               else {
+                                       // in blocking mode, we execute all 
Flink jobs contained in the user code and then return here
+                                       return new 
JobSubmissionResult(lastJobID);
+                               }
+                       }
+                       finally {
+                               ContextEnvironment.unsetContext();
+                       }
+               }
+               else {
+                       throw new RuntimeException("PackagedProgram does not 
have a valid invocation mode.");
+               }
+       }
+
+       public JobSubmissionResult run(JobWithJars program, int parallelism) 
throws ProgramInvocationException {
+               return run(program, parallelism, null);
+       }
+
+       /**
+        * Runs a program on the Flink cluster to which this client is 
connected. The call blocks until the
+        * execution is complete, and returns afterwards.
+        *
+        * @param program The program to be executed.
+        * @param parallelism The default parallelism to use when running the 
program. The default parallelism is used
+        *                    when the program does not set a parallelism by 
itself.
+        *
+        * @throws CompilerException Thrown, if the compiler encounters an 
illegal situation.
+        * @throws ProgramInvocationException Thrown, if the program could not 
be instantiated from its jar file,
+        *                                    or if the submission failed. That 
might be either due to an I/O problem,
+        *                                    i.e. the job-manager is 
unreachable, or due to the fact that the
+        *                                    parallel execution failed.
+        */
+       public JobSubmissionResult run(JobWithJars program, int parallelism, 
String savepointPath)
+                       throws CompilerException, ProgramInvocationException {
+               ClassLoader classLoader = program.getUserCodeClassLoader();
+               if (classLoader == null) {
+                       throw new IllegalArgumentException("The given 
JobWithJars does not provide a usercode class loader.");
+               }
+
+               OptimizedPlan optPlan = getOptimizedPlan(compiler, program, 
parallelism);
+               return run(optPlan, program.getJarFiles(), 
program.getClasspaths(), classLoader, savepointPath);
+       }
+
+       public JobSubmissionResult run(
+                       FlinkPlan compiledPlan, List<URL> libraries, List<URL> 
classpaths, ClassLoader classLoader) throws ProgramInvocationException {
+               return run(compiledPlan, libraries, classpaths, classLoader, 
null);
+       }
+
+       public JobSubmissionResult run(FlinkPlan compiledPlan,
+                       List<URL> libraries, List<URL> classpaths, ClassLoader 
classLoader, String savepointPath)
+               throws ProgramInvocationException
+       {
+               JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, 
savepointPath);
+               return submitJob(job, classLoader);
+       }
+
+       /**
+        * Submits a JobGraph blocking.
+        * @param jobGraph The JobGraph
+        * @param classLoader User code class loader to deserialize the results 
and errors (may contain custom classes).
+        * @return JobExecutionResult
+        * @throws ProgramInvocationException
+        */
+       public JobExecutionResult run(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+               LeaderRetrievalService leaderRetrievalService;
+               try {
+                       leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
+               } catch (Exception e) {
+                       throw new ProgramInvocationException("Could not create 
the leader retrieval service", e);
+               }
+
+               try {
+                       this.lastJobID = jobGraph.getJobID();
+                       return JobClient.submitJobAndWait(actorSystem,
+                               leaderRetrievalService, jobGraph, timeout, 
printStatusDuringExecution, classLoader);
+               } catch (JobExecutionException e) {
+                       throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
+               }
+       }
+
+       /**
+        * Submits a JobGraph detached.
+        * @param jobGraph The JobGraph
+        * @param classLoader User code class loader to deserialize the results 
and errors (may contain custom classes).
+        * @return JobSubmissionResult
+        * @throws ProgramInvocationException
+        */
+       public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+               ActorGateway jobManagerGateway;
+
+               try {
+                       jobManagerGateway = getJobManagerGateway();
+               } catch (Exception e) {
+                       throw new ProgramInvocationException("Failed to 
retrieve the JobManager gateway.", e);
+               }
+
+               try {
+                       JobClient.submitJobDetached(jobManagerGateway, 
jobGraph, timeout, classLoader);
+                       return new JobSubmissionResult(jobGraph.getJobID());
+               } catch (JobExecutionException e) {
+                       throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
+               }
+       }
+
+       /**
+        * Cancels a job identified by the job id.
+        * @param jobId the job id
+        * @throws Exception In case an error occurred.
+        */
+       public void cancel(JobID jobId) throws Exception {
+               final ActorGateway jobManagerGateway = getJobManagerGateway();
+
+               final Future<Object> response;
+               try {
+                       response = jobManagerGateway.ask(new 
JobManagerMessages.CancelJob(jobId), timeout);
+               } catch (final Exception e) {
+                       throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
+               }
+
+               final Object result = Await.result(response, timeout);
+
+               if (result instanceof JobManagerMessages.CancellationSuccess) {
+                       LOG.info("Job cancellation with ID " + jobId + " 
succeeded.");
+               } else if (result instanceof 
JobManagerMessages.CancellationFailure) {
+                       final Throwable t = 
((JobManagerMessages.CancellationFailure) result).cause();
+                       LOG.info("Job cancellation with ID " + jobId + " 
failed.", t);
+                       throw new Exception("Failed to cancel the job because 
of \n" + t.getMessage());
+               } else {
+                       throw new Exception("Unknown message received while 
cancelling: " + result.getClass().getName());
+               }
+       }
+
+       /**
+        * Stops a program on Flink cluster whose job-manager is configured in 
this client's configuration.
+        * Stopping works only for streaming programs. Be aware, that the 
program might continue to run for
+        * a while after sending the stop command, because after sources 
stopped to emit data all operators
+        * need to finish processing.
+        * 
+        * @param jobId
+        *            the job ID of the streaming program to stop
+        * @throws Exception
+        *             If the job ID is invalid (ie, is unknown or refers to a 
batch job) or if sending the stop signal
+        *             failed. That might be due to an I/O problem, ie, the 
job-manager is unreachable.
+        */
+       public void stop(final JobID jobId) throws Exception {
+               final ActorGateway jobManagerGateway = getJobManagerGateway();
+
+               final Future<Object> response;
+               try {
+                       response = jobManagerGateway.ask(new 
JobManagerMessages.StopJob(jobId), timeout);
+               } catch (final Exception e) {
+                       throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
+               }
+
+               final Object result = Await.result(response, timeout);
+
+               if (result instanceof JobManagerMessages.StoppingSuccess) {
+                       LOG.info("Job stopping with ID " + jobId + " 
succeeded.");
+               } else if (result instanceof 
JobManagerMessages.StoppingFailure) {
+                       final Throwable t = 
((JobManagerMessages.StoppingFailure) result).cause();
+                       LOG.info("Job stopping with ID " + jobId + " failed.", 
t);
+                       throw new Exception("Failed to stop the job because of 
\n" + t.getMessage());
+               } else {
+                       throw new Exception("Unknown message received while 
stopping: " + result.getClass().getName());
+               }
+       }
+
+       /**
+        * Requests and returns the accumulators for the given job identifier. 
Accumulators can be
+        * requested while a is running or after it has finished. The default 
class loader is used
+        * to deserialize the incoming accumulator results.
+        * @param jobID The job identifier of a job.
+        * @return A Map containing the accumulator's name and its value.
+        */
+       public Map<String, Object> getAccumulators(JobID jobID) throws 
Exception {
+               return getAccumulators(jobID, 
ClassLoader.getSystemClassLoader());
+       }
+
+       /**
+        * Requests and returns the accumulators for the given job identifier. 
Accumulators can be
+        * requested while a is running or after it has finished.
+        * @param jobID The job identifier of a job.
+        * @param loader The class loader for deserializing the accumulator 
results.
+        * @return A Map containing the accumulator's name and its value.
+        */
+       public Map<String, Object> getAccumulators(JobID jobID, ClassLoader 
loader) throws Exception {
+               ActorGateway jobManagerGateway = getJobManagerGateway();
+
+               Future<Object> response;
+               try {
+                       response = jobManagerGateway.ask(new 
RequestAccumulatorResults(jobID), timeout);
+               } catch (Exception e) {
+                       throw new Exception("Failed to query the job manager 
gateway for accumulators.", e);
+               }
+
+               Object result = Await.result(response, timeout);
+
+               if (result instanceof AccumulatorResultsFound) {
+                       Map<String, SerializedValue<Object>> 
serializedAccumulators =
+                                       ((AccumulatorResultsFound) 
result).result();
+
+                       return 
AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
+
+               } else if (result instanceof AccumulatorResultsErroneous) {
+                       throw ((AccumulatorResultsErroneous) result).cause();
+               } else {
+                       throw new Exception("Failed to fetch accumulators for 
the job " + jobID + ".");
+               }
+       }
+
+
+       // 
------------------------------------------------------------------------
+       //  Sessions
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Tells the JobManager to finish the session (job) defined by the 
given ID.
+        * 
+        * @param jobId The ID that identifies the session.
+        */
+       public void endSession(JobID jobId) throws Exception {
+               if (jobId == null) {
+                       throw new IllegalArgumentException("The JobID must not 
be null.");
+               }
+               endSessions(Collections.singletonList(jobId));
+       }
+
+       /**
+        * Tells the JobManager to finish the sessions (jobs) defined by the 
given IDs.
+        *
+        * @param jobIds The IDs that identify the sessions.
+        */
+       public void endSessions(List<JobID> jobIds) throws Exception {
+               if (jobIds == null) {
+                       throw new IllegalArgumentException("The JobIDs must not 
be null");
+               }
+
+               ActorGateway jobManagerGateway = getJobManagerGateway();
+               
+               for (JobID jid : jobIds) {
+                       if (jid != null) {
+                               LOG.info("Telling job manager to end the 
session {}.", jid);
+                               jobManagerGateway.tell(new 
JobManagerMessages.RemoveCachedJob(jid));
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Internal translation methods
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates the optimized plan for a given program, using this client's 
compiler.
+        *
+        * @param prog The program to be compiled.
+        * @return The compiled and optimized plan, as returned by the compiler.
+        * @throws CompilerException Thrown, if the compiler encounters an 
illegal situation.
+        * @throws ProgramInvocationException Thrown, if the program could not 
be instantiated from its jar file.
+        */
+       private static OptimizedPlan getOptimizedPlan(Optimizer compiler, 
JobWithJars prog, int parallelism)
+                       throws CompilerException, ProgramInvocationException {
+               return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
+       }
+
+       public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) 
throws ProgramInvocationException {
+               return getJobGraph(optPlan, prog.getAllLibraries(), 
prog.getClasspaths(), null);
+       }
+
+       public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, 
String savepointPath) throws ProgramInvocationException {
+               return getJobGraph(optPlan, prog.getAllLibraries(), 
prog.getClasspaths(), savepointPath);
+       }
+
+       private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, 
List<URL> classpaths, String savepointPath) {
+               JobGraph job;
+               if (optPlan instanceof StreamingPlan) {
+                       job = ((StreamingPlan) optPlan).getJobGraph();
+                       job.setSavepointPath(savepointPath);
+               } else {
+                       JobGraphGenerator gen = new 
JobGraphGenerator(this.flinkConfig);
+                       job = gen.compileJobGraph((OptimizedPlan) optPlan);
+               }
+
+               for (URL jar : jarFiles) {
+                       try {
+                               job.addJar(new Path(jar.toURI()));
+                       } catch (URISyntaxException e) {
+                               throw new RuntimeException("URL is invalid. 
This should not happen.", e);
+                       }
+               }
+ 
+               job.setClasspaths(classpaths);
+
+               return job;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Helper methods
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Returns the {@link ActorGateway} of the current job manager leader 
using
+        * the {@link LeaderRetrievalService}.
+        *
+        * @return ActorGateway of the current job manager leader
+        * @throws Exception
+        */
+       protected ActorGateway getJobManagerGateway() throws Exception {
+               LOG.info("Looking up JobManager");
+
+               return LeaderRetrievalUtils.retrieveLeaderGateway(
+                       
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig),
+                       actorSystem,
+                       lookupTimeout);
+       }
+
+       /**
+        * Logs and prints to sysout if printing to stdout is enabled.
+        * @param message The message to log/print
+        */
+       protected void logAndSysout(String message) {
+               LOG.info(message);
+               if (printStatusDuringExecution) {
+                       System.out.println(message);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Abstract methods to be implemented by the cluster specific Client
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Returns an URL (as a string) to the JobManager web interface
+        */
+       public abstract String getWebInterfaceURL();
+
+       /**
+        * Returns the latest cluster status, with number of Taskmanagers and 
slots
+        */
+       public abstract GetClusterStatusResponse getClusterStatus();
+
+       /**
+        * May return new messages from the cluster.
+        * Messages can be for example about failed containers or container 
launch requests.
+        */
+       protected abstract List<String> getNewMessages();
+
+       /**
+        * Returns a string representation of the cluster.
+        */
+       protected abstract String getClusterIdentifier();
+
+       /**
+        * Request the cluster to shut down or disconnect.
+        */
+       protected abstract void finalizeCluster();
+
+       /**
+        * Set the mode of this client (detached or blocking job execution).
+        * @param isDetached If true, the client will submit programs detached 
via the {@code run} method
+        */
+       public void setDetached(boolean isDetached) {
+               this.detachedJobSubmission = isDetached;
+       }
+
+       /**
+        * A flag to indicate whether this clients submits jobs detached.
+        * @return True if the Client submits detached, false otherwise
+        */
+       public boolean isDetached() {
+               return detachedJobSubmission;
+       }
+
+       /**
+        * Return the Flink configuration object
+        * @return The Flink configuration object
+        */
+       public Configuration getFlinkConfiguration() {
+               return flinkConfig.clone();
+       }
+
+       /**
+        * The client may define an upper limit on the number of slots to use
+        * @return -1 if unknown
+        */
+       public abstract int getMaxSlots();
+
+       /**
+        * Calls the subclasses' submitJob method. It may decide to simply call 
one of the run methods or it may perform
+        * some custom job submission logic.
+        * @param jobGraph The JobGraph to be submitted
+        * @return JobSubmissionResult
+        */
+       protected abstract JobSubmissionResult submitJob(JobGraph jobGraph, 
ClassLoader classLoader)
+               throws ProgramInvocationException;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index dfb5f2e..fe2d7e0 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -34,7 +34,7 @@ import java.util.List;
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
-       protected final Client client;
+       protected final ClusterClient client;
 
        protected final List<URL> jarFilesToAttach;
 
@@ -44,8 +44,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 
        protected final String savepointPath;
        
-       public ContextEnvironment(Client remoteConnection, List<URL> jarFiles, 
List<URL> classpaths,
-                       ClassLoader userCodeClassLoader, String savepointPath) {
+       public ContextEnvironment(ClusterClient remoteConnection, List<URL> 
jarFiles, List<URL> classpaths,
+                               ClassLoader userCodeClassLoader, String 
savepointPath) {
                this.client = remoteConnection;
                this.jarFilesToAttach = jarFiles;
                this.classpathsToAttach = classpaths;
@@ -58,7 +58,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
                Plan p = createProgramPlan(jobName);
                JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, 
this.classpathsToAttach,
                                this.userCodeClassLoader);
-               this.lastJobExecutionResult = client.runBlocking(toRun, 
getParallelism(), savepointPath);
+               this.lastJobExecutionResult = client.run(toRun, 
getParallelism(), savepointPath).getJobExecutionResult();
                return this.lastJobExecutionResult;
        }
 
@@ -66,7 +66,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
        public String getExecutionPlan() throws Exception {
                Plan plan = createProgramPlan("unnamed job");
 
-               OptimizedPlan op = Client.getOptimizedPlan(client.compiler, 
plan, getParallelism());
+               OptimizedPlan op = 
ClusterClient.getOptimizedPlan(client.compiler, plan, getParallelism());
                PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
                return gen.getOptimizerPlanAsJSON(op);
        }
@@ -83,7 +83,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
                                + ") : " + getIdString();
        }
        
-       public Client getClient() {
+       public ClusterClient getClient() {
                return this.client;
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index e820bad..f9b1fc2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -32,7 +32,7 @@ import java.util.List;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-       private final Client client;
+       private final ClusterClient client;
 
        private final List<URL> jarFilesToAttach;
 
@@ -42,34 +42,34 @@ public class ContextEnvironmentFactory implements 
ExecutionEnvironmentFactory {
 
        private final int defaultParallelism;
 
-       private final boolean wait;
+       private final boolean isDetached;
 
        private ExecutionEnvironment lastEnvCreated;
 
        private String savepointPath;
 
-       public ContextEnvironmentFactory(Client client, List<URL> 
jarFilesToAttach,
+       public ContextEnvironmentFactory(ClusterClient client, List<URL> 
jarFilesToAttach,
                        List<URL> classpathsToAttach, ClassLoader 
userCodeClassLoader, int defaultParallelism,
-                       boolean wait, String savepointPath)
+                       boolean isDetached, String savepointPath)
        {
                this.client = client;
                this.jarFilesToAttach = jarFilesToAttach;
                this.classpathsToAttach = classpathsToAttach;
                this.userCodeClassLoader = userCodeClassLoader;
                this.defaultParallelism = defaultParallelism;
-               this.wait = wait;
+               this.isDetached = isDetached;
                this.savepointPath = savepointPath;
        }
 
        @Override
        public ExecutionEnvironment createExecutionEnvironment() {
-               if (!wait && lastEnvCreated != null) {
+               if (isDetached && lastEnvCreated != null) {
                        throw new InvalidProgramException("Multiple 
enviornments cannot be created in detached mode");
                }
 
-               lastEnvCreated = wait ?
-                               new ContextEnvironment(client, 
jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath) :
-                               new DetachedEnvironment(client, 
jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
+               lastEnvCreated = isDetached ?
+                               new DetachedEnvironment(client, 
jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath):
+                               new ContextEnvironment(client, 
jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
                if (defaultParallelism > 0) {
                        lastEnvCreated.setParallelism(defaultParallelism);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
index 037c36b..8298933 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
@@ -42,7 +42,7 @@ public class DetachedEnvironment extends ContextEnvironment {
        private static final Logger LOG = 
LoggerFactory.getLogger(DetachedEnvironment.class);
 
        public DetachedEnvironment(
-                       Client remoteConnection,
+                       ClusterClient remoteConnection,
                        List<URL> jarFiles,
                        List<URL> classpaths,
                        ClassLoader userCodeClassLoader,
@@ -53,7 +53,7 @@ public class DetachedEnvironment extends ContextEnvironment {
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
                Plan p = createProgramPlan(jobName);
-               setDetachedPlan(Client.getOptimizedPlan(client.compiler, p, 
getParallelism()));
+               setDetachedPlan(ClusterClient.getOptimizedPlan(client.compiler, 
p, getParallelism()));
                LOG.warn("Job was executed in detached mode, the results will 
be available on completion.");
                this.lastJobExecutionResult = 
DetachedJobExecutionResult.INSTANCE;
                return this.lastJobExecutionResult;
@@ -72,7 +72,7 @@ public class DetachedEnvironment extends ContextEnvironment {
         * Finishes this Context Environment's execution by explicitly running 
the plan constructed.
         */
        JobSubmissionResult finalizeExecute() throws ProgramInvocationException 
{
-               return client.runDetached(detachedPlan, jarFilesToAttach, 
classpathsToAttach, userCodeClassLoader, savepointPath);
+               return client.run(detachedPlan, jarFilesToAttach, 
classpathsToAttach, userCodeClassLoader, savepointPath);
        }
 
        public static final class DetachedJobExecutionResult extends 
JobExecutionResult {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
new file mode 100644
index 0000000..82f350a
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Cluster client for communication with an standalone (on-premise) cluster or 
an existing cluster that has been
+ * brought up independently of a specific job.
+ */
+public class StandaloneClusterClient extends ClusterClient {
+
+       public StandaloneClusterClient(Configuration config) throws IOException 
{
+               super(config);
+       }
+
+
+       @Override
+       public String getWebInterfaceURL() {
+               String host = this.getJobManagerAddress().getHostName();
+               int port = 
getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+                       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
+               return "http://"; +  host + ":" + port;
+       }
+
+       @Override
+       public GetClusterStatusResponse getClusterStatus() {
+               ActorGateway jmGateway;
+               try {
+                       jmGateway = getJobManagerGateway();
+                       Future<Object> future = 
jmGateway.ask(GetClusterStatus.getInstance(), timeout);
+                       Object result = Await.result(future, timeout);
+                       if (result instanceof GetClusterStatusResponse) {
+                               return (GetClusterStatusResponse) result;
+                       } else {
+                               throw new RuntimeException("Received the wrong 
reply " + result + " from cluster.");
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException("Couldn't retrieve the 
Cluster status.", e);
+               }
+       }
+
+       @Override
+       public List<String> getNewMessages() {
+               return Collections.emptyList();
+       }
+
+       @Override
+       public String getClusterIdentifier() {
+               return "Standalone cluster with JobManager running at " + 
this.getJobManagerAddress();
+       }
+
+       @Override
+       public int getMaxSlots() {
+               return -1;
+       }
+
+       @Override
+       protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader)
+                       throws ProgramInvocationException {
+               if (isDetached()) {
+                       return super.runDetached(jobGraph, classLoader);
+               } else {
+                       return super.run(jobGraph, classLoader);
+               }
+       }
+
+       @Override
+       protected void finalizeCluster() {}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index 9d0b691..de85ca8 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -18,17 +18,13 @@
 
 package org.apache.flink.client;
 
-import static org.junit.Assert.assertEquals;
+import static 
org.apache.flink.client.CliFrontendTestUtils.checkJobManagerAddress;
 import static org.junit.Assert.fail;
 
 import static org.mockito.Mockito.*;
 
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
 import org.apache.flink.client.cli.CommandLineOptions;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -46,57 +42,18 @@ public class CliFrontendAddressConfigurationTest {
 
        @Rule
        public TemporaryFolder folder = new TemporaryFolder();
-       
+
        @BeforeClass
        public static void init() {
                CliFrontendTestUtils.pipeSystemOutToNull();
        }
-       
+
        @Before
        public void clearConfig() {
                CliFrontendTestUtils.clearGlobalConfiguration();
        }
 
        @Test
-       public void testInvalidConfigAndNoOption() {
-               try {
-                       CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
-                       CommandLineOptions options = 
mock(CommandLineOptions.class);
-
-                       frontend.updateConfig(options);
-                       Configuration config = frontend.getConfiguration();
-
-                       checkJobManagerAddress(config, null, -1);
-
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testInvalidConfigAndOption() {
-               try {
-                       CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
-
-                       CommandLineOptions options = 
mock(CommandLineOptions.class);
-                       
when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
-
-                       frontend.updateConfig(options);
-                       Configuration config = frontend.getConfiguration();
-
-                       InetSocketAddress expectedAddress = new 
InetSocketAddress("10.221.130.22", 7788);
-
-                       checkJobManagerAddress(config, 
expectedAddress.getHostName(), expectedAddress.getPort());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
        public void testValidConfig() {
                try {
                        CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
@@ -112,83 +69,38 @@ public class CliFrontendAddressConfigurationTest {
                                        
CliFrontendTestUtils.TEST_JOB_MANAGER_PORT);
                }
                catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+                               e.printStackTrace();
+                               fail(e.getMessage());
+                       }
        }
 
-       /**
-        * Test that the CliFrontent is able to pick up the .yarn-properties 
file from a specified location.
-        */
        @Test
-       public void testYarnConfig() {
+       public void testInvalidConfigAndNoOption() {
                try {
-                       File tmpFolder = folder.newFolder();
-                       String currentUser = System.getProperty("user.name");
-
-                       // copy reference flink-conf.yaml to temporary test 
directory and append custom configuration path.
-                       File confFile = new 
File(CliFrontendRunTest.class.getResource("/testconfigwithyarn/flink-conf.yaml").getFile());
-                       File testConfFile = new File(tmpFolder, 
"flink-conf.yaml");
-                       org.apache.commons.io.FileUtils.copyFile(confFile, 
testConfFile);
-                       String toAppend = "\nyarn.properties-file.location: " + 
tmpFolder;
-                       // append to flink-conf.yaml
-                       Files.write(testConfFile.toPath(), toAppend.getBytes(), 
StandardOpenOption.APPEND);
-                       // copy .yarn-properties-<username>
-                       File propertiesFile = new 
File(CliFrontendRunTest.class.getResource("/testconfigwithyarn/.yarn-properties").getFile());
-                       File testPropertiesFile = new File(tmpFolder, 
".yarn-properties-"+currentUser);
-                       
org.apache.commons.io.FileUtils.copyFile(propertiesFile, testPropertiesFile);
-
-                       // start CLI Frontend
-                       CliFrontend frontend = new 
CliFrontend(tmpFolder.getAbsolutePath());
-
+                       CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
                        CommandLineOptions options = 
mock(CommandLineOptions.class);
 
                        frontend.updateConfig(options);
                        Configuration config = frontend.getConfiguration();
 
-                       checkJobManagerAddress(
-                                       config,
-                                       
CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_ADDRESS,
-                                       
CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_PORT);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-       
-       @Test
-       public void testInvalidYarnConfig() {
-               try {
-                       CliFrontend cli = new 
CliFrontend(CliFrontendTestUtils.getConfigDirWithInvalidYarnFile());
-
-                       CommandLineOptions options = 
mock(CommandLineOptions.class);
-
-                       cli.updateConfig(options);
-
-                       Configuration config = cli.getConfiguration();
+                       checkJobManagerAddress(config, null, -1);
 
-                       checkJobManagerAddress(
-                               config,
-                               CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS,
-                               CliFrontendTestUtils.TEST_JOB_MANAGER_PORT);
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
-       public void testManualOptionsOverridesConfig() {
+       public void testInvalidConfigAndOption() {
                try {
-                       CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+                       CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
 
                        CommandLineOptions options = 
mock(CommandLineOptions.class);
                        
when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
 
                        frontend.updateConfig(options);
-
                        Configuration config = frontend.getConfiguration();
 
                        InetSocketAddress expectedAddress = new 
InetSocketAddress("10.221.130.22", 7788);
@@ -200,11 +112,11 @@ public class CliFrontendAddressConfigurationTest {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
-       public void testManualOptionsOverridesYarn() {
+       public void testManualOptionsOverridesConfig() {
                try {
-                       CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
+                       CliFrontend frontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
 
                        CommandLineOptions options = 
mock(CommandLineOptions.class);
                        
when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
@@ -223,11 +135,4 @@ public class CliFrontendAddressConfigurationTest {
                }
        }
 
-       public void checkJobManagerAddress(Configuration config, String 
expectedAddress, int expectedPort) {
-               String jobManagerAddress = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-               int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-
-               assertEquals(expectedAddress, jobManagerAddress);
-               assertEquals(expectedPort, jobManagerPort);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index 5439742..f47ca69 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -30,11 +30,10 @@ import static org.mockito.Mockito.*;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.client.cli.RunOptions;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
 import org.apache.flink.optimizer.DataStatistics;
@@ -328,7 +327,7 @@ public class CliFrontendPackageProgramTest {
                        Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), c);
 
                        // we expect this to fail with a 
"ClassNotFoundException"
-                       Client.getOptimizedPlanAsJson(compiler, prog, 666);
+                       ClusterClient.getOptimizedPlanAsJson(compiler, prog, 
666);
                        fail("Should have failed with a 
ClassNotFoundException");
                }
                catch (ProgramInvocationException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index 56173bd..ceba6cb 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -25,11 +25,10 @@ import static org.junit.Assert.*;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.RunOptions;
-import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 
 public class CliFrontendRunTest {
@@ -75,7 +74,7 @@ public class CliFrontendRunTest {
                        // test detached mode
                        {
                                String[] parameters = {"-p", "2", "-d", 
getTestJarPath()};
-                               RunTestingCliFrontend testFrontend = new 
RunTestingCliFrontend(2, false, true);
+                               RunTestingCliFrontend testFrontend = new 
RunTestingCliFrontend(2, true, true);
                                assertEquals(0, testFrontend.run(parameters));
                        }
 
@@ -96,9 +95,6 @@ public class CliFrontendRunTest {
                        // test configure savepoint path
                        {
                                String[] parameters = {"-s", 
"expectedSavepointPath", getTestJarPath()};
-                               RunTestingCliFrontend testFrontend = new 
RunTestingCliFrontend(1, false, false);
-                               assertEquals(0, testFrontend.run(parameters));
-
                                RunOptions options = 
CliFrontendParser.parseRunCommand(parameters);
                                assertEquals("expectedSavepointPath", 
options.getSavepointPath());
                        }
@@ -125,22 +121,16 @@ public class CliFrontendRunTest {
                }
 
                @Override
-               protected int executeProgramDetached(PackagedProgram program, 
Client client, int parallelism) {
-                       assertTrue(isDetached);
-                       assertEquals(this.expectedParallelism, parallelism);
-                       assertEquals(this.sysoutLogging, 
client.getPrintStatusDuringExecution());
-                       return 0;
-               }
-
-               @Override
-               protected int executeProgramBlocking(PackagedProgram program, 
Client client, int parallelism) {
-                       assertTrue(!isDetached);
+               protected int executeProgram(PackagedProgram program, 
ClusterClient client, int parallelism) {
+                       assertEquals(isDetached, client.isDetached());
+                       assertEquals(sysoutLogging, 
client.getPrintStatusDuringExecution());
+                       assertEquals(expectedParallelism, parallelism);
                        return 0;
                }
 
                @Override
-               protected Client getClient(CommandLineOptions options, String 
programName, int userParallelism, boolean detached) throws Exception {
-                       return Mockito.mock(Client.class);
+               protected ClusterClient getClient(CommandLineOptions options, 
String programName) throws Exception {
+                       return TestingClusterClientWithoutActorSystem.create();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 7d01ab6..1872133 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -25,9 +25,12 @@ import java.io.PrintStream;
 import java.lang.reflect.Field;
 import java.net.MalformedURLException;
 import java.util.Map;
+
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class CliFrontendTestUtils {
@@ -35,16 +38,11 @@ public class CliFrontendTestUtils {
        public static final String TEST_JAR_MAIN_CLASS = 
"org.apache.flink.client.testjar.WordCount";
        
        public static final String TEST_JAR_CLASSLOADERTEST_CLASS = 
"org.apache.flink.client.testjar.JobWithExternalDependency";
-       
-       
+
        public static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
-       
+
        public static final int TEST_JOB_MANAGER_PORT = 55443;
        
-       public static final String TEST_YARN_JOB_MANAGER_ADDRESS = 
"22.33.44.55";
-       
-       public static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
-       
        
        public static String getTestJarPath() throws FileNotFoundException, 
MalformedURLException {
                File f = new File("target/maven-test-jar.jar");
@@ -68,17 +66,7 @@ public class CliFrontendTestUtils {
                String confFile = 
CliFrontendRunTest.class.getResource("/invalidtestconfig/flink-conf.yaml").getFile();
                return new File(confFile).getAbsoluteFile().getParent();
        }
-       
-       public static String getConfigDirWithYarnFile() {
-               String confFile = 
CliFrontendRunTest.class.getResource("/testconfigwithyarn/flink-conf.yaml").getFile();
-               return new File(confFile).getAbsoluteFile().getParent();
-       }
-       
-       public static String getConfigDirWithInvalidYarnFile() {
-               String confFile = 
CliFrontendRunTest.class.getResource("/testconfigwithinvalidyarn/flink-conf.yaml").getFile();
-               return new File(confFile).getAbsoluteFile().getParent();
-       }
-       
+
        public static void pipeSystemOutToNull() {
                System.setOut(new PrintStream(new BlackholeOutputSteam()));
                System.setErr(new PrintStream(new BlackholeOutputSteam()));
@@ -114,6 +102,14 @@ public class CliFrontendTestUtils {
                @Override
                public void write(int b){}
        }
+
+       public static void checkJobManagerAddress(Configuration config, String 
expectedAddress, int expectedPort) {
+               String jobManagerAddress = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+               int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+               assertEquals(expectedAddress, jobManagerAddress);
+               assertEquals(expectedPort, jobManagerPort);
+       }
        
        // 
--------------------------------------------------------------------------------------------
        

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
 
b/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
new file mode 100644
index 0000000..ab608cb
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+/**
+ * A client to use in tests which does not instantiate an ActorSystem.
+ */
+public class TestingClusterClientWithoutActorSystem extends 
StandaloneClusterClient {
+
+       private TestingClusterClientWithoutActorSystem() throws IOException {
+               super(new Configuration());
+       }
+
+       /**
+        * Do not instantiate the Actor System to save resources.
+        * @return Mocked ActorSystem
+        * @throws IOException
+        */
+       @Override
+       protected ActorSystem createActorSystem() throws IOException {
+               return Mockito.mock(ActorSystem.class);
+       }
+
+       public static ClusterClient create() {
+               try {
+                       return new TestingClusterClientWithoutActorSystem();
+               } catch (IOException e) {
+                       throw new RuntimeException("Could not create 
TestingClientWithoutActorSystem.", e);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 14a1fff..4eb5269 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -98,7 +98,7 @@ public class ClientConnectionTest {
                                @Override
                                public void run() {
                                        try {
-                                               new Client(config);
+                                               new 
StandaloneClusterClient(config);
                                                fail("This should fail with an 
exception since the JobManager is unreachable.");
                                        }
                                        catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 4f9b367..96785f4 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -66,7 +66,7 @@ import static org.mockito.Mockito.when;
 
 
 /**
- * Simple and maybe stupid test to check the {@link Client} class.
+ * Simple and maybe stupid test to check the {@link ClusterClient} class.
  */
 public class ClientTest {
 
@@ -127,11 +127,12 @@ public class ClientTest {
        @Test
        public void testDetachedMode() throws Exception{
                
jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), 
JobManager.JOB_MANAGER_NAME());
-               Client out = new Client(config);
+               ClusterClient out = new StandaloneClusterClient(config);
+               out.setDetached(true);
 
                try {
                        PackagedProgram prg = new 
PackagedProgram(TestExecuteTwice.class);
-                       out.runDetached(prg, 1);
+                       out.run(prg, 1);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -141,7 +142,7 @@ public class ClientTest {
 
                try {
                        PackagedProgram prg = new 
PackagedProgram(TestEager.class);
-                       out.runDetached(prg, 1);
+                       out.run(prg, 1);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -151,7 +152,7 @@ public class ClientTest {
 
                try {
                        PackagedProgram prg = new 
PackagedProgram(TestGetRuntime.class);
-                       out.runDetached(prg, 1);
+                       out.run(prg, 1);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -161,7 +162,7 @@ public class ClientTest {
 
                try {
                        PackagedProgram prg = new 
PackagedProgram(TestGetJobID.class);
-                       out.runDetached(prg, 1);
+                       out.run(prg, 1);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -171,7 +172,7 @@ public class ClientTest {
 
                try {
                        PackagedProgram prg = new 
PackagedProgram(TestGetAccumulator.class);
-                       out.runDetached(prg, 1);
+                       out.run(prg, 1);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -181,7 +182,7 @@ public class ClientTest {
 
                try {
                        PackagedProgram prg = new 
PackagedProgram(TestGetAllAccumulator.class);
-                       out.runDetached(prg, 1);
+                       out.run(prg, 1);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -198,8 +199,9 @@ public class ClientTest {
                try {
                        
jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), 
JobManager.JOB_MANAGER_NAME());
 
-                       Client out = new Client(config);
-                       JobSubmissionResult result = 
out.runDetached(program.getPlanWithJars(), 1);
+                       ClusterClient out = new StandaloneClusterClient(config);
+                       out.setDetached(true);
+                       JobSubmissionResult result = 
out.run(program.getPlanWithJars(), 1);
 
                        assertNotNull(result);
 
@@ -219,10 +221,11 @@ public class ClientTest {
                try {
                        
jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), 
JobManager.JOB_MANAGER_NAME());
 
-                       Client out = new Client(config);
+                       ClusterClient out = new StandaloneClusterClient(config);
+                       out.setDetached(true);
 
                        try {
-                               out.runDetached(program.getPlanWithJars(), 1);
+                               out.run(program.getPlanWithJars(), 1);
                                fail("This should fail with an exception");
                        }
                        catch (ProgramInvocationException e) {
@@ -258,7 +261,9 @@ public class ClientTest {
                        
}).when(packagedProgramMock).invokeInteractiveModeForExecution();
 
                        try {
-                               new 
Client(config).runBlocking(packagedProgramMock, 1);
+                               ClusterClient client = new 
StandaloneClusterClient(config);
+                               client.setDetached(true);
+                               client.run(packagedProgramMock, 1);
                                fail("Creating the local execution environment 
should not be possible");
                        }
                        catch (InvalidProgramException e) {
@@ -280,7 +285,7 @@ public class ClientTest {
                        assertNotNull(prg.getPreviewPlan());
                        
                        Optimizer optimizer = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
-                       OptimizedPlan op = (OptimizedPlan) 
Client.getOptimizedPlan(optimizer, prg, 1);
+                       OptimizedPlan op = (OptimizedPlan) 
ClusterClient.getOptimizedPlan(optimizer, prg, 1);
                        assertNotNull(op);
 
                        PlanJSONDumpGenerator dumper = new 
PlanJSONDumpGenerator();

Reply via email to