[ 
https://issues.apache.org/jira/browse/FLINK-36787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengyuan updated FLINK-36787:
------------------------------
    Summary: In Flink SQL stream mode, joining a Kafka source with a JDBC 
dimension table caused a Yarn node to run out of memory (OOM).  (was: Flink SQL 
mode  kafka join jdbc Dimension Table Yarn node OOM)

> In Flink SQL stream mode, joining a Kafka source with a JDBC dimension table 
> caused a Yarn node to run out of memory (OOM).
> ---------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36787
>                 URL: https://issues.apache.org/jira/browse/FLINK-36787
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.17.2
>         Environment: Flink 1.17.2
> hadoop 3.2.3
> jdk:1.8
>            Reporter: zhengyuan
>            Priority: Critical
>         Attachments: flink-join-oom.log
>
>
> In Flink SQL stream mode, I used a Kafka source to join with a JDBC dimension 
> table. When the join node processed over 10 million records, the task was 
> repeatedly re-initialized until an OutOfMemoryError (OOM) occurred. After 
> changing the join type to a Temporal Join, the task node's garbage collection 
> (GC) behavior normalized.
>  
> inner join :
> CREATE TEMPORARY VIEW transform_tableJoin_effq515l0_ AS select 
> `time`,`value`,`id`,`eng_name`, PROCTIME() as proc_time from ( SELECT 
> source_kafka_rpPDIjxy5x.`time` AS `time`,source_kafka_rpPDIjxy5x.`value` AS 
> `value`,source_kafka_rpPDIjxy5x.`id` AS 
> `id`,source_meta_9rzxhLIL4w.`eng_name` AS `eng_name` FROM 
> source_kafka_rpPDIjxy5x  
>  inner join  source_meta_9rzxhLIL4w  on 
> source_kafka_rpPDIjxy5x.id=source_meta_9rzxhLIL4w.attr_id
>   ) ;
> changed to Temporal Join,:
> CREATETEMPORARYVIEW transform_tableJoin_effq515l0_ AS
> select `time`,`value`,`id`,`attr_eng_name`, instance_eng_name, PROCTIME() as 
> proc_time
> from (
>   SELECT source_kafka_rpPDIjxy5x.`time` AS `time`,
>   source_kafka_rpPDIjxy5x.`value` AS `value`,
>   source_kafka_rpPDIjxy5x.`id` AS `id`,
>   v_meta_attr_processing.`attr_eng_name` AS `attr_eng_name`,
>   v_meta_attr_processing.instance_eng_name as instance_eng_name
>  FROM source_kafka_rpPDIjxy5x  
>  inner join  v_meta_attr_processing
>  FOR SYSTEM_TIME AS OF source_kafka_rpPDIjxy5x.auto_row_time
>   on source_kafka_rpPDIjxy5x.id=v_meta_attr_processing.attr_id
>   ) ;



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to