This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3eac419 [FLINK-11045][table] Set correct UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor 3eac419 is described below commit 3eac4193b31dd4b9bcf0bd09e2adc02bda4a2873 Author: hequn8128 <chenghe...@gmail.com> AuthorDate: Sun Dec 2 19:05:49 2018 +0800 [FLINK-11045][table] Set correct UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor This closes #7213 --- .../api/common/operators/CollectionExecutor.java | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index 55f3df7..3a0b84c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -83,7 +83,7 @@ public class CollectionExecutor { private final Map<String, Aggregator<?>> aggregators; - private final ClassLoader classLoader; + private final ClassLoader userCodeClassLoader; private final ExecutionConfig executionConfig; @@ -99,7 +99,7 @@ public class CollectionExecutor { this.previousAggregates = new HashMap<String, Value>(); this.aggregators = new HashMap<String, Aggregator<?>>(); this.cachedFiles = new HashMap<String, Future<Path>>(); - this.classLoader = getClass().getClassLoader(); + this.userCodeClassLoader = Thread.currentThread().getContextClassLoader(); } // -------------------------------------------------------------------------------------------- @@ -191,8 +191,8 @@ public class CollectionExecutor { MetricGroup metrics = new UnregisteredMetricsGroup(); if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : - new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) : + new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics); } else { ctx = null; } @@ -211,8 +211,8 @@ public class CollectionExecutor { MetricGroup metrics = new UnregisteredMetricsGroup(); if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : - new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) : + new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics); } else { ctx = null; } @@ -237,8 +237,8 @@ public class CollectionExecutor { MetricGroup metrics = new UnregisteredMetricsGroup(); if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : - new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) : + new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics); for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) { List<?> bcData = execute(bcInputs.getValue()); @@ -278,8 +278,8 @@ public class CollectionExecutor { MetricGroup metrics = new UnregisteredMetricsGroup(); if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) : - new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) : + new IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics); for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) { List<?> bcData = execute(bcInputs.getValue());