cadonna commented on code in PR #12795:
URL: https://github.com/apache/kafka/pull/12795#discussion_r1018831974


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -121,21 +121,32 @@ private void runOnce() throws InterruptedException {
         }
 
         private void performActionsOnTasks() {
-            tasksAndActionsLock.lock();
-            try {
-                for (final TaskAndAction taskAndAction : getTasksAndActions()) 
{
+            while (!tasksAndActions.isEmpty()) {
+                Task toInitialize = null;
+                tasksAndActionsLock.lock();
+                try {
+                    final TaskAndAction taskAndAction = 
tasksAndActions.remove();
                     final Action action = taskAndAction.getAction();
                     switch (action) {
                         case ADD:
-                            addTask(taskAndAction.getTask());
+                            final Task task = taskAndAction.getTask();
+                            final Task existingTask = 
updatingTasks.putIfAbsent(task.id(), task);
+                            if (existingTask != null) {
+                                throw new 
IllegalStateException((existingTask.isActive() ? "Active" : "Standby") + " task 
" + task.id() + " already exist, " +
+                                    "should not try to add another " + 
(task.isActive() ? "active" : "standby") + " task with the same id. " + 
BUG_ERROR_MESSAGE);
+                            }
+                            toInitialize = task;

Review Comment:
   Could you extract this code to a method that returns a boolean that can be 
assigned to `toInitialize`? I think that makes the code more readable.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -250,30 +261,31 @@ private void removeUpdatingAndPausedTasks() {
             pausedTasks.clear();
         }
 
-        private List<TaskAndAction> getTasksAndActions() {
-            final List<TaskAndAction> tasksAndActionsToProcess = new 
ArrayList<>(tasksAndActions);
-            tasksAndActions.clear();
-            return tasksAndActionsToProcess;
-        }
-
-        private void addTask(final Task task) {
-            if (isStateless(task)) {
-                addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
-            } else {
-                final Task existingTask = updatingTasks.putIfAbsent(task.id(), 
task);
-                if (existingTask != null) {
-                    throw new IllegalStateException((existingTask.isActive() ? 
"Active" : "Standby") + " task " + task.id() + " already exist, " +
-                        "should not try to add another " + (task.isActive() ? 
"active" : "standby") + " task with the same id. " + BUG_ERROR_MESSAGE);
+        private void initializeTask(final Task task) {
+            boolean initializationFailed = false;
+            if (task.state() == Task.State.CREATED) {
+                try {
+                    task.initializeIfNeeded();
+                } catch (final StreamsException streamsException) {
+                    
addToExceptionsAndFailedTasksThenRemoveFromUpdatingTasks(new 
ExceptionAndTasks(Collections.singleton(task), streamsException));
+                    initializationFailed = true;
                 }
-                changelogReader.register(task.changelogPartitions(), 
task.stateManager());
-                if (task.isActive()) {
-                    log.info("Stateful active task " + task.id() + " was added 
to the state updater");
-                    changelogReader.enforceRestoreActive();
+            }
+            if (!initializationFailed) {

Review Comment:
   Why do we need this flag? Adding the code in the `if`-branch to the 
`try`-block after `task.initializeIfNeeded();` should have the same effect 
without needing a flag.
   For readability, I would extract the code that starts with `if 
(isStateless(task))` to a separate method named `handleInitializedTask()` or 
`postInitializeTask()` or similar. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to