This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 03c125c41f12c1f27fe61a320c5a04af200e4ec0 Author: Kostas Kloudas <[email protected]> AuthorDate: Wed Nov 13 16:23:03 2019 +0100 [FLINK-14745] Wire the configuration to the ClientUtils.executeProgram --- .../java/org/apache/flink/client/ClientUtils.java | 29 ++++++++++++++------ .../org/apache/flink/client/cli/CliFrontend.java | 12 +++----- .../flink/client/cli/CliFrontendRunTest.java | 7 +++-- .../apache/flink/client/program/ClientTest.java | 32 ++++++++++++++++++---- 4 files changed, 55 insertions(+), 25 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index 043b740..02b73a9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.client; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.client.cli.ExecutionConfigAccessor; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ContextEnvironment; import org.apache.flink.client.program.ContextEnvironmentFactory; @@ -27,9 +28,11 @@ import org.apache.flink.client.program.DetachedJobExecutionResult; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramMissingJobException; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.ExceptionUtils; @@ -133,13 +136,23 @@ public enum ClientUtils { } public static JobSubmissionResult executeProgram( + Configuration configuration, ClusterClient<?> client, - PackagedProgram program, - int parallelism, - boolean detached) throws ProgramMissingJobException, ProgramInvocationException { + PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException { + + final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + + final List<URL> jobJars = executionConfigAccessor.getJars(); + final List<URL> classpaths = executionConfigAccessor.getClasspaths(); + final SavepointRestoreSettings savepointSettings = executionConfigAccessor.getSavepointRestoreSettings(); + final int parallelism = executionConfigAccessor.getParallelism(); + final boolean detached = executionConfigAccessor.getDetachedMode(); + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + final ClassLoader userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(jobJars, classpaths, contextClassLoader); + try { - Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader()); + Thread.currentThread().setContextClassLoader(userCodeClassLoader); LOG.info("Starting program (detached: {})", detached); @@ -147,12 +160,12 @@ public enum ClientUtils { ContextEnvironmentFactory factory = new ContextEnvironmentFactory( client, - program.getJobJarAndDependencies(), - program.getClasspaths(), - program.getUserCodeClassLoader(), + jobJars, + classpaths, + userCodeClassLoader, parallelism, detached, - program.getSavepointSettings(), + savepointSettings, jobExecutionResult); ContextEnvironment.setAsContext(factory); diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 9ff6d53..5d24aee 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -278,11 +278,8 @@ public class CliFrontend { try { int userParallelism = executionParameters.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); - if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) { - userParallelism = defaultParallelism; - } - executeProgram(program, client, userParallelism, executionParameters.getDetachedMode()); + executeProgram(executionConfig, program, client); } finally { if (clusterId == null && !executionParameters.getDetachedMode()) { // terminate the cluster only if we have started it before and if it's not detached @@ -742,13 +739,12 @@ public class CliFrontend { // -------------------------------------------------------------------------------------------- protected void executeProgram( + Configuration configuration, PackagedProgram program, - ClusterClient<?> client, - int parallelism, - boolean detached) throws ProgramMissingJobException, ProgramInvocationException { + ClusterClient<?> client) throws ProgramMissingJobException, ProgramInvocationException { logAndSysout("Starting execution of program"); - JobSubmissionResult result = ClientUtils.executeProgram(client, program, parallelism, detached); + JobSubmissionResult result = ClientUtils.executeProgram(configuration, client, program); if (result.isJobExecutionResult()) { logAndSysout("Program execution finished"); diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java index 449e1b2..a0d551b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java @@ -198,9 +198,10 @@ public class CliFrontendRunTest extends CliFrontendTestBase { } @Override - protected void executeProgram(PackagedProgram program, ClusterClient client, int parallelism, boolean detached) { - assertEquals(isDetached, detached); - assertEquals(expectedParallelism, parallelism); + protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) { + final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); + assertEquals(isDetached, executionConfigAccessor.getDetachedMode()); + assertEquals(expectedParallelism, executionConfigAccessor.getParallelism()); } } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index a1cc8a2..5845080 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -31,8 +31,12 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.FlinkPipelineTranslationUtil; import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.DefaultCostEstimator; @@ -52,6 +56,7 @@ import org.junit.experimental.categories.Category; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.net.URL; import java.util.Collections; import static org.junit.Assert.assertEquals; @@ -92,6 +97,15 @@ public class ClientTest extends TestLogger { config.setString(AkkaOptions.ASK_TIMEOUT, AkkaOptions.ASK_TIMEOUT.defaultValue()); } + private Configuration fromPackagedProgram(final PackagedProgram program, final int parallelism, final boolean detached) { + final Configuration configuration = new Configuration(); + configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism); + configuration.set(DeploymentOptions.ATTACHED, !detached); + ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString); + ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString); + return configuration; + } + /** * Tests that invalid detached mode programs fail. */ @@ -100,7 +114,8 @@ public class ClientTest extends TestLogger { final ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster()); try { PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build(); - ClientUtils.executeProgram(clusterClient, prg, 1, true); + final Configuration configuration = fromPackagedProgram(prg, 1, true); + ClientUtils.executeProgram(configuration, clusterClient, prg); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -110,7 +125,8 @@ public class ClientTest extends TestLogger { try { PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build(); - ClientUtils.executeProgram(clusterClient, prg, 1, true); + final Configuration configuration = fromPackagedProgram(prg, 1, true); + ClientUtils.executeProgram(configuration, clusterClient, prg); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -120,7 +136,8 @@ public class ClientTest extends TestLogger { try { PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build(); - ClientUtils.executeProgram(clusterClient, prg, 1, true); + final Configuration configuration = fromPackagedProgram(prg, 1, true); + ClientUtils.executeProgram(configuration, clusterClient, prg); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -130,7 +147,8 @@ public class ClientTest extends TestLogger { try { PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build(); - ClientUtils.executeProgram(clusterClient, prg, 1, true); + final Configuration configuration = fromPackagedProgram(prg, 1, true); + ClientUtils.executeProgram(configuration, clusterClient, prg); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -140,7 +158,8 @@ public class ClientTest extends TestLogger { try { PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build(); - ClientUtils.executeProgram(clusterClient, prg, 1, true); + final Configuration configuration = fromPackagedProgram(prg, 1, true); + ClientUtils.executeProgram(configuration, clusterClient, prg); fail(FAIL_MESSAGE); } catch (ProgramInvocationException e) { assertEquals( @@ -184,7 +203,8 @@ public class ClientTest extends TestLogger { try { final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster()); - ClientUtils.executeProgram(client, packagedProgramMock, 1, true); + final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true); + ClientUtils.executeProgram(configuration, client, packagedProgramMock); fail("Creating the local execution environment should not be possible"); } catch (InvalidProgramException e) {
