This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1db174ef6de4a1f281cfda05e399c67cbb7fb05a Author: Stephan Ewen <[email protected]> AuthorDate: Sat Nov 21 17:39:46 2020 +0100 [FLINK-20223][runtime] (part 1) Add the user code classloader to the Operator Coordinator Context --- .../runtime/operators/coordination/OperatorCoordinator.java | 6 ++++++ .../operators/coordination/OperatorCoordinatorHolder.java | 12 +++++++++++- .../coordination/RecreateOnResetOperatorCoordinator.java | 5 +++++ .../coordination/MockOperatorCoordinatorContext.java | 5 +++++ .../coordination/OperatorCoordinatorHolderTest.java | 1 + 5 files changed, 28 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java index b40e4f6..21b67f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java @@ -175,6 +175,12 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable { * Gets the current parallelism with which this operator is executed. */ int currentParallelism(); + + /** + * Gets the classloader that contains the additional dependencies, which are not + * part of the JVM's classpath. + */ + ClassLoader getUserCodeClassloader(); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index a4c43a8..fa86de0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -319,6 +319,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC provider, eventSender, jobVertex.getName(), + jobVertex.getGraph().getUserClassLoader(), jobVertex.getParallelism(), jobVertex.getMaxParallelism()); } @@ -330,13 +331,14 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC final OperatorCoordinator.Provider coordinatorProvider, final BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> eventSender, final String operatorName, + final ClassLoader userCodeClassLoader, final int operatorParallelism, final int operatorMaxParallelism) throws Exception { final OperatorEventValve valve = new OperatorEventValve(eventSender); final LazyInitializedCoordinatorContext context = new LazyInitializedCoordinatorContext( - opId, valve, operatorName, operatorParallelism); + opId, valve, operatorName, userCodeClassLoader, operatorParallelism); final OperatorCoordinator coordinator = coordinatorProvider.create(context); @@ -367,6 +369,7 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC private final OperatorID operatorId; private final OperatorEventValve eventValve; private final String operatorName; + private final ClassLoader userCodeClassLoader; private final int operatorParallelism; private Consumer<Throwable> globalFailureHandler; @@ -376,10 +379,12 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC final OperatorID operatorId, final OperatorEventValve eventValve, final String operatorName, + final ClassLoader userCodeClassLoader, final int operatorParallelism) { this.operatorId = checkNotNull(operatorId); this.eventValve = checkNotNull(eventValve); this.operatorName = checkNotNull(operatorName); + this.userCodeClassLoader = checkNotNull(userCodeClassLoader); this.operatorParallelism = operatorParallelism; } @@ -442,5 +447,10 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC public int currentParallelism() { return operatorParallelism; } + + @Override + public ClassLoader getUserCodeClassloader() { + return userCodeClassLoader; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index 5b02c6f..ae67868 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -227,6 +227,11 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { return context.currentParallelism(); } + @Override + public ClassLoader getUserCodeClassloader() { + return context.getUserCodeClassloader(); + } + @VisibleForTesting synchronized void quiesce() { quiesced = true; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java index 6b50c2b..ab8a56b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinatorContext.java @@ -77,6 +77,11 @@ public class MockOperatorCoordinatorContext implements OperatorCoordinator.Conte return numSubtasks; } + @Override + public ClassLoader getUserCodeClassloader() { + return getClass().getClassLoader(); + } + // ------------------------------- public List<OperatorEvent> getEventsToOperatorBySubtaskId(int subtaskId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java index 2c34de6..5962afa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -401,6 +401,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger { provider, eventSender, "test-coordinator-name", + getClass().getClassLoader(), 3, 1775);
