cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r977372667
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
}
}
- @Override
- public void pause(final TaskId taskId) {
+ public void pause(final TopologyMetadata topologyMetadata) {
tasksAndActionsLock.lock();
try {
- tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
- tasksAndActionsCondition.signalAll();
+ for (final Task task : getUpdatingTasks()) {
+ if (topologyMetadata.isPaused(task.id().topologyName())) {
+
tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+ tasksAndActionsCondition.signalAll();
+ }
+ }
Review Comment:
Wouldn't it be better to store paused topologies within the state updater
thread?
Consider the following scenario:
1. The state updater managed task A from topology X.
2. Topology X is paused.
3. Task A is revoked from and task B from topology X is assigned.
When the topology is paused, we would pass the topology name through an
input queue event (like add or remove) to the state updater thread. Once the
state updater thread processes the input queue event with the topology name to
pause, it stores the name of the paused topology and pauses all tasks (i.e.,
task A) of the paused topology. Once task B is assigned and added to the state
updater, the paused topologies are consulted and since topology X is paused
task B will be paused directly. When a topology is resumed, the corresponding
tasks are resumed and the name of the resumed topology is removed from the
state updater thread.
If we expose the names of the paused topology from the state updater thread
to the default state updater, we can check in `pause(topology)` if the state
updater thread already knows about the paused topology and avoid creating an
event in the input queue for already-known paused topologies.
All this would avoid to iterate over the tasks each time `checkStateUpdater`
is called. We would only do a check of the paused topologies when a task is
added, and loop over the task if a topology was just paused.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]