This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new fa5f47ea [FLINK-31486] Get the KeySelector with UserClassLoader
fa5f47ea is described below
commit fa5f47ea2a09360143aab5f39b85b373675636ad
Author: JiangXin <[email protected]>
AuthorDate: Tue Mar 21 11:04:12 2023 +0800
[FLINK-31486] Get the KeySelector with UserClassLoader
This closes #226.
---
.../java/org/apache/flink/iteration/operator/OperatorUtils.java | 6 +++---
.../operator/allround/AbstractAllRoundWrapperOperator.java | 3 ++-
.../operator/perround/AbstractPerRoundWrapperOperator.java | 3 ++-
3 files changed, 7 insertions(+), 5 deletions(-)
diff --git
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java
index 8e36a7bb..b3b629fe 100644
---
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java
+++
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java
@@ -89,11 +89,11 @@ public class OperatorUtils {
}
}
- public static StreamConfig createWrappedOperatorConfig(StreamConfig
wrapperConfig) {
+ public static StreamConfig createWrappedOperatorConfig(
+ StreamConfig wrapperConfig, ClassLoader cl) {
StreamConfig wrappedConfig = new
StreamConfig(wrapperConfig.getConfiguration().clone());
for (int i = 0; i < wrappedConfig.getNumberOfNetworkInputs(); ++i) {
- KeySelector keySelector =
- wrapperConfig.getStatePartitioner(i,
OperatorUtils.class.getClassLoader());
+ KeySelector keySelector = wrapperConfig.getStatePartitioner(i, cl);
if (keySelector != null) {
checkState(
keySelector instanceof ProxyKeySelector,
diff --git
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator.java
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator.java
index 1c2d0b56..d3b46ea9 100644
---
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator.java
+++
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator.java
@@ -81,7 +81,8 @@ public abstract class AbstractAllRoundWrapperOperator<T, S
extends StreamOperato
operatorFactory,
(StreamTask)
parameters.getContainingTask(),
OperatorUtils.createWrappedOperatorConfig(
- parameters.getStreamConfig()),
+ parameters.getStreamConfig(),
+
containingTask.getUserCodeClassLoader()),
proxyOutput,
parameters.getOperatorEventDispatcher())
.f0;
diff --git
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
index 0cb1724d..4ae88858 100644
---
a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
+++
b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/perround/AbstractPerRoundWrapperOperator.java
@@ -166,7 +166,8 @@ public abstract class AbstractPerRoundWrapperOperator<T, S
extends StreamOperato
clonedOperatorFactory,
(StreamTask)
parameters.getContainingTask(),
OperatorUtils.createWrappedOperatorConfig(
-
parameters.getStreamConfig()),
+
parameters.getStreamConfig(),
+
containingTask.getUserCodeClassLoader()),
proxyOutput,
parameters.getOperatorEventDispatcher())
.f0;