xueyongyang created FLINK-32011:
-----------------------------------
Summary: flink1.15.2 loaded all the data in the table in mysql5.7
Key: FLINK-32011
URL: https://issues.apache.org/jira/browse/FLINK-32011
Project: Flink
Issue Type: Improvement
Components: Connectors / JDBC
Affects Versions: 1.15.2
Environment: flink1.15.2
mysql5.7
Reporter: xueyongyang
Fix For: 1.15.2
CREATE TABLE `T_P_FILTER_MERCHANT_DAY_RES2` (
`MERCHANT_NO` varchar(200) NOT NULL,
`ACT_ID` varchar(50) NOT NULL,
`RULE_ID` varchar(50) NOT NULL,
`SUM_MONEY` decimal(25,5) DEFAULT NULL,
`SUM_NUM` decimal(25,5) DEFAULT NULL,
`DATE_DT` int NOT NULL,
`DATE_TYPE` varchar(50) DEFAULT NULL,
`BEGIN_DATE` int DEFAULT NULL,
`END_DATE` int DEFAULT NULL,
`ID` bigint NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`ID`),
UNIQUE KEY `T_P_FILTER_MERCHANT_DAY_RES2_UN`
(`MERCHANT_NO`,`ACT_ID`,`RULE_ID`,`DATE_DT`)
) ENGINE=InnoDB AUTO_INCREMENT=88 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `T_P_RED_STAND_RESULT` (
`ACTIVITY_NO` varchar(50) ,
`RULE_ID` varchar(50) ,
`WEIGHT` int NOT NULL ,
`DEAL_TYPE` varchar(50) ,
`DATA_TYPE` varchar(50) ,
`DATA_NO` varchar(50) ,
`USER_TYPE` varchar(50) ,
`USER_NO` varchar(50) ,
`SUM_MONEY` decimal(25,5) ,
`SUM_NUM` decimal(25,5) ,
`DATE_TYPE` varchar(50) ,
`BEGIN_TIME` int NOT NULL ,
`END_TIME` int NOT NULL ,
`TRADE_MONEY` decimal(25,5) DEFAULT NULL,
`EXTEND_DATA_MEAN` varchar(50) ,
`DATA1` varchar(50) ,
`DATA2` varchar(50) ,
`DATA3` varchar(50) ,
`DATA4` varchar(50) ,
`DATA5` varchar(50) ,
`CK_REACH` varchar(10) ,
`READ_STATUS` varchar(10) ,
`PROCESS_STATUS` varchar(50) ,
`PROCESS_STATUS_DESC` varchar(200) ,
`RESULT_TYPE` varchar(50) ,
`MANAGER_NO` varchar(50) ,
`MANAGER_ORG_NO` varchar(50) ,
`CALCULATION_DATE` int NOT NULL ,
`CALCULATION_TIME` datetime DEFAULT NULL ,
`CREATE_TIME` datetime DEFAULT NULL ,
`CREATE_USER_NO` varchar(50) ,
`MAINTENANCE_TIME` varchar(50) ,
`MAINTENANCE_USER_NO` varchar(50) ,
`ID` bigint NOT NULL AUTO_INCREMENT,
PRIMARY KEY (`ID`),
UNIQUE KEY `T_P_RED_STAND_RESULT_UN`
(`ACTIVITY_NO`,`USER_NO`,`BEGIN_TIME`,`END_TIME`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO
T_P_RED_STAND_RESULT ( ACTIVITY_NO,
RULE_ID,
WEIGHT,
DEAL_TYPE,
DATA_TYPE,
DATA_NO,
USER_TYPE,
USER_NO,
SUM_MONEY,
SUM_NUM,
DATE_TYPE,
BEGIN_TIME,
END_TIME,
TRADE_MONEY,
EXTEND_DATA_MEAN,
DATA1,
DATA2,
DATA3,
DATA4,
DATA5,
CK_REACH,
READ_STATUS,
PROCESS_STATUS,
PROCESS_STATUS_DESC,
RESULT_TYPE,
MANAGER_NO,
MANAGER_ORG_NO,
CALCULATION_DATE,
CALCULATION_TIME,
CREATE_TIME,
CREATE_USER_NO,
MAINTENANCE_TIME,
MAINTENANCE_USER_NO)
select
'abc' as ACTIVITY_NO,
'def' as RULE_ID,
1 as WEIGHT,
'red' as DEAL_TYPE,
'gear' as DATA_TYPE,
'001010102' as DATA_NO,
'merchantRed' as USER_TYPE,
r.MERCHANT_NO as MERCHANT_NO,
r.SUM_MONEY as SUM_MONEY,
r.SUM_NUM as SUM_NUM ,
r.DATE_TYPE as DATE_TYPE ,
r.BEGIN_DATE as BEGIN_DATE,
r.END_DATE as END_DATE,
0 as TRADE_MONEY,
'other' as EXTEND_DATA_MEAN,
if(0.1 * r.SUM_MONEY >= 10 and 0.1 * r.SUM_MONEY <= 100, 0.1 * r.SUM_MONEY,
if(0.1 * r.SUM_MONEY<10, 10, if(0.1 * r.SUM_MONEY>100, 100, 0.1 *
r.SUM_MONEY))) as DATA1,
'' as DATA2,
'' as DATA3,
'' as DATA4,
'' as DATA5,
'1' as CK_REACH,
'0' as READ_STATUS,
'' as PROCESS_STATUS,
'' as PROCESS_STATUS_DESC,
'flink-batch' as RESULT_TYPE,
'' as MANAGER_NO,
'' as MANAGER_ORG_NO,
r.DATE_DT as CALCULATION_DATE,
LOCALTIMESTAMP as CALCULATION_TIME,
LOCALTIMESTAMP as CREATE_TIME,
'system' as CREATE_USER_NO,
'' as MAINTENANCE_TIME,
'' as MAINTENANCE_USER_NO
from
T_P_FILTER_MERCHANT_OTHER_DATE_RES2 r
where
1 = 1
and r.ACT_ID = 'abc'
and r.RULE_ID = 'def'
and r.DATE_DT = 20221028
and r.MERCHANT_NO not in ( select u.USER_NO from T_P_RED_STAND_RESULT u
where u.ACTIVITY_NO = 'abc' and u.CALCULATION_DATE = 20221028 and u.WEIGHT>=1)
and r.MERCHANT_NO not in ( select u.USER_NO from T_P_RED_STAND_RESULT u
where u.ACTIVITY_NO = 'abc' and u.READ_STATUS = '1')
We found that when executing the above flink sql, we feel that flink has loaded
all the data in T_P_FILTER_MERCHANT_DAY_RES2 into the memory, and then
converted the where statement into a java filter condition, and fetched the
filtered data in the memory. We have this judgment for two reasons
1. The error message of flink is oom, memory overflow
2. The feedback from the dba said that we have done a full table query of the
T_P_FILTER_MERCHANT_DAY_RES2 table
--
This message was sent by Atlassian Jira
(v8.20.10#820010)