[
https://issues.apache.org/jira/browse/FLINK-32011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719803#comment-17719803
]
Benchao Li edited comment on FLINK-32011 at 5/5/23 12:01 PM:
-------------------------------------------------------------
JDBC filter pushdown was introduced via FLINK-16024, and it's released in
jdbc-3.1 with flink 1.16/1.17 versions. Unfortunately flink 1.15 does not
support this.
was (Author: libenchao):
JDBC filter pushdown was introduced via FLINK-16024, and it's released in
jdbc-3.1 with flink 1.6/1.7 versions. Unfortunately flink 1.5 does not support
this.
> flink1.15.2 loaded all the data in the table in mysql5.7
> --------------------------------------------------------
>
> Key: FLINK-32011
> URL: https://issues.apache.org/jira/browse/FLINK-32011
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / JDBC
> Affects Versions: 1.15.2
> Environment: flink1.15.2
> mysql5.7
> Reporter: xueyongyang
> Priority: Minor
> Fix For: 1.15.2
>
>
> CREATE TABLE `T_P_FILTER_MERCHANT_DAY_RES2` (
> `MERCHANT_NO` varchar(200) NOT NULL,
> `ACT_ID` varchar(50) NOT NULL,
> `RULE_ID` varchar(50) NOT NULL,
> `SUM_MONEY` decimal(25,5) DEFAULT NULL,
> `SUM_NUM` decimal(25,5) DEFAULT NULL,
> `DATE_DT` int NOT NULL,
> `DATE_TYPE` varchar(50) DEFAULT NULL,
> `BEGIN_DATE` int DEFAULT NULL,
> `END_DATE` int DEFAULT NULL,
> `ID` bigint NOT NULL AUTO_INCREMENT,
> PRIMARY KEY (`ID`),
> UNIQUE KEY `T_P_FILTER_MERCHANT_DAY_RES2_UN`
> (`MERCHANT_NO`,`ACT_ID`,`RULE_ID`,`DATE_DT`)
> ) ENGINE=InnoDB AUTO_INCREMENT=88 DEFAULT CHARSET=utf8mb4
> COLLATE=utf8mb4_0900_ai_ci;
>
> CREATE TABLE `T_P_RED_STAND_RESULT` (
> `ACTIVITY_NO` varchar(50) ,
> `RULE_ID` varchar(50) ,
> `WEIGHT` int NOT NULL ,
> `DEAL_TYPE` varchar(50) ,
> `DATA_TYPE` varchar(50) ,
> `DATA_NO` varchar(50) ,
> `USER_TYPE` varchar(50) ,
> `USER_NO` varchar(50) ,
> `SUM_MONEY` decimal(25,5) ,
> `SUM_NUM` decimal(25,5) ,
> `DATE_TYPE` varchar(50) ,
> `BEGIN_TIME` int NOT NULL ,
> `END_TIME` int NOT NULL ,
> `TRADE_MONEY` decimal(25,5) DEFAULT NULL,
> `EXTEND_DATA_MEAN` varchar(50) ,
> `DATA1` varchar(50) ,
> `DATA2` varchar(50) ,
> `DATA3` varchar(50) ,
> `DATA4` varchar(50) ,
> `DATA5` varchar(50) ,
> `CK_REACH` varchar(10) ,
> `READ_STATUS` varchar(10) ,
> `PROCESS_STATUS` varchar(50) ,
> `PROCESS_STATUS_DESC` varchar(200) ,
> `RESULT_TYPE` varchar(50) ,
> `MANAGER_NO` varchar(50) ,
> `MANAGER_ORG_NO` varchar(50) ,
> `CALCULATION_DATE` int NOT NULL ,
> `CALCULATION_TIME` datetime DEFAULT NULL ,
> `CREATE_TIME` datetime DEFAULT NULL ,
> `CREATE_USER_NO` varchar(50) ,
> `MAINTENANCE_TIME` varchar(50) ,
> `MAINTENANCE_USER_NO` varchar(50) ,
> `ID` bigint NOT NULL AUTO_INCREMENT,
> PRIMARY KEY (`ID`),
> UNIQUE KEY `T_P_RED_STAND_RESULT_UN`
> (`ACTIVITY_NO`,`USER_NO`,`BEGIN_TIME`,`END_TIME`)
> ) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8mb4
> COLLATE=utf8mb4_0900_ai_ci;
>
> INSERT INTO
> T_P_RED_STAND_RESULT ( ACTIVITY_NO,
> RULE_ID,
> WEIGHT,
> DEAL_TYPE,
> DATA_TYPE,
> DATA_NO,
> USER_TYPE,
> USER_NO,
> SUM_MONEY,
> SUM_NUM,
> DATE_TYPE,
> BEGIN_TIME,
> END_TIME,
> TRADE_MONEY,
> EXTEND_DATA_MEAN,
> DATA1,
> DATA2,
> DATA3,
> DATA4,
> DATA5,
> CK_REACH,
> READ_STATUS,
> PROCESS_STATUS,
> PROCESS_STATUS_DESC,
> RESULT_TYPE,
> MANAGER_NO,
> MANAGER_ORG_NO,
> CALCULATION_DATE,
> CALCULATION_TIME,
> CREATE_TIME,
> CREATE_USER_NO,
> MAINTENANCE_TIME,
> MAINTENANCE_USER_NO)
> select
> 'abc' as ACTIVITY_NO,
> 'def' as RULE_ID,
> 1 as WEIGHT,
> 'red' as DEAL_TYPE,
> 'gear' as DATA_TYPE,
> '001010102' as DATA_NO,
> 'merchantRed' as USER_TYPE,
> r.MERCHANT_NO as MERCHANT_NO,
> r.SUM_MONEY as SUM_MONEY,
> r.SUM_NUM as SUM_NUM ,
> r.DATE_TYPE as DATE_TYPE ,
> r.BEGIN_DATE as BEGIN_DATE,
> r.END_DATE as END_DATE,
> 0 as TRADE_MONEY,
> 'other' as EXTEND_DATA_MEAN,
> if(0.1 * r.SUM_MONEY >= 10 and 0.1 * r.SUM_MONEY <= 100, 0.1 *
> r.SUM_MONEY, if(0.1 * r.SUM_MONEY<10, 10, if(0.1 * r.SUM_MONEY>100, 100, 0.1
> * r.SUM_MONEY))) as DATA1,
> '' as DATA2,
> '' as DATA3,
> '' as DATA4,
> '' as DATA5,
> '1' as CK_REACH,
> '0' as READ_STATUS,
> '' as PROCESS_STATUS,
> '' as PROCESS_STATUS_DESC,
> 'flink-batch' as RESULT_TYPE,
> '' as MANAGER_NO,
> '' as MANAGER_ORG_NO,
> r.DATE_DT as CALCULATION_DATE,
> LOCALTIMESTAMP as CALCULATION_TIME,
> LOCALTIMESTAMP as CREATE_TIME,
> 'system' as CREATE_USER_NO,
> '' as MAINTENANCE_TIME,
> '' as MAINTENANCE_USER_NO
> from
> T_P_FILTER_MERCHANT_OTHER_DATE_RES2 r
> where
> 1 = 1
> and r.ACT_ID = 'abc'
> and r.RULE_ID = 'def'
> and r.DATE_DT = 20221028
> and r.MERCHANT_NO not in ( select u.USER_NO from T_P_RED_STAND_RESULT u
> where u.ACTIVITY_NO = 'abc' and u.CALCULATION_DATE = 20221028 and u.WEIGHT>=1)
> and r.MERCHANT_NO not in ( select u.USER_NO from T_P_RED_STAND_RESULT u
> where u.ACTIVITY_NO = 'abc' and u.READ_STATUS = '1')
>
> We found that when executing the above flink sql, we feel that flink has
> loaded all the data in T_P_FILTER_MERCHANT_DAY_RES2 into the memory, and then
> converted the where statement into a java filter condition, and fetched the
> filtered data in the memory. We have this judgment for two reasons
> 1. The error message of flink is oom, memory overflow
> 2. The feedback from the dba said that we have done a full table query of the
> T_P_FILTER_MERCHANT_DAY_RES2 table
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)