This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 2a5f148 Revert "[FLINK-17935] Move set yarn.log-config-file to
YarnClusterClientFactory.createClusterDescriptor()"
2a5f148 is described below
commit 2a5f148c82590f9c1c4b0504b53a65eb42c93fe5
Author: Kostas Kloudas <[email protected]>
AuthorDate: Thu Jun 4 10:43:10 2020 +0200
Revert "[FLINK-17935] Move set yarn.log-config-file to
YarnClusterClientFactory.createClusterDescriptor()"
This reverts commit 32c65903
---
.../org/apache/flink/client/cli/CliFrontend.java | 2 +-
.../org/apache/flink/client/cli/ExecutorCLI.java | 7 +---
.../apache/flink/client/cli/ExecutorCLITest.java | 21 +++---------
.../configuration/DeploymentOptionsInternal.java | 37 ----------------------
.../flink/kubernetes/cli/KubernetesSessionCli.java | 13 +++-----
.../decorators/FlinkConfMountDecorator.java | 3 +-
.../parameters/AbstractKubernetesParameters.java | 13 ++------
.../parameters/KubernetesParameters.java | 2 --
.../kubernetes/cli/KubernetesSessionCliTest.java | 33 ++++---------------
.../org/apache/flink/api/scala/FlinkShell.scala | 11 +++++--
.../flink/yarn/YarnClusterClientFactory.java | 7 ----
.../apache/flink/yarn/YarnClusterDescriptor.java | 4 +++
.../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 4 +--
.../yarn/configuration/YarnLogConfigUtil.java | 4 +--
14 files changed, 39 insertions(+), 122 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index a038511..1e2bae0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -1045,7 +1045,7 @@ public class CliFrontend {
public static List<CustomCommandLine>
loadCustomCommandLines(Configuration configuration, String
configurationDirectory) {
List<CustomCommandLine> customCommandLines = new ArrayList<>();
- customCommandLines.add(new ExecutorCLI(configuration,
configurationDirectory));
+ customCommandLines.add(new ExecutorCLI(configuration));
// Command line interface of the YARN session, with a
special initialization here
// to prefix all options with y/yarn.
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
index e88de9e..c1a3d0b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutorCLI.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.cli;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.PipelineExecutor;
@@ -73,11 +72,8 @@ public class ExecutorCLI implements CustomCommandLine {
private final Configuration baseConfiguration;
- private final String configurationDir;
-
- public ExecutorCLI(final Configuration configuration, final String
configDir) {
+ public ExecutorCLI(final Configuration configuration) {
this.baseConfiguration = new
UnmodifiableConfiguration(checkNotNull(configuration));
- this.configurationDir = checkNotNull(configDir);
}
@Override
@@ -119,7 +115,6 @@ public class ExecutorCLI implements CustomCommandLine {
}
encodeDynamicProperties(commandLine, effectiveConfiguration);
- effectiveConfiguration.set(DeploymentOptionsInternal.CONF_DIR,
configurationDir);
if (LOG.isDebugEnabled()) {
LOG.debug("Effective Configuration: {}",
effectiveConfiguration);
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
b/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
index daa6cd3..6eb9a02 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/cli/ExecutorCLITest.java
@@ -26,9 +26,7 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import java.util.Arrays;
import java.util.List;
@@ -43,18 +41,13 @@ import static org.junit.Assert.assertFalse;
*/
public class ExecutorCLITest {
- @Rule
- public TemporaryFolder tmp = new TemporaryFolder();
-
private Options testOptions;
@Before
public void initOptions() {
testOptions = new Options();
- final ExecutorCLI cliUnderTest = new ExecutorCLI(
- new Configuration(),
- tmp.getRoot().getAbsolutePath());
+ final ExecutorCLI cliUnderTest = new ExecutorCLI(new
Configuration());
cliUnderTest.addGeneralOptions(testOptions);
}
@@ -64,9 +57,7 @@ public class ExecutorCLITest {
final Configuration loadedConfig = new Configuration();
loadedConfig.set(DeploymentOptions.TARGET,
expectedExecutorName);
- final ExecutorCLI cliUnderTest = new ExecutorCLI(
- loadedConfig,
- tmp.getRoot().getAbsolutePath());
+ final ExecutorCLI cliUnderTest = new ExecutorCLI(loadedConfig);
final CommandLine emptyCommandLine =
CliFrontendParser.parse(testOptions, new String[0], true);
final Configuration configuration =
cliUnderTest.applyCommandLineOptionsToConfiguration(emptyCommandLine);
@@ -96,9 +87,7 @@ public class ExecutorCLITest {
"-D" + CoreOptions.DEFAULT_PARALLELISM.key() +
"=5"
};
- final ExecutorCLI cliUnderTest = new ExecutorCLI(
- loadedConfig,
- tmp.getRoot().getAbsolutePath());
+ final ExecutorCLI cliUnderTest = new ExecutorCLI(loadedConfig);
final CommandLine commandLine =
CliFrontendParser.parse(testOptions, args, true);
final Configuration configuration =
cliUnderTest.applyCommandLineOptionsToConfiguration(commandLine);
@@ -124,9 +113,7 @@ public class ExecutorCLITest {
final ConfigOption<Integer> configOption =
key("test.int").intType().noDefaultValue();
final int expectedValue = 42;
- final ExecutorCLI cliUnderTest = new ExecutorCLI(
- new Configuration(),
- tmp.getRoot().getAbsolutePath());
+ final ExecutorCLI cliUnderTest = new ExecutorCLI(new
Configuration());
final String[] args = {executorOption, expectedExecutorName,
"-D" + configOption.key() + "=" + expectedValue};
final CommandLine commandLine =
CliFrontendParser.parse(testOptions, args, true);
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
deleted file mode 100644
index 22c6387..0000000
---
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptionsInternal.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.configuration;
-
-import org.apache.flink.annotation.Internal;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-/**
- * Internal options used during deployment.
- */
-@Internal
-public class DeploymentOptionsInternal {
-
- public static final ConfigOption<String> CONF_DIR =
- key("$internal.deployment.config-dir")
- .stringType()
- .noDefaultValue()
- .withDescription("**DO NOT USE** The
path to the configuration directory.");
-
-}
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
index 36fc950..db1bc8c 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.AbstractCustomCommandLine;
import org.apache.flink.client.cli.CliArgsException;
-import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.ExecutorCLI;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
@@ -69,14 +68,14 @@ public class KubernetesSessionCli {
private final ExecutorCLI cli;
private final ClusterClientServiceLoader clusterClientServiceLoader;
- public KubernetesSessionCli(Configuration configuration, String
configDir) {
- this(configuration, new DefaultClusterClientServiceLoader(),
configDir);
+ public KubernetesSessionCli(Configuration configuration) {
+ this(configuration, new DefaultClusterClientServiceLoader());
}
- public KubernetesSessionCli(Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader, String configDir) {
+ public KubernetesSessionCli(Configuration configuration,
ClusterClientServiceLoader clusterClientServiceLoader) {
this.baseConfiguration = new
UnmodifiableConfiguration(checkNotNull(configuration));
this.clusterClientServiceLoader =
checkNotNull(clusterClientServiceLoader);
- this.cli = new ExecutorCLI(baseConfiguration, configDir);
+ this.cli = new ExecutorCLI(baseConfiguration);
}
public Configuration getEffectiveConfiguration(String[] args) throws
CliArgsException {
@@ -179,12 +178,10 @@ public class KubernetesSessionCli {
public static void main(String[] args) {
final Configuration configuration =
GlobalConfiguration.loadConfiguration();
- final String configDir =
CliFrontend.getConfigurationDirectoryFromEnv();
-
int retCode;
try {
- final KubernetesSessionCli cli = new
KubernetesSessionCli(configuration, configDir);
+ final KubernetesSessionCli cli = new
KubernetesSessionCli(configuration);
retCode =
SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
} catch (CliArgsException e) {
retCode =
AbstractCustomCommandLine.handleCliArgsException(e, LOG);
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 79eb7aa..07cf85e 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -19,6 +19,7 @@
package org.apache.flink.kubernetes.kubeclient.decorators;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
@@ -166,7 +167,7 @@ public class FlinkConfMountDecorator extends
AbstractKubernetesStepDecorator {
}
private List<File> getLocalLogConfFiles() {
- final String confDir =
kubernetesComponentConf.getConfigDirectory();
+ final String confDir =
CliFrontend.getConfigurationDirectoryFromEnv();
final File logbackFile = new File(confDir,
CONFIG_FILE_LOGBACK_NAME);
final File log4jFile = new File(confDir,
CONFIG_FILE_LOG4J_NAME);
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index c655b63..b3bfd6b 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -18,8 +18,8 @@
package org.apache.flink.kubernetes.kubeclient.parameters;
+import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
@@ -55,13 +55,6 @@ public abstract class AbstractKubernetesParameters
implements KubernetesParamete
}
@Override
- public String getConfigDirectory() {
- final String configDir =
flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
- checkNotNull(configDir);
- return configDir;
- }
-
- @Override
public String getClusterId() {
final String clusterId =
flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID);
@@ -137,14 +130,14 @@ public abstract class AbstractKubernetesParameters
implements KubernetesParamete
@Override
public boolean hasLogback() {
- final String confDir = getConfigDirectory();
+ final String confDir =
CliFrontend.getConfigurationDirectoryFromEnv();
final File logbackFile = new File(confDir,
CONFIG_FILE_LOGBACK_NAME);
return logbackFile.exists();
}
@Override
public boolean hasLog4j() {
- final String confDir = getConfigDirectory();
+ final String confDir =
CliFrontend.getConfigurationDirectoryFromEnv();
final File log4jFile = new File(confDir,
CONFIG_FILE_LOG4J_NAME);
return log4jFile.exists();
}
diff --git
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
index 2e5ee0a..44443c4 100644
---
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
+++
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java
@@ -32,8 +32,6 @@ import java.util.Optional;
*/
public interface KubernetesParameters {
- String getConfigDirectory();
-
String getClusterId();
String getNamespace();
diff --git
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
index 79953d7..df7fe6a 100644
---
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
+++
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/cli/KubernetesSessionCliTest.java
@@ -25,7 +25,6 @@ import
org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -33,9 +32,7 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutor;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import java.util.Map;
@@ -49,14 +46,9 @@ import static org.junit.Assert.assertTrue;
*/
public class KubernetesSessionCliTest {
- @Rule
- public TemporaryFolder tmp = new TemporaryFolder();
-
@Test
public void testKubernetesSessionCliSetsDeploymentTargetCorrectly()
throws CliArgsException {
- final KubernetesSessionCli cli = new KubernetesSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath());
+ final KubernetesSessionCli cli = new KubernetesSessionCli(new
Configuration());
final String[] args = {};
final Configuration configuration =
cli.getEffectiveConfiguration(args);
@@ -67,9 +59,7 @@ public class KubernetesSessionCliTest {
@Test
public void testDynamicProperties() throws Exception {
- final KubernetesSessionCli cli = new KubernetesSessionCli(
- new Configuration(),
- tmp.getRoot().getAbsolutePath());
+ final KubernetesSessionCli cli = new KubernetesSessionCli(new
Configuration());
final String[] args = new String[] {
"-e", KubernetesSessionClusterExecutor.NAME,
"-Dakka.ask.timeout=5 min",
@@ -82,10 +72,9 @@ public class KubernetesSessionCliTest {
Assert.assertNotNull(clientFactory);
final Map<String, String> executorConfigMap =
executorConfig.toMap();
- assertEquals(4, executorConfigMap.size());
+ assertEquals(3, executorConfigMap.size());
assertEquals("5 min",
executorConfigMap.get("akka.ask.timeout"));
assertEquals("-DappName=foobar",
executorConfigMap.get("env.java.opts"));
- assertEquals(tmp.getRoot().getAbsolutePath(),
executorConfig.get(DeploymentOptionsInternal.CONF_DIR));
assertTrue(executorConfigMap.containsKey(DeploymentOptions.TARGET.key()));
}
@@ -142,9 +131,7 @@ public class KubernetesSessionCliTest {
"-D" + TaskManagerOptions.NUM_TASK_SLOTS.key()
+ "=" + slotsPerTaskManager
};
- final KubernetesSessionCli cli = new KubernetesSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath());
+ final KubernetesSessionCli cli = new
KubernetesSessionCli(configuration);
Configuration executorConfig =
cli.getEffectiveConfiguration(args);
ClusterClientFactory<String> clientFactory =
getClusterClientFactory(executorConfig);
@@ -170,9 +157,7 @@ public class KubernetesSessionCliTest {
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,
slotsPerTaskManager);
final String[] args = {"-e",
KubernetesSessionClusterExecutor.NAME};
- final KubernetesSessionCli cli = new KubernetesSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath());
+ final KubernetesSessionCli cli = new
KubernetesSessionCli(configuration);
Configuration executorConfig =
cli.getEffectiveConfiguration(args);
ClusterClientFactory<String> clientFactory =
getClusterClientFactory(executorConfig);
@@ -235,9 +220,7 @@ public class KubernetesSessionCliTest {
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
- final KubernetesSessionCli cli = new KubernetesSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath());
+ final KubernetesSessionCli cli = new
KubernetesSessionCli(configuration);
final Configuration executorConfig =
cli.getEffectiveConfiguration(new String[]{});
final ClusterClientFactory<String> clientFactory =
getClusterClientFactory(executorConfig);
@@ -275,8 +258,6 @@ public class KubernetesSessionCliTest {
Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(totalMemory));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(totalMemory));
- return new KubernetesSessionCli(
- configuration,
- tmp.getRoot().getAbsolutePath());
+ return new KubernetesSessionCli(configuration);
}
}
diff --git
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index cfd91a5..d3f65c2 100644
---
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -140,7 +140,10 @@ object FlinkShell {
}
private def getConfigDir(config: Config) = {
- config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
+ config.configDir match {
+ case Some(confDir) => confDir
+ case None => CliFrontend.getConfigurationDirectoryFromEnv
+ }
}
private def getGlobalConfig(config: Config) = {
@@ -229,7 +232,8 @@ object FlinkShell {
val effectiveConfig = new Configuration(flinkConfig)
val args = parseYarnArgList(config, "yarn-cluster")
- val configurationDirectory = getConfigDir(config)
+ val configurationDirectory =
+ config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
val frontend = new CliFrontend(
effectiveConfig,
@@ -267,7 +271,8 @@ object FlinkShell {
val effectiveConfig = new Configuration(flinkConfig)
val args = parseYarnArgList(config, mode)
- val configurationDirectory = getConfigDir(config)
+ val configurationDirectory =
+ config.configDir.getOrElse(CliFrontend.getConfigurationDirectoryFromEnv)
val frontend = new CliFrontend(
effectiveConfig,
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index 776172f..b7b8645 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -23,10 +23,8 @@ import
org.apache.flink.client.deployment.AbstractContainerizedClusterClientFact
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
-import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -53,11 +51,6 @@ public class YarnClusterClientFactory extends
AbstractContainerizedClusterClient
@Override
public YarnClusterDescriptor createClusterDescriptor(Configuration
configuration) {
checkNotNull(configuration);
-
- final String configurationDirectory =
-
configuration.get(DeploymentOptionsInternal.CONF_DIR);
- YarnLogConfigUtil.setLogConfigFileInConfig(configuration,
configurationDirectory);
-
return getClusterDescriptor(configuration);
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 25b1009..2d2103c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
@@ -404,6 +405,9 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
final List<String> pipelineJars =
flinkConfiguration.getOptional(PipelineOptions.JARS).orElse(Collections.emptyList());
Preconditions.checkArgument(pipelineJars.size() == 1, "Should
only have one jar");
+ final String configurationDirectory =
CliFrontend.getConfigurationDirectoryFromEnv();
+ YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,
configurationDirectory);
+
try {
return deployInternal(
clusterSpecification,
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 9507ad8..19f1373 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
-import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
@@ -45,6 +44,7 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
@@ -433,7 +433,7 @@ public class FlinkYarnSessionCli extends
AbstractCustomCommandLine {
configuration.setString(YarnConfigOptions.NODE_LABEL,
nodeLabelValue);
}
- configuration.set(DeploymentOptionsInternal.CONF_DIR,
configurationDirectory);
+ YarnLogConfigUtil.setLogConfigFileInConfig(configuration,
configurationDirectory);
}
private boolean isYarnPropertiesFileMode(CommandLine commandLine) {
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
index 0a93dce..981cd4d 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnLogConfigUtil.java
@@ -47,12 +47,12 @@ public class YarnLogConfigUtil {
final Configuration configuration,
final String configurationDirectory) {
- if
(configuration.get(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE) !=
null) {
+ if
(configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE)
!= null) {
return configuration;
}
discoverLogConfigFile(configurationDirectory).ifPresent(file ->
-
configuration.set(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE,
file.getPath()));
+
configuration.setString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE,
file.getPath()));
return configuration;
}