Rui Fan created FLINK-33315:
-------------------------------
Summary: Optimize memory usage of large StreamOperator
Key: FLINK-33315
URL: https://issues.apache.org/jira/browse/FLINK-33315
Project: Flink
Issue Type: Improvement
Components: Runtime / Configuration
Affects Versions: 1.17.0, 1.18.0
Reporter: Rui Fan
Assignee: Rui Fan
Attachments: image-2023-10-19-16-28-16-077.png
Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM
always fail with java.lang.OutOfMemoryError: Java heap space.
Here is a example: a hive table with a lot of data, and the
HiveSource#partitionBytes is 281MB.
After analysis, the root cause is that TM maintains the big object with 3
replicas:
* Replica_1: SourceOperatorFactory (it's necessary for running task)
* Replica_2: Temporarily generate the duplicate SourceOperatorFactory object.
** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code
link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646])
** When creating a successor operator to a SourceOperator, the call stack is:
*** OperatorChain#createOperatorChain ->
*** wrapOperatorIntoOutput ->
*** getOperatorRecordsOutCounter ->
*** operatorConfig.getStreamOperatorFactory(userCodeClassloader)
** It will generate the SourceOperatorFactory temporarily and just check
whether it's SinkWriterOperatorFactory
* Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color}
** It is used to generate SourceOperatorFactory.
** Now the value is always maintained in heap memory.
** However, after generating we can release it or store it in the disk if
needed.
*** We can define a threshold, when the value size is less than threshold, the
release strategy doesn't take effect.
** If so, we can save a lot of heap memory.
These three replicas use about 800MB of memory. Please note that this is just a
subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same time,
so 12 replicas are maintained in the TM memory, it's about 3.3 GB.
These large objects in the JVM cannot be recycled, causing TM to frequently OOM.
This JIRA is focused on optimizing Replica_2 and Replica_3.
!image-2023-10-19-16-28-16-077.png!
!https://f.haiserve.com/download/5366d5f07c07a00116b148c6fa1ebff00b010200001cc3da0438a0860702016976849360726a?userid=146850&token=d4a7e7d617dc71ea28bf02977333e1a8|width=1935,height=1127!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)