Repository: flink Updated Branches: refs/heads/master 85c55dcbb -> 58b9a3772
[FLINK-2087] Add streaming mode switch to YARN This closes #788 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58b9a377 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58b9a377 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58b9a377 Branch: refs/heads/master Commit: 58b9a3772f5027f58335fb299b122e8ecb9db218 Parents: 85c55dc Author: Robert Metzger <rmetz...@apache.org> Authored: Thu Jun 4 21:50:19 2015 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Mon Jun 8 00:58:13 2015 +0200 ---------------------------------------------------------------------- docs/setup/yarn_setup.md | 1 + .../apache/flink/client/FlinkYarnSessionCli.java | 8 ++++++++ .../flink/runtime/yarn/AbstractFlinkYarnClient.java | 6 ++++++ .../flink/runtime/taskmanager/TaskManager.scala | 2 +- .../streaming/connectors/kafka/KafkaITCase.java | 2 ++ .../java/org/apache/flink/yarn/FlinkYarnClient.java | 8 ++++++++ .../org/apache/flink/yarn/ApplicationMaster.scala | 16 +++++++++++++++- .../apache/flink/yarn/ApplicationMasterActor.scala | 14 ++++++++++++-- 8 files changed, 53 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/docs/setup/yarn_setup.md ---------------------------------------------------------------------- diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md index cf9f6f8..71a217f 100644 --- a/docs/setup/yarn_setup.md +++ b/docs/setup/yarn_setup.md @@ -99,6 +99,7 @@ Usage: -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager + -st,--streaming Start Flink in streaming mode -tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB] ~~~ http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 7352457..0fa7173 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -73,6 +73,7 @@ public class FlinkYarnSessionCli { private final Option CONTAINER; private final Option SLOTS; private final Option DETACHED; + private final Option STREAMING; /** * Dynamic properties allow the user to specify additional configuration values with -D, such as @@ -95,6 +96,7 @@ public class FlinkYarnSessionCli { SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager"); DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties"); DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached"); + STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode"); } public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) { @@ -214,6 +216,10 @@ public class FlinkYarnSessionCli { detachedMode = true; flinkYarnClient.setDetachedMode(detachedMode); } + + if (cmd.hasOption(STREAMING.getOpt())) { + flinkYarnClient.setStreamingMode(true); + } return flinkYarnClient; } @@ -237,6 +243,7 @@ public class FlinkYarnSessionCli { opt.addOption(SLOTS); opt.addOption(DYNAMIC_PROPERTIES); opt.addOption(DETACHED); + opt.addOption(STREAMING); formatter.printHelp(" ", opt); } @@ -342,6 +349,7 @@ public class FlinkYarnSessionCli { options.addOption(SLOTS); options.addOption(DYNAMIC_PROPERTIES); options.addOption(DETACHED); + options.addOption(STREAMING); } public int run(String[] args) { http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java index 039e926..0e307e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java @@ -132,4 +132,10 @@ public abstract class AbstractFlinkYarnClient { * directory in HDFS that contains the jar files and configuration which is shipped to all the containers. */ public abstract String getSessionFilesDir(); + + /** + * Instruct Flink to start in streaming mode + * @param streamingMode + */ + public abstract void setStreamingMode(boolean streamingMode); } http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 7bf5bc5..0b76bda 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1248,7 +1248,7 @@ object TaskManager { streamingMode: StreamingMode, taskManagerClass: Class[_ <: TaskManager]) : Unit = { - LOG.info("Starting TaskManager") + LOG.info(s"Starting TaskManager in streaming mode $streamingMode") // Bring up the TaskManager actor system first, bind it to the given address. http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 0a12b07..43e3cc3 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -74,6 +74,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -281,6 +282,7 @@ public class KafkaITCase { * */ @Test + @Ignore public void testPersistentSourceWithOffsetUpdates() throws Exception { LOG.info("Starting testPersistentSourceWithOffsetUpdates()"); http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java index 88b2987..118f4ad 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java @@ -93,6 +93,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME"; public static final String ENV_SLOTS = "_SLOTS"; public static final String ENV_DETACHED = "_DETACHED"; + public static final String ENV_STREAMING_MODE = "_STREAMING_MODE"; public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES"; @@ -140,6 +141,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { private org.apache.flink.configuration.Configuration flinkConfiguration; private boolean detached; + private boolean streamingMode; public FlinkYarnClient() { @@ -576,6 +578,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName()); appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, String.valueOf(slots)); appMasterEnv.put(FlinkYarnClient.ENV_DETACHED, String.valueOf(detached)); + appMasterEnv.put(FlinkYarnClient.ENV_STREAMING_MODE, String.valueOf(streamingMode)); if(dynamicPropertiesEncoded != null) { appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded); @@ -726,6 +729,11 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { return sessionFilesDir.toString(); } + @Override + public void setStreamingMode(boolean streamingMode) { + this.streamingMode = streamingMode; + } + public static class YarnDeploymentException extends RuntimeException { public YarnDeploymentException() { } http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala index c1a937e..5dd197d 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -56,7 +56,7 @@ object ApplicationMaster { EnvironmentInformation.checkJavaVersion() org.apache.flink.runtime.util.SignalHandler.register(LOG.logger) - val streamingMode = StreamingMode.BATCH_ONLY + var streamingMode = StreamingMode.BATCH_ONLY val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername) @@ -84,6 +84,11 @@ object ApplicationMaster { val logDirs = env.get(Environment.LOG_DIRS.key()) + if(hasStreamingMode(env)) { + LOG.info("Starting ApplicationMaster/JobManager in streaming mode") + streamingMode = StreamingMode.STREAMING + } + // Note that we use the "ownHostname" given by YARN here, to make sure // we use the hostnames given by YARN consistently throughout akka. // for akka "localhost" and "localhost.localdomain" are different actors. @@ -246,4 +251,13 @@ object ApplicationMaster { (configuration, jobManagerSystem, jobManager, archiver) } + + + def hasStreamingMode(env: java.util.Map[String, String]): Boolean = { + val sModeString = env.get(FlinkYarnClient.ENV_STREAMING_MODE) + if(sModeString != null) { + return sModeString.toBoolean + } + false + } } http://git-wip-us.apache.org/repos/asf/flink/blob/58b9a377/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala index 26d1f69..999610f 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala @@ -461,8 +461,10 @@ trait ApplicationMasterActor extends ActorLogMessages { runningContainers = 0 failedContainers = 0 + val hs = ApplicationMaster.hasStreamingMode(env) containerLaunchContext = Some(createContainerLaunchContext(heapLimit, hasLogback, hasLog4j, - yarnClientUsername, conf, taskManagerLocalResources)) + yarnClientUsername, conf, taskManagerLocalResources, hs)) + context.system.scheduler.scheduleOnce(FAST_YARN_HEARTBEAT_DELAY, self, HeartbeatWithYarn) } recover { @@ -499,7 +501,8 @@ trait ApplicationMasterActor extends ActorLogMessages { private def createContainerLaunchContext(heapLimit: Int, hasLogback: Boolean, hasLog4j: Boolean, yarnClientUsername: String, yarnConf: Configuration, - taskManagerLocalResources: Map[String, LocalResource]): + taskManagerLocalResources: Map[String, LocalResource], + streamingMode: Boolean): ContainerLaunchContext = { log.info("Create container launch context.") val ctx = Records.newRecord(classOf[ContainerLaunchContext]) @@ -525,6 +528,13 @@ trait ApplicationMasterActor extends ActorLogMessages { s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stdout.log 2> " + s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stderr.log" + tmCommand ++= " --streamingMode" + if(streamingMode) { + tmCommand ++= " streaming" + } else { + tmCommand ++= " batch" + } + ctx.setCommands(Collections.singletonList(tmCommand.toString())) log.info(s"Starting TM with command=${tmCommand.toString()}")