guozhangwang commented on code in PR #12439: URL: https://github.com/apache/kafka/pull/12439#discussion_r932684021
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java: ########## @@ -233,44 +234,7 @@ public void closeCleanAndRecycleState() { closeTaskSensor.record(); transitionTo(State.CLOSED); - log.info("Closed clean and recycled state"); - } - - /** - * Create an active task from this standby task without closing and re-initializing the state stores. - * The task should have been in suspended state when calling this function - * - * TODO: we should be able to not need the input partitions as input param in future but always reuse - * the task's input partitions when we have fixed partitions -> tasks mapping - */ - public StreamTask recycle(final Time time, - final ThreadCache cache, - final RecordCollector recordCollector, - final Set<TopicPartition> inputPartitions, - final Consumer<byte[], byte[]> mainConsumer) { - if (!inputPartitions.equals(this.inputPartitions)) { - log.warn("Detected unmatched input partitions for task {} when recycling it from active to standby", id); - } - - stateMgr.transitionTaskType(TaskType.ACTIVE); - - log.debug("Recycling standby task {} to active", id); - - return new StreamTask( - id, - inputPartitions, - topology, - mainConsumer, - config, - streamsMetrics, - stateDirectory, - cache, - time, - stateMgr, - recordCollector, - processorContext, - logContext - ); + log.info("Closed and recycled state, and converted type to active"); Review Comment: ack. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -566,49 +566,9 @@ public void closeCleanAndRecycleState() { } closeTaskSensor.record(); - transitionTo(State.CLOSED); - log.info("Closed clean and recycled state"); - } - - /** - * Create a standby task from this active task without closing and re-initializing the state stores. - * The task should have been in suspended state when calling this function - * - * TODO: we should be able to not need the input partitions as input param in future but always reuse - * the task's input partitions when we have fixed partitions -> tasks mapping - */ - public StandbyTask recycle(final Set<TopicPartition> inputPartitions) { - if (state() != Task.State.CLOSED) { - throw new IllegalStateException("Attempted to convert an active task that's not closed: " + id); - } - - if (!inputPartitions.equals(this.inputPartitions)) { - log.warn("Detected unmatched input partitions for task {} when recycling it from active to standby", id); - } - - stateMgr.transitionTaskType(TaskType.STANDBY); - - final ThreadCache dummyCache = new ThreadCache( - new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), - 0, - streamsMetrics - ); - - log.debug("Recycling active task {} to standby", id); - - return new StandbyTask( - id, - inputPartitions, - topology, - config, - streamsMetrics, - stateMgr, - stateDirectory, - dummyCache, - processorContext - ); + log.info("Closed and recycled state, and converted type to standby"); Review Comment: ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org