guozhangwang commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1147064683


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +379,18 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        final Collection<Task> newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        final Set<Task> activeTasksNeedCommit;
+        // Find new active tasks which need commit.
+        if (newActiveTasks == null || newActiveTasks.isEmpty()) {
+            activeTasksNeedCommit = new HashSet<>();
+        } else {
+            activeTasksNeedCommit = 
newActiveTasks.stream().filter(Task::commitNeeded).collect(Collectors.toSet());

Review Comment:
   For newly created tasks, they should not have `commitNeeded` flag set. I 
think @ableegoldman 's comment was that the conditions are `(a) EOS is used 
(since this is a txn-based issue to begin with) and (b) if we have newly-added 
active tasks (as discussed previously)`, but maybe `(c) there is already a txn 
ongoing)`.
   
   Note that for EOS-v1, and EOS-v2, condition (c) is different (and that's why 
I was originally thinking to wait until we only have EOS-v2, but after some 
thinking I'm feeling more towards narrowing the scope of fix to EOS-v2 since it 
is the commonly used config now, and fixing EOS-v1 may incur unnecessary 
complexity with little value). For EOS-v2, we just need to check 
`threadProducer.hasInflighTxn`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -298,6 +299,42 @@ private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, 
final AtomicReference<RuntimeException> activeTasksCommitException) {

Review Comment:
   Could we reuse `commitTasksAndMaybeUpdateCommittableOffsets` instead, to 
avoid code duplications?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to