This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eac419  [FLINK-11045][table] Set correct UserCodeClassLoader for 
RuntimeUDFContext in CollectionExecutor
3eac419 is described below

commit 3eac4193b31dd4b9bcf0bd09e2adc02bda4a2873
Author: hequn8128 <chenghe...@gmail.com>
AuthorDate: Sun Dec 2 19:05:49 2018 +0800

    [FLINK-11045][table] Set correct UserCodeClassLoader for RuntimeUDFContext 
in CollectionExecutor
    
    This closes #7213
---
 .../api/common/operators/CollectionExecutor.java     | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 55f3df7..3a0b84c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -83,7 +83,7 @@ public class CollectionExecutor {
        
        private final Map<String, Aggregator<?>> aggregators;
        
-       private final ClassLoader classLoader;
+       private final ClassLoader userCodeClassLoader;
        
        private final ExecutionConfig executionConfig;
 
@@ -99,7 +99,7 @@ public class CollectionExecutor {
                this.previousAggregates = new HashMap<String, Value>();
                this.aggregators = new HashMap<String, Aggregator<?>>();
                this.cachedFiles = new HashMap<String, Future<Path>>();
-               this.classLoader = getClass().getClassLoader();
+               this.userCodeClassLoader = 
Thread.currentThread().getContextClassLoader();
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -191,8 +191,8 @@ public class CollectionExecutor {
                MetricGroup metrics = new UnregisteredMetricsGroup();
                        
                if 
(RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                                       new 
IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, 
cachedFiles, accumulators, metrics);
                } else {
                        ctx = null;
                }
@@ -211,8 +211,8 @@ public class CollectionExecutor {
 
                MetricGroup metrics = new UnregisteredMetricsGroup();
                if 
(RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                                       new 
IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, 
cachedFiles, accumulators, metrics);
                } else {
                        ctx = null;
                }
@@ -237,8 +237,8 @@ public class CollectionExecutor {
 
                MetricGroup metrics = new UnregisteredMetricsGroup();
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                                       new 
IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, 
cachedFiles, accumulators, metrics);
                        
                        for (Map.Entry<String, Operator<?>> bcInputs : 
operator.getBroadcastInputs().entrySet()) {
                                List<?> bcData = execute(bcInputs.getValue());
@@ -278,8 +278,8 @@ public class CollectionExecutor {
                MetricGroup metrics = new UnregisteredMetricsGroup();
        
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-                               new IterationRuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                               new IterationRuntimeUDFContext(taskInfo, 
userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics);
                        
                        for (Map.Entry<String, Operator<?>> bcInputs : 
operator.getBroadcastInputs().entrySet()) {
                                List<?> bcData = execute(bcInputs.getValue());

Reply via email to