Guozhang Wang created KAFKA-14847:
-------------------------------------
Summary: Separate the callers of commitAllTasks v.s. commitTasks
for EOS(-v2) and ALOS
Key: KAFKA-14847
URL: https://issues.apache.org/jira/browse/KAFKA-14847
Project: Kafka
Issue Type: Improvement
Components: streams
Reporter: Guozhang Wang
Today, EOS-v2/v1 and ALOS shares the same internal callpath inside
TaskManager/TaskExecutor for committing tasks from various scenarios, the call
path {{commitTasksAndMaybeUpdateCommitableOffsets}} ->
{{commitOffsetsOrTransaction}} takes in a list of tasks as its input, which can
be a subset of the tasks that thread / task manager owns. For EOS-v1 / ALOS,
this is fine to commit just a subset of the tasks; however for EOS-v1, since
all tasks participate in the same txn it could lead to dangerous violations,
and today we are relying on all the callers of the commit function to make sure
that the list of tasks they passed in, under EOS-v2, would still not violate
the semantics. As summarized today (thanks to Matthias), today that callee
could be triggered in the following cases:
1) Inside handleRevocation() -- this is a clean path, an we add all non-revoked
tasks with commitNeeded() flag set to the commit -- so this seems to be fine.
2) tryCloseCleanAllActiveTasks() -- here we only call it, if
tasksToCloseDirty.isEmpty() -- so it seems fine, too.
3) commit() with a list of task handed in -- we call commit() inside the TM
three time
3.a) inside commitAll() as commit(tasks.values()) (passing in all tasks)
3.b) inside maybeCommitActiveTasksPerUserRequested as
commit(activeTaskIterable()); (passing in all tasks)
3.c) inside handleCorruption() -- here, we only consider RUNNING and RESTORING
tasks, which are not corrupted -- note we only throw a TaskCorruptedException
during restore state initialization, thus, corrupted tasks did not process
anything yet, and all other tasks should be clean to be committed.
3.d) commitSuccessfullyProcessedTasks() -- under EOS-v2, as we just commit a
subset of tasks' source offsets while at the same time we still commit those
unsuccessful task's outgoing records if there are any.
Just going through this list of callers itself, as demonstrated above, is
already pretty complex, and very vulnerable to bugs. It's better to not rely on
the callers, but the callees to make sure that's the case. More concretely, I
think we can introduce a new function called {{commitAllTasks}} such that under
EOS-v2, the caller always call {{commitAllTasks}} instead, and if there are
some tasks that should not be committed because we know they have not processed
any data, the {{commitAllTasks}} callee itself would do some clever filtering
internally.
Given its scope, I think it's better to do this refactoring after EOS-v1 is
removed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)