cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r979883060
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
}
}
- @Override
- public void pause(final TaskId taskId) {
+ public void pause(final TopologyMetadata topologyMetadata) {
tasksAndActionsLock.lock();
try {
- tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
- tasksAndActionsCondition.signalAll();
+ for (final Task task : getUpdatingTasks()) {
+ if (topologyMetadata.isPaused(task.id().topologyName())) {
+
tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+ tasksAndActionsCondition.signalAll();
+ }
+ }
Review Comment:
@guozhangwang I think, we misunderstood each other. Sorry if I was not clear
enough.
My proposal was to store the names of the paused topologies in the state
updater thread, like:
```
public class DefaultStateUpdater implements StateUpdater {
...
private class StateUpdaterThread extends Thread {
...
private Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
private Set<String> getPausedTopologies() {
return Collections.unmodifiableSet(pausedTopologies);
}
```
The `StateUpdater` (and `DefaultStateUpdater`) then has the following method
that is called in `taskManager.checkStateUpdater()`:
```
public class DefaultStateUpdater implements StateUpdater {
...
@Override
public syncPausedTopologies(final Set<String> pausedTopologies) {
final Set<String> stateUpdaterPausedTopologies =
getPausedTopologies();
final Set<String> pausedTopologiesToAdd =
pausedTopologies.removeAll(stateUpdaterPausedTopologies);
final Set<String> pausedTopologiesToRemove =
stateUpdaterPausedTopologies.removeAll(pausedTopologies);
if (!pausedTopologiesToAdd.isEmpty()) {
tasksAndActions.addPausedTopology(TaskAndAction.createAddPausedTopology(pausedTopologiesToAdd));
}
if (!pausedTopologiesToRemove.isEmpty()) {
tasksAndActions.removePausedTopology(TaskAndAction.createRemovePausedTopology(pausedTopologiesToRemove));
}
}
```
In this way, we would not need to iterate over the tasks in the state
updater in regular intervals but only if the set of paused topologies changed.
When we add a task to the state updater, the state updater thread needs to
check if the topology of the task is paused or not.
```
public class DefaultStateUpdater implements StateUpdater {
...
private class StateUpdaterThread extends Thread {
...
private void addTask(final Task task) {
if (isStateless(task)) {
addToRestoredTasks((StreamTask) task);
log.info("Stateless active task " + task.id() + " was added
to the restored tasks of the state updater");
} else {
if (pausedTopologies.contains(task.topologyName())) {
// add to paused tasks
} else {
// add to updating tasks
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]