Mateusz Jadczyk created KAFKA-9891:
--------------------------------------
Summary: Invalid state store content after task migration with
exactly_once and standby replicas
Key: KAFKA-9891
URL: https://issues.apache.org/jira/browse/KAFKA-9891
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.4.1, 2.3.1
Reporter: Mateusz Jadczyk
We have a simple command id deduplication mechanism (very similar to the one
from Kafka Streams examples) based on Kafka Streams State Stores. It stores
command ids from the past hour in _persistentWindowStore_. We encountered a
problem with the store if there's an exception thrown later in that topology.
We run 3 nodes using docker, each with multiple threads set for this particular
Streams Application.
The business flow is as follows:
* a valid command is sent with command id
(_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active
task 1_2. First node in the topology analyses if this is a duplicate by
checking in the state store (_COMMAND_ID_STORE_), if not puts the command id in
the state store and processes the command properly.
* an invalid command is sent with the same key but new command id
(_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the
duplicated command id is performed, it's not a duplicate, command id is put
into the state store. Next node in the topology throws an exception which
causes an error on NODE 1 for task 1_2. As a result, transaction is aborted,
offsets are not committed. I double checked for the changelog topic - relevant
messages are not committed. Therefore, the changelog topic contains only the
first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and not the
once which caused a failure.
* in the meantime a standby task 1_2 running on NODE 3 replicated
_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local
_COMMAND_ID_STORE_
* standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. It
checks if this command id is a duplicate - no, it isn't - processes the faulty
command and throws an exception. Again, transaction aborted, all fine.
* NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise,
*it is a duplicate!* Even though the transaction has been aborted and the
changelog doesn't contain this command id.
After digging into the Streams logs and some discussion on ([Stack
Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
we concluded it has something to do with checkpoint files. Here are the
detailed logs relevant to checkpoint files.
{code:java}
NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2]
o.a.k.s.p.i.ProcessorStateManager : stream-thread
[XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2]
Checkpointable offsets read from checkpoint: {}
NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2]
o.a.k.s.p.i.ProcessorStateManager : stream-thread
[XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2]
Restoring state store COMMAND_ID_STORE from changelog topic
XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint null
NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1]
o.a.k.s.p.i.ProcessorStateManager : stream-thread
[XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1]
standby-task [1_2] Checkpointable offsets read from checkpoint: {}
NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2]
o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file
/tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2]
o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file
/tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
/tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1]
o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file
/tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1]
o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file
/tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
/tmp/kafka-streams/XXXXProcessor/1_2/.checkpointNODE_1 log1:2020-04-15
21:11:33.942 DEBUG 1 --- [-StreamThread-2]
c.g.f.c.s.validation.CommandIdValidator : CommandId:
mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.NODE_3
2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1]
o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file
/tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
NODE_3 2020-04-15 21:11:47.233 TRACE 1 --- [-StreamThread-1]
o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file
/tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
/tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
NODE_3 2020-04-15 21:11:49.075 TRACE 1 --- [-StreamThread-2]
o.a.k.s.p.i.ProcessorStateManager : stream-thread
[XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2]
Restoring state store COMMAND_ID_STORE from changelog topic
XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
NODE_3 2020-04-15 21:11:49.436 TRACE 1 --- [-StreamThread-2]
o.a.k.s.p.i.StoreChangelogReader : stream-thread
[XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Found
checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for
store COMMAND_ID_STORE.NODE_3 2020-04-15 21:11:52.023 DEBUG 1 ---
[-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId:
mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.
NODE_3 2020-04-15 21:11:53.683 ERROR 1 --- [-StreamThread-2]
o.a.k.s.p.i.AssignedStreamsTasks : stream-thread
[XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Failed to
process stream task 1_2 due to the following error:
java.lang.RuntimeExceptionNODE_3 2020-04-15 21:12:05.346 TRACE 1 ---
[-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread
[XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] task [1_2]
Restoring state store COMMAND_ID_STORE from changelog topic
XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
NODE_3 2020-04-15 21:12:05.562 TRACE 1 --- [-StreamThread-1]
o.a.k.s.p.i.StoreChangelogReader : stream-thread
[XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] Found
checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for
store COMMAND_ID_STORE.NODE_3 2020-04-15 21:12:06.424 WARN 1 ---
[-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator : Command duplicate
detected. Command id mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc
{code}
It seems that on NODE_3 there's a standby task 1_2 running on T-2, it
replicates a first valid command, thus creating a checkpoint file. Invalid
command causes an error on NODE_1, then NODE_3 T-2 takes over the task. It
finds the checkpoint file (which is fine), and starts to process the invalid
command. It crashes, same node T-1 takes over, finds the checkpoint file (!),
thinks state store is clean (apparently it's not as it contains state modified
by T-2) and finds a duplicated command id.
We use Java 11, kafka clients 4.1 and spring-kafka 2.4.5. We rolled back for a
moment to kafka clients 2.3.1 and the problem persists.
*We performed more tests with configuration changes and after changing
`num.standby.replicas = 1` to `num.standby.replicas = 0` the problem
disappeared. It is also resolved when changing the store to
_inMemoryWindowStore._*
In the SO question you can find the relevant java code. I don't have a sample
project to share at the moment which replicates the problem, but it is easily
repeatable in our project.
Such behaviour can have serious implications on business logic, in our case
accidentally skipped messages without properly processing them.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)