lucasbru commented on code in PR #12795:
URL: https://github.com/apache/kafka/pull/12795#discussion_r1018881878


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -250,30 +261,31 @@ private void removeUpdatingAndPausedTasks() {
             pausedTasks.clear();
         }
 
-        private List<TaskAndAction> getTasksAndActions() {
-            final List<TaskAndAction> tasksAndActionsToProcess = new 
ArrayList<>(tasksAndActions);
-            tasksAndActions.clear();
-            return tasksAndActionsToProcess;
-        }
-
-        private void addTask(final Task task) {
-            if (isStateless(task)) {
-                addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
-            } else {
-                final Task existingTask = updatingTasks.putIfAbsent(task.id(), 
task);
-                if (existingTask != null) {
-                    throw new IllegalStateException((existingTask.isActive() ? 
"Active" : "Standby") + " task " + task.id() + " already exist, " +
-                        "should not try to add another " + (task.isActive() ? 
"active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
+        private void initializeTask(final Task task) {
+            boolean initializationFailed = false;
+            if (task.state() == Task.State.CREATED) {
+                try {
+                    task.initializeIfNeeded();
+                } catch (final StreamsException streamsException) {
+                    
addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new 
ExceptionAndTasks(Collections.singleton(task), streamsException));
+                    initializationFailed = true;
                 }
-                changelogReader.register(task.changelogPartitions(), 
task.stateManager());
-                if (task.isActive()) {
-                    log.info("Stateful active task " + task.id() + " was added 
to the state updater");
-                    changelogReader.enforceRestoreActive();
+            }
+            if (!initializationFailed) {

Review Comment:
   I introduced the flag to make the try-block smaller. For me, it's easier to 
reason about this code this way, because I know that the exception can only be 
thrown inside `initializeIfNeeded` and the task is in a well-defined state in 
the whole try-block (it's inside `updatingTasks` and no other queue). If I move 
the code starting with `if (isStateless(task))` into the `try`-block, I have to 
reason about things like - what if the exception happens somewhere inside the 
stateless task handling? Is the task maybe already removed from 
`updatingTasks`? But you are right, since in the current code no 
`StreamsException` is produced anywhere the in the code starting with 
`if(isStateless(task))` it does not make a difference at runtime. So let me 
know if you'd prefer me to change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to