http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java deleted file mode 100644 index dcf542a..0000000 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ /dev/null @@ -1,624 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.client.program; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobSubmissionResult; -import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.costs.DefaultCostEstimator; -import org.apache.flink.optimizer.plan.FlinkPlan; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.StreamingPlan; -import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.client.JobClient; -import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; -import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound; -import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.SerializedValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; -import akka.actor.ActorSystem; - -/** - * Encapsulates the functionality necessary to submit a program to a remote cluster. - */ -public class Client { - - private static final Logger LOG = LoggerFactory.getLogger(Client.class); - - /** The optimizer used in the optimization of batch programs */ - final Optimizer compiler; - - /** The actor system used to communicate with the JobManager */ - private final ActorSystem actorSystem; - - /** Configuration of the client */ - private final Configuration config; - - /** Timeout for futures */ - private final FiniteDuration timeout; - - /** Lookup timeout for the job manager retrieval service */ - private final FiniteDuration lookupTimeout; - - /** - * If != -1, this field specifies the total number of available slots on the cluster - * connected to the client. - */ - private final int maxSlots; - - /** Flag indicating whether to sysout print execution updates */ - private boolean printStatusDuringExecution = true; - - /** - * For interactive invocations, the Job ID is only available after the ContextEnvironment has - * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment - * which lets us access the last JobID here. - */ - private JobID lastJobID; - - // ------------------------------------------------------------------------ - // Construction - // ------------------------------------------------------------------------ - - /** - * Creates a instance that submits the programs to the JobManager defined in the - * configuration. This method will try to resolve the JobManager hostname and throw an exception - * if that is not possible. - * - * @param config The config used to obtain the job-manager's address, and used to configure the optimizer. - * - * @throws java.io.IOException Thrown, if the client's actor system could not be started. - * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved. - */ - public Client(Configuration config) throws IOException { - this(config, -1); - } - - /** - * Creates a new instance of the class that submits the jobs to a job-manager. - * at the given address using the default port. - * - * @param config The configuration for the client-side processes, like the optimizer. - * @param maxSlots maxSlots The number of maxSlots on the cluster if != -1. - * - * @throws java.io.IOException Thrown, if the client's actor system could not be started. - * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved. - */ - public Client(Configuration config, int maxSlots) throws IOException { - this.config = Preconditions.checkNotNull(config); - this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); - this.maxSlots = maxSlots; - - LOG.info("Starting client actor system"); - - try { - this.actorSystem = JobClient.startJobClientActorSystem(config); - } catch (Exception e) { - throw new IOException("Could start client actor system.", e); - } - - timeout = AkkaUtils.getClientTimeout(config); - lookupTimeout = AkkaUtils.getLookupTimeout(config); - } - - // ------------------------------------------------------------------------ - // Startup & Shutdown - // ------------------------------------------------------------------------ - - /** - * Shuts down the client. This stops the internal actor system and actors. - */ - public void shutdown() { - if (!this.actorSystem.isTerminated()) { - this.actorSystem.shutdown(); - this.actorSystem.awaitTermination(); - } - } - - // ------------------------------------------------------------------------ - // Configuration - // ------------------------------------------------------------------------ - - /** - * Configures whether the client should print progress updates during the execution to {@code System.out}. - * All updates are logged via the SLF4J loggers regardless of this setting. - * - * @param print True to print updates to standard out during execution, false to not print them. - */ - public void setPrintStatusDuringExecution(boolean print) { - this.printStatusDuringExecution = print; - } - - /** - * @return whether the client will print progress updates during the execution to {@code System.out} - */ - public boolean getPrintStatusDuringExecution() { - return this.printStatusDuringExecution; - } - - /** - * @return -1 if unknown. The maximum number of available processing slots at the Flink cluster - * connected to this client. - */ - public int getMaxSlots() { - return this.maxSlots; - } - - // ------------------------------------------------------------------------ - // Access to the Program's Plan - // ------------------------------------------------------------------------ - - public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) - throws CompilerException, ProgramInvocationException - { - PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); - return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism)); - } - - public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) - throws CompilerException, ProgramInvocationException - { - Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); - if (prog.isUsingProgramEntryPoint()) { - return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism); - } else if (prog.isUsingInteractiveMode()) { - // temporary hack to support the optimizer plan preview - OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler); - if (parallelism > 0) { - env.setParallelism(parallelism); - } - - return env.getOptimizedPlan(prog); - } else { - throw new RuntimeException("Couldn't determine program mode."); - } - } - - public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException { - if (parallelism > 0 && p.getDefaultParallelism() <= 0) { - LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism); - p.setDefaultParallelism(parallelism); - } - LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism()); - - return compiler.compile(p); - } - - // ------------------------------------------------------------------------ - // Program submission / execution - // ------------------------------------------------------------------------ - - public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException { - Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); - if (prog.isUsingProgramEntryPoint()) { - return runBlocking(prog.getPlanWithJars(), parallelism, prog.getSavepointPath()); - } - else if (prog.isUsingInteractiveMode()) { - LOG.info("Starting program in interactive mode"); - ContextEnvironment.setAsContext(new ContextEnvironmentFactory(this, prog.getAllLibraries(), - prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true, - prog.getSavepointPath())); - - // invoke here - try { - prog.invokeInteractiveModeForExecution(); - } - finally { - ContextEnvironment.unsetContext(); - } - - return new JobSubmissionResult(lastJobID); - } - else { - throw new RuntimeException(); - } - } - - public JobSubmissionResult runDetached(PackagedProgram prog, int parallelism) - throws ProgramInvocationException - { - Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); - if (prog.isUsingProgramEntryPoint()) { - return runDetached(prog.getPlanWithJars(), parallelism, prog.getSavepointPath()); - } - else if (prog.isUsingInteractiveMode()) { - LOG.info("Starting program in interactive mode"); - ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(), - prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false, - prog.getSavepointPath()); - ContextEnvironment.setAsContext(factory); - - // invoke here - try { - prog.invokeInteractiveModeForExecution(); - return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute(); - } - finally { - ContextEnvironment.unsetContext(); - } - } - else { - throw new RuntimeException("PackagedProgram does not have a valid invocation mode."); - } - } - - public JobExecutionResult runBlocking(JobWithJars program, int parallelism) throws ProgramInvocationException { - return runBlocking(program, parallelism, null); - } - - /** - * Runs a program on the Flink cluster to which this client is connected. The call blocks until the - * execution is complete, and returns afterwards. - * - * @param program The program to be executed. - * @param parallelism The default parallelism to use when running the program. The default parallelism is used - * when the program does not set a parallelism by itself. - * - * @throws CompilerException Thrown, if the compiler encounters an illegal situation. - * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file, - * or if the submission failed. That might be either due to an I/O problem, - * i.e. the job-manager is unreachable, or due to the fact that the - * parallel execution failed. - */ - public JobExecutionResult runBlocking(JobWithJars program, int parallelism, String savepointPath) - throws CompilerException, ProgramInvocationException { - ClassLoader classLoader = program.getUserCodeClassLoader(); - if (classLoader == null) { - throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); - } - - OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism); - return runBlocking(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath); - } - - public JobSubmissionResult runDetached(JobWithJars program, int parallelism) throws ProgramInvocationException { - return runDetached(program, parallelism, null); - } - - /** - * Submits a program to the Flink cluster to which this client is connected. The call returns after the - * program was submitted and does not wait for the program to complete. - * - * @param program The program to be executed. - * @param parallelism The default parallelism to use when running the program. The default parallelism is used - * when the program does not set a parallelism by itself. - * - * @throws CompilerException Thrown, if the compiler encounters an illegal situation. - * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file, - * or if the submission failed. That might be either due to an I/O problem, - * i.e. the job-manager is unreachable. - */ - public JobSubmissionResult runDetached(JobWithJars program, int parallelism, String savepointPath) - throws CompilerException, ProgramInvocationException { - ClassLoader classLoader = program.getUserCodeClassLoader(); - if (classLoader == null) { - throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); - } - - OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism); - return runDetached(optimizedPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath); - } - - public JobExecutionResult runBlocking( - FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException { - return runBlocking(compiledPlan, libraries, classpaths, classLoader, null); - } - - public JobExecutionResult runBlocking(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, - ClassLoader classLoader, String savepointPath) throws ProgramInvocationException - { - JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath); - return runBlocking(job, classLoader); - } - - public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException { - return runDetached(compiledPlan, libraries, classpaths, classLoader, null); - } - - public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, - ClassLoader classLoader, String savepointPath) throws ProgramInvocationException - { - JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath); - return runDetached(job, classLoader); - } - - public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { - LeaderRetrievalService leaderRetrievalService; - try { - leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config); - } catch (Exception e) { - throw new ProgramInvocationException("Could not create the leader retrieval service.", e); - } - - try { - this.lastJobID = jobGraph.getJobID(); - return JobClient.submitJobAndWait(actorSystem, leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader); - } catch (JobExecutionException e) { - throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); - } - } - - public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { - ActorGateway jobManagerGateway; - - try { - jobManagerGateway = getJobManagerGateway(); - } catch (Exception e) { - throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e); - } - - LOG.info("Checking and uploading JAR files"); - try { - JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout); - } - catch (IOException e) { - throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e); - } - try { - this.lastJobID = jobGraph.getJobID(); - JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, classLoader); - return new JobSubmissionResult(jobGraph.getJobID()); - } catch (JobExecutionException e) { - throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); - } - } - - /** - * Cancels a job identified by the job id. - * @param jobId the job id - * @throws Exception In case an error occurred. - */ - public void cancel(JobID jobId) throws Exception { - final ActorGateway jobManagerGateway = getJobManagerGateway(); - - final Future<Object> response; - try { - response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout); - } catch (final Exception e) { - throw new ProgramInvocationException("Failed to query the job manager gateway.", e); - } - - final Object result = Await.result(response, timeout); - - if (result instanceof JobManagerMessages.CancellationSuccess) { - LOG.info("Job cancellation with ID " + jobId + " succeeded."); - } else if (result instanceof JobManagerMessages.CancellationFailure) { - final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause(); - LOG.info("Job cancellation with ID " + jobId + " failed.", t); - throw new Exception("Failed to cancel the job because of \n" + t.getMessage()); - } else { - throw new Exception("Unknown message received while cancelling: " + result.getClass().getName()); - } - } - - /** - * Stops a program on Flink cluster whose job-manager is configured in this client's configuration. - * Stopping works only for streaming programs. Be aware, that the program might continue to run for - * a while after sending the stop command, because after sources stopped to emit data all operators - * need to finish processing. - * - * @param jobId - * the job ID of the streaming program to stop - * @throws Exception - * If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal - * failed. That might be due to an I/O problem, ie, the job-manager is unreachable. - */ - public void stop(final JobID jobId) throws Exception { - final ActorGateway jobManagerGateway = getJobManagerGateway(); - - final Future<Object> response; - try { - response = jobManagerGateway.ask(new JobManagerMessages.StopJob(jobId), timeout); - } catch (final Exception e) { - throw new ProgramInvocationException("Failed to query the job manager gateway.", e); - } - - final Object result = Await.result(response, timeout); - - if (result instanceof JobManagerMessages.StoppingSuccess) { - LOG.info("Job stopping with ID " + jobId + " succeeded."); - } else if (result instanceof JobManagerMessages.StoppingFailure) { - final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause(); - LOG.info("Job stopping with ID " + jobId + " failed.", t); - throw new Exception("Failed to stop the job because of \n" + t.getMessage()); - } else { - throw new Exception("Unknown message received while stopping: " + result.getClass().getName()); - } - } - - /** - * Requests and returns the accumulators for the given job identifier. Accumulators can be - * requested while a is running or after it has finished. The default class loader is used - * to deserialize the incoming accumulator results. - * @param jobID The job identifier of a job. - * @return A Map containing the accumulator's name and its value. - */ - public Map<String, Object> getAccumulators(JobID jobID) throws Exception { - return getAccumulators(jobID, ClassLoader.getSystemClassLoader()); - } - - /** - * Requests and returns the accumulators for the given job identifier. Accumulators can be - * requested while a is running or after it has finished. - * @param jobID The job identifier of a job. - * @param loader The class loader for deserializing the accumulator results. - * @return A Map containing the accumulator's name and its value. - */ - public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception { - ActorGateway jobManagerGateway = getJobManagerGateway(); - - Future<Object> response; - try { - response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout); - } catch (Exception e) { - throw new Exception("Failed to query the job manager gateway for accumulators.", e); - } - - Object result = Await.result(response, timeout); - - if (result instanceof AccumulatorResultsFound) { - Map<String, SerializedValue<Object>> serializedAccumulators = - ((AccumulatorResultsFound) result).result(); - - return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader); - - } else if (result instanceof AccumulatorResultsErroneous) { - throw ((AccumulatorResultsErroneous) result).cause(); - } else { - throw new Exception("Failed to fetch accumulators for the job " + jobID + "."); - } - } - - - // ------------------------------------------------------------------------ - // Sessions - // ------------------------------------------------------------------------ - - /** - * Tells the JobManager to finish the session (job) defined by the given ID. - * - * @param jobId The ID that identifies the session. - */ - public void endSession(JobID jobId) throws Exception { - if (jobId == null) { - throw new IllegalArgumentException("The JobID must not be null."); - } - endSessions(Collections.singletonList(jobId)); - } - - /** - * Tells the JobManager to finish the sessions (jobs) defined by the given IDs. - * - * @param jobIds The IDs that identify the sessions. - */ - public void endSessions(List<JobID> jobIds) throws Exception { - if (jobIds == null) { - throw new IllegalArgumentException("The JobIDs must not be null"); - } - - ActorGateway jobManagerGateway = getJobManagerGateway(); - - for (JobID jid : jobIds) { - if (jid != null) { - LOG.info("Telling job manager to end the session {}.", jid); - jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid)); - } - } - } - - // ------------------------------------------------------------------------ - // Internal translation methods - // ------------------------------------------------------------------------ - - /** - * Creates the optimized plan for a given program, using this client's compiler. - * - * @param prog The program to be compiled. - * @return The compiled and optimized plan, as returned by the compiler. - * @throws CompilerException Thrown, if the compiler encounters an illegal situation. - * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file. - */ - private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) - throws CompilerException, ProgramInvocationException { - return getOptimizedPlan(compiler, prog.getPlan(), parallelism); - } - - public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException { - return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null); - } - - public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException { - return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath); - } - - private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, String savepointPath) { - JobGraph job; - if (optPlan instanceof StreamingPlan) { - job = ((StreamingPlan) optPlan).getJobGraph(); - job.setSavepointPath(savepointPath); - } else { - JobGraphGenerator gen = new JobGraphGenerator(this.config); - job = gen.compileJobGraph((OptimizedPlan) optPlan); - } - - for (URL jar : jarFiles) { - try { - job.addJar(new Path(jar.toURI())); - } catch (URISyntaxException e) { - throw new RuntimeException("URL is invalid. This should not happen.", e); - } - } - - job.setClasspaths(classpaths); - - return job; - } - - // ------------------------------------------------------------------------ - // Helper methods - // ------------------------------------------------------------------------ - - /** - * Returns the {@link ActorGateway} of the current job manager leader using - * the {@link LeaderRetrievalService}. - * - * @return ActorGateway of the current job manager leader - * @throws Exception - */ - private ActorGateway getJobManagerGateway() throws Exception { - LOG.info("Looking up JobManager"); - LeaderRetrievalService leaderRetrievalService; - - leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config); - - return LeaderRetrievalUtils.retrieveLeaderGateway( - leaderRetrievalService, - actorSystem, - lookupTimeout); - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java new file mode 100644 index 0000000..b56428d --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -0,0 +1,695 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import akka.actor.ActorRef; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.FlinkPlan; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.StreamingPlan; +import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; +import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobClient; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound; +import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.net.ConnectionUtils; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Some; +import scala.Tuple2; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import akka.actor.ActorSystem; + + +/** + * Encapsulates the functionality necessary to submit a program to a remote cluster. + */ +public abstract class ClusterClient { + + private static final Logger LOG = LoggerFactory.getLogger(ClusterClient.class); + + /** The optimizer used in the optimization of batch programs */ + final Optimizer compiler; + + /** The actor system used to communicate with the JobManager */ + protected final ActorSystem actorSystem; + + /** Configuration of the client */ + protected final Configuration flinkConfig; + + /** Timeout for futures */ + protected final FiniteDuration timeout; + + /** Lookup timeout for the job manager retrieval service */ + private final FiniteDuration lookupTimeout; + + /** Flag indicating whether to sysout print execution updates */ + private boolean printStatusDuringExecution = true; + + /** + * For interactive invocations, the Job ID is only available after the ContextEnvironment has + * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment + * which lets us access the last JobID here. + */ + private JobID lastJobID; + + /** Switch for blocking/detached job submission of the client */ + private boolean detachedJobSubmission = false; + + // ------------------------------------------------------------------------ + // Construction + // ------------------------------------------------------------------------ + + /** + * Creates a instance that submits the programs to the JobManager defined in the + * configuration. This method will try to resolve the JobManager hostname and throw an exception + * if that is not possible. + * + * @param flinkConfig The config used to obtain the job-manager's address, and used to configure the optimizer. + * + * @throws java.io.IOException Thrown, if the client's actor system could not be started. + */ + public ClusterClient(Configuration flinkConfig) throws IOException { + + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), flinkConfig); + + this.timeout = AkkaUtils.getClientTimeout(flinkConfig); + this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig); + + this.actorSystem = createActorSystem(); + } + + // ------------------------------------------------------------------------ + // Startup & Shutdown + // ------------------------------------------------------------------------ + + /** + * Method to create the ActorSystem of the Client. May be overriden in subclasses. + * @return ActorSystem + * @throws IOException + */ + protected ActorSystem createActorSystem() throws IOException { + + if (actorSystem != null) { + throw new RuntimeException("This method may only be called once."); + } + + // start actor system + LOG.info("Starting client actor system."); + + String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + if (hostName == null || port == -1) { + throw new IOException("The initial JobManager address has not been set correctly."); + } + InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port); + + // find name of own public interface, able to connect to the JM + // try to find address for 2 seconds. log after 400 ms. + InetAddress ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400); + return AkkaUtils.createActorSystem(flinkConfig, + new Some<>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0))); + } + + /** + * Shuts down the client. This stops the internal actor system and actors. + */ + public void shutdown() { + try { + finalizeCluster(); + } finally { + if (!this.actorSystem.isTerminated()) { + this.actorSystem.shutdown(); + this.actorSystem.awaitTermination(); + } + } + } + + // ------------------------------------------------------------------------ + // Configuration + // ------------------------------------------------------------------------ + + /** + * Configures whether the client should print progress updates during the execution to {@code System.out}. + * All updates are logged via the SLF4J loggers regardless of this setting. + * + * @param print True to print updates to standard out during execution, false to not print them. + */ + public void setPrintStatusDuringExecution(boolean print) { + this.printStatusDuringExecution = print; + } + + /** + * @return whether the client will print progress updates during the execution to {@code System.out} + */ + public boolean getPrintStatusDuringExecution() { + return this.printStatusDuringExecution; + } + + /** + * Gets the current JobManager address from the Flink configuration (may change in case of a HA setup). + * @return The address (host and port) of the leading JobManager + */ + public InetSocketAddress getJobManagerAddressFromConfig() { + try { + String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + return new InetSocketAddress(hostName, port); + } catch (Exception e) { + throw new RuntimeException("Failed to retrieve JobManager address", e); + } + } + + /** + * Gets the current JobManager address (may change in case of a HA setup). + * @return The address (host and port) of the leading JobManager + */ + public InetSocketAddress getJobManagerAddress() { + try { + final ActorRef jmActor = getJobManagerGateway().actor(); + return AkkaUtils.getInetSockeAddressFromAkkaURL(jmActor.path().toSerializationFormat()); + } catch (Exception e) { + throw new RuntimeException("Failed to retrieve JobManager address", e); + } + } + + // ------------------------------------------------------------------------ + // Access to the Program's Plan + // ------------------------------------------------------------------------ + + public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) + throws CompilerException, ProgramInvocationException + { + PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); + return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism)); + } + + public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) + throws CompilerException, ProgramInvocationException + { + Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); + if (prog.isUsingProgramEntryPoint()) { + return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism); + } else if (prog.isUsingInteractiveMode()) { + // temporary hack to support the optimizer plan preview + OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler); + if (parallelism > 0) { + env.setParallelism(parallelism); + } + + return env.getOptimizedPlan(prog); + } else { + throw new RuntimeException("Couldn't determine program mode."); + } + } + + public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException { + if (parallelism > 0 && p.getDefaultParallelism() <= 0) { + LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism); + p.setDefaultParallelism(parallelism); + } + LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism()); + + return compiler.compile(p); + } + + // ------------------------------------------------------------------------ + // Program submission / execution + // ------------------------------------------------------------------------ + + /** + * General purpose method to run a user jar from the CliFrontend in either blocking or detached mode, depending + * on whether {@code setDetached(true)} or {@code setDetached(false)}. + * @param prog the packaged program + * @param parallelism the parallelism to execute the contained Flink job + * @return The result of the execution + * @throws ProgramInvocationException + */ + public JobSubmissionResult run(PackagedProgram prog, int parallelism) + throws ProgramInvocationException + { + Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); + if (prog.isUsingProgramEntryPoint()) { + return run(prog.getPlanWithJars(), parallelism, prog.getSavepointPath()); + } + else if (prog.isUsingInteractiveMode()) { + LOG.info("Starting program in interactive mode"); + ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(), + prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(), + prog.getSavepointPath()); + ContextEnvironment.setAsContext(factory); + + try { + // invoke main method + prog.invokeInteractiveModeForExecution(); + if (isDetached()) { + // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here + return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute(); + } + else { + // in blocking mode, we execute all Flink jobs contained in the user code and then return here + return new JobSubmissionResult(lastJobID); + } + } + finally { + ContextEnvironment.unsetContext(); + } + } + else { + throw new RuntimeException("PackagedProgram does not have a valid invocation mode."); + } + } + + public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException { + return run(program, parallelism, null); + } + + /** + * Runs a program on the Flink cluster to which this client is connected. The call blocks until the + * execution is complete, and returns afterwards. + * + * @param program The program to be executed. + * @param parallelism The default parallelism to use when running the program. The default parallelism is used + * when the program does not set a parallelism by itself. + * + * @throws CompilerException Thrown, if the compiler encounters an illegal situation. + * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file, + * or if the submission failed. That might be either due to an I/O problem, + * i.e. the job-manager is unreachable, or due to the fact that the + * parallel execution failed. + */ + public JobSubmissionResult run(JobWithJars program, int parallelism, String savepointPath) + throws CompilerException, ProgramInvocationException { + ClassLoader classLoader = program.getUserCodeClassLoader(); + if (classLoader == null) { + throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); + } + + OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism); + return run(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath); + } + + public JobSubmissionResult run( + FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException { + return run(compiledPlan, libraries, classpaths, classLoader, null); + } + + public JobSubmissionResult run(FlinkPlan compiledPlan, + List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, String savepointPath) + throws ProgramInvocationException + { + JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath); + return submitJob(job, classLoader); + } + + /** + * Submits a JobGraph blocking. + * @param jobGraph The JobGraph + * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes). + * @return JobExecutionResult + * @throws ProgramInvocationException + */ + public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + LeaderRetrievalService leaderRetrievalService; + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); + } catch (Exception e) { + throw new ProgramInvocationException("Could not create the leader retrieval service", e); + } + + try { + this.lastJobID = jobGraph.getJobID(); + return JobClient.submitJobAndWait(actorSystem, + leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader); + } catch (JobExecutionException e) { + throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); + } + } + + /** + * Submits a JobGraph detached. + * @param jobGraph The JobGraph + * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes). + * @return JobSubmissionResult + * @throws ProgramInvocationException + */ + public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + ActorGateway jobManagerGateway; + + try { + jobManagerGateway = getJobManagerGateway(); + } catch (Exception e) { + throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e); + } + + try { + JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, classLoader); + return new JobSubmissionResult(jobGraph.getJobID()); + } catch (JobExecutionException e) { + throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); + } + } + + /** + * Cancels a job identified by the job id. + * @param jobId the job id + * @throws Exception In case an error occurred. + */ + public void cancel(JobID jobId) throws Exception { + final ActorGateway jobManagerGateway = getJobManagerGateway(); + + final Future<Object> response; + try { + response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout); + } catch (final Exception e) { + throw new ProgramInvocationException("Failed to query the job manager gateway.", e); + } + + final Object result = Await.result(response, timeout); + + if (result instanceof JobManagerMessages.CancellationSuccess) { + LOG.info("Job cancellation with ID " + jobId + " succeeded."); + } else if (result instanceof JobManagerMessages.CancellationFailure) { + final Throwable t = ((JobManagerMessages.CancellationFailure) result).cause(); + LOG.info("Job cancellation with ID " + jobId + " failed.", t); + throw new Exception("Failed to cancel the job because of \n" + t.getMessage()); + } else { + throw new Exception("Unknown message received while cancelling: " + result.getClass().getName()); + } + } + + /** + * Stops a program on Flink cluster whose job-manager is configured in this client's configuration. + * Stopping works only for streaming programs. Be aware, that the program might continue to run for + * a while after sending the stop command, because after sources stopped to emit data all operators + * need to finish processing. + * + * @param jobId + * the job ID of the streaming program to stop + * @throws Exception + * If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal + * failed. That might be due to an I/O problem, ie, the job-manager is unreachable. + */ + public void stop(final JobID jobId) throws Exception { + final ActorGateway jobManagerGateway = getJobManagerGateway(); + + final Future<Object> response; + try { + response = jobManagerGateway.ask(new JobManagerMessages.StopJob(jobId), timeout); + } catch (final Exception e) { + throw new ProgramInvocationException("Failed to query the job manager gateway.", e); + } + + final Object result = Await.result(response, timeout); + + if (result instanceof JobManagerMessages.StoppingSuccess) { + LOG.info("Job stopping with ID " + jobId + " succeeded."); + } else if (result instanceof JobManagerMessages.StoppingFailure) { + final Throwable t = ((JobManagerMessages.StoppingFailure) result).cause(); + LOG.info("Job stopping with ID " + jobId + " failed.", t); + throw new Exception("Failed to stop the job because of \n" + t.getMessage()); + } else { + throw new Exception("Unknown message received while stopping: " + result.getClass().getName()); + } + } + + /** + * Requests and returns the accumulators for the given job identifier. Accumulators can be + * requested while a is running or after it has finished. The default class loader is used + * to deserialize the incoming accumulator results. + * @param jobID The job identifier of a job. + * @return A Map containing the accumulator's name and its value. + */ + public Map<String, Object> getAccumulators(JobID jobID) throws Exception { + return getAccumulators(jobID, ClassLoader.getSystemClassLoader()); + } + + /** + * Requests and returns the accumulators for the given job identifier. Accumulators can be + * requested while a is running or after it has finished. + * @param jobID The job identifier of a job. + * @param loader The class loader for deserializing the accumulator results. + * @return A Map containing the accumulator's name and its value. + */ + public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception { + ActorGateway jobManagerGateway = getJobManagerGateway(); + + Future<Object> response; + try { + response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout); + } catch (Exception e) { + throw new Exception("Failed to query the job manager gateway for accumulators.", e); + } + + Object result = Await.result(response, timeout); + + if (result instanceof AccumulatorResultsFound) { + Map<String, SerializedValue<Object>> serializedAccumulators = + ((AccumulatorResultsFound) result).result(); + + return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader); + + } else if (result instanceof AccumulatorResultsErroneous) { + throw ((AccumulatorResultsErroneous) result).cause(); + } else { + throw new Exception("Failed to fetch accumulators for the job " + jobID + "."); + } + } + + + // ------------------------------------------------------------------------ + // Sessions + // ------------------------------------------------------------------------ + + /** + * Tells the JobManager to finish the session (job) defined by the given ID. + * + * @param jobId The ID that identifies the session. + */ + public void endSession(JobID jobId) throws Exception { + if (jobId == null) { + throw new IllegalArgumentException("The JobID must not be null."); + } + endSessions(Collections.singletonList(jobId)); + } + + /** + * Tells the JobManager to finish the sessions (jobs) defined by the given IDs. + * + * @param jobIds The IDs that identify the sessions. + */ + public void endSessions(List<JobID> jobIds) throws Exception { + if (jobIds == null) { + throw new IllegalArgumentException("The JobIDs must not be null"); + } + + ActorGateway jobManagerGateway = getJobManagerGateway(); + + for (JobID jid : jobIds) { + if (jid != null) { + LOG.info("Telling job manager to end the session {}.", jid); + jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid)); + } + } + } + + // ------------------------------------------------------------------------ + // Internal translation methods + // ------------------------------------------------------------------------ + + /** + * Creates the optimized plan for a given program, using this client's compiler. + * + * @param prog The program to be compiled. + * @return The compiled and optimized plan, as returned by the compiler. + * @throws CompilerException Thrown, if the compiler encounters an illegal situation. + * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file. + */ + private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) + throws CompilerException, ProgramInvocationException { + return getOptimizedPlan(compiler, prog.getPlan(), parallelism); + } + + public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException { + return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null); + } + + public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException { + return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath); + } + + private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, String savepointPath) { + JobGraph job; + if (optPlan instanceof StreamingPlan) { + job = ((StreamingPlan) optPlan).getJobGraph(); + job.setSavepointPath(savepointPath); + } else { + JobGraphGenerator gen = new JobGraphGenerator(this.flinkConfig); + job = gen.compileJobGraph((OptimizedPlan) optPlan); + } + + for (URL jar : jarFiles) { + try { + job.addJar(new Path(jar.toURI())); + } catch (URISyntaxException e) { + throw new RuntimeException("URL is invalid. This should not happen.", e); + } + } + + job.setClasspaths(classpaths); + + return job; + } + + // ------------------------------------------------------------------------ + // Helper methods + // ------------------------------------------------------------------------ + + /** + * Returns the {@link ActorGateway} of the current job manager leader using + * the {@link LeaderRetrievalService}. + * + * @return ActorGateway of the current job manager leader + * @throws Exception + */ + protected ActorGateway getJobManagerGateway() throws Exception { + LOG.info("Looking up JobManager"); + + return LeaderRetrievalUtils.retrieveLeaderGateway( + LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig), + actorSystem, + lookupTimeout); + } + + /** + * Logs and prints to sysout if printing to stdout is enabled. + * @param message The message to log/print + */ + protected void logAndSysout(String message) { + LOG.info(message); + if (printStatusDuringExecution) { + System.out.println(message); + } + } + + // ------------------------------------------------------------------------ + // Abstract methods to be implemented by the cluster specific Client + // ------------------------------------------------------------------------ + + /** + * Returns an URL (as a string) to the JobManager web interface + */ + public abstract String getWebInterfaceURL(); + + /** + * Returns the latest cluster status, with number of Taskmanagers and slots + */ + public abstract GetClusterStatusResponse getClusterStatus(); + + /** + * May return new messages from the cluster. + * Messages can be for example about failed containers or container launch requests. + */ + protected abstract List<String> getNewMessages(); + + /** + * Returns a string representation of the cluster. + */ + protected abstract String getClusterIdentifier(); + + /** + * Request the cluster to shut down or disconnect. + */ + protected abstract void finalizeCluster(); + + /** + * Set the mode of this client (detached or blocking job execution). + * @param isDetached If true, the client will submit programs detached via the {@code run} method + */ + public void setDetached(boolean isDetached) { + this.detachedJobSubmission = isDetached; + } + + /** + * A flag to indicate whether this clients submits jobs detached. + * @return True if the Client submits detached, false otherwise + */ + public boolean isDetached() { + return detachedJobSubmission; + } + + /** + * Return the Flink configuration object + * @return The Flink configuration object + */ + public Configuration getFlinkConfiguration() { + return flinkConfig.clone(); + } + + /** + * The client may define an upper limit on the number of slots to use + * @return -1 if unknown + */ + public abstract int getMaxSlots(); + + /** + * Calls the subclasses' submitJob method. It may decide to simply call one of the run methods or it may perform + * some custom job submission logic. + * @param jobGraph The JobGraph to be submitted + * @return JobSubmissionResult + */ + protected abstract JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) + throws ProgramInvocationException; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index dfb5f2e..fe2d7e0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -34,7 +34,7 @@ import java.util.List; */ public class ContextEnvironment extends ExecutionEnvironment { - protected final Client client; + protected final ClusterClient client; protected final List<URL> jarFilesToAttach; @@ -44,8 +44,8 @@ public class ContextEnvironment extends ExecutionEnvironment { protected final String savepointPath; - public ContextEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths, - ClassLoader userCodeClassLoader, String savepointPath) { + public ContextEnvironment(ClusterClient remoteConnection, List<URL> jarFiles, List<URL> classpaths, + ClassLoader userCodeClassLoader, String savepointPath) { this.client = remoteConnection; this.jarFilesToAttach = jarFiles; this.classpathsToAttach = classpaths; @@ -58,7 +58,7 @@ public class ContextEnvironment extends ExecutionEnvironment { Plan p = createProgramPlan(jobName); JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach, this.userCodeClassLoader); - this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism(), savepointPath); + this.lastJobExecutionResult = client.run(toRun, getParallelism(), savepointPath).getJobExecutionResult(); return this.lastJobExecutionResult; } @@ -66,7 +66,7 @@ public class ContextEnvironment extends ExecutionEnvironment { public String getExecutionPlan() throws Exception { Plan plan = createProgramPlan("unnamed job"); - OptimizedPlan op = Client.getOptimizedPlan(client.compiler, plan, getParallelism()); + OptimizedPlan op = ClusterClient.getOptimizedPlan(client.compiler, plan, getParallelism()); PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator(); return gen.getOptimizerPlanAsJSON(op); } @@ -83,7 +83,7 @@ public class ContextEnvironment extends ExecutionEnvironment { + ") : " + getIdString(); } - public Client getClient() { + public ClusterClient getClient() { return this.client; } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java index e820bad..f9b1fc2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java @@ -32,7 +32,7 @@ import java.util.List; */ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory { - private final Client client; + private final ClusterClient client; private final List<URL> jarFilesToAttach; @@ -42,34 +42,34 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory { private final int defaultParallelism; - private final boolean wait; + private final boolean isDetached; private ExecutionEnvironment lastEnvCreated; private String savepointPath; - public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach, + public ContextEnvironmentFactory(ClusterClient client, List<URL> jarFilesToAttach, List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism, - boolean wait, String savepointPath) + boolean isDetached, String savepointPath) { this.client = client; this.jarFilesToAttach = jarFilesToAttach; this.classpathsToAttach = classpathsToAttach; this.userCodeClassLoader = userCodeClassLoader; this.defaultParallelism = defaultParallelism; - this.wait = wait; + this.isDetached = isDetached; this.savepointPath = savepointPath; } @Override public ExecutionEnvironment createExecutionEnvironment() { - if (!wait && lastEnvCreated != null) { + if (isDetached && lastEnvCreated != null) { throw new InvalidProgramException("Multiple enviornments cannot be created in detached mode"); } - lastEnvCreated = wait ? - new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath) : - new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath); + lastEnvCreated = isDetached ? + new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath): + new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath); if (defaultParallelism > 0) { lastEnvCreated.setParallelism(defaultParallelism); } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java index 037c36b..8298933 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java @@ -42,7 +42,7 @@ public class DetachedEnvironment extends ContextEnvironment { private static final Logger LOG = LoggerFactory.getLogger(DetachedEnvironment.class); public DetachedEnvironment( - Client remoteConnection, + ClusterClient remoteConnection, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader, @@ -53,7 +53,7 @@ public class DetachedEnvironment extends ContextEnvironment { @Override public JobExecutionResult execute(String jobName) throws Exception { Plan p = createProgramPlan(jobName); - setDetachedPlan(Client.getOptimizedPlan(client.compiler, p, getParallelism())); + setDetachedPlan(ClusterClient.getOptimizedPlan(client.compiler, p, getParallelism())); LOG.warn("Job was executed in detached mode, the results will be available on completion."); this.lastJobExecutionResult = DetachedJobExecutionResult.INSTANCE; return this.lastJobExecutionResult; @@ -72,7 +72,7 @@ public class DetachedEnvironment extends ContextEnvironment { * Finishes this Context Environment's execution by explicitly running the plan constructed. */ JobSubmissionResult finalizeExecute() throws ProgramInvocationException { - return client.runDetached(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath); + return client.run(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath); } public static final class DetachedJobExecutionResult extends JobExecutionResult { http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java new file mode 100644 index 0000000..82f350a --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.program; + +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Cluster client for communication with an standalone (on-premise) cluster or an existing cluster that has been + * brought up independently of a specific job. + */ +public class StandaloneClusterClient extends ClusterClient { + + public StandaloneClusterClient(Configuration config) throws IOException { + super(config); + } + + + @Override + public String getWebInterfaceURL() { + String host = this.getJobManagerAddress().getHostName(); + int port = getFlinkConfiguration().getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT); + return "http://" + host + ":" + port; + } + + @Override + public GetClusterStatusResponse getClusterStatus() { + ActorGateway jmGateway; + try { + jmGateway = getJobManagerGateway(); + Future<Object> future = jmGateway.ask(GetClusterStatus.getInstance(), timeout); + Object result = Await.result(future, timeout); + if (result instanceof GetClusterStatusResponse) { + return (GetClusterStatusResponse) result; + } else { + throw new RuntimeException("Received the wrong reply " + result + " from cluster."); + } + } catch (Exception e) { + throw new RuntimeException("Couldn't retrieve the Cluster status.", e); + } + } + + @Override + public List<String> getNewMessages() { + return Collections.emptyList(); + } + + @Override + public String getClusterIdentifier() { + return "Standalone cluster with JobManager running at " + this.getJobManagerAddress(); + } + + @Override + public int getMaxSlots() { + return -1; + } + + @Override + protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) + throws ProgramInvocationException { + if (isDetached()) { + return super.runDetached(jobGraph, classLoader); + } else { + return super.run(jobGraph, classLoader); + } + } + + @Override + protected void finalizeCluster() {} + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java index 9d0b691..de85ca8 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java @@ -18,17 +18,13 @@ package org.apache.flink.client; -import static org.junit.Assert.assertEquals; +import static org.apache.flink.client.CliFrontendTestUtils.checkJobManagerAddress; import static org.junit.Assert.fail; import static org.mockito.Mockito.*; -import java.io.File; -import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import org.apache.flink.client.cli.CommandLineOptions; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.junit.Before; import org.junit.BeforeClass; @@ -46,57 +42,18 @@ public class CliFrontendAddressConfigurationTest { @Rule public TemporaryFolder folder = new TemporaryFolder(); - + @BeforeClass public static void init() { CliFrontendTestUtils.pipeSystemOutToNull(); } - + @Before public void clearConfig() { CliFrontendTestUtils.clearGlobalConfiguration(); } @Test - public void testInvalidConfigAndNoOption() { - try { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir()); - CommandLineOptions options = mock(CommandLineOptions.class); - - frontend.updateConfig(options); - Configuration config = frontend.getConfiguration(); - - checkJobManagerAddress(config, null, -1); - - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testInvalidConfigAndOption() { - try { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir()); - - CommandLineOptions options = mock(CommandLineOptions.class); - when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788"); - - frontend.updateConfig(options); - Configuration config = frontend.getConfiguration(); - - InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788); - - checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test public void testValidConfig() { try { CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); @@ -112,83 +69,38 @@ public class CliFrontendAddressConfigurationTest { CliFrontendTestUtils.TEST_JOB_MANAGER_PORT); } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + e.printStackTrace(); + fail(e.getMessage()); + } } - /** - * Test that the CliFrontent is able to pick up the .yarn-properties file from a specified location. - */ @Test - public void testYarnConfig() { + public void testInvalidConfigAndNoOption() { try { - File tmpFolder = folder.newFolder(); - String currentUser = System.getProperty("user.name"); - - // copy reference flink-conf.yaml to temporary test directory and append custom configuration path. - File confFile = new File(CliFrontendRunTest.class.getResource("/testconfigwithyarn/flink-conf.yaml").getFile()); - File testConfFile = new File(tmpFolder, "flink-conf.yaml"); - org.apache.commons.io.FileUtils.copyFile(confFile, testConfFile); - String toAppend = "\nyarn.properties-file.location: " + tmpFolder; - // append to flink-conf.yaml - Files.write(testConfFile.toPath(), toAppend.getBytes(), StandardOpenOption.APPEND); - // copy .yarn-properties-<username> - File propertiesFile = new File(CliFrontendRunTest.class.getResource("/testconfigwithyarn/.yarn-properties").getFile()); - File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser); - org.apache.commons.io.FileUtils.copyFile(propertiesFile, testPropertiesFile); - - // start CLI Frontend - CliFrontend frontend = new CliFrontend(tmpFolder.getAbsolutePath()); - + CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir()); CommandLineOptions options = mock(CommandLineOptions.class); frontend.updateConfig(options); Configuration config = frontend.getConfiguration(); - checkJobManagerAddress( - config, - CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_ADDRESS, - CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_PORT); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testInvalidYarnConfig() { - try { - CliFrontend cli = new CliFrontend(CliFrontendTestUtils.getConfigDirWithInvalidYarnFile()); - - CommandLineOptions options = mock(CommandLineOptions.class); - - cli.updateConfig(options); - - Configuration config = cli.getConfiguration(); + checkJobManagerAddress(config, null, -1); - checkJobManagerAddress( - config, - CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS, - CliFrontendTestUtils.TEST_JOB_MANAGER_PORT); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } - + @Test - public void testManualOptionsOverridesConfig() { + public void testInvalidConfigAndOption() { try { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir()); CommandLineOptions options = mock(CommandLineOptions.class); when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788"); frontend.updateConfig(options); - Configuration config = frontend.getConfiguration(); InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788); @@ -200,11 +112,11 @@ public class CliFrontendAddressConfigurationTest { fail(e.getMessage()); } } - + @Test - public void testManualOptionsOverridesYarn() { + public void testManualOptionsOverridesConfig() { try { - CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile()); + CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); CommandLineOptions options = mock(CommandLineOptions.class); when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788"); @@ -223,11 +135,4 @@ public class CliFrontendAddressConfigurationTest { } } - public void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) { - String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); - - assertEquals(expectedAddress, jobManagerAddress); - assertEquals(expectedPort, jobManagerPort); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java index 5439742..f47ca69 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java @@ -30,11 +30,10 @@ import static org.mockito.Mockito.*; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.ProgramOptions; import org.apache.flink.client.cli.RunOptions; -import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.optimizer.DataStatistics; @@ -328,7 +327,7 @@ public class CliFrontendPackageProgramTest { Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c); // we expect this to fail with a "ClassNotFoundException" - Client.getOptimizedPlanAsJson(compiler, prog, 666); + ClusterClient.getOptimizedPlanAsJson(compiler, prog, 666); fail("Should have failed with a ClassNotFoundException"); } catch (ProgramInvocationException e) { http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java index 56173bd..ceba6cb 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java @@ -25,11 +25,10 @@ import static org.junit.Assert.*; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CommandLineOptions; import org.apache.flink.client.cli.RunOptions; -import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; public class CliFrontendRunTest { @@ -75,7 +74,7 @@ public class CliFrontendRunTest { // test detached mode { String[] parameters = {"-p", "2", "-d", getTestJarPath()}; - RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, true); + RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, true, true); assertEquals(0, testFrontend.run(parameters)); } @@ -96,9 +95,6 @@ public class CliFrontendRunTest { // test configure savepoint path { String[] parameters = {"-s", "expectedSavepointPath", getTestJarPath()}; - RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(1, false, false); - assertEquals(0, testFrontend.run(parameters)); - RunOptions options = CliFrontendParser.parseRunCommand(parameters); assertEquals("expectedSavepointPath", options.getSavepointPath()); } @@ -125,22 +121,16 @@ public class CliFrontendRunTest { } @Override - protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) { - assertTrue(isDetached); - assertEquals(this.expectedParallelism, parallelism); - assertEquals(this.sysoutLogging, client.getPrintStatusDuringExecution()); - return 0; - } - - @Override - protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) { - assertTrue(!isDetached); + protected int executeProgram(PackagedProgram program, ClusterClient client, int parallelism) { + assertEquals(isDetached, client.isDetached()); + assertEquals(sysoutLogging, client.getPrintStatusDuringExecution()); + assertEquals(expectedParallelism, parallelism); return 0; } @Override - protected Client getClient(CommandLineOptions options, String programName, int userParallelism, boolean detached) throws Exception { - return Mockito.mock(Client.class); + protected ClusterClient getClient(CommandLineOptions options, String programName) throws Exception { + return TestingClusterClientWithoutActorSystem.create(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java index 7d01ab6..1872133 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java +++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java @@ -25,9 +25,12 @@ import java.io.PrintStream; import java.lang.reflect.Field; import java.net.MalformedURLException; import java.util.Map; + +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class CliFrontendTestUtils { @@ -35,16 +38,11 @@ public class CliFrontendTestUtils { public static final String TEST_JAR_MAIN_CLASS = "org.apache.flink.client.testjar.WordCount"; public static final String TEST_JAR_CLASSLOADERTEST_CLASS = "org.apache.flink.client.testjar.JobWithExternalDependency"; - - + public static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33"; - + public static final int TEST_JOB_MANAGER_PORT = 55443; - public static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55"; - - public static final int TEST_YARN_JOB_MANAGER_PORT = 6655; - public static String getTestJarPath() throws FileNotFoundException, MalformedURLException { File f = new File("target/maven-test-jar.jar"); @@ -68,17 +66,7 @@ public class CliFrontendTestUtils { String confFile = CliFrontendRunTest.class.getResource("/invalidtestconfig/flink-conf.yaml").getFile(); return new File(confFile).getAbsoluteFile().getParent(); } - - public static String getConfigDirWithYarnFile() { - String confFile = CliFrontendRunTest.class.getResource("/testconfigwithyarn/flink-conf.yaml").getFile(); - return new File(confFile).getAbsoluteFile().getParent(); - } - - public static String getConfigDirWithInvalidYarnFile() { - String confFile = CliFrontendRunTest.class.getResource("/testconfigwithinvalidyarn/flink-conf.yaml").getFile(); - return new File(confFile).getAbsoluteFile().getParent(); - } - + public static void pipeSystemOutToNull() { System.setOut(new PrintStream(new BlackholeOutputSteam())); System.setErr(new PrintStream(new BlackholeOutputSteam())); @@ -114,6 +102,14 @@ public class CliFrontendTestUtils { @Override public void write(int b){} } + + public static void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) { + String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + assertEquals(expectedAddress, jobManagerAddress); + assertEquals(expectedPort, jobManagerPort); + } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java b/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java new file mode 100644 index 0000000..ab608cb --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/TestingClusterClientWithoutActorSystem.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client; + +import akka.actor.ActorSystem; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.configuration.Configuration; +import org.mockito.Mockito; + +import java.io.IOException; + +/** + * A client to use in tests which does not instantiate an ActorSystem. + */ +public class TestingClusterClientWithoutActorSystem extends StandaloneClusterClient { + + private TestingClusterClientWithoutActorSystem() throws IOException { + super(new Configuration()); + } + + /** + * Do not instantiate the Actor System to save resources. + * @return Mocked ActorSystem + * @throws IOException + */ + @Override + protected ActorSystem createActorSystem() throws IOException { + return Mockito.mock(ActorSystem.class); + } + + public static ClusterClient create() { + try { + return new TestingClusterClientWithoutActorSystem(); + } catch (IOException e) { + throw new RuntimeException("Could not create TestingClientWithoutActorSystem.", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java index 14a1fff..4eb5269 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java @@ -98,7 +98,7 @@ public class ClientConnectionTest { @Override public void run() { try { - new Client(config); + new StandaloneClusterClient(config); fail("This should fail with an exception since the JobManager is unreachable."); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 4f9b367..96785f4 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -66,7 +66,7 @@ import static org.mockito.Mockito.when; /** - * Simple and maybe stupid test to check the {@link Client} class. + * Simple and maybe stupid test to check the {@link ClusterClient} class. */ public class ClientTest { @@ -127,11 +127,12 @@ public class ClientTest { @Test public void testDetachedMode() throws Exception{ jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME()); - Client out = new Client(config); + ClusterClient out = new StandaloneClusterClient(config); + out.setDetached(true); try { PackagedProgram prg = new PackagedProgram(TestExecuteTwice.class); - out.runDetached(prg, 1); + out.run(prg, 1); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -141,7 +142,7 @@ public class ClientTest { try { PackagedProgram prg = new PackagedProgram(TestEager.class); - out.runDetached(prg, 1); + out.run(prg, 1); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -151,7 +152,7 @@ public class ClientTest { try { PackagedProgram prg = new PackagedProgram(TestGetRuntime.class); - out.runDetached(prg, 1); + out.run(prg, 1); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -161,7 +162,7 @@ public class ClientTest { try { PackagedProgram prg = new PackagedProgram(TestGetJobID.class); - out.runDetached(prg, 1); + out.run(prg, 1); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -171,7 +172,7 @@ public class ClientTest { try { PackagedProgram prg = new PackagedProgram(TestGetAccumulator.class); - out.runDetached(prg, 1); + out.run(prg, 1); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -181,7 +182,7 @@ public class ClientTest { try { PackagedProgram prg = new PackagedProgram(TestGetAllAccumulator.class); - out.runDetached(prg, 1); + out.run(prg, 1); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -198,8 +199,9 @@ public class ClientTest { try { jobManagerSystem.actorOf(Props.create(SuccessReturningActor.class), JobManager.JOB_MANAGER_NAME()); - Client out = new Client(config); - JobSubmissionResult result = out.runDetached(program.getPlanWithJars(), 1); + ClusterClient out = new StandaloneClusterClient(config); + out.setDetached(true); + JobSubmissionResult result = out.run(program.getPlanWithJars(), 1); assertNotNull(result); @@ -219,10 +221,11 @@ public class ClientTest { try { jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME()); - Client out = new Client(config); + ClusterClient out = new StandaloneClusterClient(config); + out.setDetached(true); try { - out.runDetached(program.getPlanWithJars(), 1); + out.run(program.getPlanWithJars(), 1); fail("This should fail with an exception"); } catch (ProgramInvocationException e) { @@ -258,7 +261,9 @@ public class ClientTest { }).when(packagedProgramMock).invokeInteractiveModeForExecution(); try { - new Client(config).runBlocking(packagedProgramMock, 1); + ClusterClient client = new StandaloneClusterClient(config); + client.setDetached(true); + client.run(packagedProgramMock, 1); fail("Creating the local execution environment should not be possible"); } catch (InvalidProgramException e) { @@ -280,7 +285,7 @@ public class ClientTest { assertNotNull(prg.getPreviewPlan()); Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); - OptimizedPlan op = (OptimizedPlan) Client.getOptimizedPlan(optimizer, prg, 1); + OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, 1); assertNotNull(op); PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
