[ https://issues.apache.org/jira/browse/FLINK-36787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhengyuan updated FLINK-36787: ------------------------------ Summary: In Flink SQL stream mode, joining a Kafka source with a JDBC dimension table caused a Yarn node to run out of memory (OOM). (was: Flink SQL mode kafka join jdbc Dimension Table Yarn node OOM) > In Flink SQL stream mode, joining a Kafka source with a JDBC dimension table > caused a Yarn node to run out of memory (OOM). > --------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-36787 > URL: https://issues.apache.org/jira/browse/FLINK-36787 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.17.2 > Environment: Flink 1.17.2 > hadoop 3.2.3 > jdk:1.8 > Reporter: zhengyuan > Priority: Critical > Attachments: flink-join-oom.log > > > In Flink SQL stream mode, I used a Kafka source to join with a JDBC dimension > table. When the join node processed over 10 million records, the task was > repeatedly re-initialized until an OutOfMemoryError (OOM) occurred. After > changing the join type to a Temporal Join, the task node's garbage collection > (GC) behavior normalized. > > inner join : > CREATE TEMPORARY VIEW transform_tableJoin_effq515l0_ AS select > `time`,`value`,`id`,`eng_name`, PROCTIME() as proc_time from ( SELECT > source_kafka_rpPDIjxy5x.`time` AS `time`,source_kafka_rpPDIjxy5x.`value` AS > `value`,source_kafka_rpPDIjxy5x.`id` AS > `id`,source_meta_9rzxhLIL4w.`eng_name` AS `eng_name` FROM > source_kafka_rpPDIjxy5x > inner join source_meta_9rzxhLIL4w on > source_kafka_rpPDIjxy5x.id=source_meta_9rzxhLIL4w.attr_id > ) ; > changed to Temporal Join,: > CREATETEMPORARYVIEW transform_tableJoin_effq515l0_ AS > select `time`,`value`,`id`,`attr_eng_name`, instance_eng_name, PROCTIME() as > proc_time > from ( > SELECT source_kafka_rpPDIjxy5x.`time` AS `time`, > source_kafka_rpPDIjxy5x.`value` AS `value`, > source_kafka_rpPDIjxy5x.`id` AS `id`, > v_meta_attr_processing.`attr_eng_name` AS `attr_eng_name`, > v_meta_attr_processing.instance_eng_name as instance_eng_name > FROM source_kafka_rpPDIjxy5x > inner join v_meta_attr_processing > FOR SYSTEM_TIME AS OF source_kafka_rpPDIjxy5x.auto_row_time > on source_kafka_rpPDIjxy5x.id=v_meta_attr_processing.attr_id > ) ; -- This message was sent by Atlassian Jira (v8.20.10#820010)