This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors-clean in repository https://gitbox.apache.org/repos/asf/flink.git
commit e156e3942e033403ad4b64d97c7289a98e55a081 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Tue Nov 19 06:25:28 2019 +0100 [FLINK-XXXXX] Fix ClusterClientFactryDiscovery --- .../main/java/org/apache/flink/client/cli/DefaultCLI.java | 5 +++-- .../flink/client/deployment/StandaloneClientFactory.java | 5 ++--- .../client/deployment/ClusterClientServiceLoaderTest.java | 3 ++- .../org/apache/flink/yarn/YarnClusterClientFactory.java | 8 +++++--- .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 8 +++++--- .../apache/flink/yarn/YarnClusterClientFactoryTest.java | 15 +++++++++++++-- 6 files changed, 30 insertions(+), 14 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java index 397d5dd..1245688 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -18,7 +18,6 @@ package org.apache.flink.client.cli; -import org.apache.flink.client.deployment.StandaloneClientFactory; import org.apache.flink.configuration.Configuration; import org.apache.commons.cli.CommandLine; @@ -29,6 +28,8 @@ import org.apache.commons.cli.Options; */ public class DefaultCLI extends AbstractCustomCommandLine { + public static final String ID = "default"; + public DefaultCLI(Configuration configuration) { super(configuration); } @@ -41,7 +42,7 @@ public class DefaultCLI extends AbstractCustomCommandLine { @Override public String getId() { - return StandaloneClientFactory.ID; + return ID; } @Override diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java index 647f14f..e5ec6f4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.client.deployment; import org.apache.flink.annotation.Internal; +import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; @@ -32,12 +33,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> { - public static final String ID = "default"; - @Override public boolean isCompatibleWith(Configuration configuration) { checkNotNull(configuration); - return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET)); + return StandaloneSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET)); } @Override diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java index a8e34ab..b7a9953 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java @@ -18,6 +18,7 @@ package org.apache.flink.client.deployment; +import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; @@ -57,7 +58,7 @@ public class ClusterClientServiceLoaderTest { @Test public void testStandaloneClusterClientFactoryDiscovery() { final Configuration config = new Configuration(); - config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID.toUpperCase()); + config.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME); ClusterClientFactory<StandaloneClusterId> factory = serviceLoaderUnderTest.getClusterClientFactory(config); assertTrue(factory instanceof StandaloneClientFactory); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java index fd23699..1b1d9de 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java @@ -26,6 +26,8 @@ import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.executors.YarnJobClusterExecutor; +import org.apache.flink.yarn.executors.YarnSessionClusterExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -42,12 +44,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class YarnClusterClientFactory implements ClusterClientFactory<ApplicationId> { - public static final String ID = "yarn-cluster"; - @Override public boolean isCompatibleWith(Configuration configuration) { checkNotNull(configuration); - return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET)); + final String deploymentTarget = configuration.getString(DeploymentOptions.TARGET); + return YarnJobClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget) || + YarnSessionClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget); } @Override diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 388dea0..7ef70dc 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -43,7 +43,6 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.ShutdownHookUtil; -import org.apache.flink.yarn.YarnClusterClientFactory; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal; @@ -105,6 +104,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine { private static final long CLIENT_POLLING_INTERVAL_MS = 3000L; + /** The id for the CommandLine interface. */ + private static final String ID = "yarn-cluster"; + // YARN-session related constants private static final String YARN_PROPERTIES_FILE = ".yarn-properties-"; private static final String YARN_APPLICATION_ID_KEY = "applicationID"; @@ -319,14 +321,14 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine { @Override public boolean isActive(CommandLine commandLine) { String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null); - boolean yarnJobManager = YarnClusterClientFactory.ID.equals(jobManagerOption); + boolean yarnJobManager = ID.equals(jobManagerOption); boolean yarnAppId = commandLine.hasOption(applicationId.getOpt()); return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null); } @Override public String getId() { - return YarnClusterClientFactory.ID; + return ID; } @Override diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java index 6bdd920..508c11e 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java @@ -23,6 +23,8 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.yarn.executors.YarnJobClusterExecutor; +import org.apache.flink.yarn.executors.YarnSessionClusterExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.junit.Test; @@ -35,9 +37,18 @@ import static org.junit.Assert.assertTrue; public class YarnClusterClientFactoryTest { @Test - public void testYarnClusterClientFactoryDiscovery() { + public void testYarnClusterClientFactoryDiscoveryWithPerJobExecutor() { + testYarnClusterClientFactoryDiscoveryHelper(YarnJobClusterExecutor.NAME); + } + + @Test + public void testYarnClusterClientFactoryDiscoveryWithSessionExecutor() { + testYarnClusterClientFactoryDiscoveryHelper(YarnSessionClusterExecutor.NAME); + } + + private void testYarnClusterClientFactoryDiscoveryHelper(final String targetName) { final Configuration configuration = new Configuration(); - configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID.toUpperCase()); + configuration.setString(DeploymentOptions.TARGET, targetName); final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader(); final ClusterClientFactory<ApplicationId> factory = serviceLoader.getClusterClientFactory(configuration);