This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new fa5f47ea [FLINK-31486] Get the KeySelector with UserClassLoader
fa5f47ea is described below

commit fa5f47ea2a09360143aab5f39b85b373675636ad
Author: JiangXin <[email protected]>
AuthorDate: Tue Mar 21 11:04:12 2023 +0800

    [FLINK-31486] Get the KeySelector with UserClassLoader
    
    This closes #226.
---
 .../java/org/apache/flink/iteration/operator/OperatorUtils.java     | 6 +++---
 .../operator/allround/AbstractAllRoundWrapperOperator.java          | 3 ++-
 .../operator/perround/AbstractPerRoundWrapperOperator.java          | 3 ++-
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java
index 8e36a7bb..b3b629fe 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java
@@ -89,11 +89,11 @@ public class OperatorUtils {
         }
     }
 
-    public static StreamConfig createWrappedOperatorConfig(StreamConfig 
wrapperConfig) {
+    public static StreamConfig createWrappedOperatorConfig(
+            StreamConfig wrapperConfig, ClassLoader cl) {
         StreamConfig wrappedConfig = new 
StreamConfig(wrapperConfig.getConfiguration().clone());
         for (int i = 0; i < wrappedConfig.getNumberOfNetworkInputs(); ++i) {
-            KeySelector keySelector =
-                    wrapperConfig.getStatePartitioner(i, 
OperatorUtils.class.getClassLoader());
+            KeySelector keySelector = wrapperConfig.getStatePartitioner(i, cl);
             if (keySelector != null) {
                 checkState(
                         keySelector instanceof ProxyKeySelector,
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator.java
index 1c2d0b56..d3b46ea9 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator.java
@@ -81,7 +81,8 @@ public abstract class AbstractAllRoundWrapperOperator<T, S 
extends StreamOperato
                                         operatorFactory,
                                         (StreamTask) 
parameters.getContainingTask(),
                                         
OperatorUtils.createWrappedOperatorConfig(
-                                                parameters.getStreamConfig()),
+                                                parameters.getStreamConfig(),
+                                                
containingTask.getUserCodeClassLoader()),
                                         proxyOutput,
                                         
parameters.getOperatorEventDispatcher())
                                 .f0;
diff --git 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
index 0cb1724d..4ae88858 100644
--- 
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
+++ 
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
@@ -166,7 +166,8 @@ public abstract class AbstractPerRoundWrapperOperator<T, S 
extends StreamOperato
                                             clonedOperatorFactory,
                                             (StreamTask) 
parameters.getContainingTask(),
                                             
OperatorUtils.createWrappedOperatorConfig(
-                                                    
parameters.getStreamConfig()),
+                                                    
parameters.getStreamConfig(),
+                                                    
containingTask.getUserCodeClassLoader()),
                                             proxyOutput,
                                             
parameters.getOperatorEventDispatcher())
                                     .f0;

Reply via email to