guozhangwang commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1154747980


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,18 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        final Collection<Task> newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+            log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+            final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+            if (numCommitted == -1) {
+                log.info("Couldn't commit any tasks since a rebalance is in 
progress");
+            } else {
+                log.info("Committed {} transactions", numCommitted);

Review Comment:
   nit: `Committed the ongoing V2 transaction at the assignment due to newly 
created active tasks`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,18 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        final Collection<Task> newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+            log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");

Review Comment:
   One caveat for EOS-v2 is that, when we commit, we'd have to make sure we are 
committing all tasks that have processed any data, but not just the active 
tasks --- sorry for not making that clear before, since it also bothers me some 
time to make it straight, and as a result I filed 
https://issues.apache.org/jira/browse/KAFKA-14847, please feel free to read it 
in more details. --- in a word, when we are in EOS-v2, each commit has to 
include everyone even if we only want to commit for a part of that, so we'd 
better give all the tasks in the `commitTasksAndMaybeUpdateCommittableOffsets` 
func below.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,18 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        final Collection<Task> newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+            log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+            final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+            if (numCommitted == -1) {

Review Comment:
   This reminds me one thing: we call `onAssignment` first, and then 
`onPartitionsAssigned` later, and we only set `rebalanceInProgress` to false in 
the latter func, which means that during `onAssignment` we would always see 
`rebalanceInProgress == true` which would not allow a commit logically.. 
   
   I gave some thought about it, and currently the quick (and somewhat dirty..) 
fix would be to move the `rebalanceInProgress == true` line right before the 
`createNewTasks` inside the `handleAssignment` here. But we should leave a TODO 
such that moving one we would only rely on `onAssignment` as the rebalance 
complete barrier and move others from `onPartitionsAssigned` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to