Feifan Wang created FLINK-37318:
-----------------------------------
Summary: RocksDBKeyedStateBackend disposed before task exit
Key: FLINK-37318
URL: https://issues.apache.org/jira/browse/FLINK-37318
Project: Flink
Issue Type: Bug
Components: Runtime / State Backends
Affects Versions: 1.12.2
Reporter: Feifan Wang
We encountered two cases in the same job where RocksDBStateBackend was disposed
before task cancel, as shown in the following logs:
{code:java}
2024-12-01 12:12:12,703 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Closed
RocksDB State Backend. Cleaning up RocksDB working directory
/data4/hadoop/yarn/nm-local-dir/usercache/hadoop-rt/appcache/application_1725359226161_2973711/flink-io-a9a5e6ba-f582-40e0-a6f1-f4221911de28/job_34a8565e51c2c2e22bae59f537e5775f_op_KeyedProcessOperator_174cfe0875f20e2c391c93234c3d8810__65_80__uuid_01262a91-5f77-4f57-9cc1-edf586a5d799.
2024-12-01 12:12:21,866 WARN org.apache.flink.runtime.taskmanager.Task
- OverAggregate(partitionBy=[dpid, $18], orderBy=[stat_time ASC],
window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], sele...id, imei,
idfa, mac_address, client_ip, device_model, os_version, os, extension,
stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0
(3e1d0680c28b9bc1e58fc5eaf341e6a9) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Error while deserializing the user
key.
at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:486)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState$1.next(RocksDBMapState.java:197)
at
org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:203)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:219)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:400)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:629)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:767)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:578)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at
org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:233)
at
org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:72)
at
org.apache.flink.api.common.typeutils.base.LongSerializer.deserialize(LongSerializer.java:30)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:380)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$000(RocksDBMapState.java:64)
at
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:483)
... 20 more
2024-12-01 12:12:21,868 INFO org.apache.flink.runtime.taskmanager.Task
- Freeing task resources for OverAggregate(partitionBy=[dpid, $18],
orderBy=[stat_time ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW], sele...id, imei, idfa, mac_address, client_ip, device_model, os_version,
os, extension, stat_time, w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0
(3e1d0680c28b9bc1e58fc5eaf341e6a9).
2024-12-01 12:12:21,895 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering
task and sending final execution state FAILED to JobManager for task
OverAggregate(partitionBy=[dpid, $18], orderBy=[stat_time ASC], window=[ ROWS
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], sele...id, imei, idfa,
mac_address, client_ip, device_model, os_version, os, extension, stat_time,
w0$o0 AS rnk, (w0$o0 - 1) AS $f19])) (65/80)#0 3e1d0680c28b9bc1e58fc5eaf341e6a9.
2024-12-01 12:12:21,964 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin],
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000,
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os,
extension, stat_time, session_id0, event_timestamp0, page_identifier0])
(49/80)#0 (a533508520a4898f3658291d2eb7d509).
2024-12-01 12:12:21,964 INFO org.apache.flink.runtime.taskmanager.Task
- IntervalJoin(joinType=[LeftOuterJoin],
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000,
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os,
extension, stat_time, session_id0, event_timestamp0, page_identifier0])
(49/80)#0 (a533508520a4898f3658291d2eb7d509) switched from RUNNING to CANCELING.
2024-12-01 12:12:21,964 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code
IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address,
client_ip, device_model, os_version, os, extension, stat_time, session_id0,
event_timestamp0, page_identifier0]) (49/80)#0
(a533508520a4898f3658291d2eb7d509).
2024-12-01 12:12:21,971 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin],
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000,
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os,
extension, stat_time, session_id0, event_timestamp0, page_identifier0])
(65/80)#0 (54c43203bab937f639cf9564425bface).
2024-12-01 12:12:21,971 INFO org.apache.flink.runtime.taskmanager.Task
- IntervalJoin(joinType=[LeftOuterJoin],
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000,
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os,
extension, stat_time, session_id0, event_timestamp0, page_identifier0])
(65/80)#0 (54c43203bab937f639cf9564425bface) switched from RUNNING to CANCELING.
2024-12-01 12:12:21,971 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code
IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address,
client_ip, device_model, os_version, os, extension, stat_time, session_id0,
event_timestamp0, page_identifier0]) (65/80)#0
(54c43203bab937f639cf9564425bface).
2024-12-01 12:12:21,972 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin],
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000,
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os,
extension, stat_time, session_id0, event_timestamp0, page_identifier0])
(17/80)#0 (51c0a203e2dce0cceb2da1a1f992cd69).
2024-12-01 12:12:21,972 INFO org.apache.flink.runtime.taskmanager.Task
- IntervalJoin(joinType=[LeftOuterJoin],
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000,
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os,
extension, stat_time, session_id0, event_timestamp0, page_identifier0])
(17/80)#0 (51c0a203e2dce0cceb2da1a1f992cd69) switched from RUNNING to CANCELING.
2024-12-01 12:12:21,972 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code
IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true,
leftLowerBound=-600000, leftUpperBound=600000, leftTimeInde...a, mac_address,
client_ip, device_model, os_version, os, extension, stat_time, session_id0,
event_timestamp0, page_identifier0]) (17/80)#0
(51c0a203e2dce0cceb2da1a1f992cd69).
2024-12-01 12:12:21,973 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task IntervalJoin(joinType=[LeftOuterJoin],
windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=600000,
leftTimeInde...a, mac_address, client_ip, device_model, os_version, os,
extension, stat_time, session_id0, event_timestamp0, page_identifier0])
(33/80)#0 (da848644a3859496193cbe46a3730bfd).
{code}
Please note that there is no cancel task log before the close RocksDB message
at "2024-12-01 12:12:12,703" in the log. And then the deserialization exception
of RocksDBMapState appears 9 seconds later.
This seems to be a bug. I first want to understand why RocksDBKeyedStateBackend
is disposed. Can anyone provide some help?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)