This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new d22874e  [FLINK-17935] Move set yarn.log-config-file to 
YarnClusterClientFactory.createClusterDescriptor()
d22874e is described below

commit d22874ef6322bfb774d60b09c022c3dbbe295d70
Author: Kostas Kloudas <[email protected]>
AuthorDate: Fri May 29 15:08:51 2020 +0200

    [FLINK-17935] Move set yarn.log-config-file to 
YarnClusterClientFactory.createClusterDescriptor()
    
    This closes #12455.
---
 .../org/apache/flink/client/cli/CliFrontend.java   |  2 +-
 .../org/apache/flink/client/cli/ExecutorCLI.java   |  7 +++-
 .../apache/flink/client/cli/ExecutorCLITest.java   | 21 +++++++++---
 .../configuration/DeploymentOptionsInternal.java   | 37 ++++++++++++++++++++++
 .../flink/kubernetes/cli/KubernetesSessionCli.java | 13 +++++---
 .../decorators/FlinkConfMountDecorator.java        |  3 +-
 .../parameters/AbstractKubernetesParameters.java   | 13 ++++++--
 .../parameters/KubernetesParameters.java           |  2 ++
 .../flink/kubernetes/KubernetesTestBase.java       | 11 ++-----
 .../kubernetes/cli/KubernetesSessionCliTest.java   | 33 +++++++++++++++----
 .../org/apache/flink/api/scala/FlinkShell.scala    | 11 ++-----
 .../flink/yarn/YarnClusterClientFactory.java       |  7 ++++
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  4 ---
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  4 +--
 .../yarn/configuration/YarnLogConfigUtil.java      |  4 +--
 15 files changed, 125 insertions(+), 47 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 1e2bae0..a038511 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -1045,7 +1045,7 @@ public class CliFrontend {
 
        public static List<CustomCommandLine> 
loadCustomCommandLines(Configuration configuration, String 
configurationDirectory) {
                List<CustomCommandLine> customCommandLines = new ArrayList<>();
-               customCommandLines.add(new ExecutorCLI(configuration));
+               customCommandLines.add(new ExecutorCLI(configuration, 
configurationDirectory));
 
                //      Command line interface of the YARN session, with a 
special initialization here
                //      to prefix all options with y/yarn.
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
index c1a3d0b..e88de9e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.cli;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.PipelineExecutor;
@@ -72,8 +73,11 @@ public class ExecutorCLI implements CustomCommandLine {
 
        private final Configuration baseConfiguration;
 
-       public ExecutorCLI(final Configuration configuration) {
+       private final String configurationDir;
+
+       public ExecutorCLI(final Configuration configuration, final String 
configDir) {
                this.baseConfiguration = new 
UnmodifiableConfiguration(checkNotNull(configuration));
+               this.configurationDir =  checkNotNull(configDir);
        }
 
        @Override
@@ -115,6 +119,7 @@ public class ExecutorCLI implements CustomCommandLine {
                }
 
                encodeDynamicProperties(commandLine, effectiveConfiguration);
+               effectiveConfiguration.set(DeploymentOptionsInternal.CONF_DIR, 
configurationDir);
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Effective Configuration: {}", 
effectiveConfiguration);
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java 
b/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
index 6eb9a02..daa6cd3 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
@@ -26,7 +26,9 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.util.Arrays;
 import java.util.List;
@@ -41,13 +43,18 @@ import static org.junit.Assert.assertFalse;
  */
 public class ExecutorCLITest {
 
+       @Rule
+       public TemporaryFolder tmp = new TemporaryFolder();
+
        private Options testOptions;
 
        @Before
        public void initOptions() {
                testOptions = new Options();
 
-               final ExecutorCLI cliUnderTest = new ExecutorCLI(new 
Configuration());
+               final ExecutorCLI cliUnderTest = new ExecutorCLI(
+                               new Configuration(),
+                               tmp.getRoot().getAbsolutePath());
                cliUnderTest.addGeneralOptions(testOptions);
        }
 
@@ -57,7 +64,9 @@ public class ExecutorCLITest {
                final Configuration loadedConfig = new Configuration();
                loadedConfig.set(DeploymentOptions.TARGET, 
expectedExecutorName);
 
-               final ExecutorCLI cliUnderTest = new ExecutorCLI(loadedConfig);
+               final ExecutorCLI cliUnderTest = new ExecutorCLI(
+                               loadedConfig,
+                               tmp.getRoot().getAbsolutePath());
                final CommandLine emptyCommandLine = 
CliFrontendParser.parse(testOptions, new String[0], true);
 
                final Configuration configuration = 
cliUnderTest.applyCommandLineOptionsToConfiguration(emptyCommandLine);
@@ -87,7 +96,9 @@ public class ExecutorCLITest {
                                "-D" + CoreOptions.DEFAULT_PARALLELISM.key() + 
"=5"
                };
 
-               final ExecutorCLI cliUnderTest = new ExecutorCLI(loadedConfig);
+               final ExecutorCLI cliUnderTest = new ExecutorCLI(
+                               loadedConfig,
+                               tmp.getRoot().getAbsolutePath());
                final CommandLine commandLine = 
CliFrontendParser.parse(testOptions, args, true);
 
                final Configuration configuration = 
cliUnderTest.applyCommandLineOptionsToConfiguration(commandLine);
@@ -113,7 +124,9 @@ public class ExecutorCLITest {
                final ConfigOption<Integer> configOption = 
key("test.int").intType().noDefaultValue();
                final int expectedValue = 42;
 
-               final ExecutorCLI cliUnderTest = new ExecutorCLI(new 
Configuration());
+               final ExecutorCLI cliUnderTest = new ExecutorCLI(
+                               new Configuration(),
+                               tmp.getRoot().getAbsolutePath());
 
                final String[] args = {executorOption, expectedExecutorName, 
"-D" + configOption.key() + "=" + expectedValue};
                final CommandLine commandLine = 
CliFrontendParser.parse(testOptions, args, true);
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
new file mode 100644
index 0000000..22c6387
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.Internal;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Internal options used during deployment.
+ */
+@Internal
+public class DeploymentOptionsInternal {
+
+       public static final ConfigOption<String> CONF_DIR =
+                       key("$internal.deployment.config-dir")
+                                       .stringType()
+                                       .noDefaultValue()
+                                       .withDescription("**DO NOT USE** The 
path to the configuration directory.");
+
+}
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
index db1bc8c..36fc950 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.AbstractCustomCommandLine;
 import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.ExecutorCLI;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
@@ -68,14 +69,14 @@ public class KubernetesSessionCli {
        private final ExecutorCLI cli;
        private final ClusterClientServiceLoader clusterClientServiceLoader;
 
-       public KubernetesSessionCli(Configuration configuration) {
-               this(configuration, new DefaultClusterClientServiceLoader());
+       public KubernetesSessionCli(Configuration configuration, String 
configDir) {
+               this(configuration, new DefaultClusterClientServiceLoader(), 
configDir);
        }
 
-       public KubernetesSessionCli(Configuration configuration, 
ClusterClientServiceLoader clusterClientServiceLoader) {
+       public KubernetesSessionCli(Configuration configuration, 
ClusterClientServiceLoader clusterClientServiceLoader, String configDir) {
                this.baseConfiguration = new 
UnmodifiableConfiguration(checkNotNull(configuration));
                this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
-               this.cli = new ExecutorCLI(baseConfiguration);
+               this.cli = new ExecutorCLI(baseConfiguration, configDir);
        }
 
        public Configuration getEffectiveConfiguration(String[] args) throws 
CliArgsException {
@@ -178,10 +179,12 @@ public class KubernetesSessionCli {
        public static void main(String[] args) {
                final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
 
+               final String configDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+
                int retCode;
 
                try {
-                       final KubernetesSessionCli cli = new 
KubernetesSessionCli(configuration);
+                       final KubernetesSessionCli cli = new 
KubernetesSessionCli(configuration, configDir);
                        retCode = 
SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
                } catch (CliArgsException e) {
                        retCode = 
AbstractCustomCommandLine.handleCliArgsException(e, LOG);
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 07cf85e..79eb7aa 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -19,7 +19,6 @@
 package org.apache.flink.kubernetes.kubeclient.decorators;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
@@ -167,7 +166,7 @@ public class FlinkConfMountDecorator extends 
AbstractKubernetesStepDecorator {
        }
 
        private List<File> getLocalLogConfFiles() {
-               final String confDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+               final String confDir = 
kubernetesComponentConf.getConfigDirectory();
                final File logbackFile = new File(confDir, 
CONFIG_FILE_LOGBACK_NAME);
                final File log4jFile = new File(confDir, 
CONFIG_FILE_LOG4J_NAME);
 
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index b3bfd6b..c655b63 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.kubernetes.kubeclient.parameters;
 
-import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.utils.Constants;
 
@@ -55,6 +55,13 @@ public abstract class AbstractKubernetesParameters 
implements KubernetesParamete
        }
 
        @Override
+       public String getConfigDirectory() {
+               final String configDir = 
flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
+               checkNotNull(configDir);
+               return configDir;
+       }
+
+       @Override
        public String getClusterId() {
                final String clusterId = 
flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID);
 
@@ -130,14 +137,14 @@ public abstract class AbstractKubernetesParameters 
implements KubernetesParamete
 
        @Override
        public boolean hasLogback() {
-               final String confDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+               final String confDir = getConfigDirectory();
                final File logbackFile = new File(confDir, 
CONFIG_FILE_LOGBACK_NAME);
                return logbackFile.exists();
        }
 
        @Override
        public boolean hasLog4j() {
-               final String confDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
+               final String confDir = getConfigDirectory();
                final File log4jFile = new File(confDir, 
CONFIG_FILE_LOG4J_NAME);
                return log4jFile.exists();
        }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
index 44443c4..2e5ee0a 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
@@ -32,6 +32,8 @@ import java.util.Optional;
  */
 public interface KubernetesParameters {
 
+       String getConfigDirectory();
+
        String getClusterId();
 
        String getNamespace();
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
index dc08852..a7b0286 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestBase.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.kubernetes;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.testutils.CommonTestUtils;
@@ -29,7 +29,6 @@ import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -77,6 +76,7 @@ public class KubernetesTestBase extends TestLogger {
                flinkConfig.setString(KubernetesConfigOptions.CONTAINER_IMAGE, 
CONTAINER_IMAGE);
                
flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, 
CONTAINER_IMAGE_PULL_POLICY);
                flinkConfig.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(JOB_MANAGER_MEMORY));
+               flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, 
flinkConfDir.toString());
        }
 
        protected void onSetup() throws Exception {
@@ -84,17 +84,12 @@ public class KubernetesTestBase extends TestLogger {
 
        @Before
        public final void setup() throws Exception {
-               setupFlinkConfig();
-
                flinkConfDir = temporaryFolder.newFolder().getAbsoluteFile();
                hadoopConfDir = temporaryFolder.newFolder().getAbsoluteFile();
 
+               setupFlinkConfig();
                writeFlinkConfiguration();
 
-               Map<String, String> map = new HashMap<>();
-               map.put(ConfigConstants.ENV_FLINK_CONF_DIR, 
flinkConfDir.toString());
-               TestBaseUtils.setEnv(map);
-
                kubeClient = server.getClient().inNamespace(NAMESPACE);
                flinkKubeClient = new Fabric8FlinkKubeClient(flinkConfig, 
kubeClient, Executors::newDirectExecutorService);
 
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
index df7fe6a..79953d7 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -32,7 +33,9 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutor;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.util.Map;
 
@@ -46,9 +49,14 @@ import static org.junit.Assert.assertTrue;
  */
 public class KubernetesSessionCliTest {
 
+       @Rule
+       public TemporaryFolder tmp = new TemporaryFolder();
+
        @Test
        public void testKubernetesSessionCliSetsDeploymentTargetCorrectly() 
throws CliArgsException {
-               final KubernetesSessionCli cli = new KubernetesSessionCli(new 
Configuration());
+               final KubernetesSessionCli cli = new KubernetesSessionCli(
+                               new Configuration(),
+                               tmp.getRoot().getAbsolutePath());
 
                final String[] args = {};
                final Configuration configuration = 
cli.getEffectiveConfiguration(args);
@@ -59,7 +67,9 @@ public class KubernetesSessionCliTest {
        @Test
        public void testDynamicProperties() throws Exception {
 
-               final KubernetesSessionCli cli = new KubernetesSessionCli(new 
Configuration());
+               final KubernetesSessionCli cli = new KubernetesSessionCli(
+                               new Configuration(),
+                               tmp.getRoot().getAbsolutePath());
                final String[] args = new String[] {
                        "-e", KubernetesSessionClusterExecutor.NAME,
                        "-Dakka.ask.timeout=5 min",
@@ -72,9 +82,10 @@ public class KubernetesSessionCliTest {
                Assert.assertNotNull(clientFactory);
 
                final Map<String, String> executorConfigMap = 
executorConfig.toMap();
-               assertEquals(3, executorConfigMap.size());
+               assertEquals(4, executorConfigMap.size());
                assertEquals("5 min", 
executorConfigMap.get("akka.ask.timeout"));
                assertEquals("-DappName=foobar", 
executorConfigMap.get("env.java.opts"));
+               assertEquals(tmp.getRoot().getAbsolutePath(), 
executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
                
assertTrue(executorConfigMap.containsKey(DeploymentOptions.TARGET.key()));
        }
 
@@ -131,7 +142,9 @@ public class KubernetesSessionCliTest {
                                "-D" + TaskManagerOptions.NUM_TASK_SLOTS.key() 
+ "=" + slotsPerTaskManager
                };
 
-               final KubernetesSessionCli cli = new 
KubernetesSessionCli(configuration);
+               final KubernetesSessionCli cli = new KubernetesSessionCli(
+                               configuration,
+                               tmp.getRoot().getAbsolutePath());
 
                Configuration executorConfig = 
cli.getEffectiveConfiguration(args);
                ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
@@ -157,7 +170,9 @@ public class KubernetesSessionCliTest {
                configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
slotsPerTaskManager);
 
                final String[] args = {"-e", 
KubernetesSessionClusterExecutor.NAME};
-               final KubernetesSessionCli cli = new 
KubernetesSessionCli(configuration);
+               final KubernetesSessionCli cli = new KubernetesSessionCli(
+                               configuration,
+                               tmp.getRoot().getAbsolutePath());
 
                Configuration executorConfig = 
cli.getEffectiveConfiguration(args);
                ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
@@ -220,7 +235,9 @@ public class KubernetesSessionCliTest {
                
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
                
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
 
-               final KubernetesSessionCli cli = new 
KubernetesSessionCli(configuration);
+               final KubernetesSessionCli cli = new KubernetesSessionCli(
+                               configuration,
+                               tmp.getRoot().getAbsolutePath());
 
                final Configuration executorConfig = 
cli.getEffectiveConfiguration(new String[]{});
                final ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
@@ -258,6 +275,8 @@ public class KubernetesSessionCliTest {
                Configuration configuration = new Configuration();
                configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(totalMemory));
                configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(totalMemory));
-               return new KubernetesSessionCli(configuration);
+               return new KubernetesSessionCli(
+                               configuration,
+                               tmp.getRoot().getAbsolutePath());
        }
 }
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index d3f65c2..cfd91a5 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -140,10 +140,7 @@ object FlinkShell {
   }
 
   private def getConfigDir(config: Config) = {
-    config.configDir match {
-      case Some(confDir) => confDir
-      case None => CliFrontend.getConfigurationDirectoryFromEnv
-    }
+    config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
   }
 
   private def getGlobalConfig(config: Config) = {
@@ -232,8 +229,7 @@ object FlinkShell {
     val effectiveConfig = new Configuration(flinkConfig)
     val args = parseYarnArgList(config, "yarn-cluster")
 
-    val configurationDirectory =
-      config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
+    val configurationDirectory = getConfigDir(config)
 
     val frontend = new CliFrontend(
       effectiveConfig,
@@ -271,8 +267,7 @@ object FlinkShell {
     val effectiveConfig = new Configuration(flinkConfig)
     val args = parseYarnArgList(config, mode)
 
-    val configurationDirectory =
-      config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
+    val configurationDirectory = getConfigDir(config)
 
     val frontend = new CliFrontend(
       effectiveConfig,
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index b7b8645..776172f 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -23,8 +23,10 @@ import 
org.apache.flink.client.deployment.AbstractContainerizedClusterClientFact
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
+import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -51,6 +53,11 @@ public class YarnClusterClientFactory extends 
AbstractContainerizedClusterClient
        @Override
        public YarnClusterDescriptor createClusterDescriptor(Configuration 
configuration) {
                checkNotNull(configuration);
+
+               final String configurationDirectory =
+                               
configuration.get(DeploymentOptionsInternal.CONF_DIR);
+               YarnLogConfigUtil.setLogConfigFileInConfig(configuration, 
configurationDirectory);
+
                return getClusterDescriptor(configuration);
        }
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 8e15e56..6acd05d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterRetrieveException;
@@ -405,9 +404,6 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                final List<String> pipelineJars = 
flinkConfiguration.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList());
                Preconditions.checkArgument(pipelineJars.size() == 1, "Should 
only have one jar");
 
-               final String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
-               YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
configurationDirectory);
-
                try {
                        return deployInternal(
                                        clusterSpecification,
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 19f1373..9507ad8 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
@@ -44,7 +45,6 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.ShutdownHookUtil;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
-import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
 import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
 import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
@@ -433,7 +433,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine {
                        configuration.setString(YarnConfigOptions.NODE_LABEL, 
nodeLabelValue);
                }
 
-               YarnLogConfigUtil.setLogConfigFileInConfig(configuration, 
configurationDirectory);
+               configuration.set(DeploymentOptionsInternal.CONF_DIR, 
configurationDirectory);
        }
 
        private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
index 981cd4d..0a93dce 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
@@ -47,12 +47,12 @@ public class YarnLogConfigUtil {
                        final Configuration configuration,
                        final String configurationDirectory) {
 
-               if 
(configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) 
!= null) {
+               if 
(configuration.get(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) != 
null) {
                        return configuration;
                }
 
                discoverLogConfigFile(configurationDirectory).ifPresent(file ->
-                               
configuration.setString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, 
file.getPath()));
+                               
configuration.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, 
file.getPath()));
                return configuration;
        }
 

Reply via email to