This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 0dd3b4ce9f0 [FLINK-33316][runtime] Avoid unnecessary heavy
getStreamOperatorFactory
0dd3b4ce9f0 is described below
commit 0dd3b4ce9f0b9f193926445bf9c1f8579fa86161
Author: Rui Fan <[email protected]>
AuthorDate: Thu Oct 19 20:38:15 2023 +0800
[FLINK-33316][runtime] Avoid unnecessary heavy getStreamOperatorFactory
This closes #23550.
---
.../apache/flink/streaming/api/graph/StreamConfig.java | 16 +++++++++++++---
.../flink/streaming/runtime/tasks/OperatorChain.java | 6 ++----
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 267289c181f..2fb5b81d4a5 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -78,7 +78,12 @@ public class StreamConfig implements Serializable {
// Config Keys
// ------------------------------------------------------------------------
- @VisibleForTesting public static final String SERIALIZEDUDF =
"serializedUDF";
+ public static final String SERIALIZED_UDF = "serializedUDF";
+ /**
+ * Introduce serializedUdfClassName to avoid unnecessarily heavy {@link
+ * #getStreamOperatorFactory}.
+ */
+ public static final String SERIALIZED_UDF_CLASS_NAME =
"serializedUdfClassName";
private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
private static final String NUMBER_OF_NETWORK_INPUTS =
"numberOfNetworkInputs";
@@ -368,7 +373,8 @@ public class StreamConfig implements Serializable {
public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
if (factory != null) {
- toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory);
+ toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
+ config.setString(SERIALIZED_UDF_CLASS_NAME,
factory.getClass().getName());
}
}
@@ -380,7 +386,7 @@ public class StreamConfig implements Serializable {
public <T extends StreamOperatorFactory<?>> T
getStreamOperatorFactory(ClassLoader cl) {
try {
- return InstantiationUtil.readObjectFromConfig(this.config,
SERIALIZEDUDF, cl);
+ return InstantiationUtil.readObjectFromConfig(this.config,
SERIALIZED_UDF, cl);
} catch (ClassNotFoundException e) {
String classLoaderInfo =
ClassLoaderUtil.getUserCodeClassLoaderInfo(cl);
boolean loadableDoubleCheck =
ClassLoaderUtil.validateClassLoadable(e, cl);
@@ -400,6 +406,10 @@ public class StreamConfig implements Serializable {
}
}
+ public String getStreamOperatorFactoryClassName() {
+ return config.getString(SERIALIZED_UDF_CLASS_NAME, null);
+ }
+
public void setIterationId(String iterationId) {
config.setString(ITERATION_ID, iterationId);
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b581990e8c8..1afee0f75b1 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -641,9 +641,7 @@ public abstract class OperatorChain<OUT, OP extends
StreamOperator<OUT>>
@Nullable
private Counter getOperatorRecordsOutCounter(
StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
- ClassLoader userCodeClassloader =
containingTask.getUserCodeClassLoader();
- StreamOperatorFactory<?> operatorFactory =
- operatorConfig.getStreamOperatorFactory(userCodeClassloader);
+ String streamOperatorFactoryClassName =
operatorConfig.getStreamOperatorFactoryClassName();
// Do not use the numRecordsOut counter on output if this operator is
SinkWriterOperator.
//
// Metric "numRecordsOut" is defined as the total number of records
written to the
@@ -651,7 +649,7 @@ public abstract class OperatorChain<OUT, OP extends
StreamOperator<OUT>>
// number of records sent to downstream operators, which is number of
Committable batches
// sent to SinkCommitter. So we skip registering this metric on output
and leave this metric
// to sink writer implementations to report.
- if (operatorFactory instanceof SinkWriterOperatorFactory) {
+ if
(SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName))
{
return null;
}