This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cffa2e8647f74e0332cad011d937015ac9ac031e Author: Kostas Kloudas <[email protected]> AuthorDate: Tue Nov 12 09:12:37 2019 +0100 [hotfix] CliFrontend.buildProgram() uses only ProgramOptions --- .../org/apache/flink/client/cli/CliFrontend.java | 21 +++++---- .../flink/client/cli/ExecutionConfigAccessor.java | 26 ----------- .../client/cli/CliFrontendPackageProgramTest.java | 50 +++++++++------------- 3 files changed, 31 insertions(+), 66 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 9552978..6e9b2f9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -185,7 +185,6 @@ public class CliFrontend { final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true); final ProgramOptions programOptions = new ProgramOptions(commandLine); - final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions); // evaluate help flag if (commandLine.hasOption(HELP_OPTION.getOpt())) { @@ -195,7 +194,7 @@ public class CliFrontend { if (!programOptions.isPython()) { // Java program should be specified a JAR file - if (executionParameters.getJarFilePath() == null) { + if (programOptions.getJarFilePath() == null) { throw new CliArgsException("Java program should be specified a JAR file."); } } @@ -203,7 +202,7 @@ public class CliFrontend { final PackagedProgram program; try { LOG.info("Building program from JAR file"); - program = buildProgram(programOptions, executionParameters); + program = buildProgram(programOptions); } catch (FileNotFoundException e) { throw new CliArgsException("Could not build the program from JAR file.", e); @@ -211,7 +210,10 @@ public class CliFrontend { final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine); final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine); + + final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions); final Configuration executionConfig = executionParameters.getConfiguration(); + try { runProgram(executorConfig, executionConfig, program); } finally { @@ -322,7 +324,6 @@ public class CliFrontend { final CommandLine commandLine = CliFrontendParser.parse(commandOptions, args, true); final ProgramOptions programOptions = new ProgramOptions(commandLine); - final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions); // evaluate help flag if (commandLine.hasOption(HELP_OPTION.getOpt())) { @@ -337,7 +338,7 @@ public class CliFrontend { // -------- build the packaged program ------------- LOG.info("Building program from JAR file"); - final PackagedProgram program = buildProgram(programOptions, executionParameters); + final PackagedProgram program = buildProgram(programOptions); try { int parallelism = programOptions.getParallelism(); @@ -768,12 +769,10 @@ public class CliFrontend { * * @return A PackagedProgram (upon success) */ - PackagedProgram buildProgram( - final ProgramOptions runOptions, - final ExecutionConfigAccessor executionParameters) throws FileNotFoundException, ProgramInvocationException { + PackagedProgram buildProgram(final ProgramOptions runOptions) throws FileNotFoundException, ProgramInvocationException { String[] programArgs = runOptions.getProgramArgs(); - String jarFilePath = executionParameters.getJarFilePath(); - List<URL> classpaths = executionParameters.getClasspaths(); + String jarFilePath = runOptions.getJarFilePath(); + List<URL> classpaths = runOptions.getClasspaths(); // Get assembler class String entryPointClass = runOptions.getEntryPointClassName(); @@ -803,7 +802,7 @@ public class CliFrontend { .setArguments(programArgs) .build(); - program.setSavepointRestoreSettings(executionParameters.getSavepointRestoreSettings()); + program.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings()); return program; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java index 77c0440..ee32449 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java @@ -27,10 +27,8 @@ import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; -import java.io.File; import java.net.MalformedURLException; import java.net.URL; -import java.util.Collections; import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -67,39 +65,15 @@ public class ExecutionConfigAccessor { ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, options.getClasspaths(), URL::toString); - parseJarURLToConfig(options.getJarFilePath(), configuration); - SavepointRestoreSettings.toConfiguration(options.getSavepointRestoreSettings(), configuration); return new ExecutionConfigAccessor(configuration); } - private static void parseJarURLToConfig(final String jarFile, final Configuration configuration) { - if (jarFile == null) { - return; - } - - try { - final URL jarUrl = new File(jarFile).getAbsoluteFile().toURI().toURL(); - final List<URL> jarUrlSingleton = Collections.singletonList(jarUrl); - ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.JARS, jarUrlSingleton, URL::toString); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("JAR file path invalid", e); - } - } - public Configuration getConfiguration() { return configuration; } - public String getJarFilePath() { - final List<URL> jarURL = decodeUrlList(configuration, PipelineOptions.JARS); - if (jarURL != null && !jarURL.isEmpty()) { - return jarURL.get(0).getPath(); - } - return null; - } - public List<URL> getClasspaths() { return decodeUrlList(configuration, PipelineOptions.CLASSPATHS); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index 2b64c82..75771cc 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -80,11 +80,10 @@ public class CliFrontendPackageProgramTest extends TestLogger { @Test public void testNonExistingJarFile() throws Exception { ProgramOptions programOptions = mock(ProgramOptions.class); - ExecutionConfigAccessor executionOptions = mock(ExecutionConfigAccessor.class); - when(executionOptions.getJarFilePath()).thenReturn("/some/none/existing/path"); + when(programOptions.getJarFilePath()).thenReturn("/some/none/existing/path"); try { - frontend.buildProgram(programOptions, executionOptions); + frontend.buildProgram(programOptions); fail("should throw an exception"); } catch (FileNotFoundException e) { @@ -95,12 +94,11 @@ public class CliFrontendPackageProgramTest extends TestLogger { @Test public void testFileNotJarFile() throws Exception { ProgramOptions programOptions = mock(ProgramOptions.class); - ExecutionConfigAccessor executionOptions = mock(ExecutionConfigAccessor.class); - when(executionOptions.getJarFilePath()).thenReturn(getNonJarFilePath()); + when(programOptions.getJarFilePath()).thenReturn(getNonJarFilePath()); when(programOptions.getProgramArgs()).thenReturn(new String[0]); try { - frontend.buildProgram(programOptions, executionOptions); + frontend.buildProgram(programOptions); fail("should throw an exception"); } catch (ProgramInvocationException e) { @@ -120,13 +118,12 @@ public class CliFrontendPackageProgramTest extends TestLogger { CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true); ProgramOptions programOptions = new ProgramOptions(commandLine); - ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions); - assertEquals(getTestJarPath(), executionOptions.getJarFilePath()); - assertArrayEquals(classpath, executionOptions.getClasspaths().toArray()); + assertEquals(getTestJarPath(), programOptions.getJarFilePath()); + assertArrayEquals(classpath, programOptions.getClasspaths().toArray()); assertArrayEquals(reducedArguments, programOptions.getProgramArgs()); - PackagedProgram prog = frontend.buildProgram(programOptions, executionOptions); + PackagedProgram prog = frontend.buildProgram(programOptions); Assert.assertArrayEquals(reducedArguments, prog.getArguments()); Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName()); @@ -144,13 +141,12 @@ public class CliFrontendPackageProgramTest extends TestLogger { CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true); ProgramOptions programOptions = new ProgramOptions(commandLine); - ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions); - assertEquals(getTestJarPath(), executionOptions.getJarFilePath()); - assertArrayEquals(classpath, executionOptions.getClasspaths().toArray()); + assertEquals(getTestJarPath(), programOptions.getJarFilePath()); + assertArrayEquals(classpath, programOptions.getClasspaths().toArray()); assertArrayEquals(reducedArguments, programOptions.getProgramArgs()); - PackagedProgram prog = frontend.buildProgram(programOptions, executionOptions); + PackagedProgram prog = frontend.buildProgram(programOptions); Assert.assertArrayEquals(reducedArguments, prog.getArguments()); Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName()); @@ -168,13 +164,12 @@ public class CliFrontendPackageProgramTest extends TestLogger { CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true); ProgramOptions programOptions = new ProgramOptions(commandLine); - ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions); - assertEquals(getTestJarPath(), executionOptions.getJarFilePath()); - assertArrayEquals(classpath, executionOptions.getClasspaths().toArray()); + assertEquals(getTestJarPath(), programOptions.getJarFilePath()); + assertArrayEquals(classpath, programOptions.getClasspaths().toArray()); assertArrayEquals(reducedArguments, programOptions.getProgramArgs()); - PackagedProgram prog = frontend.buildProgram(programOptions, executionOptions); + PackagedProgram prog = frontend.buildProgram(programOptions); Assert.assertArrayEquals(reducedArguments, prog.getArguments()); Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName()); @@ -197,14 +192,13 @@ public class CliFrontendPackageProgramTest extends TestLogger { CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true); ProgramOptions programOptions = new ProgramOptions(commandLine); - ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions); - assertEquals(arguments[4], executionOptions.getJarFilePath()); - assertArrayEquals(classpath, executionOptions.getClasspaths().toArray()); + assertEquals(arguments[4], programOptions.getJarFilePath()); + assertArrayEquals(classpath, programOptions.getClasspaths().toArray()); assertArrayEquals(reducedArguments, programOptions.getProgramArgs()); try { - frontend.buildProgram(programOptions, executionOptions); + frontend.buildProgram(programOptions); fail("Should fail with an exception"); } catch (FileNotFoundException e) { @@ -218,13 +212,12 @@ public class CliFrontendPackageProgramTest extends TestLogger { CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true); ProgramOptions programOptions = new ProgramOptions(commandLine); - ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions); - assertEquals(arguments[0], executionOptions.getJarFilePath()); + assertEquals(arguments[0], programOptions.getJarFilePath()); assertArrayEquals(new String[0], programOptions.getProgramArgs()); try { - frontend.buildProgram(programOptions, executionOptions); + frontend.buildProgram(programOptions); } catch (FileNotFoundException e) { // that's what we want @@ -279,14 +272,13 @@ public class CliFrontendPackageProgramTest extends TestLogger { CommandLine commandLine = CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, arguments, true); ProgramOptions programOptions = new ProgramOptions(commandLine); - ExecutionConfigAccessor executionOptions = ExecutionConfigAccessor.fromProgramOptions(programOptions); - assertEquals(getTestJarPath(), executionOptions.getJarFilePath()); - assertArrayEquals(classpath, executionOptions.getClasspaths().toArray()); + assertEquals(getTestJarPath(), programOptions.getJarFilePath()); + assertArrayEquals(classpath, programOptions.getClasspaths().toArray()); assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, programOptions.getEntryPointClassName()); assertArrayEquals(reducedArguments, programOptions.getProgramArgs()); - PackagedProgram prog = spy(frontend.buildProgram(programOptions, executionOptions)); + PackagedProgram prog = spy(frontend.buildProgram(programOptions)); ClassLoader testClassLoader = new ClassLoader(prog.getUserCodeClassLoader()) { @Override
