This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c7ef9ed387334ca2bc9034d4fba82406e60a9fa9
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Mon Nov 11 14:58:45 2019 +0100

    possible fix
---
 .../java/org/apache/flink/client/ClientUtils.java  |   5 -
 .../org/apache/flink/client/cli/CliFrontend.java   |  24 +++--
 .../flink/client/cli/ExecutionConfigAccessor.java  |   4 +-
 .../org/apache/flink/api/java/FlinkILoopTest.java  | 111 ++-------------------
 4 files changed, 21 insertions(+), 123 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java 
b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 96a02ea..b5537e2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -29,10 +29,8 @@ import 
org.apache.flink.client.program.DetachedJobExecutionResult;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
-import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -185,9 +183,6 @@ public enum ClientUtils {
 
                        final AtomicReference<JobExecutionResult> 
jobExecutionResult = new AtomicReference<>();
 
-                       // TODO: 11.11.19 this should move to the appropriate 
place
-//                     ConfigUtils.encodeStreamToConfig(configuration, 
PipelineOptions.JARS, program.getAllLibraries().stream(), URL::toString);
-
                        final ContextEnvironmentFactory factory = new 
ContextEnvironmentFactory(
                                        executorServiceLoader,
                                        configuration,
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index f53095b..780eadd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -35,10 +35,12 @@ import 
org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
@@ -176,7 +178,6 @@ public class CliFrontend {
                final CommandLine commandLine = 
CliFrontendParser.parse(commandLineOptions, args, true);
 
                final ProgramOptions programOptions = new 
ProgramOptions(commandLine);
-               final ExecutionConfigAccessor executionParameters = 
ExecutionConfigAccessor.fromProgramOptions(programOptions);
 
                // evaluate help flag
                if (commandLine.hasOption(HELP_OPTION.getOpt())) {
@@ -186,11 +187,12 @@ public class CliFrontend {
 
                if (!programOptions.isPython()) {
                        // Java program should be specified a JAR file
-                       if (executionParameters.getJarFilePaths().isEmpty()) {
+                       if (programOptions.getJarFilePath() == null) {
                                throw new CliArgsException("Java program should 
be specified a JAR file.");
                        }
                }
 
+               final ExecutionConfigAccessor executionParameters = 
ExecutionConfigAccessor.fromProgramOptions(programOptions);
                final PackagedProgram program;
                try {
                        LOG.info("Building program from JAR file");
@@ -201,8 +203,7 @@ public class CliFrontend {
                }
 
                final CustomCommandLine customCommandLine = 
getActiveCustomCommandLine(commandLine);
-
-               final Configuration effectiveConfig = 
getEffectiveConfiguration(commandLine, executionParameters, customCommandLine);
+               final Configuration effectiveConfig = 
getEffectiveConfiguration(program, customCommandLine, commandLine, 
executionParameters);
                try {
                        execute(effectiveConfig, program);
                } finally {
@@ -214,13 +215,14 @@ public class CliFrontend {
                ClientUtils.runProgram(clusterClientServiceLoader, 
configuration, program);
        }
 
-       private Configuration getEffectiveConfiguration(CommandLine 
commandLine, ExecutionConfigAccessor executionParameters, CustomCommandLine 
customCommandLine) throws FlinkException {
-               // TODO: 01.11.19 all this should be merged nicely, e.g. 
executionParameters can
-               // take an already existing configuration as a parameter.
-               final Configuration executorConfig = 
customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
-               final Configuration executionConfig = 
executionParameters.getConfiguration();
-               final Configuration effectiveConfig = new 
Configuration(executorConfig);
-               effectiveConfig.addAll(executionConfig);
+       private Configuration getEffectiveConfiguration(
+                       final PackagedProgram program,
+                       final CustomCommandLine customCommandLine,
+                       final CommandLine commandLine,
+                       final ExecutionConfigAccessor executionParameters) 
throws FlinkException {
+               final Configuration effectiveConfig = 
customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+               executionParameters.applyToConfiguration(effectiveConfig);
+               ConfigUtils.encodeStreamToConfig(effectiveConfig, 
PipelineOptions.JARS, program.getAllLibraries().stream(), URL::toString);
                return effectiveConfig;
        }
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index e82dd36..05c3946 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -95,8 +95,8 @@ public class ExecutionConfigAccessor {
                }
        }
 
-       public Configuration getConfiguration() {
-               return configuration;
+       void applyToConfiguration(final Configuration effectiveConfig) {
+               effectiveConfig.addAll(configuration);
        }
 
        public List<URL> getJarFilePaths() {
diff --git 
a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java 
b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
index 19bc2a0..b86d5f4 100644
--- 
a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
+++ 
b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.api.java;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.scala.FlinkILoop;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
@@ -29,22 +25,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.BDDMockito;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.net.URL;
-import java.util.List;
 
 import scala.Option;
-import scala.tools.nsc.Settings;
-import scala.tools.nsc.settings.MutableSettings;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -52,49 +34,24 @@ import static org.junit.Assert.assertTrue;
 /**
  * Integration tests for {@link FlinkILoop}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(PlanExecutor.class)
-@PowerMockIgnore("javax.tools.*")
 public class FlinkILoopTest extends TestLogger {
 
        @Test
-       public void testConfigurationForwarding() throws Exception {
+       public void testConfigurationForwarding() {
                Configuration configuration = new Configuration();
                configuration.setString("foobar", "foobar");
-               FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, 
configuration, Option.<String[]>empty());
 
-               final TestPlanExecutor testPlanExecutor = new 
TestPlanExecutor();
-
-               PowerMockito.mockStatic(PlanExecutor.class);
-               BDDMockito.given(PlanExecutor.createRemoteExecutor(
-                       Matchers.anyString(),
-                       Matchers.anyInt(),
-                       Matchers.any(Configuration.class)
-               )).willAnswer(new Answer<PlanExecutor>() {
-                       @Override
-                       public PlanExecutor answer(InvocationOnMock invocation) 
throws Throwable {
-                               testPlanExecutor.setHost((String) 
invocation.getArguments()[0]);
-                               testPlanExecutor.setPort((Integer) 
invocation.getArguments()[1]);
-                               
testPlanExecutor.setConfiguration((Configuration) invocation.getArguments()[2]);
-                               return testPlanExecutor;
-                       }
-               });
-
-               Settings settings = new Settings();
-               ((MutableSettings.BooleanSetting) 
settings.usejavacp()).value_$eq(true);
-
-               flinkILoop.settings_$eq(settings);
-               flinkILoop.createInterpreter();
+               FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, 
configuration, Option.<String[]>empty());
 
                ExecutionEnvironment env = flinkILoop.scalaBenv().getJavaEnv();
 
-               env.fromElements(1).output(new 
DiscardingOutputFormat<Integer>());
+               assertTrue(env instanceof RemoteEnvironment);
 
-               env.execute("Test job");
+               RemoteEnvironment remoteEnv = (RemoteEnvironment) env;
 
-               Configuration forwardedConfiguration = 
testPlanExecutor.getConfiguration();
+               Configuration configUnderTest = 
remoteEnv.getExecutorConfiguration();
 
-               assertEquals(configuration, forwardedConfiguration);
+               assertEquals(configuration, configUnderTest);
        }
 
        @Test
@@ -114,60 +71,4 @@ public class FlinkILoopTest extends TestLogger {
 
                assertEquals(configuration, forwardedConfiguration);
        }
-
-       static class TestPlanExecutor extends PlanExecutor {
-
-               private String host;
-               private int port;
-               private Configuration configuration;
-               private List<String> jars;
-               private List<String> globalClasspaths;
-
-               @Override
-               public JobExecutionResult executePlan(
-                               Pipeline plan, List<URL> jarFiles, List<URL> 
globalClasspaths) throws Exception {
-                       return null;
-               }
-
-               public String getHost() {
-                       return host;
-               }
-
-               public void setHost(String host) {
-                       this.host = host;
-               }
-
-               public int getPort() {
-                       return port;
-               }
-
-               public void setPort(int port) {
-                       this.port = port;
-               }
-
-               public Configuration getConfiguration() {
-                       return configuration;
-               }
-
-               public void setConfiguration(Configuration configuration) {
-                       this.configuration = configuration;
-               }
-
-               public List<String> getJars() {
-                       return jars;
-               }
-
-               public void setJars(List<String> jars) {
-                       this.jars = jars;
-               }
-
-               public List<String> getGlobalClasspaths() {
-                       return globalClasspaths;
-               }
-
-               public void setGlobalClasspaths(List<String> globalClasspaths) {
-                       this.globalClasspaths = globalClasspaths;
-               }
-       }
-
 }

Reply via email to