This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 03c125c41f12c1f27fe61a320c5a04af200e4ec0
Author: Kostas Kloudas <[email protected]>
AuthorDate: Wed Nov 13 16:23:03 2019 +0100

    [FLINK-14745] Wire the configuration to the ClientUtils.executeProgram
---
 .../java/org/apache/flink/client/ClientUtils.java  | 29 ++++++++++++++------
 .../org/apache/flink/client/cli/CliFrontend.java   | 12 +++-----
 .../flink/client/cli/CliFrontendRunTest.java       |  7 +++--
 .../apache/flink/client/program/ClientTest.java    | 32 ++++++++++++++++++----
 4 files changed, 55 insertions(+), 25 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 043b740..02b73a9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ContextEnvironmentFactory;
@@ -27,9 +28,11 @@ import 
org.apache.flink.client.program.DetachedJobExecutionResult;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -133,13 +136,23 @@ public enum ClientUtils {
        }
 
        public static JobSubmissionResult executeProgram(
+                       Configuration configuration,
                        ClusterClient<?> client,
-                       PackagedProgram program,
-                       int parallelism,
-                       boolean detached) throws ProgramMissingJobException, 
ProgramInvocationException {
+                       PackagedProgram program) throws 
ProgramMissingJobException, ProgramInvocationException {
+
+               final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
+
+               final List<URL> jobJars = executionConfigAccessor.getJars();
+               final List<URL> classpaths = 
executionConfigAccessor.getClasspaths();
+               final SavepointRestoreSettings savepointSettings = 
executionConfigAccessor.getSavepointRestoreSettings();
+               final int parallelism = 
executionConfigAccessor.getParallelism();
+               final boolean detached = 
executionConfigAccessor.getDetachedMode();
+
                final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+               final ClassLoader userCodeClassLoader = 
ClientUtils.buildUserCodeClassLoader(jobJars, classpaths, contextClassLoader);
+
                try {
-                       
Thread.currentThread().setContextClassLoader(program.getUserCodeClassLoader());
+                       
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 
                        LOG.info("Starting program (detached: {})", detached);
 
@@ -147,12 +160,12 @@ public enum ClientUtils {
 
                        ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(
                                client,
-                               program.getJobJarAndDependencies(),
-                               program.getClasspaths(),
-                               program.getUserCodeClassLoader(),
+                               jobJars,
+                               classpaths,
+                               userCodeClassLoader,
                                parallelism,
                                detached,
-                               program.getSavepointSettings(),
+                               savepointSettings,
                                jobExecutionResult);
                        ContextEnvironment.setAsContext(factory);
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 9ff6d53..5d24aee 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -278,11 +278,8 @@ public class CliFrontend {
                                try {
                                        int userParallelism = 
executionParameters.getParallelism();
                                        LOG.debug("User parallelism is set to 
{}", userParallelism);
-                                       if (ExecutionConfig.PARALLELISM_DEFAULT 
== userParallelism) {
-                                               userParallelism = 
defaultParallelism;
-                                       }
 
-                                       executeProgram(program, client, 
userParallelism, executionParameters.getDetachedMode());
+                                       executeProgram(executionConfig, 
program, client);
                                } finally {
                                        if (clusterId == null && 
!executionParameters.getDetachedMode()) {
                                                // terminate the cluster only 
if we have started it before and if it's not detached
@@ -742,13 +739,12 @@ public class CliFrontend {
        // 
--------------------------------------------------------------------------------------------
 
        protected void executeProgram(
+                       Configuration configuration,
                        PackagedProgram program,
-                       ClusterClient<?> client,
-                       int parallelism,
-                       boolean detached) throws ProgramMissingJobException, 
ProgramInvocationException {
+                       ClusterClient<?> client) throws 
ProgramMissingJobException, ProgramInvocationException {
                logAndSysout("Starting execution of program");
 
-               JobSubmissionResult result = ClientUtils.executeProgram(client, 
program, parallelism, detached);
+               JobSubmissionResult result = 
ClientUtils.executeProgram(configuration, client, program);
 
                if (result.isJobExecutionResult()) {
                        logAndSysout("Program execution finished");
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index 449e1b2..a0d551b 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -198,9 +198,10 @@ public class CliFrontendRunTest extends 
CliFrontendTestBase {
                }
 
                @Override
-               protected void executeProgram(PackagedProgram program, 
ClusterClient client, int parallelism, boolean detached) {
-                       assertEquals(isDetached, detached);
-                       assertEquals(expectedParallelism, parallelism);
+               protected void executeProgram(Configuration configuration, 
PackagedProgram program, ClusterClient client) {
+                       final ExecutionConfigAccessor executionConfigAccessor = 
ExecutionConfigAccessor.fromConfiguration(configuration);
+                       assertEquals(isDetached, 
executionConfigAccessor.getDetachedMode());
+                       assertEquals(expectedParallelism, 
executionConfigAccessor.getParallelism());
                }
        }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index a1cc8a2..5845080 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -31,8 +31,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -52,6 +56,7 @@ import org.junit.experimental.categories.Category;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.net.URL;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
@@ -92,6 +97,15 @@ public class ClientTest extends TestLogger {
                config.setString(AkkaOptions.ASK_TIMEOUT, 
AkkaOptions.ASK_TIMEOUT.defaultValue());
        }
 
+       private Configuration fromPackagedProgram(final PackagedProgram 
program, final int parallelism, final boolean detached) {
+               final Configuration configuration = new Configuration();
+               configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
+               configuration.set(DeploymentOptions.ATTACHED, !detached);
+               ConfigUtils.encodeCollectionToConfig(configuration, 
PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);
+               ConfigUtils.encodeCollectionToConfig(configuration, 
PipelineOptions.JARS, program.getJobJarAndDependencies(), URL::toString);
+               return configuration;
+       }
+
        /**
         * Tests that invalid detached mode programs fail.
         */
@@ -100,7 +114,8 @@ public class ClientTest extends TestLogger {
                final ClusterClient<?> clusterClient = new 
MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
                try {
                        PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
-                       ClientUtils.executeProgram(clusterClient, prg, 1, true);
+                       final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+                       ClientUtils.executeProgram(configuration, 
clusterClient, prg);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -110,7 +125,8 @@ public class ClientTest extends TestLogger {
 
                try {
                        PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
-                       ClientUtils.executeProgram(clusterClient, prg, 1, true);
+                       final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+                       ClientUtils.executeProgram(configuration, 
clusterClient, prg);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -120,7 +136,8 @@ public class ClientTest extends TestLogger {
 
                try {
                        PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
-                       ClientUtils.executeProgram(clusterClient, prg, 1, true);
+                       final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+                       ClientUtils.executeProgram(configuration, 
clusterClient, prg);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -130,7 +147,8 @@ public class ClientTest extends TestLogger {
 
                try {
                        PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
-                       ClientUtils.executeProgram(clusterClient, prg, 1, true);
+                       final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+                       ClientUtils.executeProgram(configuration, 
clusterClient, prg);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -140,7 +158,8 @@ public class ClientTest extends TestLogger {
 
                try {
                        PackagedProgram prg = 
PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
-                       ClientUtils.executeProgram(clusterClient, prg, 1, true);
+                       final Configuration configuration = 
fromPackagedProgram(prg, 1, true);
+                       ClientUtils.executeProgram(configuration, 
clusterClient, prg);
                        fail(FAIL_MESSAGE);
                } catch (ProgramInvocationException e) {
                        assertEquals(
@@ -184,7 +203,8 @@ public class ClientTest extends TestLogger {
 
                try {
                        final ClusterClient<?> client = new 
MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
-                       ClientUtils.executeProgram(client, packagedProgramMock, 
1, true);
+                       final Configuration configuration = 
fromPackagedProgram(packagedProgramMock, 1, true);
+                       ClientUtils.executeProgram(configuration, client, 
packagedProgramMock);
                        fail("Creating the local execution environment should 
not be possible");
                }
                catch (InvalidProgramException e) {

Reply via email to