[
https://issues.apache.org/jira/browse/FLINK-33315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17777174#comment-17777174
]
Rui Fan commented on FLINK-33315:
---------------------------------
Hi [~pnowojski] , you may be interested in this optimization. :)
> Optimize memory usage of large StreamOperator
> ---------------------------------------------
>
> Key: FLINK-33315
> URL: https://issues.apache.org/jira/browse/FLINK-33315
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Configuration
> Affects Versions: 1.17.0, 1.18.0
> Reporter: Rui Fan
> Assignee: Rui Fan
> Priority: Major
> Attachments: image-2023-10-19-16-28-16-077.png
>
>
> Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM
> always fail with java.lang.OutOfMemoryError: Java heap space.
>
> Here is a example: a hive table with a lot of data, and the
> HiveSource#partitionBytes is 281MB.
> After analysis, the root cause is that TM maintains the big object with 3
> replicas:
> * Replica_1: SourceOperatorFactory (it's necessary for running task)
> * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object.
> ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code
> link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646])
> ** When creating a successor operator to a SourceOperator, the call stack is:
> *** OperatorChain#createOperatorChain ->
> *** wrapOperatorIntoOutput ->
> *** getOperatorRecordsOutCounter ->
> *** operatorConfig.getStreamOperatorFactory(userCodeClassloader)
> ** It will generate the SourceOperatorFactory temporarily and just check
> whether it's SinkWriterOperatorFactory
> * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color}
> ** It is used to generate SourceOperatorFactory.
> ** Now the value is always maintained in heap memory.
> ** However, after generating we can release it or store it in the disk if
> needed.
> *** We can define a threshold, when the value size is less than threshold,
> the release strategy doesn't take effect.
> ** If so, we can save a lot of heap memory.
> These three replicas use about 800MB of memory. Please note that this is just
> a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same
> time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB.
> These large objects in the JVM cannot be recycled, causing TM to frequently
> OOM.
> This JIRA focus on optimizing Replica_2 and Replica_3.
>
> !image-2023-10-19-16-28-16-077.png!
>
> !https://f.haiserve.com/download/5366d5f07c07a00116b148c6fa1ebff00b010200001cc3da0438a0860702016976849360726a?userid=146850&token=d4a7e7d617dc71ea28bf02977333e1a8|width=1935,height=1127!
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)