[
https://issues.apache.org/jira/browse/FLINK-33315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rui Fan updated FLINK-33315:
----------------------------
Attachment:
130f436613b52b321bd9bd0211dd109f0b0102000020e860f292a13c0702016976850466192b.png
> Optimize memory usage of large StreamOperator
> ---------------------------------------------
>
> Key: FLINK-33315
> URL: https://issues.apache.org/jira/browse/FLINK-33315
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Configuration
> Affects Versions: 1.17.0, 1.18.0
> Reporter: Rui Fan
> Assignee: Rui Fan
> Priority: Major
> Attachments:
> 130f436613b52b321bd9bd0211dd109f0b0102000020e860f292a13c0702016976850466192b.png,
> image-2023-10-19-16-28-16-077.png
>
>
> Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM
> always fail with java.lang.OutOfMemoryError: Java heap space.
>
> Here is a example: a hive table with a lot of data, and the
> HiveSource#partitionBytes is 281MB.
> After analysis, the root cause is that TM maintains the big object with 3
> replicas:
> * Replica_1: SourceOperatorFactory (it's necessary for running task)
> * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object.
> ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code
> link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646])
> ** When creating a successor operator to a SourceOperator, the call stack is:
> *** OperatorChain#createOperatorChain ->
> *** wrapOperatorIntoOutput ->
> *** getOperatorRecordsOutCounter ->
> *** operatorConfig.getStreamOperatorFactory(userCodeClassloader)
> ** It will generate the SourceOperatorFactory temporarily and just check
> whether it's SinkWriterOperatorFactory
> * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color}
> ** It is used to generate SourceOperatorFactory.
> ** Now the value is always maintained in heap memory.
> ** However, after generating we can release it or store it in the disk if
> needed.
> *** We can define a threshold, when the value size is less than threshold,
> the release strategy doesn't take effect.
> ** If so, we can save a lot of heap memory.
> These three replicas use about 800MB of memory. Please note that this is just
> a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same
> time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB.
> These large objects in the JVM cannot be recycled, causing TM to frequently
> OOM.
> This JIRA focus on optimizing Replica_2 and Replica_3.
>
> !image-2023-10-19-16-28-16-077.png!
>
> !https://f.haiserve.com/download/130f436613b52b321bd9bd0211dd109f0b0102000020e860f292a13c0702016976850466192b?userid=146850&token=4e7b7352b30d6e5d2dd2bb7a7479fc93!
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)