[ 
https://issues.apache.org/jira/browse/FLINK-11045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16709816#comment-16709816
 ] 

ASF GitHub Bot commented on FLINK-11045:
----------------------------------------

asfgit closed pull request #7213: [FLINK-11045][table] Set correct 
UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor
URL: https://github.com/apache/flink/pull/7213
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 55f3df7d31c..3a0b84c8449 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -83,7 +83,7 @@
        
        private final Map<String, Aggregator<?>> aggregators;
        
-       private final ClassLoader classLoader;
+       private final ClassLoader userCodeClassLoader;
        
        private final ExecutionConfig executionConfig;
 
@@ -99,7 +99,7 @@ public CollectionExecutor(ExecutionConfig executionConfig) {
                this.previousAggregates = new HashMap<String, Value>();
                this.aggregators = new HashMap<String, Aggregator<?>>();
                this.cachedFiles = new HashMap<String, Future<Path>>();
-               this.classLoader = getClass().getClassLoader();
+               this.userCodeClassLoader = 
Thread.currentThread().getContextClassLoader();
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -191,8 +191,8 @@ else if (operator instanceof GenericDataSinkBase) {
                MetricGroup metrics = new UnregisteredMetricsGroup();
                        
                if 
(RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                                       new 
IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, 
cachedFiles, accumulators, metrics);
                } else {
                        ctx = null;
                }
@@ -211,8 +211,8 @@ else if (operator instanceof GenericDataSinkBase) {
 
                MetricGroup metrics = new UnregisteredMetricsGroup();
                if 
(RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                                       new 
IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, 
cachedFiles, accumulators, metrics);
                } else {
                        ctx = null;
                }
@@ -237,8 +237,8 @@ else if (operator instanceof GenericDataSinkBase) {
 
                MetricGroup metrics = new UnregisteredMetricsGroup();
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-                                       new 
IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
accumulators, metrics);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                                       new 
IterationRuntimeUDFContext(taskInfo, userCodeClassLoader, executionConfig, 
cachedFiles, accumulators, metrics);
                        
                        for (Map.Entry<String, Operator<?>> bcInputs : 
operator.getBroadcastInputs().entrySet()) {
                                List<?> bcData = execute(bcInputs.getValue());
@@ -278,8 +278,8 @@ else if (operator instanceof GenericDataSinkBase) {
                MetricGroup metrics = new UnregisteredMetricsGroup();
        
                if 
(RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass()))
 {
-                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics) :
-                               new IterationRuntimeUDFContext(taskInfo, 
classLoader, executionConfig, cachedFiles, accumulators, metrics);
+                       ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, 
userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics) :
+                               new IterationRuntimeUDFContext(taskInfo, 
userCodeClassLoader, executionConfig, cachedFiles, accumulators, metrics);
                        
                        for (Map.Entry<String, Operator<?>> bcInputs : 
operator.getBroadcastInputs().entrySet()) {
                                List<?> bcData = execute(bcInputs.getValue());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UserCodeClassLoader has not been set correctly for RuntimeUDFContext in 
> CollectionExecutor
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11045
>                 URL: https://issues.apache.org/jira/browse/FLINK-11045
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
>              Labels: pull-request-available
>
> We should use {{UserCodeClassLoader}} when new a {{RuntimeUDFContext}}.
> i.e., Change the code from
> {code:java}
> this.classLoader = getClass().getClassLoader();
> new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, 
> accumulators, metrics)
> {code}
> to
> {code:java}
> new RuntimeUDFContext(taskInfo, Thread.currentThread().getContextClassLoader, 
> executionConfig, cachedFiles, accumulators, metrics)
> {code}
> in {{CollectionExecutor}}.
> FYI. This is a problem reported from the [user mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-example-Table-program-cannot-be-compiled-This-is-a-bug-Please-file-an-issue-td24852.html].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to