This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1db174ef6de4a1f281cfda05e399c67cbb7fb05a
Author: Stephan Ewen <[email protected]>
AuthorDate: Sat Nov 21 17:39:46 2020 +0100

    [FLINK-20223][runtime] (part 1) Add the user code classloader to the 
Operator Coordinator Context
---
 .../runtime/operators/coordination/OperatorCoordinator.java  |  6 ++++++
 .../operators/coordination/OperatorCoordinatorHolder.java    | 12 +++++++++++-
 .../coordination/RecreateOnResetOperatorCoordinator.java     |  5 +++++
 .../coordination/MockOperatorCoordinatorContext.java         |  5 +++++
 .../coordination/OperatorCoordinatorHolderTest.java          |  1 +
 5 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index b40e4f6..21b67f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -175,6 +175,12 @@ public interface OperatorCoordinator extends 
CheckpointListener, AutoCloseable {
                 * Gets the current parallelism with which this operator is 
executed.
                 */
                int currentParallelism();
+
+               /**
+                * Gets the classloader that contains the additional 
dependencies, which are not
+                * part of the JVM's classpath.
+                */
+               ClassLoader getUserCodeClassloader();
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index a4c43a8..fa86de0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -319,6 +319,7 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                                        provider,
                                        eventSender,
                                        jobVertex.getName(),
+                                       
jobVertex.getGraph().getUserClassLoader(),
                                        jobVertex.getParallelism(),
                                        jobVertex.getMaxParallelism());
                }
@@ -330,13 +331,14 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                        final OperatorCoordinator.Provider coordinatorProvider,
                        final BiFunction<SerializedValue<OperatorEvent>, 
Integer, CompletableFuture<Acknowledge>> eventSender,
                        final String operatorName,
+                       final ClassLoader userCodeClassLoader,
                        final int operatorParallelism,
                        final int operatorMaxParallelism) throws Exception {
 
                final OperatorEventValve valve = new 
OperatorEventValve(eventSender);
 
                final LazyInitializedCoordinatorContext context = new 
LazyInitializedCoordinatorContext(
-                               opId, valve, operatorName, operatorParallelism);
+                               opId, valve, operatorName, userCodeClassLoader, 
operatorParallelism);
 
                final OperatorCoordinator coordinator = 
coordinatorProvider.create(context);
 
@@ -367,6 +369,7 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                private final OperatorID operatorId;
                private final OperatorEventValve eventValve;
                private final String operatorName;
+               private final ClassLoader userCodeClassLoader;
                private final int operatorParallelism;
 
                private Consumer<Throwable> globalFailureHandler;
@@ -376,10 +379,12 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                                final OperatorID operatorId,
                                final OperatorEventValve eventValve,
                                final String operatorName,
+                               final ClassLoader userCodeClassLoader,
                                final int operatorParallelism) {
                        this.operatorId = checkNotNull(operatorId);
                        this.eventValve = checkNotNull(eventValve);
                        this.operatorName = checkNotNull(operatorName);
+                       this.userCodeClassLoader = 
checkNotNull(userCodeClassLoader);
                        this.operatorParallelism = operatorParallelism;
                }
 
@@ -442,5 +447,10 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                public int currentParallelism() {
                        return operatorParallelism;
                }
+
+               @Override
+               public ClassLoader getUserCodeClassloader() {
+                       return userCodeClassLoader;
+               }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index 5b02c6f..ae67868 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -227,6 +227,11 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
                        return context.currentParallelism();
                }
 
+               @Override
+               public ClassLoader getUserCodeClassloader() {
+                       return context.getUserCodeClassloader();
+               }
+
                @VisibleForTesting
                synchronized void quiesce() {
                        quiesced = true;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
index 6b50c2b..ab8a56b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java
@@ -77,6 +77,11 @@ public class MockOperatorCoordinatorContext implements 
OperatorCoordinator.Conte
                return numSubtasks;
        }
 
+       @Override
+       public ClassLoader getUserCodeClassloader() {
+               return getClass().getClassLoader();
+       }
+
        // -------------------------------
 
        public List<OperatorEvent> getEventsToOperatorBySubtaskId(int 
subtaskId) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index 2c34de6..5962afa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -401,6 +401,7 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
                                provider,
                                eventSender,
                                "test-coordinator-name",
+                               getClass().getClassLoader(),
                                3,
                                1775);
 

Reply via email to