This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da3f8752561 MINOR: Cleanup additional dead code missed during 
stateupdater refactoring (#21157)
da3f8752561 is described below

commit da3f8752561192f3bee24a543b09a5e72f94633d
Author: Shashank <[email protected]>
AuthorDate: Thu Dec 18 05:59:54 2025 -0800

    MINOR: Cleanup additional dead code missed during stateupdater refactoring 
(#21157)
    
    - Removed unused methods
    - Minor cleanup of method parameters
    - Removed unused variables and imports
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../streams/processor/internals/StreamThread.java  | 35 +---------
 .../streams/processor/internals/TaskManager.java   | 81 ----------------------
 .../processor/internals/ActiveTaskCreatorTest.java |  2 -
 .../processor/internals/StreamThreadTest.java      |  1 -
 .../processor/internals/TaskManagerTest.java       |  2 -
 .../StreamThreadStateStoreProviderTest.java        |  7 --
 6 files changed, 1 insertion(+), 127 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 405ea720b6e..1bdf4d9ba68 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -497,7 +497,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none");
         }
 
-        final MainConsumerSetup mainConsumerSetup = 
setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log, 
threadId, consumerConfigs);
+        final MainConsumerSetup mainConsumerSetup = 
setupMainConsumer(topologyMetadata, config, clientSupplier, processId, 
consumerConfigs);
 
         taskManager.setMainConsumer(mainConsumerSetup.mainConsumer);
         referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer;
@@ -537,8 +537,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                                                        final StreamsConfig 
config,
                                                        final 
KafkaClientSupplier clientSupplier,
                                                        final UUID processId,
-                                                       final Logger log,
-                                                       final String threadId,
                                                        final Map<String, 
Object> consumerConfigs) {
         if 
(config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name))
 {
             if (topologyMetadata.hasNamedTopologies()) {
@@ -1373,37 +1371,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
     }
 
-    private void initializeAndRestorePhase() {
-        final java.util.function.Consumer<Set<TopicPartition>> offsetResetter 
= partitions -> resetOffsets(partitions, null);
-        final State stateSnapshot = state;
-        // only try to initialize the assigned tasks
-        // if the state is still in PARTITION_ASSIGNED after the poll call
-        if (stateSnapshot == State.PARTITIONS_ASSIGNED
-            || stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
-
-            log.debug("State is {}; initializing tasks if necessary", 
stateSnapshot);
-
-            if (taskManager.tryToCompleteRestoration(now, offsetResetter)) {
-                log.info("Restoration took {} ms for all active tasks {}", 
time.milliseconds() - lastPartitionAssignedMs,
-                    taskManager.activeTaskIds());
-                setState(State.RUNNING);
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("Initialization call done. State is {}", state);
-            }
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("Idempotently invoking restoration logic in state {}", 
state);
-        }
-        // we can always let changelog reader try restoring in order to 
initialize the changelogs;
-        // if there's no active restoring or standby updating it would not try 
to fetch any data
-        // After KAFKA-13873, we only restore the not paused tasks.
-        changelogReader.restore(taskManager.notPausedTasks());
-        log.debug("Idempotent restore call done. Thread state has not 
changed.");
-    }
-
     private void checkStateUpdater() {
         final java.util.function.Consumer<Set<TopicPartition>> offsetResetter 
= partitions -> resetOffsets(partitions, null);
         final State stateSnapshot = state;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 205f67374f7..5842a4067fc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -58,7 +58,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -844,77 +843,6 @@ public class TaskManager {
         return activeTaskCreator.createActiveTaskFromStandby(standbyTask, 
partitions, mainConsumer);
     }
 
-    /**
-     * Tries to initialize any new or still-uninitialized tasks, then checks 
if they can/have completed restoration.
-     *
-     * @throws IllegalStateException If store gets registered after 
initialized is already finished
-     * @throws StreamsException if the store's change log does not contain the 
partition
-     * @return {@code true} if all tasks are fully restored
-     */
-    boolean tryToCompleteRestoration(final long now,
-                                     final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
-        boolean allRunning = true;
-
-        // transit to restore active is idempotent so we can call it multiple 
times
-        changelogReader.enforceRestoreActive();
-
-        final List<Task> activeTasks = new LinkedList<>();
-        for (final Task task : tasks.allTasks()) {
-            try {
-                task.initializeIfNeeded();
-                task.clearTaskTimeout();
-            } catch (final LockException lockException) {
-                // it is possible that if there are multiple threads within 
the instance that one thread
-                // trying to grab the task from the other, while the other has 
not released the lock since
-                // it did not participate in the rebalance. In this case we 
can just retry in the next iteration
-                log.debug("Could not initialize task {} since: {}; will 
retry", task.id(), lockException.getMessage());
-                allRunning = false;
-            } catch (final TimeoutException timeoutException) {
-                task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
-                allRunning = false;
-            }
-
-            if (task.isActive()) {
-                activeTasks.add(task);
-            }
-        }
-
-        if (allRunning && !activeTasks.isEmpty()) {
-
-            final Set<TopicPartition> restored = 
changelogReader.completedChangelogs();
-
-            for (final Task task : activeTasks) {
-                if (restored.containsAll(task.changelogPartitions())) {
-                    try {
-                        task.completeRestoration(offsetResetter);
-                        task.clearTaskTimeout();
-                    } catch (final TimeoutException timeoutException) {
-                        task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
-                        log.debug(
-                            String.format(
-                                "Could not complete restoration for %s due to 
the following exception; will retry",
-                                task.id()),
-                            timeoutException
-                        );
-
-                        allRunning = false;
-                    }
-                } else {
-                    // we found a restoring task that isn't done restoring, 
which is evidence that
-                    // not all tasks are running
-                    allRunning = false;
-                }
-            }
-        }
-        if (allRunning) {
-            // we can call resume multiple times since it is idempotent.
-            mainConsumer.resume(mainConsumer.assignment());
-            changelogReader.transitToUpdateStandby();
-        }
-
-        return allRunning;
-    }
-
     public boolean checkStateUpdater(final long now,
                                      final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
         addTasksToStateUpdater();
@@ -2167,15 +2095,6 @@ public class TaskManager {
             e -> log.debug("Ignoring error in unclean {}", name));
     }
 
-    boolean needsInitializationOrRestoration() {
-        return 
activeTaskStream().anyMatch(Task::needsInitializationOrRestoration);
-    }
-
-    // for testing only
-    void addTask(final Task task) {
-        tasks.addTask(task);
-    }
-
     private boolean canTryInitializeTask(final TaskId taskId, final long 
nowMs) {
         return !taskIdToBackoffRecord.containsKey(taskId) || 
taskIdToBackoffRecord.get(taskId).canAttempt(nowMs);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 9ddff25ff94..bc9449cebce 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -68,8 +68,6 @@ public class ActiveTaskCreatorTest {
     private InternalTopologyBuilder builder;
     @Mock
     private StateDirectory stateDirectory;
-    @Mock
-    private ChangelogReader changeLogReader;
 
     private final MockClientSupplier mockClientSupplier = new 
MockClientSupplier();
     private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 152b00918c1..947c1eedc18 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -247,7 +247,6 @@ public class StreamThreadTest {
     // task0 is unused
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
-    private final TaskId task3 = new TaskId(1, 1);
 
     private Properties configProps(final boolean enableEoS, final boolean 
processingThreadsEnabled) {
         return mkProperties(mkMap(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 3dff3e9ef23..798d2d3406b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -191,8 +191,6 @@ public class TaskManagerTest {
     private StandbyTaskCreator standbyTaskCreator;
     @Mock
     private Admin adminClient;
-    @Mock
-    private ProcessorStateManager stateManager;
     final StateUpdater stateUpdater = mock(StateUpdater.class);
     final DefaultTaskManager schedulingTaskManager = 
mock(DefaultTaskManager.class);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 34d8d279778..8a8e892df00 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -178,9 +177,7 @@ public class StreamThreadStateStoreProviderTest {
         taskOne = createStreamsTask(
             streamsConfig,
             mockConsumer,
-            mockRestoreConsumer,
             mockProducer,
-            mockAdminClient,
             processorTopology,
             new TaskId(0, 0));
         taskOne.initializeIfNeeded();
@@ -189,9 +186,7 @@ public class StreamThreadStateStoreProviderTest {
         final StreamTask taskTwo = createStreamsTask(
             streamsConfig,
             mockConsumer,
-            mockRestoreConsumer,
             mockProducer,
-            mockAdminClient,
             processorTopology,
             new TaskId(0, 1));
         taskTwo.initializeIfNeeded();
@@ -420,9 +415,7 @@ public class StreamThreadStateStoreProviderTest {
 
     private StreamTask createStreamsTask(final StreamsConfig streamsConfig,
                                          final Consumer<byte[], byte[]> 
consumer,
-                                         final Consumer<byte[], byte[]> 
restoreConsumer,
                                          final Producer<byte[], byte[]> 
producer,
-                                         final Admin adminClient,
                                          final ProcessorTopology topology,
                                          final TaskId taskId) {
         final Metrics metrics = new Metrics();

Reply via email to