This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executors in repository https://gitbox.apache.org/repos/asf/flink.git
commit c7ef9ed387334ca2bc9034d4fba82406e60a9fa9 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Mon Nov 11 14:58:45 2019 +0100 possible fix --- .../java/org/apache/flink/client/ClientUtils.java | 5 - .../org/apache/flink/client/cli/CliFrontend.java | 24 +++-- .../flink/client/cli/ExecutionConfigAccessor.java | 4 +- .../org/apache/flink/api/java/FlinkILoopTest.java | 111 ++------------------- 4 files changed, 21 insertions(+), 123 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index 96a02ea..b5537e2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -29,10 +29,8 @@ import org.apache.flink.client.program.DetachedJobExecutionResult; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramMissingJobException; -import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.ExecutorServiceLoader; import org.apache.flink.runtime.client.JobExecutionException; @@ -185,9 +183,6 @@ public enum ClientUtils { final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>(); - // TODO: 11.11.19 this should move to the appropriate place -// ConfigUtils.encodeStreamToConfig(configuration, PipelineOptions.JARS, program.getAllLibraries().stream(), URL::toString); - final ContextEnvironmentFactory factory = new ContextEnvironmentFactory( executorServiceLoader, configuration, diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index f53095b..780eadd 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -35,10 +35,12 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.ProgramMissingJobException; import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.plugin.PluginUtils; @@ -176,7 +178,6 @@ public class CliFrontend { final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true); final ProgramOptions programOptions = new ProgramOptions(commandLine); - final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions); // evaluate help flag if (commandLine.hasOption(HELP_OPTION.getOpt())) { @@ -186,11 +187,12 @@ public class CliFrontend { if (!programOptions.isPython()) { // Java program should be specified a JAR file - if (executionParameters.getJarFilePaths().isEmpty()) { + if (programOptions.getJarFilePath() == null) { throw new CliArgsException("Java program should be specified a JAR file."); } } + final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions); final PackagedProgram program; try { LOG.info("Building program from JAR file"); @@ -201,8 +203,7 @@ public class CliFrontend { } final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine); - - final Configuration effectiveConfig = getEffectiveConfiguration(commandLine, executionParameters, customCommandLine); + final Configuration effectiveConfig = getEffectiveConfiguration(program, customCommandLine, commandLine, executionParameters); try { execute(effectiveConfig, program); } finally { @@ -214,13 +215,14 @@ public class CliFrontend { ClientUtils.runProgram(clusterClientServiceLoader, configuration, program); } - private Configuration getEffectiveConfiguration(CommandLine commandLine, ExecutionConfigAccessor executionParameters, CustomCommandLine customCommandLine) throws FlinkException { - // TODO: 01.11.19 all this should be merged nicely, e.g. executionParameters can - // take an already existing configuration as a parameter. - final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine); - final Configuration executionConfig = executionParameters.getConfiguration(); - final Configuration effectiveConfig = new Configuration(executorConfig); - effectiveConfig.addAll(executionConfig); + private Configuration getEffectiveConfiguration( + final PackagedProgram program, + final CustomCommandLine customCommandLine, + final CommandLine commandLine, + final ExecutionConfigAccessor executionParameters) throws FlinkException { + final Configuration effectiveConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine); + executionParameters.applyToConfiguration(effectiveConfig); + ConfigUtils.encodeStreamToConfig(effectiveConfig, PipelineOptions.JARS, program.getAllLibraries().stream(), URL::toString); return effectiveConfig; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java index e82dd36..05c3946 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java @@ -95,8 +95,8 @@ public class ExecutionConfigAccessor { } } - public Configuration getConfiguration() { - return configuration; + void applyToConfiguration(final Configuration effectiveConfig) { + effectiveConfig.addAll(configuration); } public List<URL> getJarFilePaths() { diff --git a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java index 19bc2a0..b86d5f4 100644 --- a/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java +++ b/flink-scala-shell/src/test/java/org/apache/flink/api/java/FlinkILoopTest.java @@ -18,10 +18,6 @@ package org.apache.flink.api.java; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.PlanExecutor; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.scala.FlinkILoop; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; @@ -29,22 +25,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.TestLogger; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.BDDMockito; -import org.mockito.Matchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.net.URL; -import java.util.List; import scala.Option; -import scala.tools.nsc.Settings; -import scala.tools.nsc.settings.MutableSettings; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -52,49 +34,24 @@ import static org.junit.Assert.assertTrue; /** * Integration tests for {@link FlinkILoop}. */ -@RunWith(PowerMockRunner.class) -@PrepareForTest(PlanExecutor.class) -@PowerMockIgnore("javax.tools.*") public class FlinkILoopTest extends TestLogger { @Test - public void testConfigurationForwarding() throws Exception { + public void testConfigurationForwarding() { Configuration configuration = new Configuration(); configuration.setString("foobar", "foobar"); - FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, configuration, Option.<String[]>empty()); - final TestPlanExecutor testPlanExecutor = new TestPlanExecutor(); - - PowerMockito.mockStatic(PlanExecutor.class); - BDDMockito.given(PlanExecutor.createRemoteExecutor( - Matchers.anyString(), - Matchers.anyInt(), - Matchers.any(Configuration.class) - )).willAnswer(new Answer<PlanExecutor>() { - @Override - public PlanExecutor answer(InvocationOnMock invocation) throws Throwable { - testPlanExecutor.setHost((String) invocation.getArguments()[0]); - testPlanExecutor.setPort((Integer) invocation.getArguments()[1]); - testPlanExecutor.setConfiguration((Configuration) invocation.getArguments()[2]); - return testPlanExecutor; - } - }); - - Settings settings = new Settings(); - ((MutableSettings.BooleanSetting) settings.usejavacp()).value_$eq(true); - - flinkILoop.settings_$eq(settings); - flinkILoop.createInterpreter(); + FlinkILoop flinkILoop = new FlinkILoop("localhost", 6123, configuration, Option.<String[]>empty()); ExecutionEnvironment env = flinkILoop.scalaBenv().getJavaEnv(); - env.fromElements(1).output(new DiscardingOutputFormat<Integer>()); + assertTrue(env instanceof RemoteEnvironment); - env.execute("Test job"); + RemoteEnvironment remoteEnv = (RemoteEnvironment) env; - Configuration forwardedConfiguration = testPlanExecutor.getConfiguration(); + Configuration configUnderTest = remoteEnv.getExecutorConfiguration(); - assertEquals(configuration, forwardedConfiguration); + assertEquals(configuration, configUnderTest); } @Test @@ -114,60 +71,4 @@ public class FlinkILoopTest extends TestLogger { assertEquals(configuration, forwardedConfiguration); } - - static class TestPlanExecutor extends PlanExecutor { - - private String host; - private int port; - private Configuration configuration; - private List<String> jars; - private List<String> globalClasspaths; - - @Override - public JobExecutionResult executePlan( - Pipeline plan, List<URL> jarFiles, List<URL> globalClasspaths) throws Exception { - return null; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public Configuration getConfiguration() { - return configuration; - } - - public void setConfiguration(Configuration configuration) { - this.configuration = configuration; - } - - public List<String> getJars() { - return jars; - } - - public void setJars(List<String> jars) { - this.jars = jars; - } - - public List<String> getGlobalClasspaths() { - return globalClasspaths; - } - - public void setGlobalClasspaths(List<String> globalClasspaths) { - this.globalClasspaths = globalClasspaths; - } - } - }