This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new da3f8752561 MINOR: Cleanup additional dead code missed during
stateupdater refactoring (#21157)
da3f8752561 is described below
commit da3f8752561192f3bee24a543b09a5e72f94633d
Author: Shashank <[email protected]>
AuthorDate: Thu Dec 18 05:59:54 2025 -0800
MINOR: Cleanup additional dead code missed during stateupdater refactoring
(#21157)
- Removed unused methods
- Minor cleanup of method parameters
- Removed unused variables and imports
Reviewers: Lucas Brutschy <[email protected]>
---
.../streams/processor/internals/StreamThread.java | 35 +---------
.../streams/processor/internals/TaskManager.java | 81 ----------------------
.../processor/internals/ActiveTaskCreatorTest.java | 2 -
.../processor/internals/StreamThreadTest.java | 1 -
.../processor/internals/TaskManagerTest.java | 2 -
.../StreamThreadStateStoreProviderTest.java | 7 --
6 files changed, 1 insertion(+), 127 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 405ea720b6e..1bdf4d9ba68 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -497,7 +497,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"none");
}
- final MainConsumerSetup mainConsumerSetup =
setupMainConsumer(topologyMetadata, config, clientSupplier, processId, log,
threadId, consumerConfigs);
+ final MainConsumerSetup mainConsumerSetup =
setupMainConsumer(topologyMetadata, config, clientSupplier, processId,
consumerConfigs);
taskManager.setMainConsumer(mainConsumerSetup.mainConsumer);
referenceContainer.mainConsumer = mainConsumerSetup.mainConsumer;
@@ -537,8 +537,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
final StreamsConfig
config,
final
KafkaClientSupplier clientSupplier,
final UUID processId,
- final Logger log,
- final String threadId,
final Map<String,
Object> consumerConfigs) {
if
(config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name))
{
if (topologyMetadata.hasNamedTopologies()) {
@@ -1373,37 +1371,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
}
- private void initializeAndRestorePhase() {
- final java.util.function.Consumer<Set<TopicPartition>> offsetResetter
= partitions -> resetOffsets(partitions, null);
- final State stateSnapshot = state;
- // only try to initialize the assigned tasks
- // if the state is still in PARTITION_ASSIGNED after the poll call
- if (stateSnapshot == State.PARTITIONS_ASSIGNED
- || stateSnapshot == State.RUNNING &&
taskManager.needsInitializationOrRestoration()) {
-
- log.debug("State is {}; initializing tasks if necessary",
stateSnapshot);
-
- if (taskManager.tryToCompleteRestoration(now, offsetResetter)) {
- log.info("Restoration took {} ms for all active tasks {}",
time.milliseconds() - lastPartitionAssignedMs,
- taskManager.activeTaskIds());
- setState(State.RUNNING);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Initialization call done. State is {}", state);
- }
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Idempotently invoking restoration logic in state {}",
state);
- }
- // we can always let changelog reader try restoring in order to
initialize the changelogs;
- // if there's no active restoring or standby updating it would not try
to fetch any data
- // After KAFKA-13873, we only restore the not paused tasks.
- changelogReader.restore(taskManager.notPausedTasks());
- log.debug("Idempotent restore call done. Thread state has not
changed.");
- }
-
private void checkStateUpdater() {
final java.util.function.Consumer<Set<TopicPartition>> offsetResetter
= partitions -> resetOffsets(partitions, null);
final State stateSnapshot = state;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 205f67374f7..5842a4067fc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -58,7 +58,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -844,77 +843,6 @@ public class TaskManager {
return activeTaskCreator.createActiveTaskFromStandby(standbyTask,
partitions, mainConsumer);
}
- /**
- * Tries to initialize any new or still-uninitialized tasks, then checks
if they can/have completed restoration.
- *
- * @throws IllegalStateException If store gets registered after
initialized is already finished
- * @throws StreamsException if the store's change log does not contain the
partition
- * @return {@code true} if all tasks are fully restored
- */
- boolean tryToCompleteRestoration(final long now,
- final
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
- boolean allRunning = true;
-
- // transit to restore active is idempotent so we can call it multiple
times
- changelogReader.enforceRestoreActive();
-
- final List<Task> activeTasks = new LinkedList<>();
- for (final Task task : tasks.allTasks()) {
- try {
- task.initializeIfNeeded();
- task.clearTaskTimeout();
- } catch (final LockException lockException) {
- // it is possible that if there are multiple threads within
the instance that one thread
- // trying to grab the task from the other, while the other has
not released the lock since
- // it did not participate in the rebalance. In this case we
can just retry in the next iteration
- log.debug("Could not initialize task {} since: {}; will
retry", task.id(), lockException.getMessage());
- allRunning = false;
- } catch (final TimeoutException timeoutException) {
- task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
- allRunning = false;
- }
-
- if (task.isActive()) {
- activeTasks.add(task);
- }
- }
-
- if (allRunning && !activeTasks.isEmpty()) {
-
- final Set<TopicPartition> restored =
changelogReader.completedChangelogs();
-
- for (final Task task : activeTasks) {
- if (restored.containsAll(task.changelogPartitions())) {
- try {
- task.completeRestoration(offsetResetter);
- task.clearTaskTimeout();
- } catch (final TimeoutException timeoutException) {
- task.maybeInitTaskTimeoutOrThrow(now,
timeoutException);
- log.debug(
- String.format(
- "Could not complete restoration for %s due to
the following exception; will retry",
- task.id()),
- timeoutException
- );
-
- allRunning = false;
- }
- } else {
- // we found a restoring task that isn't done restoring,
which is evidence that
- // not all tasks are running
- allRunning = false;
- }
- }
- }
- if (allRunning) {
- // we can call resume multiple times since it is idempotent.
- mainConsumer.resume(mainConsumer.assignment());
- changelogReader.transitToUpdateStandby();
- }
-
- return allRunning;
- }
-
public boolean checkStateUpdater(final long now,
final
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
addTasksToStateUpdater();
@@ -2167,15 +2095,6 @@ public class TaskManager {
e -> log.debug("Ignoring error in unclean {}", name));
}
- boolean needsInitializationOrRestoration() {
- return
activeTaskStream().anyMatch(Task::needsInitializationOrRestoration);
- }
-
- // for testing only
- void addTask(final Task task) {
- tasks.addTask(task);
- }
-
private boolean canTryInitializeTask(final TaskId taskId, final long
nowMs) {
return !taskIdToBackoffRecord.containsKey(taskId) ||
taskIdToBackoffRecord.get(taskId).canAttempt(nowMs);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 9ddff25ff94..bc9449cebce 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -68,8 +68,6 @@ public class ActiveTaskCreatorTest {
private InternalTopologyBuilder builder;
@Mock
private StateDirectory stateDirectory;
- @Mock
- private ChangelogReader changeLogReader;
private final MockClientSupplier mockClientSupplier = new
MockClientSupplier();
private final StreamsMetricsImpl streamsMetrics = new
StreamsMetricsImpl(new Metrics(), "clientId", new MockTime());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 152b00918c1..947c1eedc18 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -247,7 +247,6 @@ public class StreamThreadTest {
// task0 is unused
private final TaskId task1 = new TaskId(0, 1);
private final TaskId task2 = new TaskId(0, 2);
- private final TaskId task3 = new TaskId(1, 1);
private Properties configProps(final boolean enableEoS, final boolean
processingThreadsEnabled) {
return mkProperties(mkMap(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 3dff3e9ef23..798d2d3406b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -191,8 +191,6 @@ public class TaskManagerTest {
private StandbyTaskCreator standbyTaskCreator;
@Mock
private Admin adminClient;
- @Mock
- private ProcessorStateManager stateManager;
final StateUpdater stateUpdater = mock(StateUpdater.class);
final DefaultTaskManager schedulingTaskManager =
mock(DefaultTaskManager.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 34d8d279778..8a8e892df00 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -178,9 +177,7 @@ public class StreamThreadStateStoreProviderTest {
taskOne = createStreamsTask(
streamsConfig,
mockConsumer,
- mockRestoreConsumer,
mockProducer,
- mockAdminClient,
processorTopology,
new TaskId(0, 0));
taskOne.initializeIfNeeded();
@@ -189,9 +186,7 @@ public class StreamThreadStateStoreProviderTest {
final StreamTask taskTwo = createStreamsTask(
streamsConfig,
mockConsumer,
- mockRestoreConsumer,
mockProducer,
- mockAdminClient,
processorTopology,
new TaskId(0, 1));
taskTwo.initializeIfNeeded();
@@ -420,9 +415,7 @@ public class StreamThreadStateStoreProviderTest {
private StreamTask createStreamsTask(final StreamsConfig streamsConfig,
final Consumer<byte[], byte[]>
consumer,
- final Consumer<byte[], byte[]>
restoreConsumer,
final Producer<byte[], byte[]>
producer,
- final Admin adminClient,
final ProcessorTopology topology,
final TaskId taskId) {
final Metrics metrics = new Metrics();