guozhangwang commented on code in PR #12439:
URL: https://github.com/apache/kafka/pull/12439#discussion_r930170868


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -211,13 +209,36 @@ private RecordCollector createRecordCollector(final 
TaskId taskId,
         );
     }
 
+    /*
+     * TODO: we pass in the new input partitions to validate if they still 
match,
+     *       in the future we when we have fixed partitions -> tasks mapping,
+     *       we should always reuse the input partition and hence no need 
validations
+     */
     StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
                                            final Set<TopicPartition> 
inputPartitions,
                                            final Consumer<byte[], byte[]> 
consumer) {
+        if (!inputPartitions.equals(standbyTask.inputPartitions)) {
+            log.warn("Detected unmatched input partitions for task {} when 
recycling it from standby to active", standbyTask.id);
+        }
+
         final RecordCollector recordCollector = 
createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), 
standbyTask.topology);
-        final StreamTask task = standbyTask.recycle(time, cache, 
recordCollector, inputPartitions, consumer);
+        final StreamTask task = new StreamTask(
+            standbyTask.id,
+            inputPartitions,
+            standbyTask.topology,
+            consumer,
+            standbyTask.config,
+            streamsMetrics,
+            stateDirectory,
+            cache,
+            time,
+            standbyTask.stateMgr,
+            recordCollector,
+            standbyTask.processorContext,
+            standbyTask.logContext
+        );
 
-        log.trace("Created active task {} with assigned partitions {}", 
task.id, inputPartitions);
+        log.trace("Recycled active task {} from recycled standby with assigned 
partitions {}", task.id, inputPartitions);

Review Comment:
   ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to