http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java index be2caaf..c291ada 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java @@ -54,7 +54,7 @@ public class ExecutionPlanCreationTest { config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, mockJmAddress.getPort()); Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); - OptimizedPlan op = (OptimizedPlan) Client.getOptimizedPlan(optimizer, prg, -1); + OptimizedPlan op = (OptimizedPlan) ClusterClient.getOptimizedPlan(optimizer, prg, -1); assertNotNull(op); PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java index 2541345..6ad250d 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java @@ -36,9 +36,10 @@ import com.google.common.collect.Lists; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; -import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.JobWithJars; import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -205,9 +206,9 @@ public class FlinkClient { configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost); configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort); - final Client client; + final ClusterClient client; try { - client = new Client(configuration); + client = new StandaloneClusterClient(configuration); } catch (final IOException e) { throw new RuntimeException("Could not establish a connection to the job manager", e); } @@ -245,9 +246,9 @@ public class FlinkClient { configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerHost); configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerPort); - final Client client; + final ClusterClient client; try { - client = new Client(configuration); + client = new StandaloneClusterClient(configuration); } catch (final IOException e) { throw new RuntimeException("Could not establish a connection to the job manager", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java index 7962fce..bc5ae09 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java @@ -117,11 +117,14 @@ public class JobExecutionResult extends JobSubmissionResult { return (Integer) result; } + /** * Returns a dummy object for wrapping a JobSubmissionResult * @param result The SubmissionResult * @return a JobExecutionResult + * @deprecated Will be removed in future versions. */ + @Deprecated public static JobExecutionResult fromJobSubmissionResult(JobSubmissionResult result) { return new JobExecutionResult(result.getJobID(), -1, null); } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java index 91a838b..4928b25 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java @@ -25,7 +25,7 @@ import org.apache.flink.annotation.Public; */ @Public public class JobSubmissionResult { - + private JobID jobID; public JobSubmissionResult(JobID jobID) { @@ -40,4 +40,26 @@ public class JobSubmissionResult { public JobID getJobID() { return jobID; } + + /** + * Checks if this JobSubmissionResult is also a JobExecutionResult. + * See {@code getJobExecutionResult} to retrieve the JobExecutionResult. + * @return True if this is a JobExecutionResult, false otherwise + */ + public boolean isJobExecutionResults() { + return this instanceof JobExecutionResult; + } + + /** + * Returns the JobExecutionResult if available. + * @return The JobExecutionResult + * @throws ClassCastException if this is not a JobExecutionResult + */ + public JobExecutionResult getJobExecutionResult() { + if (isJobExecutionResults()) { + return (JobExecutionResult) this; + } else { + throw new ClassCastException("This JobSubmissionResult is not a JobExecutionResult."); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-dist/src/main/flink-bin/conf/log4j-cli.properties ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties index acb9d1a..2aba6af 100644 --- a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties +++ b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties @@ -29,7 +29,7 @@ log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %- # Log output from org.apache.flink.yarn to the console. This is used by the # CliFrontend class when using a per-job YARN cluster. log4j.logger.org.apache.flink.yarn=INFO, console -log4j.logger.org.apache.flink.client.FlinkYarnSessionCli=INFO, console +log4j.logger.org.apache.flink.yarn.cli.FlinkYarnSessionCli=INFO, console log4j.logger.org.apache.hadoop=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh index aa87c89..16f8ab9 100755 --- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh +++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh @@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4 export FLINK_CONF_DIR -$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.client.FlinkYarnSessionCli -ship $bin/../lib/ -j $FLINK_LIB_DIR/flink-dist*.jar "$@" +$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -ship $bin/../lib/ -j $FLINK_LIB_DIR/flink-dist*.jar "$@" http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java index 955122f..d79768f 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java @@ -62,7 +62,7 @@ public class DegreesWithExceptionITCase { } catch (Throwable t) { t.printStackTrace(); - fail("Cluster shutdown caused an exception: " + t.getMessage()); + fail("ClusterClient shutdown caused an exception: " + t.getMessage()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java index 61ef446..56a0a59 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java @@ -66,7 +66,7 @@ public class ReduceOnEdgesWithExceptionITCase { } catch (Throwable t) { t.printStackTrace(); - fail("Cluster shutdown caused an exception: " + t.getMessage()); + fail("ClusterClient shutdown caused an exception: " + t.getMessage()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java index 6cc0b6a..7458e08 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java @@ -67,7 +67,7 @@ public class ReduceOnNeighborsWithExceptionITCase { } catch (Throwable t) { t.printStackTrace(); - fail("Cluster shutdown caused an exception: " + t.getMessage()); + fail("ClusterClient shutdown caused an exception: " + t.getMessage()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java index 03bae4e..9da54c1 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; @@ -92,7 +92,7 @@ public abstract class JarActionHandler implements RequestHandler { ClassLoader classLoader = program.getUserCodeClassLoader(); Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration()); - FlinkPlan plan = Client.getOptimizedPlan(optimizer, program, parallelism); + FlinkPlan plan = ClusterClient.getOptimizedPlan(optimizer, program, parallelism); if (plan instanceof StreamingPlan) { graph = ((StreamingPlan) plan).getJobGraph(); http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index cb95040..46a432e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -205,7 +205,16 @@ public class JobClient { checkNotNull(jobManagerGateway, "The jobManagerGateway must not be null."); checkNotNull(jobGraph, "The jobGraph must not be null."); checkNotNull(timeout, "The timeout must not be null."); - + + LOG.info("Checking and uploading JAR files"); + try { + JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout); + } + catch (IOException e) { + throw new JobSubmissionException(jobGraph.getJobID(), + "Could not upload the program's JAR files to the JobManager.", e); + } + Object result; try { Future<Object> future = jobManagerGateway.ask( @@ -214,7 +223,7 @@ public class JobClient { ListeningBehaviour.DETACHED // only receive the Acknowledge for the job submission message ), timeout); - + result = Await.result(future, timeout); } catch (TimeoutException e) { @@ -225,10 +234,10 @@ public class JobClient { throw new JobExecutionException(jobGraph.getJobID(), "Failed to send job to JobManager: " + t.getMessage(), t.getCause()); } - + if (result instanceof JobManagerMessages.JobSubmitSuccess) { JobID respondedID = ((JobManagerMessages.JobSubmitSuccess) result).jobId(); - + // validate response if (!respondedID.equals(jobGraph.getJobID())) { throw new JobExecutionException(jobGraph.getJobID(), http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java index 9746cef..b7bb84e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java @@ -51,4 +51,5 @@ public enum ApplicationStatus { public int processExitCode() { return processExitCode; } + } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index 1bcb195..0aaf098 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -538,7 +538,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva } // ------------------------------------------------------------------------ - // Cluster Shutdown + // ClusterClient Shutdown // ------------------------------------------------------------------------ private void shutdownCluster(ApplicationStatus status, String diagnostics) { http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java index 5146d7b..9ad5ba5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/GetClusterStatusResponse.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.clusterframework.messages; import java.io.Serializable; /** - * The reply to a {@code GetClusterStatus} message sent by the resource manager. Sends over the + * The reply to a {@code GetClusterStatus} message sent by the job manager. Sends over the * current number of task managers and the available task slots. */ public class GetClusterStatusResponse implements Serializable { http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java deleted file mode 100644 index c1498c5..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.runtime.yarn; - -import org.apache.flink.configuration.Configuration; -import org.apache.hadoop.fs.Path; -import java.io.File; -import java.util.List; - -/** - * Abstract interface for an implementation of a Flink on YARN client to deploy. - * - * The Client describes the properties of the YARN application to create. - */ -public abstract class AbstractFlinkYarnClient { - - // ---- Setter for YARN Cluster properties ----- // - - /** - * @param memoryMB The amount of memory for the JobManager (in MB) - */ - public abstract void setJobManagerMemory(int memoryMB); - - /** - * @param memoryMB The memory per TaskManager (in MB) - */ - public abstract void setTaskManagerMemory(int memoryMB); - - /** - * Flink configuration - */ - public abstract void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf); - - public abstract Configuration getFlinkConfiguration(); - - /** - * - * @param slots The number of TaskManager slots per TaskManager. - */ - public abstract void setTaskManagerSlots(int slots); - - /** - * @return the number of TaskManager processing slots per TaskManager. - */ - public abstract int getTaskManagerSlots(); - - /** - * @param queue Name of the YARN queue - */ - public abstract void setQueue(String queue); - - /** - * - * @param localJarPath Local Path to the Flink uberjar - */ - public abstract void setLocalJarPath(Path localJarPath); - - /** - * - * @param confPath local path to the Flink configuration file - */ - public abstract void setConfigurationFilePath(Path confPath); - - /** - * - * @param logConfPath local path to the flink logging configuration - */ - public abstract void setFlinkLoggingConfigurationPath(Path logConfPath); - public abstract Path getFlinkLoggingConfigurationPath(); - - /** - * - * @param tmCount number of TaskManagers to start - */ - public abstract void setTaskManagerCount(int tmCount); - public abstract int getTaskManagerCount(); - - /** - * @param confDirPath Path to config directory. - */ - public abstract void setConfigurationDirectory(String confDirPath); - - /** - * List of files to transfer to the YARN containers. - */ - public abstract void setShipFiles(List<File> shipFiles); - - /** - * - * @param dynamicPropertiesEncoded Encoded String of the dynamic properties (-D configuration values of the Flink configuration) - */ - public abstract void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded); - public abstract String getDynamicPropertiesEncoded(); - - // --------------------------------------- Operations on the YARN cluster ----- // - - /** - * Returns a String containing details about the cluster (NodeManagers, available memory, ...) - * - */ - public abstract String getClusterDescription() throws Exception; - - /** - * Trigger the deployment to YARN. - * - */ - public abstract AbstractFlinkYarnCluster deploy() throws Exception; - - /** - * @param detachedMode If true, the Flink YARN client is non-blocking. That means it returns - * once Flink has been started successfully on YARN. - */ - public abstract void setDetachedMode(boolean detachedMode); - - public abstract boolean isDetached(); - - /** - * @return The string representation of the Path to the YARN session files. This is a temporary - * directory in HDFS that contains the jar files and configuration which is shipped to all the containers. - */ - public abstract String getSessionFilesDir(); - - /** - * Set a name for the YARN application - * @param name - */ - public abstract void setName(String name); -} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java deleted file mode 100644 index af015c7..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.yarn; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; - -/** - * Abstract class for interacting with a running Flink cluster within YARN. - */ -public abstract class AbstractFlinkYarnCluster { - - /** - * Get hostname and port of the JobManager. - */ - public abstract InetSocketAddress getJobManagerAddress(); - - /** - * Returns an URL (as a string) to the JobManager web interface, running next to the - * ApplicationMaster and JobManager in a YARN container - */ - public abstract String getWebInterfaceURL(); - - /** - * Request the YARN cluster to shut down. - * - * @param failApplication If true, the application will be marked as failed in YARN - */ - public abstract void shutdown(boolean failApplication); - - /** - * Boolean indicating whether the cluster has been stopped already - */ - public abstract boolean hasBeenStopped(); - - /** - * Returns the latest cluster status, with number of Taskmanagers and slots - */ - public abstract GetClusterStatusResponse getClusterStatus(); - - /** - * Boolean indicating whether the Flink YARN cluster is in an erronous state. - */ - public abstract boolean hasFailed(); - - /** - * @return Diagnostics if the Cluster is in "failed" state. - */ - public abstract String getDiagnostics(); - - /** - * May return new messages from the cluster. - * Messages can be for example about failed containers or container launch requests. - */ - public abstract List<String> getNewMessages(); - - /** - * Returns a string representation of the ApplicationID assigned by YARN. - */ - public abstract String getApplicationId(); - - /** - * Flink's YARN cluster abstraction has two modes for connecting to the YARN AM. - * In the detached mode, the AM is launched and the Flink YARN client is disconnecting - * afterwards. - * In the non-detached mode, it maintains a connection with the AM to control the cluster. - * @return boolean indicating whether the cluster is a detached cluster - */ - public abstract boolean isDetached(); - - /** - * Connect the FlinkYarnCluster to the ApplicationMaster. - * - * Detached YARN sessions don't need to connect to the ApplicationMaster. - * Detached per job YARN sessions need to connect until the required number of TaskManagers have been started. - * - * @throws IOException - */ - public abstract void connectToCluster() throws IOException; - - /** - * Disconnect from the ApplicationMaster without stopping the session - * (therefore, use the {@link AbstractFlinkYarnCluster#shutdown(boolean)} method. - * - * @see AbstractFlinkYarnCluster#shutdown(boolean) - */ - public abstract void disconnect(); - - /** - * Tells the ApplicationMaster to monitor the status of JobId and stop itself once the specified - * job has finished. - * - * @param jobID Id of the job - */ - public abstract void stopAfterJob(JobID jobID); - - /** - * Return the Flink configuration object - * @return The Flink configuration object - */ - public abstract Configuration getFlinkConfiguration(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index 7b80206..1a8870b 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -19,14 +19,14 @@ package org.apache.flink.api.scala import java.io._ -import java.util.Properties -import org.apache.flink.client.{CliFrontend, ClientUtils, FlinkYarnSessionCli} -import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} +import org.apache.flink.client.cli.CliFrontendParser +import org.apache.flink.client.program.ClusterClient +import org.apache.flink.client.CliFrontend +import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration} import org.apache.flink.runtime.minicluster.{FlinkMiniCluster, LocalFlinkMiniCluster} -import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster -import org.apache.hadoop.fs.Path +import scala.collection.mutable.ArrayBuffer import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ @@ -138,7 +138,7 @@ object FlinkShell { def fetchConnectionInfo( config: Config - ): (String, Int, Option[Either[FlinkMiniCluster, AbstractFlinkYarnCluster]]) = { + ): (String, Int, Option[Either[FlinkMiniCluster, ClusterClient]]) = { config.executionMode match { case ExecutionMode.LOCAL => // Local mode val config = GlobalConfiguration.getConfiguration() @@ -217,7 +217,7 @@ object FlinkShell { repl.closeInterpreter() cluster match { case Some(Left(miniCluster)) => miniCluster.stop() - case Some(Right(yarnCluster)) => yarnCluster.shutdown(false) + case Some(Right(yarnCluster)) => yarnCluster.shutdown() case _ => } } @@ -226,71 +226,49 @@ object FlinkShell { } def deployNewYarnCluster(yarnConfig: YarnConfig) = { - val yarnClient = FlinkYarnSessionCli.getFlinkYarnClient - - // use flink-dist.jar for scala shell - val jarPath = new Path("file://" + - s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}") - yarnClient.setLocalJarPath(jarPath) - - val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv - val flinkConfiguration = GlobalConfiguration.getConfiguration - val confFile = new File(confDirPath + File.separator + "flink-conf.yaml") - val confPath = new Path(confFile.getAbsolutePath) - yarnClient.setFlinkConfiguration(flinkConfiguration) - yarnClient.setConfigurationDirectory(confDirPath) - yarnClient.setConfigurationFilePath(confPath) + + val args = ArrayBuffer[String]( + "-m", "yarn-cluster" + ) // number of task managers is required. yarnConfig.containers match { - case Some(containers) => yarnClient.setTaskManagerCount(containers) + case Some(containers) => args ++= Seq("-yn", containers.toString) case None => throw new IllegalArgumentException("Number of taskmanagers must be specified.") } // set configuration from user input - yarnConfig.jobManagerMemory.foreach(yarnClient.setJobManagerMemory) - yarnConfig.name.foreach(yarnClient.setName) - yarnConfig.queue.foreach(yarnClient.setQueue) - yarnConfig.slots.foreach(yarnClient.setTaskManagerSlots) - yarnConfig.taskManagerMemory.foreach(yarnClient.setTaskManagerMemory) - - // deploy - val cluster = yarnClient.deploy() + yarnConfig.jobManagerMemory.foreach((jmMem) => args ++= Seq("-yjm", jmMem.toString)) + yarnConfig.slots.foreach((tmMem) => args ++= Seq("-ytm", tmMem.toString)) + yarnConfig.name.foreach((name) => args ++= Seq("-ynm", name.toString)) + yarnConfig.queue.foreach((queue) => args ++= Seq("-yqu", queue.toString)) + yarnConfig.slots.foreach((slots) => args ++= Seq("-ys", slots.toString)) + + val customCLI = CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster") + + val options = CliFrontendParser.parseRunCommand(args.toArray) + + val cluster = customCLI.createClient("Flink Scala Shell", options.getCommandLine) + val address = cluster.getJobManagerAddress.getAddress.getHostAddress val port = cluster.getJobManagerAddress.getPort - cluster.connectToCluster() (address, port, Some(Right(cluster))) } def fetchDeployedYarnClusterInfo() = { + // load configuration val globalConfig = GlobalConfiguration.getConfiguration - val propertiesLocation = CliFrontend.getYarnPropertiesLocation(globalConfig) - val propertiesFile = new File(propertiesLocation) - - // read properties - val properties = if (propertiesFile.exists()) { - println("Found YARN properties file " + propertiesFile.getAbsolutePath) - val properties = new Properties() - val inputStream = new FileInputStream(propertiesFile) - - try { - properties.load(inputStream) - } finally { - inputStream.close() - } - properties - } else { - throw new IllegalArgumentException("Scala Shell cannot fetch YARN properties.") - } + val customCLI = CliFrontendParser.getAllCustomCommandLine.get("yarn-cluster") + + val cluster = customCLI.retrieveCluster(globalConfig) - val addressInStr = properties.getProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY) - val address = ClientUtils.parseHostPortAddress(addressInStr) + val jobManager = cluster.getJobManagerAddress - (address.getHostString, address.getPort, None) + (jobManager.getHostString, jobManager.getPort, None) } def ensureYarnConfig(config: Config) = config.yarnConfig match { http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 4475bc8..f03cb84 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -775,7 +775,7 @@ object ExecutionEnvironment { * configuration parameters for the Client only; Program parallelism can be set via * [[ExecutionEnvironment.setParallelism]]. * - * Cluster configuration has to be done in the remotely running Flink instance. + * ClusterClient configuration has to be done in the remotely running Flink instance. * * @param host The host name or address of the master (JobManager), where the program should be * executed. http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java index f37969d..bc9bedc 100644 --- a/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java +++ b/flink-streaming-connectors/flink-connector-elasticsearch2/src/test/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSinkITCase.java @@ -230,4 +230,4 @@ public class ElasticsearchSinkITCase extends StreamingMultipleProgramsTestBase { indexer.add(createIndexRequest(element)); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 03945a0..333f9c0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -30,9 +30,10 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.JobWithJars; import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -195,9 +196,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host); configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port); - Client client; + ClusterClient client; try { - client = new Client(configuration); + client = new StandaloneClusterClient(configuration); client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled()); } catch (Exception e) { @@ -205,7 +206,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { } try { - return client.runBlocking(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader); + return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult(); } catch (ProgramInvocationException e) { throw e; http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 5999143..0332684 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -63,7 +63,10 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph); return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE; } else { - return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointPath()); + return ctx + .getClient() + .run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointPath()) + .getJobExecutionResult(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java index b1768f0..34a7eed 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java @@ -78,7 +78,7 @@ public class RemoteEnvironmentITCase { } catch (Throwable t) { t.printStackTrace(); - fail("Cluster shutdown caused an exception: " + t.getMessage()); + fail("ClusterClient shutdown caused an exception: " + t.getMessage()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java index 7dccb7d..09b5e7e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java @@ -69,7 +69,7 @@ public class AutoParallelismITCase { catch (Throwable t) { System.err.println("Error stopping cluster on shutdown"); t.printStackTrace(); - fail("Cluster shutdown caused an exception: " + t.getMessage()); + fail("ClusterClient shutdown caused an exception: " + t.getMessage()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java index 4437db1..0a0f451 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java @@ -69,7 +69,7 @@ public class SimpleRecoveryITCase { catch (Throwable t) { System.err.println("Error stopping cluster on shutdown"); t.printStackTrace(); - fail("Cluster shutdown caused an exception: " + t.getMessage()); + fail("ClusterClient shutdown caused an exception: " + t.getMessage()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml index 239a85a..e849211 100644 --- a/flink-yarn-tests/pom.xml +++ b/flink-yarn-tests/pom.xml @@ -92,11 +92,16 @@ under the License. <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <!-- Enforce single threaded execution due to port conflicts with the mini yarn cluster --> - <forkCount>1</forkCount> - <workingDirectory>../</workingDirectory> - </configuration> + <executions> + <execution> + <id>integration-tests</id> + <configuration> + <!-- Enforce single threaded execution due to port conflicts with the mini yarn cluster --> + <forkCount>1</forkCount> + <workingDirectory>../</workingDirectory> + </configuration> + </execution> + </executions> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java new file mode 100644 index 0000000..c6a1ade --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.junit.*; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.OutputStream; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests that verify that the CLI client picks up the correct address for the JobManager + * from configuration and configs. + */ +public class CliFrontendYarnAddressConfigurationTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private final static PrintStream OUT = System.out; + private final static PrintStream ERR = System.err; + + @BeforeClass + public static void disableStdOutErr() { + class NullPrint extends OutputStream { + @Override + public void write(int b) {} + } + + PrintStream nullPrinter = new PrintStream(new NullPrint()); + System.setOut(nullPrinter); + System.setErr(nullPrinter); + } + + @AfterClass + public static void restoreAfterwards() { + System.setOut(OUT); + System.setErr(ERR); + } + + @Before + public void clearConfig() throws NoSuchFieldException, IllegalAccessException { + // reset GlobalConfiguration between tests + Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON"); + instance.setAccessible(true); + instance.set(null, null); + } + + private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55"; + private static final int TEST_YARN_JOB_MANAGER_PORT = 6655; + + private static final String propertiesFile = + "jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + TEST_YARN_JOB_MANAGER_PORT; + + + private static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33"; + private static final int TEST_JOB_MANAGER_PORT = 55443; + + private static final String flinkConf = + "jobmanager.rpc.address: " + TEST_JOB_MANAGER_ADDRESS + "\n" + + "jobmanager.rpc.port: " + TEST_JOB_MANAGER_PORT; + + + private static final String invalidPropertiesFile = + "jasfobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":asf" + TEST_YARN_JOB_MANAGER_PORT; + + + /** + * Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location. + */ + @Test + public void testYarnConfig() { + try { + File tmpFolder = temporaryFolder.newFolder(); + String currentUser = System.getProperty("user.name"); + + // copy .yarn-properties-<username> + File testPropertiesFile = new File(tmpFolder, ".yarn-properties-"+currentUser); + Files.write(testPropertiesFile.toPath(), propertiesFile.getBytes(), StandardOpenOption.CREATE); + + // copy reference flink-conf.yaml to temporary test directory and append custom configuration path. + String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder; + File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml"); + Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE); + + // start CLI Frontend + TestCLI frontend = new TestCLI(tmpFolder.getAbsolutePath()); + + CommandLineOptions options = mock(CommandLineOptions.class); + + frontend.getClient(options, "Program name"); + + frontend.updateConfig(options); + Configuration config = frontend.getConfiguration(); + + checkJobManagerAddress( + config, + TEST_YARN_JOB_MANAGER_ADDRESS, + TEST_YARN_JOB_MANAGER_PORT); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + public static class TestCLI extends CliFrontend { + TestCLI(String configDir) throws Exception { + super(configDir); + } + + @Override + public ClusterClient getClient(CommandLineOptions options, String programName) throws Exception { + return super.getClient(options, programName); + } + + @Override + public void updateConfig(CommandLineOptions options) { + super.updateConfig(options); + } + } + + @Test + public void testInvalidYarnConfig() { + try { + File tmpFolder = temporaryFolder.newFolder(); + + // copy invalid .yarn-properties-<username> + File testPropertiesFile = new File(tmpFolder, ".yarn-properties"); + Files.write(testPropertiesFile.toPath(), invalidPropertiesFile.getBytes(), StandardOpenOption.CREATE); + + // copy reference flink-conf.yaml to temporary test directory and append custom configuration path. + String confFile = flinkConf + "\nyarn.properties-file.location: " + tmpFolder; + File testConfFile = new File(tmpFolder.getAbsolutePath(), "flink-conf.yaml"); + Files.write(testConfFile.toPath(), confFile.getBytes(), StandardOpenOption.CREATE); + + TestCLI cli = new TestCLI(tmpFolder.getAbsolutePath()); + + CommandLineOptions options = mock(CommandLineOptions.class); + + cli.updateConfig(options); + + Configuration config = cli.getConfiguration(); + + checkJobManagerAddress( + config, + TEST_JOB_MANAGER_ADDRESS, + TEST_JOB_MANAGER_PORT); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + + @Test + public void testManualOptionsOverridesYarn() { + try { + File emptyFolder = temporaryFolder.newFolder(); + TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath()); + + CommandLineOptions options = mock(CommandLineOptions.class); + when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788"); + + frontend.updateConfig(options); + + Configuration config = frontend.getConfiguration(); + + InetSocketAddress expectedAddress = new InetSocketAddress("10.221.130.22", 7788); + + checkJobManagerAddress(config, expectedAddress.getHostName(), expectedAddress.getPort()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + + private static void checkJobManagerAddress(Configuration config, String expectedAddress, int expectedPort) { + String jobManagerAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + assertEquals(expectedAddress, jobManagerAddress); + assertEquals(expectedPort, jobManagerPort); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index 7197b64..c842bdc 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -24,8 +24,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.FlinkYarnSessionCli; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.test.util.TestBaseUtils; import org.junit.Assert; @@ -38,6 +37,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties; + public class FlinkYarnSessionCliTest { @Rule @@ -54,7 +55,7 @@ public class FlinkYarnSessionCliTest { TestBaseUtils.setEnv(map); Options options = new Options(); FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false); - cli.getYARNSessionCLIOptions(options); + cli.addOptions(options); CommandLineParser parser = new PosixParser(); CommandLine cmd = null; @@ -65,11 +66,12 @@ public class FlinkYarnSessionCliTest { Assert.fail("Parsing failed with " + e.getMessage()); } - AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd); + YarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd); - Assert.assertNotNull(flinkYarnClient); + Assert.assertNotNull(flinkYarnDescriptor); - Map<String, String> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded()); + Map<String, String> dynProperties = + FlinkYarnSessionCli.getDynamicProperties(flinkYarnDescriptor.getDynamicPropertiesEncoded()); Assert.assertEquals(1, dynProperties.size()); Assert.assertEquals("5 min", dynProperties.get("akka.ask.timeout")); } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java deleted file mode 100644 index dbfbfe2..0000000 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingFlinkYarnClient.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.yarn; - -import org.apache.flink.util.Preconditions; - -import java.io.File; -import java.io.FilenameFilter; -import java.util.ArrayList; -import java.util.List; - -/** - * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the - * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the set of files which - * are shipped to the yarn cluster. This is necessary to load the testing classes. - */ -public class TestingFlinkYarnClient extends FlinkYarnClientBase { - - public TestingFlinkYarnClient() { - List<File> filesToShip = new ArrayList<>(); - - File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests")); - Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " + - "Make sure to package the flink-yarn-tests module."); - - File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime")); - Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " + - "jar. Make sure to package the flink-runtime module."); - - filesToShip.add(testingJar); - filesToShip.add(testingRuntimeJar); - - setShipFiles(filesToShip); - } - - @Override - protected Class<?> getApplicationMasterClass() { - return TestingApplicationMaster.class; - } - - public static class TestJarFinder implements FilenameFilter { - - private final String jarName; - - public TestJarFinder(final String jarName) { - this.jarName = jarName; - } - - @Override - public boolean accept(File dir, String name) { - return name.startsWith(jarName) && name.endsWith("-tests.jar") && - dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java new file mode 100644 index 0000000..386f48f --- /dev/null +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.util.Preconditions; + +import java.io.File; +import java.io.FilenameFilter; +import java.util.ArrayList; +import java.util.List; + +/** + * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the + * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the set of files which + * are shipped to the yarn cluster. This is necessary to load the testing classes. + */ +public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor { + + public TestingYarnClusterDescriptor() { + List<File> filesToShip = new ArrayList<>(); + + File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests")); + Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " + + "Make sure to package the flink-yarn-tests module."); + + File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime")); + Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " + + "jar. Make sure to package the flink-runtime module."); + + filesToShip.add(testingJar); + filesToShip.add(testingRuntimeJar); + + setShipFiles(filesToShip); + } + + @Override + protected Class<?> getApplicationMasterClass() { + return TestingApplicationMaster.class; + } + + public static class TestJarFinder implements FilenameFilter { + + private final String jarName; + + public TestJarFinder(final String jarName) { + this.jarName = jarName; + } + + @Override + public boolean accept(File dir, String name) { + return name.startsWith(jarName) && name.endsWith("-tests.jar") && + dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index a93abf0..4d45f16 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -22,6 +22,7 @@ import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.testkit.JavaTestKit; import org.apache.curator.test.TestingServer; +import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -33,7 +34,6 @@ import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.AfterClass; @@ -97,7 +97,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { public void testMultipleAMKill() throws Exception { final int numberKillingAttempts = numberApplicationAttempts - 1; - TestingFlinkYarnClient flinkYarnClient = new TestingFlinkYarnClient(); + TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(); Assert.assertNotNull("unable to get yarn client", flinkYarnClient); flinkYarnClient.setTaskManagerCount(1); @@ -119,13 +119,12 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { "@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery"); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); - AbstractFlinkYarnCluster yarnCluster = null; + ClusterClient yarnCluster = null; final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES); try { yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); final Configuration config = yarnCluster.getFlinkConfiguration(); new JavaTestKit(actorSystem) {{ @@ -169,7 +168,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { }}; } finally { if (yarnCluster != null) { - yarnCluster.shutdown(false); + yarnCluster.shutdown(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index 38e17a5..826a086 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -328,7 +328,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { @Test public void testNonexistingQueue() { LOG.info("Starting testNonexistingQueue()"); - addTestAppender(FlinkYarnClient.class, Level.WARN); + addTestAppender(YarnClusterDescriptor.class, Level.WARN); runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "1", @@ -432,7 +432,9 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase { "-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly "-ytm", "1024", "-ys", "2", // test requesting slots from YARN. - "--yarndetached", job, "--input", tmpInFile.getAbsoluteFile().toString(), "--output", tmpOutFolder.getAbsoluteFile().toString()}, + "--yarndetached", job, + "--input", tmpInFile.getAbsoluteFile().toString(), + "--output", tmpOutFolder.getAbsoluteFile().toString()}, "Job has been submitted with JobID", RunTypes.CLI_FRONTEND); http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index cb402a3..fe5400a 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -18,11 +18,10 @@ package org.apache.flink.yarn; -import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; -import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -95,8 +94,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase { checkForLogString("The Flink YARN client has been started in detached mode"); - Assert.assertFalse("The runner should detach.", runner.isAlive()); - LOG.info("Waiting until two containers are running"); // wait until two containers are running while(getRunningContainers() < 2) { @@ -171,7 +168,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { @Ignore("The test is too resource consuming (8.5 GB of memory)") @Test public void testResourceComputation() { - addTestAppender(FlinkYarnClient.class, Level.WARN); + addTestAppender(YarnClusterDescriptor.class, Level.WARN); LOG.info("Starting testResourceComputation()"); runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "5", @@ -199,7 +196,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { @Ignore("The test is too resource consuming (8 GB of memory)") @Test public void testfullAlloc() { - addTestAppender(FlinkYarnClient.class, Level.WARN); + addTestAppender(YarnClusterDescriptor.class, Level.WARN); LOG.info("Starting testfullAlloc()"); runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(), "-n", "2", @@ -218,7 +215,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { final int WAIT_TIME = 15; LOG.info("Starting testJavaAPI()"); - AbstractFlinkYarnClient flinkYarnClient = FlinkYarnSessionCli.getFlinkYarnClient(); + AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor(); Assert.assertNotNull("unable to get yarn client", flinkYarnClient); flinkYarnClient.setTaskManagerCount(1); flinkYarnClient.setJobManagerMemory(768); @@ -231,10 +228,9 @@ public class YARNSessionFIFOITCase extends YarnTestBase { flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); // deploy - AbstractFlinkYarnCluster yarnCluster = null; + ClusterClient yarnCluster = null; try { yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); } catch (Exception e) { LOG.warn("Failing test", e); Assert.fail("Error while deploying YARN cluster: "+e.getMessage()); @@ -248,7 +244,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { } GetClusterStatusResponse status = yarnCluster.getClusterStatus(); if(status != null && status.equals(expectedStatus)) { - LOG.info("Cluster reached status " + status); + LOG.info("ClusterClient reached status " + status); break; // all good, cluster started } if(second > WAIT_TIME) { @@ -263,7 +259,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { LOG.info("Shutting down cluster. All tests passed"); // shutdown cluster - yarnCluster.shutdown(false); + yarnCluster.shutdown(); LOG.info("Finished testJavaAPI()"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f9b52a31/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 03ab647..4de964a 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -20,7 +20,7 @@ package org.apache.flink.yarn; import org.apache.commons.io.FileUtils; import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.FlinkYarnSessionCli; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; import org.apache.hadoop.conf.Configuration; @@ -368,7 +368,7 @@ public abstract class YarnTestBase extends TestLogger { File yarnConfFile = writeYarnSiteConfigXML(conf); map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath()); - map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos + map.put("IN_TESTS", "yes we are in tests"); // see YarnClusterDescriptor() for more infos TestBaseUtils.setEnv(map); Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
