This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 3d08530 [FLINK-17788][scala-shell] Fix yarn session support in scala
shell
3d08530 is described below
commit 3d0853035c1a70522bf239ced4b17c0f67c5e025
Author: Kostas Kloudas <[email protected]>
AuthorDate: Wed Jun 10 15:25:39 2020 +0200
[FLINK-17788][scala-shell] Fix yarn session support in scala shell
---
.../src/main/scala/org/apache/flink/api/scala/FlinkShell.scala | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index cfd91a5..f77b938 100644
---
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -222,12 +222,14 @@ object FlinkShell {
case None => fetchDeployedYarnClusterInfo(config, clusterConfig,
"default")
}
+ println("Configuration: " + effectiveConfig)
+
(effectiveConfig, clusterClient)
}
private def deployNewYarnCluster(config: Config, flinkConfig: Configuration)
= {
val effectiveConfig = new Configuration(flinkConfig)
- val args = parseYarnArgList(config, "yarn-cluster")
+ val args = parseArgList(config, "yarn-cluster")
val configurationDirectory = getConfigDir(config)
@@ -253,6 +255,7 @@ object FlinkShell {
.deploySessionCluster(clusterSpecification)
.getClusterClient
} finally {
+ executorConfig.set(DeploymentOptions.TARGET, "yarn-session")
clusterDescriptor.close()
}
@@ -265,7 +268,7 @@ object FlinkShell {
mode: String) = {
val effectiveConfig = new Configuration(flinkConfig)
- val args = parseYarnArgList(config, mode)
+ val args = parseArgList(config, mode)
val configurationDirectory = getConfigDir(config)
@@ -284,7 +287,7 @@ object FlinkShell {
(executorConfig, None)
}
- def parseYarnArgList(config: Config, mode: String): Array[String] = {
+ def parseArgList(config: Config, mode: String): Array[String] = {
val args = if (mode == "default") {
ArrayBuffer[String]()
} else {