lucasbru commented on code in PR #12795:
URL: https://github.com/apache/kafka/pull/12795#discussion_r1018903133


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -121,21 +121,32 @@ private void runOnce() throws InterruptedException {
         }
 
         private void performActionsOnTasks() {
-            tasksAndActionsLock.lock();
-            try {
-                for (final TaskAndAction taskAndAction : getTasksAndActions()) 
{
+            while (!tasksAndActions.isEmpty()) {
+                Task toInitialize = null;
+                tasksAndActionsLock.lock();
+                try {
+                    final TaskAndAction taskAndAction = 
tasksAndActions.remove();
                     final Action action = taskAndAction.getAction();
                     switch (action) {
                         case ADD:
-                            addTask(taskAndAction.getTask());
+                            final Task task = taskAndAction.getTask();
+                            final Task existingTask = 
updatingTasks.putIfAbsent(task.id(), task);
+                            if (existingTask != null) {
+                                throw new 
IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task 
" + task.id() + " already exist, " +
+                                    "should not try to add another " + 
(task.isActive() ? "active" : "standby") + " task with the same id. " + 
BUG_ERROR_MESSAGE);
+                            }
+                            toInitialize = task;

Review Comment:
   boolean won't quite work because then I don't have a reference to the task 
in the initialization block. 
   
   I introduced a function for adding the task though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to