This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 2a5f148  Revert "[FLINK-17935] Move set yarn.log-config-file to 
YarnClusterClientFactory.createClusterDescriptor()"
2a5f148 is described below

commit 2a5f148c82590f9c1c4b0504b53a65eb42c93fe5
Author: Kostas Kloudas <[email protected]>
AuthorDate: Thu Jun 4 10:43:10 2020 +0200

    Revert "[FLINK-17935] Move set yarn.log-config-file to 
YarnClusterClientFactory.createClusterDescriptor()"
    
    This reverts commit 32c65903
---
 .../org/apache/flink/client/cli/CliFrontend.java   |  2 +-
 .../org/apache/flink/client/cli/ExecutorCLI.java   |  7 +---
 .../apache/flink/client/cli/ExecutorCLITest.java   | 21 +++---------
 .../configuration/DeploymentOptionsInternal.java   | 37 ----------------------
 .../flink/kubernetes/cli/KubernetesSessionCli.java | 13 +++-----
 .../decorators/FlinkConfMountDecorator.java        |  3 +-
 .../parameters/AbstractKubernetesParameters.java   | 13 ++------
 .../parameters/KubernetesParameters.java           |  2 --
 .../kubernetes/cli/KubernetesSessionCliTest.java   | 33 ++++---------------
 .../org/apache/flink/api/scala/FlinkShell.scala    | 11 +++++--
 .../flink/yarn/YarnClusterClientFactory.java       |  7 ----
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  4 +++
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  4 +--
 .../yarn/configuration/YarnLogConfigUtil.java      |  4 +--
 14 files changed, 39 insertions(+), 122 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index a038511..1e2bae0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -1045,7 +1045,7 @@ public class CliFrontend {
 
        public static List<CustomCommandLine> 
loadCustomCommandLines(Configuration configuration, String 
configurationDirectory) {
                List<CustomCommandLine> customCommandLines = new ArrayList<>();
-               customCommandLines.add(new ExecutorCLI(configuration, 
configurationDirectory));
+               customCommandLines.add(new ExecutorCLI(configuration));
 
                //      Command line interface of the YARN session, with a 
special initialization here
                //      to prefix all options with y/yarn.
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
index e88de9e..c1a3d0b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.cli;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.PipelineExecutor;
@@ -73,11 +72,8 @@ public class ExecutorCLI implements CustomCommandLine {
 
        private final Configuration baseConfiguration;
 
-       private final String configurationDir;
-
-       public ExecutorCLI(final Configuration configuration, final String 
configDir) {
+       public ExecutorCLI(final Configuration configuration) {
                this.baseConfiguration = new 
UnmodifiableConfiguration(checkNotNull(configuration));
-               this.configurationDir =  checkNotNull(configDir);
        }
 
        @Override
@@ -119,7 +115,6 @@ public class ExecutorCLI implements CustomCommandLine {
                }
 
                encodeDynamicProperties(commandLine, effectiveConfiguration);
-               effectiveConfiguration.set(DeploymentOptionsInternal.CONF_DIR, 
configurationDir);
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Effective Configuration: {}", 
effectiveConfiguration);
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java 
b/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
index daa6cd3..6eb9a02 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
@@ -26,9 +26,7 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 import java.util.Arrays;
 import java.util.List;
@@ -43,18 +41,13 @@ import static org.junit.Assert.assertFalse;
  */
 public class ExecutorCLITest {
 
-       @Rule
-       public TemporaryFolder tmp = new TemporaryFolder();
-
        private Options testOptions;
 
        @Before
        public void initOptions() {
                testOptions = new Options();
 
-               final ExecutorCLI cliUnderTest = new ExecutorCLI(
-                               new Configuration(),
-                               tmp.getRoot().getAbsolutePath());
+               final ExecutorCLI cliUnderTest = new ExecutorCLI(new 
Configuration());
                cliUnderTest.addGeneralOptions(testOptions);
        }
 
@@ -64,9 +57,7 @@ public class ExecutorCLITest {
                final Configuration loadedConfig = new Configuration();
                loadedConfig.set(DeploymentOptions.TARGET, 
expectedExecutorName);
 
-               final ExecutorCLI cliUnderTest = new ExecutorCLI(
-                               loadedConfig,
-                               tmp.getRoot().getAbsolutePath());
+               final ExecutorCLI cliUnderTest = new ExecutorCLI(loadedConfig);
                final CommandLine emptyCommandLine = 
CliFrontendParser.parse(testOptions, new String[0], true);
 
                final Configuration configuration = 
cliUnderTest.applyCommandLineOptionsToConfiguration(emptyCommandLine);
@@ -96,9 +87,7 @@ public class ExecutorCLITest {
                                "-D" + CoreOptions.DEFAULT_PARALLELISM.key() + 
"=5"
                };
 
-               final ExecutorCLI cliUnderTest = new ExecutorCLI(
-                               loadedConfig,
-                               tmp.getRoot().getAbsolutePath());
+               final ExecutorCLI cliUnderTest = new ExecutorCLI(loadedConfig);
                final CommandLine commandLine = 
CliFrontendParser.parse(testOptions, args, true);
 
                final Configuration configuration = 
cliUnderTest.applyCommandLineOptionsToConfiguration(commandLine);
@@ -124,9 +113,7 @@ public class ExecutorCLITest {
                final ConfigOption<Integer> configOption = 
key("test.int").intType().noDefaultValue();
                final int expectedValue = 42;
 
-               final ExecutorCLI cliUnderTest = new ExecutorCLI(
-                               new Configuration(),
-                               tmp.getRoot().getAbsolutePath());
+               final ExecutorCLI cliUnderTest = new ExecutorCLI(new 
Configuration());
 
                final String[] args = {executorOption, expectedExecutorName, 
"-D" + configOption.key() + "=" + expectedValue};
                final CommandLine commandLine = 
CliFrontendParser.parse(testOptions, args, true);
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
deleted file mode 100644
index 22c6387..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.configuration;
-
-import org.apache.flink.annotation.Internal;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-/**
- * Internal options used during deployment.
- */
-@Internal
-public class DeploymentOptionsInternal {
-
-       public static final ConfigOption<String> CONF_DIR =
-                       key("$internal.deployment.config-dir")
-                                       .stringType()
-                                       .noDefaultValue()
-                                       .withDescription("**DO NOT USE** The 
path to the configuration directory.");
-
-}
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
index 36fc950..db1bc8c 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.AbstractCustomCommandLine;
 import org.apache.flink.client.cli.CliArgsException;
-import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.ExecutorCLI;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
@@ -69,14 +68,14 @@ public class KubernetesSessionCli {
        private final ExecutorCLI cli;
        private final ClusterClientServiceLoader clusterClientServiceLoader;
 
-       public KubernetesSessionCli(Configuration configuration, String 
configDir) {
-               this(configuration, new DefaultClusterClientServiceLoader(), 
configDir);
+       public KubernetesSessionCli(Configuration configuration) {
+               this(configuration, new DefaultClusterClientServiceLoader());
        }
 
-       public KubernetesSessionCli(Configuration configuration, 
ClusterClientServiceLoader clusterClientServiceLoader, String configDir) {
+       public KubernetesSessionCli(Configuration configuration, 
ClusterClientServiceLoader clusterClientServiceLoader) {
                this.baseConfiguration = new 
UnmodifiableConfiguration(checkNotNull(configuration));
                this.clusterClientServiceLoader = 
checkNotNull(clusterClientServiceLoader);
-               this.cli = new ExecutorCLI(baseConfiguration, configDir);
+               this.cli = new ExecutorCLI(baseConfiguration);
        }
 
        public Configuration getEffectiveConfiguration(String[] args) throws 
CliArgsException {
@@ -179,12 +178,10 @@ public class KubernetesSessionCli {
        public static void main(String[] args) {
                final Configuration configuration = 
GlobalConfiguration.loadConfiguration();
 
-               final String configDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
-
                int retCode;
 
                try {
-                       final KubernetesSessionCli cli = new 
KubernetesSessionCli(configuration, configDir);
+                       final KubernetesSessionCli cli = new 
KubernetesSessionCli(configuration);
                        retCode = 
SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
                } catch (CliArgsException e) {
                        retCode = 
AbstractCustomCommandLine.handleCliArgsException(e, LOG);
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 79eb7aa..07cf85e 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.kubernetes.kubeclient.decorators;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
@@ -166,7 +167,7 @@ public class FlinkConfMountDecorator extends 
AbstractKubernetesStepDecorator {
        }
 
        private List<File> getLocalLogConfFiles() {
-               final String confDir = 
kubernetesComponentConf.getConfigDirectory();
+               final String confDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
                final File logbackFile = new File(confDir, 
CONFIG_FILE_LOGBACK_NAME);
                final File log4jFile = new File(confDir, 
CONFIG_FILE_LOG4J_NAME);
 
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index c655b63..b3bfd6b 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.kubernetes.kubeclient.parameters;
 
+import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.utils.Constants;
 
@@ -55,13 +55,6 @@ public abstract class AbstractKubernetesParameters 
implements KubernetesParamete
        }
 
        @Override
-       public String getConfigDirectory() {
-               final String configDir = 
flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
-               checkNotNull(configDir);
-               return configDir;
-       }
-
-       @Override
        public String getClusterId() {
                final String clusterId = 
flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID);
 
@@ -137,14 +130,14 @@ public abstract class AbstractKubernetesParameters 
implements KubernetesParamete
 
        @Override
        public boolean hasLogback() {
-               final String confDir = getConfigDirectory();
+               final String confDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
                final File logbackFile = new File(confDir, 
CONFIG_FILE_LOGBACK_NAME);
                return logbackFile.exists();
        }
 
        @Override
        public boolean hasLog4j() {
-               final String confDir = getConfigDirectory();
+               final String confDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
                final File log4jFile = new File(confDir, 
CONFIG_FILE_LOG4J_NAME);
                return log4jFile.exists();
        }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
index 2e5ee0a..44443c4 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
@@ -32,8 +32,6 @@ import java.util.Optional;
  */
 public interface KubernetesParameters {
 
-       String getConfigDirectory();
-
        String getClusterId();
 
        String getNamespace();
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
index 79953d7..df7fe6a 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -33,9 +32,7 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutor;
 
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 import java.util.Map;
 
@@ -49,14 +46,9 @@ import static org.junit.Assert.assertTrue;
  */
 public class KubernetesSessionCliTest {
 
-       @Rule
-       public TemporaryFolder tmp = new TemporaryFolder();
-
        @Test
        public void testKubernetesSessionCliSetsDeploymentTargetCorrectly() 
throws CliArgsException {
-               final KubernetesSessionCli cli = new KubernetesSessionCli(
-                               new Configuration(),
-                               tmp.getRoot().getAbsolutePath());
+               final KubernetesSessionCli cli = new KubernetesSessionCli(new 
Configuration());
 
                final String[] args = {};
                final Configuration configuration = 
cli.getEffectiveConfiguration(args);
@@ -67,9 +59,7 @@ public class KubernetesSessionCliTest {
        @Test
        public void testDynamicProperties() throws Exception {
 
-               final KubernetesSessionCli cli = new KubernetesSessionCli(
-                               new Configuration(),
-                               tmp.getRoot().getAbsolutePath());
+               final KubernetesSessionCli cli = new KubernetesSessionCli(new 
Configuration());
                final String[] args = new String[] {
                        "-e", KubernetesSessionClusterExecutor.NAME,
                        "-Dakka.ask.timeout=5 min",
@@ -82,10 +72,9 @@ public class KubernetesSessionCliTest {
                Assert.assertNotNull(clientFactory);
 
                final Map<String, String> executorConfigMap = 
executorConfig.toMap();
-               assertEquals(4, executorConfigMap.size());
+               assertEquals(3, executorConfigMap.size());
                assertEquals("5 min", 
executorConfigMap.get("akka.ask.timeout"));
                assertEquals("-DappName=foobar", 
executorConfigMap.get("env.java.opts"));
-               assertEquals(tmp.getRoot().getAbsolutePath(), 
executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
                
assertTrue(executorConfigMap.containsKey(DeploymentOptions.TARGET.key()));
        }
 
@@ -142,9 +131,7 @@ public class KubernetesSessionCliTest {
                                "-D" + TaskManagerOptions.NUM_TASK_SLOTS.key() 
+ "=" + slotsPerTaskManager
                };
 
-               final KubernetesSessionCli cli = new KubernetesSessionCli(
-                               configuration,
-                               tmp.getRoot().getAbsolutePath());
+               final KubernetesSessionCli cli = new 
KubernetesSessionCli(configuration);
 
                Configuration executorConfig = 
cli.getEffectiveConfiguration(args);
                ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
@@ -170,9 +157,7 @@ public class KubernetesSessionCliTest {
                configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
slotsPerTaskManager);
 
                final String[] args = {"-e", 
KubernetesSessionClusterExecutor.NAME};
-               final KubernetesSessionCli cli = new KubernetesSessionCli(
-                               configuration,
-                               tmp.getRoot().getAbsolutePath());
+               final KubernetesSessionCli cli = new 
KubernetesSessionCli(configuration);
 
                Configuration executorConfig = 
cli.getEffectiveConfiguration(args);
                ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
@@ -235,9 +220,7 @@ public class KubernetesSessionCliTest {
                
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
                
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
 
-               final KubernetesSessionCli cli = new KubernetesSessionCli(
-                               configuration,
-                               tmp.getRoot().getAbsolutePath());
+               final KubernetesSessionCli cli = new 
KubernetesSessionCli(configuration);
 
                final Configuration executorConfig = 
cli.getEffectiveConfiguration(new String[]{});
                final ClusterClientFactory<String> clientFactory = 
getClusterClientFactory(executorConfig);
@@ -275,8 +258,6 @@ public class KubernetesSessionCliTest {
                Configuration configuration = new Configuration();
                configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(totalMemory));
                configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(totalMemory));
-               return new KubernetesSessionCli(
-                               configuration,
-                               tmp.getRoot().getAbsolutePath());
+               return new KubernetesSessionCli(configuration);
        }
 }
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index cfd91a5..d3f65c2 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -140,7 +140,10 @@ object FlinkShell {
   }
 
   private def getConfigDir(config: Config) = {
-    config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
+    config.configDir match {
+      case Some(confDir) => confDir
+      case None => CliFrontend.getConfigurationDirectoryFromEnv
+    }
   }
 
   private def getGlobalConfig(config: Config) = {
@@ -229,7 +232,8 @@ object FlinkShell {
     val effectiveConfig = new Configuration(flinkConfig)
     val args = parseYarnArgList(config, "yarn-cluster")
 
-    val configurationDirectory = getConfigDir(config)
+    val configurationDirectory =
+      config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
 
     val frontend = new CliFrontend(
       effectiveConfig,
@@ -267,7 +271,8 @@ object FlinkShell {
     val effectiveConfig = new Configuration(flinkConfig)
     val args = parseYarnArgList(config, mode)
 
-    val configurationDirectory = getConfigDir(config)
+    val configurationDirectory =
+      config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
 
     val frontend = new CliFrontend(
       effectiveConfig,
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index 776172f..b7b8645 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -23,10 +23,8 @@ import 
org.apache.flink.client.deployment.AbstractContainerizedClusterClientFact
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
-import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -53,11 +51,6 @@ public class YarnClusterClientFactory extends 
AbstractContainerizedClusterClient
        @Override
        public YarnClusterDescriptor createClusterDescriptor(Configuration 
configuration) {
                checkNotNull(configuration);
-
-               final String configurationDirectory =
-                               
configuration.get(DeploymentOptionsInternal.CONF_DIR);
-               YarnLogConfigUtil.setLogConfigFileInConfig(configuration, 
configurationDirectory);
-
                return getClusterDescriptor(configuration);
        }
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 25b1009..2d2103c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterRetrieveException;
@@ -404,6 +405,9 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                final List<String> pipelineJars = 
flinkConfiguration.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList());
                Preconditions.checkArgument(pipelineJars.size() == 1, "Should 
only have one jar");
 
+               final String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
+               YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, 
configurationDirectory);
+
                try {
                        return deployInternal(
                                        clusterSpecification,
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 9507ad8..19f1373 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
@@ -45,6 +44,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.ShutdownHookUtil;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
 import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
 import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
@@ -433,7 +433,7 @@ public class FlinkYarnSessionCli extends 
AbstractCustomCommandLine {
                        configuration.setString(YarnConfigOptions.NODE_LABEL, 
nodeLabelValue);
                }
 
-               configuration.set(DeploymentOptionsInternal.CONF_DIR, 
configurationDirectory);
+               YarnLogConfigUtil.setLogConfigFileInConfig(configuration, 
configurationDirectory);
        }
 
        private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
index 0a93dce..981cd4d 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
@@ -47,12 +47,12 @@ public class YarnLogConfigUtil {
                        final Configuration configuration,
                        final String configurationDirectory) {
 
-               if 
(configuration.get(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) != 
null) {
+               if 
(configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) 
!= null) {
                        return configuration;
                }
 
                discoverLogConfigFile(configurationDirectory).ifPresent(file ->
-                               
configuration.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, 
file.getPath()));
+                               
configuration.setString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, 
file.getPath()));
                return configuration;
        }
 

Reply via email to