[FLINK-5091] Formalize the Mesos AppMaster environment for docker compatibility
- introduced ContainerSpecification. - reworked how the TM container environment is constructed; eliminated - special-case environment variables, file layout. - added dynamic configuration support to GlobalConfiguration. - integrated the SecurityContext into AM/TM runners. - added config setting for Mesos framework user. - support DCOS side-channel authentication. - set the FS default scheme. - made the artifact server more generic (no assumption about existence - of dispatcher, Path-based). - moved some test code related to overriding the JVMâs env. - moved the Mesos containerizer config code to the MesosTaskManagerParameters. This closes #2915. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/230bf17b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/230bf17b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/230bf17b Branch: refs/heads/master Commit: 230bf17bac3d76959a5cb6aa73ac685757c51cab Parents: 3b85f42 Author: wrighe3 <[email protected]> Authored: Thu Dec 1 00:21:28 2016 -0800 Committer: Maximilian Michels <[email protected]> Committed: Tue Dec 6 00:29:25 2016 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 15 + .../configuration/GlobalConfiguration.java | 27 +- flink-dist/src/main/assemblies/bin.xml | 6 + .../main/flink-bin/mesos-bin/mesos-appmaster.sh | 51 +++ .../flink-bin/mesos-bin/mesos-taskmanager.sh | 60 +++ .../main/java/org/apache/flink/mesos/Utils.java | 21 + .../clusterframework/LaunchableMesosWorker.java | 106 ++++- .../MesosApplicationMasterRunner.java | 456 ++++++------------- .../clusterframework/MesosConfigKeys.java | 25 +- .../MesosFlinkResourceManager.java | 45 +- .../MesosTaskManagerParameters.java | 106 ++++- .../MesosTaskManagerRunner.java | 73 ++- .../flink/mesos/util/MesosArtifactResolver.java | 31 ++ .../flink/mesos/util/MesosArtifactServer.java | 146 ++++-- .../MesosFlinkResourceManagerTest.java | 19 +- .../clusterframework/BootstrapTools.java | 36 ++ .../ContainerSpecification.java | 206 +++++++++ .../overlays/AbstractContainerOverlay.java | 72 +++ .../overlays/CompositeContainerOverlay.java | 49 ++ .../overlays/ContainerOverlay.java | 37 ++ .../overlays/FlinkDistributionOverlay.java | 126 +++++ .../overlays/HadoopConfOverlay.java | 147 ++++++ .../overlays/HadoopUserOverlay.java | 83 ++++ .../overlays/KeytabOverlay.java | 102 +++++ .../overlays/Krb5ConfOverlay.java | 111 +++++ .../overlays/SSLStoreOverlay.java | 124 +++++ .../flink/runtime/security/SecurityUtils.java | 4 +- .../overlays/ContainerOverlayTestBase.java | 73 +++ .../overlays/FlinkDistributionOverlayTest.java | 117 +++++ .../overlays/HadoopConfOverlayTest.java | 119 +++++ .../overlays/HadoopUserOverlayTest.java | 73 +++ .../overlays/KeytabOverlayTest.java | 71 +++ .../overlays/Krb5ConfOverlayTest.java | 59 +++ .../overlays/SSLStoreOverlayTest.java | 78 ++++ .../flink/core/testutils/CommonTestUtils.java | 39 ++ .../apache/flink/test/util/TestBaseUtils.java | 38 +- 36 files changed, 2450 insertions(+), 501 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 6bc5e2e..a515c33 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -440,6 +440,11 @@ public final class ConfigConstants { // ------------------------ Mesos Configuration ------------------------ /** + * The initial number of Mesos tasks to allocate. + */ + public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks"; + + /** * The maximum number of failed Mesos tasks before entirely stopping * the Mesos session / job on Mesos. * @@ -484,6 +489,8 @@ public final class ConfigConstants { public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret"; + public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user"; + /** * The cpus to acquire from Mesos. * @@ -1186,6 +1193,8 @@ public final class ConfigConstants { public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*"; + public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = ""; + /** Default value to override SSL support for the Artifact Server */ public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true; @@ -1405,6 +1414,12 @@ public final class ConfigConstants { /** The environment variable name which contains the location of the lib folder */ public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR"; + /** The environment variable name which contains the location of the bin directory */ + public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR"; + + /** The environment variable name which contains the Flink installation root directory */ + public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME"; + // -------------------------------- Security ------------------------------- /** http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java index ecfbc72..dca6307 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java @@ -39,12 +39,31 @@ public final class GlobalConfiguration { public static final String FLINK_CONF_FILENAME = "flink-conf.yaml"; + // -------------------------------------------------------------------------------------------- private GlobalConfiguration() {} // -------------------------------------------------------------------------------------------- + private static Configuration dynamicProperties = null; + + /** + * Set the process-wide dynamic properties to be merged with the loaded configuration. + */ + public static void setDynamicProperties(Configuration dynamicProperties) { + GlobalConfiguration.dynamicProperties = new Configuration(dynamicProperties); + } + + /** + * Get the dynamic properties. + */ + public static Configuration getDynamicProperties() { + return GlobalConfiguration.dynamicProperties; + } + + // -------------------------------------------------------------------------------------------- + /** * Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an * empty configuration object if the environment variable is not set. In production this variable is set but @@ -90,7 +109,13 @@ public final class GlobalConfiguration { "' (" + confDirFile.getAbsolutePath() + ") does not exist."); } - return loadYAMLResource(yamlConfigFile); + Configuration conf = loadYAMLResource(yamlConfigFile); + + if(dynamicProperties != null) { + conf.addAll(dynamicProperties); + } + + return conf; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/assemblies/bin.xml ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index b4291d3..901cac9 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -82,6 +82,12 @@ under the License. <outputDirectory>bin</outputDirectory> <fileMode>0755</fileMode> </fileSet> + <!-- copy Mesos start scripts --> + <fileSet> + <directory>src/main/flink-bin/mesos-bin</directory> + <outputDirectory>bin</outputDirectory> + <fileMode>0755</fileMode> + </fileSet> <!-- copy default configuration --> <fileSet> http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh new file mode 100755 index 0000000..d65c6b0 --- /dev/null +++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +# auxilliary function to construct a lightweight classpath for the +# Flink AppMaster +constructAppMasterClassPath() { + + while read -d '' -r jarfile ; do + if [[ $CC_CLASSPATH = "" ]]; then + CC_CLASSPATH="$jarfile"; + else + CC_CLASSPATH="$CC_CLASSPATH":"$jarfile" + fi + done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) + + echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS +} + +CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)` + +log=flink-appmaster.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" + +export FLINK_CONF_DIR +export FLINK_BIN_DIR +export FLINK_LIB_DIR + +$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner "$@" + http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh new file mode 100755 index 0000000..ff03abd --- /dev/null +++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +# get Flink config +. "$bin"/config.sh + +# auxilliary function to construct a lightweight classpath for the +# Flink TaskManager +constructTaskManagerClassPath() { + + while read -d '' -r jarfile ; do + if [[ $CC_CLASSPATH = "" ]]; then + CC_CLASSPATH="$jarfile"; + else + CC_CLASSPATH="$CC_CLASSPATH":"$jarfile" + fi + done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) + + echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS +} + +CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)` + +log=flink-taskmanager.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" + +# Add precomputed memory JVM options +if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then + FLINK_ENV_JAVA_OPTS_MEM="" +fi +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}" + +# Add TaskManager-specific JVM options +export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" + +export FLINK_CONF_DIR +export FLINK_BIN_DIR +export FLINK_LIB_DIR + +$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager "$@" + http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java index 173ae33..7787e40 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java @@ -18,7 +18,10 @@ package org.apache.flink.mesos; +import org.apache.flink.mesos.util.MesosArtifactResolver; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.mesos.Protos; +import scala.Option; import java.net.URL; import java.util.Arrays; @@ -46,6 +49,24 @@ public class Utils { } /** + * Construct a Mesos URI. + */ + public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) { + Option<URL> url = resolver.resolve(artifact.dest); + if(url.isEmpty()) { + throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest); + } + + return Protos.CommandInfo.URI.newBuilder() + .setValue(url.get().toExternalForm()) + .setOutputFile(artifact.dest.toString()) + .setExtract(artifact.extract) + .setCache(artifact.cachable) + .setExecutable(artifact.executable) + .build(); + } + + /** * Construct a scalar resource value. */ public static Protos.Resource scalar(String name, double value) { http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java index 5f940b5..c6e51f1 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -23,8 +23,11 @@ import com.netflix.fenzo.TaskAssignmentResult; import com.netflix.fenzo.TaskRequest; import com.netflix.fenzo.VMTaskFitnessCalculator; import org.apache.flink.configuration.Configuration; -import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.Utils; import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.util.MesosArtifactResolver; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.mesos.Protos; import java.util.Collections; @@ -38,7 +41,10 @@ import static org.apache.flink.mesos.Utils.ranges; import static org.apache.flink.mesos.Utils.scalar; /** - * Specifies how to launch a Mesos worker. + * Implements the launch of a Mesos worker. + * + * Translates the abstract {@link ContainerSpecification} into a concrete + * Mesos-specific {@link org.apache.mesos.Protos.TaskInfo}. */ public class LaunchableMesosWorker implements LaunchableTask { @@ -49,20 +55,24 @@ public class LaunchableMesosWorker implements LaunchableTask { "taskmanager.rpc.port", "taskmanager.data.port" }; + private final MesosArtifactResolver resolver; + private final ContainerSpecification containerSpec; private final MesosTaskManagerParameters params; - private final Protos.TaskInfo.Builder template; private final Protos.TaskID taskID; private final Request taskRequest; /** * Construct a launchable Mesos worker. * @param params the TM parameters such as memory, cpu to acquire. - * @param template a template for the TaskInfo to be constructed at launch time. + * @param containerSpec an abstract container specification for launch time. * @param taskID the taskID for this worker. */ - public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + public LaunchableMesosWorker( + MesosArtifactResolver resolver, MesosTaskManagerParameters params, + ContainerSpecification containerSpec, Protos.TaskID taskID) { + this.resolver = resolver; this.params = params; - this.template = template; + this.containerSpec = containerSpec; this.taskID = taskID; this.taskRequest = new Request(); } @@ -157,17 +167,25 @@ public class LaunchableMesosWorker implements LaunchableTask { @Override public Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult assignment) { + ContaineredTaskManagerParameters tmParams = params.containeredParameters(); + final Configuration dynamicProperties = new Configuration(); - // specialize the TaskInfo template with assigned resources, environment variables, etc - final Protos.TaskInfo.Builder taskInfo = template - .clone() + // incorporate the dynamic properties set by the template + dynamicProperties.addAll(containerSpec.getDynamicConfiguration()); + + // build a TaskInfo with assigned resources, environment variables, etc + final Protos.TaskInfo.Builder taskInfo = Protos.TaskInfo.newBuilder() .setSlaveId(slaveId) .setTaskId(taskID) .setName(taskID.getValue()) .addResources(scalar("cpus", assignment.getRequest().getCPUs())) .addResources(scalar("mem", assignment.getRequest().getMemory())); + final Protos.CommandInfo.Builder cmd = taskInfo.getCommandBuilder(); + final Protos.Environment.Builder env = cmd.getEnvironmentBuilder(); + final StringBuilder jvmArgs = new StringBuilder(); + // use the assigned ports for the TM if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) { throw new IllegalArgumentException("unsufficient # of ports assigned"); @@ -179,17 +197,69 @@ public class LaunchableMesosWorker implements LaunchableTask { dynamicProperties.setInteger(key, port); } - // finalize environment variables - final Protos.Environment.Builder environmentBuilder = taskInfo.getCommandBuilder().getEnvironmentBuilder(); + // ship additional files + for(ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) { + cmd.addUris(Utils.uri(resolver, artifact)); + } - // propagate the Mesos task ID to the TM - environmentBuilder - .addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue())); + // propagate environment variables + for (Map.Entry<String, String> entry : params.containeredParameters().taskManagerEnv().entrySet()) { + env.addVariables(variable(entry.getKey(), entry.getValue())); + } + for (Map.Entry<String, String> entry : containerSpec.getEnvironmentVariables().entrySet()) { + env.addVariables(variable(entry.getKey(), entry.getValue())); + } - // propagate the dynamic configuration properties to the TM - String dynamicPropertiesEncoded = FlinkMesosSessionCli.encodeDynamicProperties(dynamicProperties); - environmentBuilder - .addVariables(variable(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded)); + // propagate the Mesos task ID to the TM + env.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue())); + + // finalize the memory parameters + jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m"); + jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m"); + jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m"); + + // pass dynamic system properties + jvmArgs.append(' ').append( + ContainerSpecification.formatSystemProperties(containerSpec.getSystemProperties())); + + // finalize JVM args + env.addVariables(variable(MesosConfigKeys.ENV_JVM_ARGS, jvmArgs.toString())); + + // build the launch command w/ dynamic application properties + StringBuilder launchCommand = new StringBuilder("$FLINK_HOME/bin/mesos-taskmanager.sh "); + launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties)); + cmd.setValue(launchCommand.toString()); + + // build the container info + Protos.ContainerInfo.Builder containerInfo = null; + switch(params.containerType()) { + case MESOS: + if(params.containerImageName().isDefined()) { + containerInfo = Protos.ContainerInfo.newBuilder() + .setType(Protos.ContainerInfo.Type.MESOS) + .setMesos(Protos.ContainerInfo.MesosInfo.newBuilder() + .setImage(Protos.Image.newBuilder() + .setType(Protos.Image.Type.DOCKER) + .setDocker(Protos.Image.Docker.newBuilder() + .setName(params.containerImageName().get())))); + } + break; + + case DOCKER: + assert(params.containerImageName().isDefined()); + containerInfo = Protos.ContainerInfo.newBuilder() + .setType(Protos.ContainerInfo.Type.DOCKER) + .setDocker(Protos.ContainerInfo.DockerInfo.newBuilder() + .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST) + .setImage(params.containerImageName().get())); + break; + + default: + throw new IllegalStateException("unsupported container type"); + } + if(containerInfo != null) { + taskInfo.setContainer(containerInfo); + } return taskInfo.build(); } http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index ef58250..4b9bd82 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -22,14 +22,17 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; - +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.curator.framework.CuratorFramework; import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; @@ -38,14 +41,20 @@ import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.mesos.util.ZooKeeperUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay; +import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay; +import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay; +import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.process.ProcessReaper; import org.apache.flink.runtime.security.SecurityUtils; -import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.JvmShutdownSafeguard; @@ -53,21 +62,15 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.runtime.util.NamedThreadFactory; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.runtime.webmonitor.WebMonitor; - -import org.apache.hadoop.security.UserGroupInformation; - import org.apache.mesos.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import scala.Option; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.net.URL; -import java.net.UnknownHostException; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -75,9 +78,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import static org.apache.flink.mesos.Utils.uri; -import static org.apache.flink.mesos.Utils.variable; - import static org.apache.flink.util.Preconditions.checkState; /** @@ -106,6 +106,18 @@ public class MesosApplicationMasterRunner { private static final int ACTOR_DIED_EXIT_CODE = 32; // ------------------------------------------------------------------------ + // Command-line options + // ------------------------------------------------------------------------ + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + // ------------------------------------------------------------------------ // Program entry point // ------------------------------------------------------------------------ @@ -126,36 +138,44 @@ public class MesosApplicationMasterRunner { /** * The instance entry point for the Mesos AppMaster. Obtains user group - * information and calls the main work method {@link #runPrivileged(Configuration)} as a + * information and calls the main work method {@link #runPrivileged(Configuration,Configuration)} as a * privileged action. * * @param args The command line arguments. * @return The process exit code. */ - protected int run(String[] args) { + protected int run(final String[] args) { try { LOG.debug("All environment variables: {}", ENV); - final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX); - checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX); + // loading all config values here has the advantage that the program fails fast, if any + // configuration problem occurs - // Flink configuration - final Configuration dynamicProperties = - FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); - LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(ALL_OPTIONS, args); - final Configuration configuration = createConfiguration(workingDir, dynamicProperties); + final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); + final Configuration config = GlobalConfiguration.loadConfiguration(); - SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration); + // configure the default filesystem + try { + FileSystem.setDefaultScheme(config); + } catch (IOException e) { + throw new IOException("Error while setting the default " + + "filesystem scheme from configuration.", e); + } + + // configure security + SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(config); sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration()); SecurityUtils.install(sc); - LOG.info("Running Flink as user {}", UserGroupInformation.getCurrentUser().getShortUserName()); - + // run the actual work in the installed security context return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { @Override - public Integer call() { - return runPrivileged(configuration); + public Integer call() throws Exception { + return runPrivileged(config, dynamicProperties); } }); } @@ -175,78 +195,38 @@ public class MesosApplicationMasterRunner { * * @return The return code for the Java process. */ - protected int runPrivileged(Configuration config) { + protected int runPrivileged(Configuration config, Configuration dynamicProperties) { ActorSystem actorSystem = null; WebMonitor webMonitor = null; MesosArtifactServer artifactServer = null; - - // ------- (1) load and parse / validate all configurations ------- - - // loading all config values here has the advantage that the program fails fast, if any - // configuration problem occurs - - final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX); - - final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID); - checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID); - - // Note that we use the "appMasterHostname" given by the system, to make sure - // we use the hostnames consistently throughout akka. - // for akka "localhost" and "localhost.localdomain" are different actors. - final String appMasterHostname; + ExecutorService futureExecutor = null; + ExecutorService ioExecutor = null; try { - appMasterHostname = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException uhe) { - LOG.error("Could not retrieve the local hostname.", uhe); + // ------- (1) load and parse / validate all configurations ------- - return INIT_ERROR_EXIT_CODE; - } + // Note that we use the "appMasterHostname" given by the system, to make sure + // we use the hostnames consistently throughout akka. + // for akka "localhost" and "localhost.localdomain" are different actors. + final String appMasterHostname = InetAddress.getLocalHost().getHostName(); - // Mesos configuration - final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname); + // Mesos configuration + final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname); - int numberProcessors = Hardware.getNumberCPUCores(); + // JM configuration + int numberProcessors = Hardware.getNumberCPUCores(); - final ExecutorService futureExecutor = Executors.newFixedThreadPool( - numberProcessors, - new NamedThreadFactory("mesos-jobmanager-future-", "-thread-")); + futureExecutor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("mesos-jobmanager-future-", "-thread-")); - final ExecutorService ioExecutor = Executors.newFixedThreadPool( - numberProcessors, - new NamedThreadFactory("mesos-jobmanager-io-", "-thread-")); + ioExecutor = Executors.newFixedThreadPool( + numberProcessors, + new NamedThreadFactory("mesos-jobmanager-io-", "-thread-")); - try { - // environment values related to TM - final int taskManagerContainerMemory; - final int numInitialTaskManagers; - final int slotsPerTaskManager; - - try { - taskManagerContainerMemory = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY)); - } catch (NumberFormatException e) { - throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_MEMORY + " : " - + e.getMessage()); - } - try { - numInitialTaskManagers = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT)); - } catch (NumberFormatException e) { - throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_COUNT + " : " - + e.getMessage()); - } - try { - slotsPerTaskManager = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS)); - } catch (NumberFormatException e) { - throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_SLOTS + " : " - + e.getMessage()); - } - - final ContaineredTaskManagerParameters containeredParameters = - ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager); - - final MesosTaskManagerParameters taskManagerParameters = - MesosTaskManagerParameters.create(config, containeredParameters); + // TM configuration + final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(config); LOG.info("TaskManagers will be created with {} task slots", taskManagerParameters.containeredParameters().numSlots()); @@ -257,7 +237,7 @@ public class MesosApplicationMasterRunner { taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(), taskManagerParameters.cpus()); - // JM endpoint, which should be explicitly configured by the dispatcher (based on acquired net resources) + // JM endpoint, which should be explicitly configured based on acquired net resources final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); checkState(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" + @@ -279,18 +259,28 @@ public class MesosApplicationMasterRunner { LOG.debug("Starting Artifact Server"); final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY, ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT); - artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort, config); + final String artifactServerPrefix = UUID.randomUUID().toString(); + artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config); // ----------------- (3) Generate the configuration for the TaskManagers ------------------- + // generate a container spec which conveys the artifacts/vars needed to launch a TM + ContainerSpecification taskManagerContainerSpec = new ContainerSpecification(); + + // propagate the AM dynamic configuration to the TM + taskManagerContainerSpec.getDynamicConfiguration().addAll(dynamicProperties); + + // propagate newly-generated configuration elements final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration( - config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT); - LOG.debug("TaskManager configuration: {}", taskManagerConfig); + new Configuration(), akkaHostname, akkaPort, taskManagerParameters.containeredParameters().numSlots(), + TASKMANAGER_REGISTRATION_TIMEOUT); + taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig); + + // apply the overlays + applyOverlays(config, taskManagerContainerSpec); - final Protos.TaskInfo.Builder taskManagerContext = createTaskManagerContext( - config, ENV, - taskManagerParameters, taskManagerConfig, - workingDir, getTaskManagerClass(), artifactServer, LOG); + // configure the artifact server to serve the specified artifacts + configureArtifactServer(artifactServer, taskManagerContainerSpec); // ----------------- (4) start the actors ------------------- @@ -341,8 +331,8 @@ public class MesosApplicationMasterRunner { workerStore, leaderRetriever, taskManagerParameters, - taskManagerContext, - numInitialTaskManagers, + taskManagerContainerSpec, + artifactServer, LOG); ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master"); @@ -389,8 +379,21 @@ public class MesosApplicationMasterRunner { } } - futureExecutor.shutdownNow(); - ioExecutor.shutdownNow(); + if(futureExecutor != null) { + try { + futureExecutor.shutdownNow(); + } catch (Throwable tt) { + LOG.error("Error shutting down future executor", tt); + } + } + + if(ioExecutor != null) { + try { + ioExecutor.shutdownNow(); + } catch (Throwable tt) { + LOG.error("Error shutting down io executor", tt); + } + } return INIT_ERROR_EXIT_CODE; } @@ -442,35 +445,12 @@ public class MesosApplicationMasterRunner { return MemoryArchivist.class; } - protected Class<? extends TaskManager> getTaskManagerClass() { - return MesosTaskManager.class; - } - - /** - * - * @param baseDirectory - * @param additional - * - * @return The configuration to be used by the TaskManagers. - */ - private static Configuration createConfiguration(String baseDirectory, Configuration additional) { - LOG.info("Loading config from directory {}", baseDirectory); - - Configuration configuration = GlobalConfiguration.loadConfiguration(); - - // add dynamic properties to JobManager configuration. - configuration.addAll(additional); - - return configuration; - } - /** * Loads and validates the ResourceManager Mesos configuration from the given Flink configuration. */ public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) { Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder() - .setUser("") .setHostname(hostname); Protos.Credential.Builder credential = null; @@ -494,6 +474,10 @@ public class MesosApplicationMasterRunner { ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE, ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE)); + frameworkInfo.setUser(flinkConfig.getString( + ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER, + ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER)); + if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) { frameworkInfo.setPrincipal(flinkConfig.getString( ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null)); @@ -501,15 +485,16 @@ public class MesosApplicationMasterRunner { credential = Protos.Credential.newBuilder(); credential.setPrincipal(frameworkInfo.getPrincipal()); - if(!flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) { - throw new IllegalConfigurationException(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET + " must be configured."); + // some environments use a side-channel to communicate the secret to Mesos, + // and thus don't set the 'secret' configuration setting + if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) { + credential.setSecret(flinkConfig.getString( + ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null)); } - credential.setSecret(flinkConfig.getString( - ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null)); } MesosConfiguration mesos = - new MesosConfiguration(masterUrl, frameworkInfo, Option.apply(credential)); + new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential)); return mesos; } @@ -533,203 +518,34 @@ public class MesosApplicationMasterRunner { } /** - * Creates a Mesos task info template, which describes how to bring up a TaskManager process as - * a Mesos task. + * Generate a container specification as a TaskManager template. * * <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager - * needs (such as JAR file, config file, ...) and all environment variables in a task info record. + * needs (such as JAR file, config file, ...) and all environment variables into a container specification. * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory. * A lightweight HTTP server serves the artifacts to the fetcher. - * - * <p>We do this work before we start the ResourceManager actor in order to fail early if - * any of the operations here fail. - * - * @param flinkConfig - * The Flink configuration object. - * @param env - * The environment variables. - * @param tmParams - * The TaskManager container memory parameters. - * @param taskManagerConfig - * The configuration for the TaskManagers. - * @param workingDirectory - * The current application master container's working directory. - * @param taskManagerMainClass - * The class with the main method. - * @param artifactServer - * The artifact server. - * @param log - * The logger. - * - * @return The task info template for the TaskManager processes. - * - * @throws Exception Thrown if the task info could not be created, for example if - * the resources could not be copied. - */ - public static Protos.TaskInfo.Builder createTaskManagerContext( - Configuration flinkConfig, - Map<String, String> env, - MesosTaskManagerParameters tmParams, - Configuration taskManagerConfig, - String workingDirectory, - Class<?> taskManagerMainClass, - MesosArtifactServer artifactServer, - Logger log) throws Exception { - - - Protos.TaskInfo.Builder info = Protos.TaskInfo.newBuilder(); - Protos.CommandInfo.Builder cmd = Protos.CommandInfo.newBuilder(); - - log.info("Setting up artifacts for TaskManagers"); - - String shipListString = env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES); - checkState(shipListString != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES); - - String classPathString = env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH); - checkState(classPathString != null, "Environment variable %s not set", MesosConfigKeys.ENV_FLINK_CLASSPATH); - - // register the Flink jar - final File flinkJarFile = new File(workingDirectory, "flink.jar"); - cmd.addUris(uri(artifactServer.addFile(flinkJarFile, "flink.jar"), true)); - - String hadoopConfDir = env.get("HADOOP_CONF_DIR"); - LOG.debug("ENV: hadoopConfDir = {}", hadoopConfDir); - - //upload Hadoop configurations to artifact server - boolean hadoopConf = false; - if(hadoopConfDir != null && hadoopConfDir.length() != 0) { - File source = new File(hadoopConfDir); - if(source.exists() && source.isDirectory()) { - hadoopConf = true; - File[] fileList = source.listFiles(); - for(File file: fileList) { - if(file.getName().equals("core-site.xml") || file.getName().equals("hdfs-site.xml")) { - LOG.debug("Adding local file: [{}] to artifact server", file); - File f = new File(hadoopConfDir, file.getName()); - cmd.addUris(uri(artifactServer.addFile(f, file.getName()), true)); - } - } - } - } - - //upload keytab to the artifact server - String keytabFileName = null; - String keytab = flinkConfig.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null); - if(keytab != null) { - File source = new File(keytab); - if(source.exists()) { - LOG.debug("Adding keytab file: [{}] to artifact server", source); - keytabFileName = source.getName(); - cmd.addUris(uri(artifactServer.addFile(source, source.getName()), true)); - } - } - - String principal = flinkConfig.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null); - if(keytabFileName != null && principal != null) { - //reset the configurations since we will use in-memory reference from within the TM instance - taskManagerConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,""); - taskManagerConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,""); - } - - // register the TaskManager configuration - final File taskManagerConfigFile = - new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml"); - LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath()); - BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile); - cmd.addUris(uri(artifactServer.addFile(taskManagerConfigFile, GlobalConfiguration.FLINK_CONF_FILENAME), true)); - - // prepare additional files to be shipped - for (String pathStr : shipListString.split(",")) { - if (!pathStr.isEmpty()) { - File shipFile = new File(workingDirectory, pathStr); - cmd.addUris(uri(artifactServer.addFile(shipFile, shipFile.getName()), true)); - } - } - - log.info("Creating task info for TaskManagers"); - - // build the launch command - boolean hasLogback = new File(workingDirectory, "logback.xml").exists(); - boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists(); - boolean hasKrb5 = false; - - String launchCommand = BootstrapTools.getTaskManagerShellCommand( - flinkConfig, tmParams.containeredParameters(), ".", ".", - hasLogback, hasLog4j, hasKrb5, taskManagerMainClass); - cmd.setValue(launchCommand); - - // build the environment variables - Protos.Environment.Builder envBuilder = Protos.Environment.newBuilder(); - for (Map.Entry<String, String> entry : tmParams.containeredParameters().taskManagerEnv().entrySet()) { - envBuilder.addVariables(variable(entry.getKey(), entry.getValue())); - } - envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, classPathString)); - - //add hadoop config directory to the environment - if(hadoopConf) { - envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_CONF_DIR, ".")); - } - - //add keytab and principal to environment - if(keytabFileName != null && principal != null) { - envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB, keytabFileName)); - envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL, principal)); - } - - envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_USER_NAME, - UserGroupInformation.getCurrentUser().getUserName())); - - cmd.setEnvironment(envBuilder); - - info.setCommand(cmd); - - // Set container for task manager if specified in configs. - String tmImageName = flinkConfig.getString( - ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_NAME, ""); - - if (tmImageName.length() > 0) { - String taskManagerContainerType = flinkConfig.getString( - ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE, - ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE); - - Protos.ContainerInfo.Builder containerInfo; - - switch (taskManagerContainerType) { - case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS: - containerInfo = Protos.ContainerInfo.newBuilder() - .setType(Protos.ContainerInfo.Type.MESOS) - .setMesos(Protos.ContainerInfo.MesosInfo.newBuilder() - .setImage(Protos.Image.newBuilder() - .setType(Protos.Image.Type.DOCKER) - .setDocker(Protos.Image.Docker.newBuilder() - .setName(tmImageName)))); - break; - case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER: - containerInfo = Protos.ContainerInfo.newBuilder() - .setType(Protos.ContainerInfo.Type.DOCKER) - .setDocker(Protos.ContainerInfo.DockerInfo.newBuilder() - .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST) - .setImage(tmImageName)); - break; - default: - LOG.warn( - "Invalid container type '{}' provided for setting {}. Valid values are '{}' or '{}'. " + - "Starting task managers now without container.", - taskManagerContainerType, - ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE, - ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS, - ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER); - - containerInfo = null; - - break; - } + */ + private static void applyOverlays( + Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException { + + // create the overlays that will produce the specification + CompositeContainerOverlay overlay = new CompositeContainerOverlay( + FlinkDistributionOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), + HadoopConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), + HadoopUserOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), + KeytabOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), + Krb5ConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(), + SSLStoreOverlay.newBuilder().fromEnvironment(globalConfiguration).build() + ); + + // apply the overlays + overlay.configure(containerSpec); + } - if (containerInfo != null) { - info.setContainer(containerInfo); - } + private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException { + // serve the artifacts associated with the container environment + for(ContainerSpecification.Artifact artifact : container.getArtifacts()) { + server.addPath(artifact.source, artifact.dest); } - - return info; } } http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java index bc6dde4..ebd9af5 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java @@ -26,21 +26,20 @@ public class MesosConfigKeys { // Environment variable names // ------------------------------------------------------------------------ - public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY"; - public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT"; - public static final String ENV_SLOTS = "_SLOTS"; - public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES"; - public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME"; - public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES"; + /** + * The Mesos task ID, used by the TM for informational purposes + */ public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID"; + + /** + * Reserved for future enhancement + */ public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR"; - public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH"; - public static final String ENV_CLASSPATH = "CLASSPATH"; - public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX"; - public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID"; - public static final String ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"; - public static final String ENV_KEYTAB = "_KEYTAB_FILE"; - public static final String ENV_KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL"; + + /** + * JVM arguments, used by the JM and TM + */ + public static final String ENV_JVM_ARGS = "JVM_ARGS"; /** Private constructor to prevent instantiation */ private MesosConfigKeys() {} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java index 6b24ee8..a7321a3 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java @@ -27,6 +27,7 @@ import com.netflix.fenzo.functions.Action1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; import org.apache.flink.mesos.scheduler.ConnectionMonitor; import org.apache.flink.mesos.scheduler.LaunchableTask; @@ -44,9 +45,11 @@ import org.apache.flink.mesos.scheduler.messages.ReRegistered; import org.apache.flink.mesos.scheduler.messages.Registered; import org.apache.flink.mesos.scheduler.messages.ResourceOffers; import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosArtifactResolver; import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; import org.apache.flink.runtime.clusterframework.messages.StopCluster; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -76,8 +79,11 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe /** The TaskManager container parameters (like container memory size) */ private final MesosTaskManagerParameters taskManagerParameters; - /** Context information used to start a TaskManager Java process */ - private final Protos.TaskInfo.Builder taskManagerLaunchContext; + /** Container specification for launching a TM */ + private final ContainerSpecification taskManagerContainerSpec; + + /** Resolver for HTTP artifacts **/ + private final MesosArtifactResolver artifactResolver; /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ private final int maxFailedTasks; @@ -112,7 +118,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe MesosWorkerStore workerStore, LeaderRetrievalService leaderRetrievalService, MesosTaskManagerParameters taskManagerParameters, - Protos.TaskInfo.Builder taskManagerLaunchContext, + ContainerSpecification taskManagerContainerSpec, + MesosArtifactResolver artifactResolver, int maxFailedTasks, int numInitialTaskManagers) { @@ -121,9 +128,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe this.mesosConfig = requireNonNull(mesosConfig); this.workerStore = requireNonNull(workerStore); + this.artifactResolver = requireNonNull(artifactResolver); this.taskManagerParameters = requireNonNull(taskManagerParameters); - this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext); + this.taskManagerContainerSpec = requireNonNull(taskManagerContainerSpec); this.maxFailedTasks = maxFailedTasks; this.workersInNew = new HashMap<>(); @@ -661,7 +669,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) { LaunchableMesosWorker launchable = - new LaunchableMesosWorker(taskManagerParameters, taskManagerLaunchContext, taskID); + new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec, taskID); return launchable; } @@ -723,10 +731,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe * The Flink configuration object. * @param taskManagerParameters * The parameters for launching TaskManager containers. - * @param taskManagerLaunchContext - * The parameters for launching the TaskManager processes in the TaskManager containers. - * @param numInitialTaskManagers - * The initial number of TaskManagers to allocate. + * @param taskManagerContainerSpec + * The container specification. + * @param artifactResolver + * The artifact resolver to locate artifacts * @param log * The logger to log to. * @@ -738,10 +746,22 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe MesosWorkerStore workerStore, LeaderRetrievalService leaderRetrievalService, MesosTaskManagerParameters taskManagerParameters, - Protos.TaskInfo.Builder taskManagerLaunchContext, - int numInitialTaskManagers, + ContainerSpecification taskManagerContainerSpec, + MesosArtifactResolver artifactResolver, Logger log) { + + final int numInitialTaskManagers = flinkConfig.getInteger( + ConfigConstants.MESOS_INITIAL_TASKS, 0); + if (numInitialTaskManagers >= 0) { + log.info("Mesos framework to allocate {} initial tasks", + numInitialTaskManagers); + } + else { + throw new IllegalConfigurationException("Invalid value for " + + ConfigConstants.MESOS_INITIAL_TASKS + ", which must be at least zero."); + } + final int maxFailedTasks = flinkConfig.getInteger( ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers); if (maxFailedTasks >= 0) { @@ -755,7 +775,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe workerStore, leaderRetrievalService, taskManagerParameters, - taskManagerLaunchContext, + taskManagerContainerSpec, + artifactResolver, maxFailedTasks, numInitialTaskManagers); } http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java index 1b19d08..7fae58c 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java @@ -19,10 +19,14 @@ package org.apache.flink.mesos.runtime.clusterframework; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import scala.Option; import static java.util.Objects.requireNonNull; +import static org.apache.flink.configuration.ConfigOptions.key; /** * This class describes the Mesos-specific parameters for launching a TaskManager process. @@ -32,13 +36,43 @@ import static java.util.Objects.requireNonNull; */ public class MesosTaskManagerParameters { - private double cpus; + public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS = + key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS) + .defaultValue(1); - private ContaineredTaskManagerParameters containeredParameters; + public static final ConfigOption<Integer> MESOS_RM_TASKS_MEMORY_MB = + key("mesos.resourcemanager.tasks.mem") + .defaultValue(1024); - public MesosTaskManagerParameters(double cpus, ContaineredTaskManagerParameters containeredParameters) { + public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS = + key("mesos.resourcemanager.tasks.cpus") + .defaultValue(0.0); + + public static final ConfigOption<String> MESOS_RM_CONTAINER_TYPE = + key("mesos.resourcemanager.tasks.container.type") + .defaultValue("mesos"); + + public static final ConfigOption<String> MESOS_RM_CONTAINER_IMAGE_NAME = + key("mesos.resourcemanager.tasks.container.image.name") + .noDefaultValue(); + + private final double cpus; + + private final ContainerType containerType; + + private final Option<String> containerImageName; + + private final ContaineredTaskManagerParameters containeredParameters; + + public MesosTaskManagerParameters( + double cpus, + ContainerType containerType, + Option<String> containerImageName, + ContaineredTaskManagerParameters containeredParameters) { requireNonNull(containeredParameters); this.cpus = cpus; + this.containerType = containerType; + this.containerImageName = containerImageName; this.containeredParameters = containeredParameters; } @@ -50,6 +84,22 @@ public class MesosTaskManagerParameters { } /** + * Get the container type (Mesos or Docker). The default is Mesos. + * + * Mesos provides a facility for a framework to specify which containerizer to use. + */ + public ContainerType containerType() { + return containerType; + } + + /** + * Get the container image name. + */ + public Option<String> containerImageName() { + return containerImageName; + } + + /** * Get the common containered parameters. */ public ContaineredTaskManagerParameters containeredParameters() { @@ -60,6 +110,8 @@ public class MesosTaskManagerParameters { public String toString() { return "MesosTaskManagerParameters{" + "cpus=" + cpus + + ", containerType=" + containerType + + ", containerImageName=" + containerImageName + ", containeredParameters=" + containeredParameters + '}'; } @@ -67,15 +119,49 @@ public class MesosTaskManagerParameters { /** * Create the Mesos TaskManager parameters. * @param flinkConfig the TM configuration. - * @param containeredParameters additional containered parameters. */ - public static MesosTaskManagerParameters create( - Configuration flinkConfig, - ContaineredTaskManagerParameters containeredParameters) { + public static MesosTaskManagerParameters create(Configuration flinkConfig) { - double cpus = flinkConfig.getDouble(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CPUS, - Math.max(containeredParameters.numSlots(), 1.0)); + // parse the common parameters + ContaineredTaskManagerParameters containeredParameters = ContaineredTaskManagerParameters.create( + flinkConfig, + flinkConfig.getInteger(MESOS_RM_TASKS_MEMORY_MB), + flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS)); + + double cpus = flinkConfig.getDouble(MESOS_RM_TASKS_CPUS); + if(cpus <= 0.0) { + cpus = Math.max(containeredParameters.numSlots(), 1.0); + } + + // parse the containerization parameters + String imageName = flinkConfig.getString(MESOS_RM_CONTAINER_IMAGE_NAME); + + ContainerType containerType; + String containerTypeString = flinkConfig.getString(MESOS_RM_CONTAINER_TYPE); + switch(containerTypeString) { + case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS: + containerType = ContainerType.MESOS; + break; + case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER: + containerType = ContainerType.DOCKER; + if(imageName == null || imageName.length() == 0) { + throw new IllegalConfigurationException(MESOS_RM_CONTAINER_IMAGE_NAME.key() + + " must be specified for docker container type"); + } + break; + default: + throw new IllegalConfigurationException("invalid container type: " + containerTypeString); + } + + return new MesosTaskManagerParameters( + cpus, + containerType, + Option.apply(imageName), + containeredParameters); + } - return new MesosTaskManagerParameters(cpus, containeredParameters); + public enum ContainerType { + MESOS, + DOCKER } } http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java index 5100deb..75b5043 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java @@ -18,15 +18,20 @@ package org.apache.flink.mesos.runtime.clusterframework; -import java.io.File; import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -35,7 +40,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.Preconditions; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,24 +51,33 @@ public class MesosTaskManagerRunner { private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class); + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + /** The process environment variables */ private static final Map<String, String> ENV = System.getenv(); - public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws IOException { + public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws Exception { EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); // try to parse the command line arguments + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(ALL_OPTIONS, args); + final Configuration configuration; try { - configuration = TaskManager.parseArgsAndLoadConfig(args); - - // add dynamic properties to TaskManager configuration. - final Configuration dynamicProperties = - FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); + final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); LOG.debug("Mesos dynamic properties: {}", dynamicProperties); - configuration.addAll(dynamicProperties); + + configuration = GlobalConfiguration.loadConfiguration(); } catch (Throwable t) { LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t); @@ -74,7 +87,6 @@ public class MesosTaskManagerRunner { // read the environment variables final Map<String, String> envs = System.getenv(); - final String effectiveUsername = envs.get(MesosConfigKeys.ENV_HADOOP_USER_NAME); final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR); // configure local directory @@ -88,20 +100,12 @@ public class MesosTaskManagerRunner { configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs); } - final String keytab = envs.get(MesosConfigKeys.ENV_KEYTAB); - LOG.info("Keytab file:{}", keytab); - - final String principal = envs.get(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL); - LOG.info("Keytab principal:{}", principal); - - if(keytab != null && keytab.length() != 0) { - File f = new File(".", keytab); - if(!f.exists()) { - LOG.error("Could not locate keytab file:[" + keytab + "]"); - System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); - } - configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab); - configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, principal); + // configure the default filesystem + try { + FileSystem.setDefaultScheme(configuration); + } catch (IOException e) { + throw new IOException("Error while setting the default " + + "filesystem scheme from configuration.", e); } // tell akka to die in case of an error @@ -112,23 +116,17 @@ public class MesosTaskManagerRunner { final ResourceID resourceId = new ResourceID(containerID); LOG.info("ResourceID assigned for this container: {}", resourceId); - String hadoopConfDir = envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR); - LOG.info("hadoopConfDir: {}", hadoopConfDir); - + // Run the TM in the security context SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration); - if(hadoopConfDir != null && hadoopConfDir.length() != 0) { - sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration()); - } + sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration()); + SecurityUtils.install(sc); try { - SecurityUtils.install(sc); - LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'", - UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername); - SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() { + SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { @Override - public Object call() throws Exception { + public Integer call() throws Exception { TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager); - return null; + return 0; } }); } @@ -136,6 +134,5 @@ public class MesosTaskManagerRunner { LOG.error("Error while starting the TaskManager", t); System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE()); } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java new file mode 100644 index 0000000..a6a26dc --- /dev/null +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.flink.core.fs.Path; +import scala.Option; + +import java.net.URL; + +/** + * An interface for resolving artifact URIs. + */ +public interface MesosArtifactResolver { + Option<URL> resolve(Path remoteFile); +} http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java index fbf61ac..37cb260 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java @@ -26,7 +26,6 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; -import io.netty.channel.DefaultFileRegion; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; @@ -43,24 +42,32 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.router.Handler; import io.netty.handler.codec.http.router.Routed; import io.netty.handler.codec.http.router.Router; +import io.netty.handler.stream.ChunkedStream; + +import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.ssl.SslHandler; import io.netty.util.CharsetUtil; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.net.SSLUtils; import org.jets3t.service.utils.Mimetypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; +import java.util.HashMap; +import java.util.Map; import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; @@ -82,7 +89,7 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; * http://mesos.apache.org/documentation/latest/fetcher/ * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ */ -public class MesosArtifactServer { +public class MesosArtifactServer implements MesosArtifactResolver { private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); @@ -92,17 +99,19 @@ public class MesosArtifactServer { private Channel serverChannel; - private URL baseURL; + private final URL baseURL; + + private final Map<Path,URL> paths = new HashMap<>(); private final SSLContext serverSSLContext; - public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort, Configuration config) - throws Exception { + public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration config) + throws Exception { if (configuredPort < 0 || configuredPort > 0xFFFF) { throw new IllegalArgumentException("File server port is invalid: " + configuredPort); } - // Config to enable https access to the web-ui + // Config to enable https access to the artifact server boolean enableSSL = config.getBoolean( ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED, ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) && @@ -136,6 +145,7 @@ public class MesosArtifactServer { ch.pipeline() .addLast(new HttpServerCodec()) + .addLast(new ChunkedWriteHandler()) .addLast(handler.name(), handler) .addLast(new UnknownFileHandler()); } @@ -159,11 +169,15 @@ public class MesosArtifactServer { String httpProtocol = (serverSSLContext != null) ? "https": "http"; - baseURL = new URL(httpProtocol, serverHostname, port, "/" + sessionID + "/"); + baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/"); LOG.info("Mesos Artifact Server Base URL: {}, listening at {}:{}", baseURL, address, port); } + public URL baseURL() { + return baseURL; + } + /** * Get the server port on which the artifact server is listening. */ @@ -185,13 +199,51 @@ public class MesosArtifactServer { * @param remoteFile the remote path with which to locate the file. * @return the fully-qualified remote path to the file. * @throws MalformedURLException if the remote path is invalid. + */ + public synchronized URL addFile(File localFile, String remoteFile) throws IOException, MalformedURLException { + return addPath(new Path(localFile.toURI()), new Path(remoteFile)); + } + + /** + * Adds a path to the artifact server. + * @param path the qualified FS path to serve (local, hdfs, etc). + * @param remoteFile the remote path with which to locate the file. + * @return the fully-qualified remote path to the file. + * @throws MalformedURLException if the remote path is invalid. */ - public synchronized URL addFile(File localFile, String remoteFile) throws MalformedURLException { - URL fileURL = new URL(baseURL, remoteFile); - router.ANY(fileURL.getPath(), new VirtualFileServerHandler(localFile)); + public synchronized URL addPath(Path path, Path remoteFile) throws IOException, MalformedURLException { + if(paths.containsKey(remoteFile)) { + throw new IllegalArgumentException("duplicate path registered"); + } + if(remoteFile.isAbsolute()) { + throw new IllegalArgumentException("not expecting an absolute path"); + } + URL fileURL = new URL(baseURL, remoteFile.toString()); + router.ANY(fileURL.getPath(), new VirtualFileServerHandler(path)); + + paths.put(remoteFile, fileURL); + return fileURL; } + public synchronized void removePath(Path remoteFile) { + if(paths.containsKey(remoteFile)) { + URL fileURL = null; + try { + fileURL = new URL(baseURL, remoteFile.toString()); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + router.removePath(fileURL.getPath()); + } + } + + @Override + public synchronized Option<URL> resolve(Path remoteFile) { + Option<URL> resolved = Option.apply(paths.get(remoteFile)); + return resolved; + } + /** * Stops the artifact server. * @throws Exception @@ -215,12 +267,17 @@ public class MesosArtifactServer { @ChannelHandler.Sharable public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<Routed> { - private final File file; + private FileSystem fs; + private Path path; - public VirtualFileServerHandler(File file) { - this.file = file; - if(!file.exists()) { - throw new IllegalArgumentException("no such file: " + file.getAbsolutePath()); + public VirtualFileServerHandler(Path path) throws IOException { + this.path = path; + if(!path.isAbsolute()) { + throw new IllegalArgumentException("path must be absolute: " + path.toString()); + } + this.fs = path.getFileSystem(); + if(!fs.exists(path) || fs.getFileStatus(path).isDir()) { + throw new IllegalArgumentException("no such file: " + path.toString()); } } @@ -230,7 +287,7 @@ public class MesosArtifactServer { HttpRequest request = routed.request(); if (LOG.isDebugEnabled()) { - LOG.debug("{} request for file '{}'", request.getMethod(), file.getAbsolutePath()); + LOG.debug("{} request for file '{}'", request.getMethod(), path); } if(!(request.getMethod() == GET || request.getMethod() == HEAD)) { @@ -238,47 +295,40 @@ public class MesosArtifactServer { return; } - final RandomAccessFile raf; + + final FileStatus status; try { - raf = new RandomAccessFile(file, "r"); + status = fs.getFileStatus(path); } - catch (FileNotFoundException e) { + catch (IOException e) { + LOG.error("unable to stat file", e); sendError(ctx, GONE); return; } - try { - long fileLength = raf.length(); - // compose the response - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - if (HttpHeaders.isKeepAlive(request)) { - response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - HttpHeaders.setHeader(response, CACHE_CONTROL, "private"); - HttpHeaders.setHeader(response, CONTENT_TYPE, Mimetypes.MIMETYPE_OCTET_STREAM); - HttpHeaders.setContentLength(response, fileLength); + // compose the response + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + HttpHeaders.setHeader(response, CONNECTION, HttpHeaders.Values.CLOSE); + HttpHeaders.setHeader(response, CACHE_CONTROL, "private"); + HttpHeaders.setHeader(response, CONTENT_TYPE, Mimetypes.MIMETYPE_OCTET_STREAM); + HttpHeaders.setContentLength(response, status.getLen()); - ctx.write(response); + ctx.write(response); - if (request.getMethod() == GET) { - // write the content. Netty's DefaultFileRegion will close the file. - ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); + if (request.getMethod() == GET) { + // write the content. Netty will close the stream. + final FSDataInputStream stream = fs.open(path); + try { + ctx.write(new ChunkedStream(stream)); } - else { - // close the file immediately in HEAD case - raf.close(); + catch(Exception e) { + stream.close(); + throw e; } - ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); - - // close the connection, if no keep-alive is needed - if (!HttpHeaders.isKeepAlive(request)) { - lastContentFuture.addListener(ChannelFutureListener.CLOSE); - } - } - catch(Exception ex) { - raf.close(); - throw ex; } + + ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + lastContentFuture.addListener(ChannelFutureListener.CLOSE); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java index f287e13..93ccf68 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java @@ -33,10 +33,12 @@ import org.apache.flink.mesos.scheduler.LaunchCoordinator; import org.apache.flink.mesos.scheduler.TaskMonitor; import org.apache.flink.mesos.scheduler.messages.*; import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.util.MesosArtifactResolver; import org.apache.flink.mesos.util.MesosConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.clusterframework.messages.*; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.ActorGateway; @@ -79,6 +81,7 @@ public class MesosFlinkResourceManagerTest { private static Configuration config = new Configuration() {{ setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1); + setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0); }}; @BeforeClass @@ -107,12 +110,13 @@ public class MesosFlinkResourceManagerTest { MesosWorkerStore workerStore, LeaderRetrievalService leaderRetrievalService, MesosTaskManagerParameters taskManagerParameters, - Protos.TaskInfo.Builder taskManagerLaunchContext, + ContainerSpecification taskManagerContainerSpec, + MesosArtifactResolver artifactResolver, int maxFailedTasks, int numInitialTaskManagers) { super(flinkConfig, mesosConfig, workerStore, leaderRetrievalService, taskManagerParameters, - taskManagerLaunchContext, maxFailedTasks, numInitialTaskManagers); + taskManagerContainerSpec, artifactResolver, maxFailedTasks, numInitialTaskManagers); } @Override @@ -141,6 +145,7 @@ public class MesosFlinkResourceManagerTest { public LeaderRetrievalService retrievalService; public MesosConfiguration mesosConfig; public MesosWorkerStore workerStore; + public MesosArtifactResolver artifactResolver; public SchedulerDriver schedulerDriver; public TestingMesosFlinkResourceManager resourceManagerInstance; public ActorGateway resourceManager; @@ -176,6 +181,9 @@ public class MesosFlinkResourceManagerTest { // worker store workerStore = mock(MesosWorkerStore.class); when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty()); + + // artifact + artifactResolver = mock(MesosArtifactResolver.class); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -185,15 +193,16 @@ public class MesosFlinkResourceManagerTest { * Initialize the resource manager. */ public void initialize() { + ContainerSpecification containerSpecification = new ContainerSpecification(); ContaineredTaskManagerParameters containeredParams = new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>()); - MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(1.0, containeredParams); - Protos.TaskInfo.Builder taskInfo = Protos.TaskInfo.newBuilder(); + MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters( + 1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams); TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef = TestActorRef.create(system, MesosFlinkResourceManager.createActorProps( TestingMesosFlinkResourceManager.class, - config, mesosConfig, workerStore, retrievalService, tmParams, taskInfo, 0, LOG)); + config, mesosConfig, workerStore, retrievalService, tmParams, containerSpecification, artifactResolver, LOG)); resourceManagerInstance = resourceManagerRef.underlyingActor(); resourceManager = new AkkaActorGateway(resourceManagerRef, null);
