guozhangwang commented on code in PR #12439: URL: https://github.com/apache/kafka/pull/12439#discussion_r930170868
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java: ########## @@ -211,13 +209,36 @@ private RecordCollector createRecordCollector(final TaskId taskId, ); } + /* + * TODO: we pass in the new input partitions to validate if they still match, + * in the future we when we have fixed partitions -> tasks mapping, + * we should always reuse the input partition and hence no need validations + */ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask, final Set<TopicPartition> inputPartitions, final Consumer<byte[], byte[]> consumer) { + if (!inputPartitions.equals(standbyTask.inputPartitions)) { + log.warn("Detected unmatched input partitions for task {} when recycling it from standby to active", standbyTask.id); + } + final RecordCollector recordCollector = createRecordCollector(standbyTask.id, getLogContext(standbyTask.id), standbyTask.topology); - final StreamTask task = standbyTask.recycle(time, cache, recordCollector, inputPartitions, consumer); + final StreamTask task = new StreamTask( + standbyTask.id, + inputPartitions, + standbyTask.topology, + consumer, + standbyTask.config, + streamsMetrics, + stateDirectory, + cache, + time, + standbyTask.stateMgr, + recordCollector, + standbyTask.processorContext, + standbyTask.logContext + ); - log.trace("Created active task {} with assigned partitions {}", task.id, inputPartitions); + log.trace("Recycled active task {} from recycled standby with assigned partitions {}", task.id, inputPartitions); Review Comment: ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org