This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a6557f66435973b65967af7dd1d893f748f7feae Author: Dawid Wysakowicz <[email protected]> AuthorDate: Thu Jun 11 12:08:15 2020 +0200 [FLINK-18241] Use correct user class loader in OptimizerPlanEnvironment & StreamPlanEnvironment This closes #12607 --- flink-clients/pom.xml | 8 + .../client/program/OptimizerPlanEnvironment.java | 4 +- .../flink/client/program/PackagedProgramUtils.java | 10 +- .../client/program/StreamPlanEnvironment.java | 4 +- .../program/PackagedProgramUtilsPipelineTest.java | 194 +++++++++++++++++++++ .../client/program/PackagedProgramUtilsTest.java | 97 +---------- .../flink/api/java/ExecutionEnvironment.java | 13 +- .../environment/StreamExecutionEnvironment.java | 15 +- 8 files changed, 245 insertions(+), 100 deletions(-) diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml index 6505844..65a55fa 100644 --- a/flink-clients/pom.xml +++ b/flink-clients/pom.xml @@ -88,6 +88,14 @@ under the License. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> </dependencies> <!-- More information on this: diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java index 4f9b40e..20d2274 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java @@ -35,8 +35,8 @@ public class OptimizerPlanEnvironment extends ExecutionEnvironment { return pipeline; } - public OptimizerPlanEnvironment(Configuration configuration, int parallelism) { - super(configuration); + public OptimizerPlanEnvironment(Configuration configuration, ClassLoader userClassloader, int parallelism) { + super(configuration, userClassloader); if (parallelism > 0) { setParallelism(parallelism); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index 0db1686..2c4c7b6 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -135,9 +135,15 @@ public enum PackagedProgramUtils { } // temporary hack to support the optimizer plan preview - OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(configuration, parallelism); + OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment( + configuration, + program.getUserCodeClassLoader(), + parallelism); benv.setAsContext(); - StreamPlanEnvironment senv = new StreamPlanEnvironment(configuration, parallelism); + StreamPlanEnvironment senv = new StreamPlanEnvironment( + configuration, + program.getUserCodeClassLoader(), + parallelism); senv.setAsContext(); try { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java index b2ce783..7f086f9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java @@ -38,8 +38,8 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment { return pipeline; } - public StreamPlanEnvironment(Configuration configuration, int parallelism) { - super(configuration); + public StreamPlanEnvironment(Configuration configuration, ClassLoader userClassLoader, int parallelism) { + super(configuration, userClassLoader); if (parallelism > 0) { setParallelism(parallelism); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java new file mode 100644 index 0000000..4c87db5 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.testutils.ClassLoaderUtils; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link PackagedProgramUtils} methods that should be executed for + * {@link StreamExecutionEnvironment} and {@link Environment}. + */ +@RunWith(Parameterized.class) +public class PackagedProgramUtilsPipelineTest { + + @Parameterized.Parameter + public TestParameter testParameter; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Parameterized.Parameters + public static Collection<TestParameter> parameters() { + return Arrays.asList( + TestParameter.of(DataSetTestProgram.class, pipeline -> ((Plan) pipeline).getExecutionConfig()), + TestParameter.of(DataStreamTestProgram.class, pipeline -> ((StreamGraph) pipeline).getExecutionConfig()) + ); + } + + /** + * This tests whether configuration forwarding from a {@link Configuration} to the environment + * works. + */ + @Test + public void testConfigurationForwarding() throws Exception { + // we want to test forwarding with this config, ensure that the default is what we expect. + assertThat( + ExecutionEnvironment.getExecutionEnvironment().getConfig().isAutoTypeRegistrationDisabled(), + is(false)); + + PackagedProgram packagedProgram = PackagedProgram.newBuilder() + .setEntryPointClassName(testParameter.entryClass().getName()) + .build(); + + Configuration config = new Configuration(); + config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false); + + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram( + packagedProgram, + config, + 1 /* parallelism */, + false /* suppress output */); + + ExecutionConfig executionConfig = testParameter.extractExecutionConfig(pipeline); + + // we want to test forwarding with this config, ensure that the default is what we expect. + assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true)); + } + + @Test + public void testUserClassloaderForConfiguration() throws Exception { + String userSerializerClassName = "UserSerializer"; + List<URL> userUrls = getClassUrls(userSerializerClassName); + + PackagedProgram packagedProgram = PackagedProgram.newBuilder() + .setUserClassPaths(userUrls) + .setEntryPointClassName(testParameter.entryClass().getName()) + .build(); + + Configuration config = new Configuration(); + config.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, Collections.singletonList( + String.format( + "class:%s,serializer:%s", + PackagedProgramUtilsPipelineTest.class.getName(), + userSerializerClassName) + )); + + Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram( + packagedProgram, + config, + 1 /* parallelism */, + false /* suppress output */); + + ExecutionConfig executionConfig = testParameter.extractExecutionConfig(pipeline); + + assertThat( + executionConfig.getDefaultKryoSerializerClasses().get(PackagedProgramUtilsPipelineTest.class).getName(), + is(userSerializerClassName)); + } + + private List<URL> getClassUrls(String className) throws IOException { + URLClassLoader urlClassLoader = ClassLoaderUtils.compileAndLoadJava( + temporaryFolder.newFolder(), + className + ".java", + "import com.esotericsoftware.kryo.Kryo;\n" + + "import com.esotericsoftware.kryo.Serializer;\n" + + "import com.esotericsoftware.kryo.io.Input;\n" + + "import com.esotericsoftware.kryo.io.Output;\n" + + "public class " + className + " extends Serializer {\n" + + "\t@Override\n" + + "\tpublic void write(\n" + + "\t\tKryo kryo,\n" + + "\t\tOutput output,\n" + + "\t\tObject object) {\n" + + "\t}\n" + + "\n" + + "\t@Override\n" + + "\tpublic Object read(Kryo kryo, Input input, Class type) {\n" + + "\t\treturn null;\n" + + "\t}\n" + + "}"); + return Arrays.asList(urlClassLoader.getURLs()); + } + + private interface TestParameter { + Class<?> entryClass(); + + ExecutionConfig extractExecutionConfig(Pipeline pipeline); + + static TestParameter of(Class<?> entryClass, Function<Pipeline, ExecutionConfig> executionConfigExtractor) { + return new TestParameter() { + @Override + public Class<?> entryClass() { + return entryClass; + } + + @Override + public ExecutionConfig extractExecutionConfig(Pipeline pipeline) { + return executionConfigExtractor.apply(pipeline); + } + }; + } + } + + /** Test Program for the DataSet API. */ + public static class DataSetTestProgram { + public static void main(String[] args) throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.fromElements("hello").print(); + env.execute(); + } + } + + /** Test Program for the DataStream API. */ + public static class DataStreamTestProgram { + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements("hello").print(); + env.execute(); + } + } + +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java index aa271c5..8c110a7 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java @@ -18,14 +18,8 @@ package org.apache.flink.client.program; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.PipelineOptions; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.streaming.api.datastream.DataStream; import org.junit.Test; @@ -38,59 +32,12 @@ import static org.junit.Assert.assertThat; /** * Tests {@link PackagedProgramUtils}. + * + * <p>See also {@link PackagedProgramUtilsPipelineTest} for tests that need to test behaviour of + * {@link DataStream} and {@link DataSet} programs. */ public class PackagedProgramUtilsTest { - /** - * This tests whether configuration forwarding from a {@link Configuration} to the environment - * works. - */ - @Test - public void testDataSetConfigurationForwarding() throws Exception { - assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig()); - - PackagedProgram packagedProgram = PackagedProgram.newBuilder() - .setEntryPointClassName(DataSetTestProgram.class.getName()) - .build(); - - Configuration config = createConfigurationWithOption(); - - Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram( - packagedProgram, - config, - 1 /* parallelism */, - false /* suppress output */); - - ExecutionConfig executionConfig = ((Plan) pipeline).getExecutionConfig(); - - assertExpectedOption(executionConfig); - } - - /** - * This tests whether configuration forwarding from a {@link Configuration} to the environment - * works. - */ - @Test - public void testDataStreamConfigurationForwarding() throws Exception { - assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig()); - - PackagedProgram packagedProgram = PackagedProgram.newBuilder() - .setEntryPointClassName(DataStreamTestProgram.class.getName()) - .build(); - - Configuration config = createConfigurationWithOption(); - - Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram( - packagedProgram, - config, - 1 /* parallelism */, - false /* suppress output */); - - ExecutionConfig executionConfig = ((StreamGraph) pipeline).getExecutionConfig(); - - assertExpectedOption(executionConfig); - } - @Test public void testResolveURI() throws URISyntaxException { final String relativeFile = "path/of/user.jar"; @@ -111,38 +58,4 @@ public class PackagedProgramUtilsTest { assertThat(resolveURI(localSchemaFile).getScheme(), is("local")); assertThat(resolveURI(localSchemaFile).toString(), is(localSchemaFile)); } - - private static void assertPrecondition(ExecutionConfig executionConfig) { - // we want to test forwarding with this config, ensure that the default is what we expect. - assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(false)); - } - - private static void assertExpectedOption(ExecutionConfig executionConfig) { - // we want to test forwarding with this config, ensure that the default is what we expect. - assertThat(executionConfig.isAutoTypeRegistrationDisabled(), is(true)); - } - - private static Configuration createConfigurationWithOption() { - Configuration config = new Configuration(); - config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false); - return config; - } - - /** Test Program for the DataSet API. */ - public static class DataSetTestProgram { - public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.fromElements("hello").print(); - env.execute(); - } - } - - /** Test Program for the DataStream API. */ - public static class DataStreamTestProgram { - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.fromElements("hello").print(); - env.execute(); - } - } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 75c8dd1..9f0089d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -146,7 +146,18 @@ public class ExecutionEnvironment { */ @PublicEvolving public ExecutionEnvironment(final Configuration configuration) { - this(DefaultExecutorServiceLoader.INSTANCE, configuration, null); + this(configuration, null); + } + + /** + * Creates a new {@link ExecutionEnvironment} that will use the given {@link Configuration} to + * configure the {@link PipelineExecutor}. + * + * <p>In addition, this constructor allows specifying the user code {@link ClassLoader}. + */ + @PublicEvolving + public ExecutionEnvironment(final Configuration configuration, final ClassLoader userClassloader) { + this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 7ee04e7..c688506 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -190,7 +190,20 @@ public class StreamExecutionEnvironment { */ @PublicEvolving public StreamExecutionEnvironment(final Configuration configuration) { - this(DefaultExecutorServiceLoader.INSTANCE, configuration, null); + this(configuration, null); + } + + /** + * Creates a new {@link StreamExecutionEnvironment} that will use the given {@link + * Configuration} to configure the {@link PipelineExecutor}. + * + * <p>In addition, this constructor allows specifying the user code {@link ClassLoader}. + */ + @PublicEvolving + public StreamExecutionEnvironment( + final Configuration configuration, + final ClassLoader userClassloader) { + this(DefaultExecutorServiceLoader.INSTANCE, configuration, userClassloader); } /**
