1996fanrui commented on code in PR #23589:
URL: https://github.com/apache/flink/pull/23589#discussion_r1375972599


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -795,6 +795,20 @@ public boolean isGraphContainingLoops() {
         return config.getBoolean(GRAPH_CONTAINING_LOOPS, false);
     }
 
+    /**
+     * In general, we don't clear any configuration. However, the {@link 
#SERIALIZED_UDF} may be
+     * very large when operator includes some large objects, the 
SERIALIZED_UDF is used to create a
+     * StreamOperator and usually only needs to be called once. {@link 
#CHAINED_TASK_CONFIG} may be
+     * large as well due to the StreamConfig of all non-head operators in 
OperatorChain will be
+     * serialized and stored in CHAINED_TASK_CONFIG. They can be cleared to 
reduce the memory after
+     * StreamTask is initialized. If so, TM will have more memory during 
running. See FLINK-33315
+     * and FLINK-33317 for more information.
+     */
+    public void clearInitialConfigs() {
+        config.removeKey(SERIALIZED_UDF);
+        config.removeKey(CHAINED_TASK_CONFIG);

Review Comment:
   Thanks @pnowojski for the quick response!
   
   It's definitely a good suggestion, throwing a error is easy to let developer 
find this improvement if any developer get them in the future.
   
   How about adding a `private final transient Set<String> removedKeys` to save 
them? If so, we don't need to get bytes from config and check whether it's 
removedValue in advance.
   
   Of course, these 2 solution are similar, if you prefer store a special 
value, I can accept it.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java:
##########
@@ -795,6 +795,20 @@ public boolean isGraphContainingLoops() {
         return config.getBoolean(GRAPH_CONTAINING_LOOPS, false);
     }
 
+    /**
+     * In general, we don't clear any configuration. However, the {@link 
#SERIALIZED_UDF} may be
+     * very large when operator includes some large objects, the 
SERIALIZED_UDF is used to create a
+     * StreamOperator and usually only needs to be called once. {@link 
#CHAINED_TASK_CONFIG} may be
+     * large as well due to the StreamConfig of all non-head operators in 
OperatorChain will be
+     * serialized and stored in CHAINED_TASK_CONFIG. They can be cleared to 
reduce the memory after
+     * StreamTask is initialized. If so, TM will have more memory during 
running. See FLINK-33315
+     * and FLINK-33317 for more information.
+     */
+    public void clearInitialConfigs() {
+        config.removeKey(SERIALIZED_UDF);
+        config.removeKey(CHAINED_TASK_CONFIG);
+    }

Review Comment:
   Thanks @RocMarshal for the quick response!
   
   > Do we want to release this part of memory faster here?
   
   If the heap memory isn't enough, the GC can be happened, and these objects 
can be released. So I'm not sure whether it's needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to