[
https://issues.apache.org/jira/browse/FLINK-29772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
shizhengchao updated FLINK-29772:
---------------------------------
Description:
{code:java}
//
"Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle,
watermark=[-(toTimeStamps($2), 10000:INTERVAL SECOND)]]], fields=[data_type,
cluster_name, server_time, server_time_s, client_time, client_time_s, imei,
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type,
handle_type, reach_type, source_type, create_time, msg_id, imsi,
array_info_imei, phone, channel_id, process_time, code, msg, receiver,
content_type, android_version, apk_version]) -> Calc(select=[data_type,
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) ->
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#',
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'),
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
SET "UTF-16LE") AND server_time IS NOT NULL), server_time,
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'),
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
SET "UTF-16LE") AND client_time IS NOT NULL), client_time,
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
10))))) AS client_time, IF(((data_type =
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime],
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19)
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77
{color:red}BLOCKED on java.lang.Object@4aa3fe44 owned by{color} "Legacy Source
Thread - Source: TableSourceScan(table=[[ostream, user_mart,
dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 10000:INTERVAL
SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s,
client_time, client_time_s, imei, request_id, owner_id, service_id, content_id,
sign_id, receiver_type, msg_type, handle_type, reach_type, source_type,
create_time, msg_id, imsi, array_info_imei, phone, channel_id, process_time,
code, msg, receiver, content_type, android_version, apk_version]) ->
Calc(select=[data_type, server_time, client_time, msg_id, array_info_imei,
code, PROCTIME() AS proctime, Reinterpret(toTimeStamps(server_time)) AS
rowtime]) -> Calc(select=[array_info_imei AS imei, REPLACE(msg_id,
_UTF-16LE'#', _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time,
Sarg[(-∞.._UTF-16LE'NULL'), (_UTF-16LE'NULL'.._UTF-16LE'null'),
(_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER SET "UTF-16LE") AND server_time IS NOT
NULL), server_time,
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'),
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
SET "UTF-16LE") AND client_time IS NOT NULL), client_time,
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
10))))) AS client_time, IF(((data_type =
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime],
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19)
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
-{color:red} blocked on java.lang.Object@4aa3fe44{color}
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$556/1624913200.run(Unknown
Source)
{code}
was:
{code:java}
//
"Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle,
watermark=[-(toTimeStamps($2), 10000:INTERVAL SECOND)]]], fields=[data_type,
cluster_name, server_time, server_time_s, client_time, client_time_s, imei,
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type,
handle_type, reach_type, source_type, create_time, msg_id, imsi,
array_info_imei, phone, channel_id, process_time, code, msg, receiver,
content_type, android_version, apk_version]) -> Calc(select=[data_type,
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) ->
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#',
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'),
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
SET "UTF-16LE") AND server_time IS NOT NULL), server_time,
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'),
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
SET "UTF-16LE") AND client_time IS NOT NULL), client_time,
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
10))))) AS client_time, IF(((data_type =
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime],
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19)
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77 BLOCKED on
java.lang.Object@4aa3fe44 owned by "Legacy Source Thread - Source:
TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle,
watermark=[-(toTimeStamps($2), 10000:INTERVAL SECOND)]]], fields=[data_type,
cluster_name, server_time, server_time_s, client_time, client_time_s, imei,
request_id, owner_id, service_id, content_id, sign_id, receiver_type, msg_type,
handle_type, reach_type, source_type, create_time, msg_id, imsi,
array_info_imei, phone, channel_id, process_time, code, msg, receiver,
content_type, android_version, apk_version]) -> Calc(select=[data_type,
server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS
proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) ->
Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#',
_UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'),
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
SET "UTF-16LE") AND server_time IS NOT NULL), server_time,
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'),
(_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
SET "UTF-16LE") AND client_time IS NOT NULL), client_time,
CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
10))))) AS client_time, IF(((data_type =
_UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS
send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime],
where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19)
CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER SET
"UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
- blocked on java.lang.Object@4aa3fe44
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$556/1624913200.run(Unknown
Source)
{code}
> Kafka table source scan blocked
> -------------------------------
>
> Key: FLINK-29772
> URL: https://issues.apache.org/jira/browse/FLINK-29772
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.2
> Reporter: shizhengchao
> Priority: Major
>
> {code:java}
> //
> "Source: TableSourceScan(table=[[ostream, user_mart, dwd_ads_isms_msgmiddle,
> watermark=[-(toTimeStamps($2), 10000:INTERVAL SECOND)]]], fields=[data_type,
> cluster_name, server_time, server_time_s, client_time, client_time_s, imei,
> request_id, owner_id, service_id, content_id, sign_id, receiver_type,
> msg_type, handle_type, reach_type, source_type, create_time, msg_id, imsi,
> array_info_imei, phone, channel_id, process_time, code, msg, receiver,
> content_type, android_version, apk_version]) -> Calc(select=[data_type,
> server_time, client_time, msg_id, array_info_imei, code, PROCTIME() AS
> proctime, Reinterpret(toTimeStamps(server_time)) AS rowtime]) ->
> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#',
> _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'),
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
> SET "UTF-16LE") AND server_time IS NOT NULL), server_time,
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
> 10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'),
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
> SET "UTF-16LE") AND client_time IS NOT NULL), client_time,
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
> 10))))) AS client_time, IF(((data_type =
> _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
> (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS
> send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime],
> where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19)
> CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER
> SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=77
> {color:red}BLOCKED on java.lang.Object@4aa3fe44 owned by{color} "Legacy
> Source Thread - Source: TableSourceScan(table=[[ostream, user_mart,
> dwd_ads_isms_msgmiddle, watermark=[-(toTimeStamps($2), 10000:INTERVAL
> SECOND)]]], fields=[data_type, cluster_name, server_time, server_time_s,
> client_time, client_time_s, imei, request_id, owner_id, service_id,
> content_id, sign_id, receiver_type, msg_type, handle_type, reach_type,
> source_type, create_time, msg_id, imsi, array_info_imei, phone, channel_id,
> process_time, code, msg, receiver, content_type, android_version,
> apk_version]) -> Calc(select=[data_type, server_time, client_time, msg_id,
> array_info_imei, code, PROCTIME() AS proctime,
> Reinterpret(toTimeStamps(server_time)) AS rowtime]) ->
> Calc(select=[array_info_imei AS imei, REPLACE(msg_id, _UTF-16LE'#',
> _UTF-16LE'') AS msg_id, CASE((SEARCH(server_time, Sarg[(-∞.._UTF-16LE'NULL'),
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
> SET "UTF-16LE") AND server_time IS NOT NULL), server_time,
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
> 10))))) AS server_time, CASE((SEARCH(client_time, Sarg[(-∞.._UTF-16LE'NULL'),
> (_UTF-16LE'NULL'.._UTF-16LE'null'), (_UTF-16LE'null'..+∞)]:CHAR(4) CHARACTER
> SET "UTF-16LE") AND client_time IS NOT NULL), client_time,
> CAST(FROM_UNIXTIME(CAST(SUBSTRING(CAST(PROCTIME_MATERIALIZE(proctime)), 0,
> 10))))) AS client_time, IF(((data_type =
> _UTF-16LE'sms-netmsg-send':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
> (code = _UTF-16LE'0':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 1, 0) AS
> send_cnt, IF(((data_type = _UTF-16LE'sms-netmsg-callback':VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE") AND (code = _UTF-16LE'0':VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE")), 1, 0) AS reach_cnt, rowtime],
> where=[SEARCH(data_type, Sarg[_UTF-16LE'sms-netmsg-callback':VARCHAR(19)
> CHARACTER SET "UTF-16LE", _UTF-16LE'sms-netmsg-send':VARCHAR(19) CHARACTER
> SET "UTF-16LE"]:VARCHAR(19) CHARACTER SET "UTF-16LE")]) (22/24)#0" Id=87
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> -{color:red} blocked on java.lang.Object@4aa3fe44{color}
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$556/1624913200.run(Unknown
> Source)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)