This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new 0dd3b4ce9f0 [FLINK-33316][runtime] Avoid unnecessary heavy 
getStreamOperatorFactory
0dd3b4ce9f0 is described below

commit 0dd3b4ce9f0b9f193926445bf9c1f8579fa86161
Author: Rui Fan <[email protected]>
AuthorDate: Thu Oct 19 20:38:15 2023 +0800

    [FLINK-33316][runtime] Avoid unnecessary heavy getStreamOperatorFactory
    
    This closes #23550.
---
 .../apache/flink/streaming/api/graph/StreamConfig.java   | 16 +++++++++++++---
 .../flink/streaming/runtime/tasks/OperatorChain.java     |  6 ++----
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 267289c181f..2fb5b81d4a5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -78,7 +78,12 @@ public class StreamConfig implements Serializable {
     //  Config Keys
     // ------------------------------------------------------------------------
 
-    @VisibleForTesting public static final String SERIALIZEDUDF = 
"serializedUDF";
+    public static final String SERIALIZED_UDF = "serializedUDF";
+    /**
+     * Introduce serializedUdfClassName to avoid unnecessarily heavy {@link
+     * #getStreamOperatorFactory}.
+     */
+    public static final String SERIALIZED_UDF_CLASS_NAME = 
"serializedUdfClassName";
 
     private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
     private static final String NUMBER_OF_NETWORK_INPUTS = 
"numberOfNetworkInputs";
@@ -368,7 +373,8 @@ public class StreamConfig implements Serializable {
 
     public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
         if (factory != null) {
-            toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory);
+            toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
+            config.setString(SERIALIZED_UDF_CLASS_NAME, 
factory.getClass().getName());
         }
     }
 
@@ -380,7 +386,7 @@ public class StreamConfig implements Serializable {
 
     public <T extends StreamOperatorFactory<?>> T 
getStreamOperatorFactory(ClassLoader cl) {
         try {
-            return InstantiationUtil.readObjectFromConfig(this.config, 
SERIALIZEDUDF, cl);
+            return InstantiationUtil.readObjectFromConfig(this.config, 
SERIALIZED_UDF, cl);
         } catch (ClassNotFoundException e) {
             String classLoaderInfo = 
ClassLoaderUtil.getUserCodeClassLoaderInfo(cl);
             boolean loadableDoubleCheck = 
ClassLoaderUtil.validateClassLoadable(e, cl);
@@ -400,6 +406,10 @@ public class StreamConfig implements Serializable {
         }
     }
 
+    public String getStreamOperatorFactoryClassName() {
+        return config.getString(SERIALIZED_UDF_CLASS_NAME, null);
+    }
+
     public void setIterationId(String iterationId) {
         config.setString(ITERATION_ID, iterationId);
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b581990e8c8..1afee0f75b1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -641,9 +641,7 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
     @Nullable
     private Counter getOperatorRecordsOutCounter(
             StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
-        ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
-        StreamOperatorFactory<?> operatorFactory =
-                operatorConfig.getStreamOperatorFactory(userCodeClassloader);
+        String streamOperatorFactoryClassName = 
operatorConfig.getStreamOperatorFactoryClassName();
         // Do not use the numRecordsOut counter on output if this operator is 
SinkWriterOperator.
         //
         // Metric "numRecordsOut" is defined as the total number of records 
written to the
@@ -651,7 +649,7 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
         // number of records sent to downstream operators, which is number of 
Committable batches
         // sent to SinkCommitter. So we skip registering this metric on output 
and leave this metric
         // to sink writer implementations to report.
-        if (operatorFactory instanceof SinkWriterOperatorFactory) {
+        if 
(SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName))
 {
             return null;
         }
 

Reply via email to