This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a6557f66435973b65967af7dd1d893f748f7feae
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Jun 11 12:08:15 2020 +0200

    [FLINK-18241] Use correct user class loader in OptimizerPlanEnvironment & 
StreamPlanEnvironment
    
    This closes #12607
---
 flink-clients/pom.xml                              |   8 +
 .../client/program/OptimizerPlanEnvironment.java   |   4 +-
 .../flink/client/program/PackagedProgramUtils.java |  10 +-
 .../client/program/StreamPlanEnvironment.java      |   4 +-
 .../program/PackagedProgramUtilsPipelineTest.java  | 194 +++++++++++++++++++++
 .../client/program/PackagedProgramUtilsTest.java   |  97 +----------
 .../flink/api/java/ExecutionEnvironment.java       |  13 +-
 .../environment/StreamExecutionEnvironment.java    |  15 +-
 8 files changed, 245 insertions(+), 100 deletions(-)

diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 6505844..65a55fa 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -88,6 +88,14 @@ under the License.
                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
        </dependencies>
 
        <!-- More information on this:
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
index 4f9b40e..20d2274 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -35,8 +35,8 @@ public class OptimizerPlanEnvironment extends 
ExecutionEnvironment {
                return pipeline;
        }
 
-       public OptimizerPlanEnvironment(Configuration configuration, int 
parallelism) {
-               super(configuration);
+       public OptimizerPlanEnvironment(Configuration configuration, 
ClassLoader userClassloader, int parallelism) {
+               super(configuration, userClassloader);
                if (parallelism > 0) {
                        setParallelism(parallelism);
                }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 0db1686..2c4c7b6 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -135,9 +135,15 @@ public enum PackagedProgramUtils {
                }
 
                // temporary hack to support the optimizer plan preview
-               OptimizerPlanEnvironment benv = new 
OptimizerPlanEnvironment(configuration, parallelism);
+               OptimizerPlanEnvironment benv = new OptimizerPlanEnvironment(
+                       configuration,
+                       program.getUserCodeClassLoader(),
+                       parallelism);
                benv.setAsContext();
-               StreamPlanEnvironment senv = new 
StreamPlanEnvironment(configuration, parallelism);
+               StreamPlanEnvironment senv = new StreamPlanEnvironment(
+                       configuration,
+                       program.getUserCodeClassLoader(),
+                       parallelism);
                senv.setAsContext();
 
                try {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java
index b2ce783..7f086f9 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StreamPlanEnvironment.java
@@ -38,8 +38,8 @@ public class StreamPlanEnvironment extends 
StreamExecutionEnvironment {
                return pipeline;
        }
 
-       public StreamPlanEnvironment(Configuration configuration, int 
parallelism) {
-               super(configuration);
+       public StreamPlanEnvironment(Configuration configuration, ClassLoader 
userClassLoader, int parallelism) {
+               super(configuration, userClassLoader);
                if (parallelism > 0) {
                        setParallelism(parallelism);
                }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
new file mode 100644
index 0000000..4c87db5
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsPipelineTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.testutils.ClassLoaderUtils;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link PackagedProgramUtils} methods that should be executed for
+ * {@link StreamExecutionEnvironment} and {@link Environment}.
+ */
+@RunWith(Parameterized.class)
+public class PackagedProgramUtilsPipelineTest {
+
+       @Parameterized.Parameter
+       public TestParameter testParameter;
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Parameterized.Parameters
+       public static Collection<TestParameter> parameters() {
+               return Arrays.asList(
+                       TestParameter.of(DataSetTestProgram.class, pipeline -> 
((Plan) pipeline).getExecutionConfig()),
+                       TestParameter.of(DataStreamTestProgram.class, pipeline 
-> ((StreamGraph) pipeline).getExecutionConfig())
+               );
+       }
+
+       /**
+        * This tests whether configuration forwarding from a {@link 
Configuration} to the environment
+        * works.
+        */
+       @Test
+       public void testConfigurationForwarding() throws Exception {
+               // we want to test forwarding with this config, ensure that the 
default is what we expect.
+               assertThat(
+                       
ExecutionEnvironment.getExecutionEnvironment().getConfig().isAutoTypeRegistrationDisabled(),
+                       is(false));
+
+               PackagedProgram packagedProgram = PackagedProgram.newBuilder()
+                       
.setEntryPointClassName(testParameter.entryClass().getName())
+                       .build();
+
+               Configuration config = new Configuration();
+               config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false);
+
+               Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
+                       packagedProgram,
+                       config,
+                       1 /* parallelism */,
+                       false /* suppress output */);
+
+               ExecutionConfig executionConfig = 
testParameter.extractExecutionConfig(pipeline);
+
+               // we want to test forwarding with this config, ensure that the 
default is what we expect.
+               assertThat(executionConfig.isAutoTypeRegistrationDisabled(), 
is(true));
+       }
+
+       @Test
+       public void testUserClassloaderForConfiguration() throws Exception {
+               String userSerializerClassName = "UserSerializer";
+               List<URL> userUrls = getClassUrls(userSerializerClassName);
+
+               PackagedProgram packagedProgram = PackagedProgram.newBuilder()
+                       .setUserClassPaths(userUrls)
+                       
.setEntryPointClassName(testParameter.entryClass().getName())
+                       .build();
+
+               Configuration config = new Configuration();
+               config.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, 
Collections.singletonList(
+                       String.format(
+                               "class:%s,serializer:%s",
+                               
PackagedProgramUtilsPipelineTest.class.getName(),
+                               userSerializerClassName)
+               ));
+
+               Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
+                       packagedProgram,
+                       config,
+                       1 /* parallelism */,
+                       false /* suppress output */);
+
+               ExecutionConfig executionConfig = 
testParameter.extractExecutionConfig(pipeline);
+
+               assertThat(
+                       
executionConfig.getDefaultKryoSerializerClasses().get(PackagedProgramUtilsPipelineTest.class).getName(),
+                       is(userSerializerClassName));
+       }
+
+       private List<URL> getClassUrls(String className) throws IOException {
+               URLClassLoader urlClassLoader = 
ClassLoaderUtils.compileAndLoadJava(
+                       temporaryFolder.newFolder(),
+                       className + ".java",
+                       "import com.esotericsoftware.kryo.Kryo;\n" +
+                               "import 
com.esotericsoftware.kryo.Serializer;\n" +
+                               "import com.esotericsoftware.kryo.io.Input;\n" +
+                               "import com.esotericsoftware.kryo.io.Output;\n"
+                               + "public class " + className + " extends 
Serializer {\n" +
+                               "\t@Override\n" +
+                               "\tpublic void write(\n" +
+                               "\t\tKryo kryo,\n" +
+                               "\t\tOutput output,\n" +
+                               "\t\tObject object) {\n" +
+                               "\t}\n" +
+                               "\n" +
+                               "\t@Override\n" +
+                               "\tpublic Object read(Kryo kryo, Input input, 
Class type) {\n" +
+                               "\t\treturn null;\n" +
+                               "\t}\n" +
+                               "}");
+               return Arrays.asList(urlClassLoader.getURLs());
+       }
+
+       private interface TestParameter {
+               Class<?> entryClass();
+
+               ExecutionConfig extractExecutionConfig(Pipeline pipeline);
+
+               static TestParameter of(Class<?> entryClass, Function<Pipeline, 
ExecutionConfig> executionConfigExtractor) {
+                       return new TestParameter() {
+                               @Override
+                               public Class<?> entryClass() {
+                                       return entryClass;
+                               }
+
+                               @Override
+                               public ExecutionConfig 
extractExecutionConfig(Pipeline pipeline) {
+                                       return 
executionConfigExtractor.apply(pipeline);
+                               }
+                       };
+               }
+       }
+
+       /** Test Program for the DataSet API. */
+       public static class DataSetTestProgram {
+               public static void main(String[] args) throws Exception {
+                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+                       env.fromElements("hello").print();
+                       env.execute();
+               }
+       }
+
+       /** Test Program for the DataStream API. */
+       public static class DataStreamTestProgram {
+               public static void main(String[] args) throws Exception {
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.fromElements("hello").print();
+                       env.execute();
+               }
+       }
+
+}
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java
index aa271c5..8c110a7 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramUtilsTest.java
@@ -18,14 +18,8 @@
 
 package org.apache.flink.client.program;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.streaming.api.datastream.DataStream;
 
 import org.junit.Test;
 
@@ -38,59 +32,12 @@ import static org.junit.Assert.assertThat;
 
 /**
  * Tests {@link PackagedProgramUtils}.
+ *
+ * <p>See also {@link PackagedProgramUtilsPipelineTest} for tests that need to 
test behaviour of
+ * {@link DataStream} and {@link DataSet} programs.
  */
 public class PackagedProgramUtilsTest {
 
-       /**
-        * This tests whether configuration forwarding from a {@link 
Configuration} to the environment
-        * works.
-        */
-       @Test
-       public void testDataSetConfigurationForwarding() throws Exception {
-               
assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig());
-
-               PackagedProgram packagedProgram = PackagedProgram.newBuilder()
-                               
.setEntryPointClassName(DataSetTestProgram.class.getName())
-                               .build();
-
-               Configuration config = createConfigurationWithOption();
-
-               Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
-                               packagedProgram,
-                               config,
-                               1 /* parallelism */,
-                               false /* suppress output */);
-
-               ExecutionConfig executionConfig = ((Plan) 
pipeline).getExecutionConfig();
-
-               assertExpectedOption(executionConfig);
-       }
-
-       /**
-        * This tests whether configuration forwarding from a {@link 
Configuration} to the environment
-        * works.
-        */
-       @Test
-       public void testDataStreamConfigurationForwarding() throws Exception {
-               
assertPrecondition(ExecutionEnvironment.getExecutionEnvironment().getConfig());
-
-               PackagedProgram packagedProgram = PackagedProgram.newBuilder()
-                               
.setEntryPointClassName(DataStreamTestProgram.class.getName())
-                               .build();
-
-               Configuration config = createConfigurationWithOption();
-
-               Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(
-                               packagedProgram,
-                               config,
-                               1 /* parallelism */,
-                               false /* suppress output */);
-
-               ExecutionConfig executionConfig = ((StreamGraph) 
pipeline).getExecutionConfig();
-
-               assertExpectedOption(executionConfig);
-       }
-
        @Test
        public void testResolveURI() throws URISyntaxException {
                final String relativeFile = "path/of/user.jar";
@@ -111,38 +58,4 @@ public class PackagedProgramUtilsTest {
                assertThat(resolveURI(localSchemaFile).getScheme(), 
is("local"));
                assertThat(resolveURI(localSchemaFile).toString(), 
is(localSchemaFile));
        }
-
-       private static void assertPrecondition(ExecutionConfig executionConfig) 
{
-               // we want to test forwarding with this config, ensure that the 
default is what we expect.
-               assertThat(executionConfig.isAutoTypeRegistrationDisabled(), 
is(false));
-       }
-
-       private static void assertExpectedOption(ExecutionConfig 
executionConfig) {
-               // we want to test forwarding with this config, ensure that the 
default is what we expect.
-               assertThat(executionConfig.isAutoTypeRegistrationDisabled(), 
is(true));
-       }
-
-       private static Configuration createConfigurationWithOption() {
-               Configuration config = new Configuration();
-               config.set(PipelineOptions.AUTO_TYPE_REGISTRATION, false);
-               return config;
-       }
-
-       /** Test Program for the DataSet API. */
-       public static class DataSetTestProgram {
-               public static void main(String[] args) throws Exception {
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       env.fromElements("hello").print();
-                       env.execute();
-               }
-       }
-
-       /** Test Program for the DataStream API. */
-       public static class DataStreamTestProgram {
-               public static void main(String[] args) throws Exception {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.fromElements("hello").print();
-                       env.execute();
-               }
-       }
 }
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 75c8dd1..9f0089d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -146,7 +146,18 @@ public class ExecutionEnvironment {
         */
        @PublicEvolving
        public ExecutionEnvironment(final Configuration configuration) {
-               this(DefaultExecutorServiceLoader.INSTANCE, configuration, 
null);
+               this(configuration, null);
+       }
+
+       /**
+        * Creates a new {@link ExecutionEnvironment} that will use the given 
{@link Configuration} to
+        * configure the {@link PipelineExecutor}.
+        *
+        * <p>In addition, this constructor allows specifying the user code 
{@link ClassLoader}.
+        */
+       @PublicEvolving
+       public ExecutionEnvironment(final Configuration configuration, final 
ClassLoader userClassloader) {
+               this(DefaultExecutorServiceLoader.INSTANCE, configuration, 
userClassloader);
        }
 
        /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7ee04e7..c688506 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -190,7 +190,20 @@ public class StreamExecutionEnvironment {
         */
        @PublicEvolving
        public StreamExecutionEnvironment(final Configuration configuration) {
-               this(DefaultExecutorServiceLoader.INSTANCE, configuration, 
null);
+               this(configuration, null);
+       }
+
+       /**
+        * Creates a new {@link StreamExecutionEnvironment} that will use the 
given {@link
+        * Configuration} to configure the {@link PipelineExecutor}.
+        *
+        * <p>In addition, this constructor allows specifying the user code 
{@link ClassLoader}.
+        */
+       @PublicEvolving
+       public StreamExecutionEnvironment(
+                       final Configuration configuration,
+                       final ClassLoader userClassloader) {
+               this(DefaultExecutorServiceLoader.INSTANCE, configuration, 
userClassloader);
        }
 
        /**

Reply via email to