This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new ebd46cc5a1c MINOR: Revert create local state Standbys on start 
(#16922)" (#20981)
ebd46cc5a1c is described below

commit ebd46cc5a1c37bbf8406392c3511389ccd470c21
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Nov 24 18:54:49 2025 -0500

    MINOR: Revert create local state Standbys on start (#16922)" (#20981)
    
    This reverts commit 571f5081
    (https://github.com/apache/kafka/pull/16922) of an incomplete feature.
    
    PR https://github.com/apache/kafka/pull/16922 is part of
    
    
[KIP-1035](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets).
    In particular,  on starting a Kafka Streams instance, if it has
    pre-existing state, the  state stores are initialized on the main
    thread. Part of this  initialization registers the stateful metrics with
    the JMX thread-id tag  of main. This breaks the KIP-1076 implementation
    where need to  register metrics with thread-id tags of
    xxxStreamThread-N. This is  necessary due to the fact that the
    StreamsMetric is a singleton shared  by all StreamThread instances, so
    we need to make sure only add  metrics for the current StreamThread
    otherwise duplicate metrics are  registered. This PR reverts the changes
    until a fix is implemented,  allowing the individual StreamThreads to
    register the metrics.
    
    Reviewers: Matthias Sax<[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../org/apache/kafka/streams/KafkaStreams.java     |   9 +-
 .../processor/internals/ProcessorStateManager.java |  43 +-----
 .../processor/internals/StateDirectory.java        | 126 +---------------
 .../streams/processor/internals/TaskManager.java   |  63 --------
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  38 -----
 .../processor/internals/StateDirectoryTest.java    | 161 +--------------------
 .../processor/internals/StreamThreadTest.java      |   3 +-
 .../processor/internals/TaskManagerTest.java       |  78 ----------
 9 files changed, 16 insertions(+), 507 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 40b50d2b5ad..b37981b1b01 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -195,7 +195,7 @@
 
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
-              
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask|TaskManager).java"/>
+              
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
 
     <suppress checks="MethodLength"
               files="KTableImpl.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index e37634fe8dd..ae8131b4f5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -183,7 +183,6 @@ public class KafkaStreams implements AutoCloseable {
     protected final TopologyMetadata topologyMetadata;
     private final QueryableStoreProvider queryableStoreProvider;
     private final DelegatingStandbyUpdateListener 
delegatingStandbyUpdateListener;
-    private final LogContext logContext;
 
     GlobalStreamThread globalStreamThread;
     protected StateDirectory stateDirectory = null;
@@ -636,9 +635,6 @@ public class KafkaStreams implements AutoCloseable {
                 return;
             }
 
-            // all (alive) threads have received their assignment, close any 
remaining startup tasks, they're not needed
-            stateDirectory.closeStartupTasks();
-
             setState(State.RUNNING);
         }
 
@@ -961,7 +957,7 @@ public class KafkaStreams implements AutoCloseable {
         } else {
             clientId = userClientId;
         }
-        logContext = new LogContext(String.format("stream-client [%s] ", 
clientId));
+        final LogContext logContext = new 
LogContext(String.format("stream-client [%s] ", clientId));
         this.log = logContext.logger(getClass());
         topologyMetadata.setLog(logContext);
 
@@ -1374,9 +1370,6 @@ public class KafkaStreams implements AutoCloseable {
      */
     public synchronized void start() throws IllegalStateException, 
StreamsException {
         if (setState(State.REBALANCING)) {
-            log.debug("Initializing STANDBY tasks for existing local state");
-            stateDirectory.initializeStartupTasks(topologyMetadata, 
streamsMetrics, logContext);
-
             log.debug("Starting Streams client");
 
             if (globalStreamThread != null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3506845d288..e708c317677 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -166,11 +166,11 @@ public class ProcessorStateManager implements 
StateManager {
 
     private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
 
-    private String logPrefix;
+    private final String logPrefix;
 
     private final TaskId taskId;
     private final boolean eosEnabled;
-    private ChangelogRegister changelogReader;
+    private final ChangelogRegister changelogReader;
     private final Collection<TopicPartition> sourcePartitions;
     private final Map<String, String> storeToChangelogTopic;
 
@@ -222,39 +222,6 @@ public class ProcessorStateManager implements StateManager 
{
         log.debug("Created state store manager for task {}", taskId);
     }
 
-    /**
-     * Special constructor used by {@link StateDirectory} to partially 
initialize startup tasks for local state, before
-     * they're assigned to a thread. When the task is assigned to a thread, 
the initialization of this StateManager is
-     * completed in {@link #assignToStreamThread(LogContext, 
ChangelogRegister, Collection)}.
-     */
-    static ProcessorStateManager createStartupTaskStateManager(final TaskId 
taskId,
-                                                               final boolean 
eosEnabled,
-                                                               final 
LogContext logContext,
-                                                               final 
StateDirectory stateDirectory,
-                                                               final 
Map<String, String> storeToChangelogTopic,
-                                                               final 
Set<TopicPartition> sourcePartitions,
-                                                               final boolean 
stateUpdaterEnabled) {
-        return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, 
logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, 
stateUpdaterEnabled);
-    }
-
-    /**
-     * Standby tasks initialized for local state on-startup are only partially 
initialized, because they are not yet
-     * assigned to a StreamThread. Once assigned to a StreamThread, we 
complete their initialization here using the
-     * assigned StreamThread's context.
-     */
-    void assignToStreamThread(final LogContext logContext,
-                              final ChangelogRegister changelogReader,
-                              final Collection<TopicPartition> 
sourcePartitions) {
-        if (this.changelogReader != null) {
-            throw new IllegalStateException("Attempted to replace an existing 
changelogReader on a StateManager without closing it.");
-        }
-        this.sourcePartitions.clear();
-        this.log = logContext.logger(ProcessorStateManager.class);
-        this.logPrefix = logContext.logPrefix();
-        this.changelogReader = changelogReader;
-        this.sourcePartitions.addAll(sourcePartitions);
-    }
-
     void registerStateStores(final List<StateStore> allStores, final 
InternalProcessorContext<?, ?> processorContext) {
         processorContext.uninitialize();
         for (final StateStore store : allStores) {
@@ -347,7 +314,7 @@ public class ProcessorStateManager implements StateManager {
     }
 
     private void maybeRegisterStoreWithChangelogReader(final String storeName) 
{
-        if (isLoggingEnabled(storeName) && changelogReader != null) {
+        if (isLoggingEnabled(storeName)) {
             changelogReader.register(getStorePartition(storeName), this);
         }
     }
@@ -616,7 +583,7 @@ public class ProcessorStateManager implements StateManager {
     public void close() throws ProcessorStateException {
         log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
 
-        if (!stateUpdaterEnabled && changelogReader != null) {
+        if (!stateUpdaterEnabled) {
             changelogReader.unregister(getAllChangelogTopicPartitions());
         }
 
@@ -664,7 +631,7 @@ public class ProcessorStateManager implements StateManager {
     void recycle() {
         log.debug("Recycling state for {} task {}.", taskType, taskId);
 
-        if (!stateUpdaterEnabled && changelogReader != null) {
+        if (!stateUpdaterEnabled) {
             final List<TopicPartition> allChangelogs = 
getAllChangelogTopicPartitions();
             changelogReader.unregister(allChangelogs);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 8ea2d3ae65a..3504edc8e7a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -16,18 +16,12 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskCorruptedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -48,19 +42,15 @@ import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.PosixFilePermission;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -106,13 +96,12 @@ public class StateDirectory implements AutoCloseable {
     private final boolean hasPersistentStores;
     private final boolean hasNamedTopologies;
 
-    private final ConcurrentMap<TaskId, Thread> lockedTasksToOwner = new 
ConcurrentHashMap<>();
+    private final Map<TaskId, Thread> lockedTasksToOwner = new 
ConcurrentHashMap<>();
 
     private FileChannel stateDirLockChannel;
     private FileLock stateDirLock;
 
     private final StreamsConfig config;
-    private final ConcurrentMap<TaskId, Task> tasksForLocalState = new 
ConcurrentHashMap<>();
 
     /**
      * Ensures that the state base directory as well as the application's 
sub-directory are created.
@@ -206,109 +195,6 @@ public class StateDirectory implements AutoCloseable {
         return stateDirLock != null;
     }
 
-    public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
-                                       final StreamsMetricsImpl streamsMetrics,
-                                       final LogContext logContext) {
-        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
-        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
-            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
-            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
-            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
-
-            // discover all non-empty task directories in StateDirectory
-            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
-                final String dirName = taskDirectory.file().getName();
-                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
-                final ProcessorTopology subTopology = 
topologyMetadata.buildSubtopology(id);
-
-                // we still check if the task's sub-topology is stateful, even 
though we know its directory contains state,
-                // because it's possible that the topology has changed since 
that data was written, and is now stateless
-                // this therefore prevents us from creating unnecessary Tasks 
just because of some left-over state
-                if (subTopology.hasStateWithChangelogs()) {
-                    final Set<TopicPartition> inputPartitions = 
topologyMetadata.nodeToSourceTopics(id).values().stream()
-                            .flatMap(Collection::stream)
-                            .map(t -> new TopicPartition(t, id.partition()))
-                            .collect(Collectors.toSet());
-                    final ProcessorStateManager stateManager = 
ProcessorStateManager.createStartupTaskStateManager(
-                        id,
-                        eosEnabled,
-                        logContext,
-                        this,
-                        subTopology.storeToChangelogTopic(),
-                        inputPartitions,
-                        stateUpdaterEnabled
-                    );
-
-                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
-                        id,
-                        config,
-                        stateManager,
-                        streamsMetrics,
-                        dummyCache
-                    );
-
-                    final Task task = new StandbyTask(
-                        id,
-                        inputPartitions,
-                        subTopology,
-                        topologyMetadata.taskConfig(id),
-                        streamsMetrics,
-                        stateManager,
-                        this,
-                        dummyCache,
-                        context
-                    );
-
-                    try {
-                        task.initializeIfNeeded();
-
-                        tasksForLocalState.put(id, task);
-                    } catch (final TaskCorruptedException e) {
-                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
-                        task.suspend();
-                        task.closeDirty();
-                    }
-                }
-            }
-        }
-    }
-
-    public boolean hasStartupTasks() {
-        return !tasksForLocalState.isEmpty();
-    }
-
-    public Task removeStartupTask(final TaskId taskId) {
-        final Task task = tasksForLocalState.remove(taskId);
-        if (task != null) {
-            lockedTasksToOwner.replace(taskId, Thread.currentThread());
-        }
-        return task;
-    }
-
-    public void closeStartupTasks() {
-        closeStartupTasks(t -> true);
-    }
-
-    private void closeStartupTasks(final Predicate<Task> predicate) {
-        if (!tasksForLocalState.isEmpty()) {
-            // "drain" Tasks first to ensure that we don't try to close Tasks 
that another thread is attempting to close
-            final Set<Task> drainedTasks = new 
HashSet<>(tasksForLocalState.size());
-            for (final Map.Entry<TaskId, Task> entry : 
tasksForLocalState.entrySet()) {
-                if (predicate.test(entry.getValue()) && 
removeStartupTask(entry.getKey()) != null) {
-                    // only add to our list of drained Tasks if we exclusively 
"claimed" a Task from tasksForLocalState
-                    // to ensure we don't accidentally try to drain the same 
Task multiple times from concurrent threads
-                    drainedTasks.add(entry.getValue());
-                }
-            }
-
-            // now that we have exclusive ownership of the drained tasks, 
close them
-            for (final Task task : drainedTasks) {
-                task.suspend();
-                task.closeClean();
-            }
-        }
-    }
-
     public UUID initializeProcessId() {
         if (!hasPersistentStores) {
             final UUID processId = UUID.randomUUID();
@@ -505,17 +391,9 @@ public class StateDirectory implements AutoCloseable {
         }
     }
 
-    /**
-     * Expose for tests.
-     */
-    Thread lockOwner(final TaskId taskId) {
-        return lockedTasksToOwner.get(taskId);
-    }
-
     @Override
     public void close() {
         if (hasPersistentStores) {
-            closeStartupTasks();
             try {
                 stateDirLock.release();
                 stateDirLockChannel.close();
@@ -633,7 +511,6 @@ public class StateDirectory implements AutoCloseable {
         );
         if (namedTopologyDirs != null) {
             for (final File namedTopologyDir : namedTopologyDirs) {
-                closeStartupTasks(task -> 
task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName())));
                 final File[] contents = namedTopologyDir.listFiles();
                 if (contents != null && contents.length == 0) {
                     try {
@@ -671,7 +548,6 @@ public class StateDirectory implements AutoCloseable {
             log.debug("Tried to clear out the local state for NamedTopology {} 
but none was found", topologyName);
         }
         try {
-            closeStartupTasks(task -> 
task.id().topologyName().equals(topologyName));
             Utils.delete(namedTopologyDir);
         } catch (final IOException e) {
             log.error("Hit an unexpected error while clearing local state for 
topology " + topologyName, e);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c1b1c06379e..064662ee01f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -334,31 +334,6 @@ public class TaskManager {
         }
     }
 
-    private Map<Task, Set<TopicPartition>> assignStartupTasks(final 
Map<TaskId, Set<TopicPartition>> tasksToAssign,
-                                                              final String 
threadLogPrefix,
-                                                              final 
TopologyMetadata topologyMetadata,
-                                                              final 
ChangelogRegister changelogReader) {
-        if (stateDirectory.hasStartupTasks()) {
-            final Map<Task, Set<TopicPartition>> assignedTasks = new 
HashMap<>(tasksToAssign.size());
-            for (final Map.Entry<TaskId, Set<TopicPartition>> entry : 
tasksToAssign.entrySet()) {
-                final TaskId taskId = entry.getKey();
-                final Task task = stateDirectory.removeStartupTask(taskId);
-                if (task != null) {
-                    // replace our dummy values with the real ones, now we 
know our thread and assignment
-                    final Set<TopicPartition> inputPartitions = 
entry.getValue();
-                    task.stateManager().assignToStreamThread(new 
LogContext(threadLogPrefix), changelogReader, inputPartitions);
-                    updateInputPartitionsOfStandbyTaskIfTheyChanged(task, 
inputPartitions);
-
-                    assignedTasks.put(task, inputPartitions);
-                }
-            }
-
-            return assignedTasks;
-        } else {
-            return Collections.emptyMap();
-        }
-    }
-
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      * @throws StreamsException fatal error while creating / initializing the 
task
@@ -488,15 +463,6 @@ public class TaskManager {
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
                                                 final Set<Task> 
tasksToCloseClean) {
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = 
assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = 
assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-
-        // recycle the startup standbys to active
-        tasks.addStandbyTasks(startupStandbyTasksToRecycle.keySet());
-
-        // use startup Standbys as real Standby tasks
-        tasks.addStandbyTasks(startupStandbyTasksToUse.keySet());
-
         for (final Task task : tasks.allTasks()) {
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
@@ -551,7 +517,6 @@ public class TaskManager {
                                              final Set<Task> tasksToCloseClean,
                                              final Map<TaskId, 
RuntimeException> failedTasks) {
         handleTasksPendingInitialization();
-        handleStartupTaskReuse(activeTasksToCreate, standbyTasksToCreate, 
failedTasks);
         handleRestoringAndUpdatingTasks(activeTasksToCreate, 
standbyTasksToCreate, failedTasks);
         handleRunningAndSuspendedTasks(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
     }
@@ -569,34 +534,6 @@ public class TaskManager {
         }
     }
 
-    private void handleStartupTaskReuse(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
-                                        final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
-                                        final Map<TaskId, RuntimeException> 
failedTasks) {
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = 
assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = 
assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-
-        // recycle the startup standbys to active, and remove them from the 
set of actives that need to be created
-        if (!startupStandbyTasksToRecycle.isEmpty()) {
-            final Set<Task> tasksToCloseDirty = new HashSet<>();
-            for (final Map.Entry<Task, Set<TopicPartition>> entry : 
startupStandbyTasksToRecycle.entrySet()) {
-                final Task task = entry.getKey();
-                recycleTaskFromStateUpdater(task, entry.getValue(), 
tasksToCloseDirty, failedTasks);
-                activeTasksToCreate.remove(task.id());
-            }
-
-            // if any standby tasks failed to recycle, close them dirty
-            tasksToCloseDirty.forEach(task ->
-                closeTaskDirty(task, false)
-            );
-        }
-
-        // use startup Standbys as real Standby tasks
-        if (!startupStandbyTasksToUse.isEmpty()) {
-            tasks.addPendingTasksToInit(startupStandbyTasksToUse.keySet());
-            startupStandbyTasksToUse.keySet().forEach(task -> 
standbyTasksToCreate.remove(task.id()));
-        }
-    }
-
     private void handleRunningAndSuspendedTasks(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 14d4cf8c21f..847be900366 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -405,44 +405,6 @@ public class KafkaStreamsTest {
         }
     }
 
-    @Test
-    public void shouldInitializeTasksForLocalStateOnStart() {
-        prepareStreams();
-        prepareStreamThread(streamThreadOne, 1);
-        prepareStreamThread(streamThreadTwo, 2);
-
-        try (final MockedConstruction<StateDirectory> constructed = 
mockConstruction(StateDirectory.class,
-                (mock, context) -> 
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
-            try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
-                assertEquals(1, constructed.constructed().size());
-                final StateDirectory stateDirectory = 
constructed.constructed().get(0);
-                verify(stateDirectory, times(0)).initializeStartupTasks(any(), 
any(), any());
-                streams.start();
-                verify(stateDirectory, times(1)).initializeStartupTasks(any(), 
any(), any());
-            }
-        }
-    }
-
-    @Test
-    public void shouldCloseStartupTasksAfterFirstRebalance() throws Exception {
-        prepareStreams();
-        final AtomicReference<StreamThread.State> state1 = 
prepareStreamThread(streamThreadOne, 1);
-        final AtomicReference<StreamThread.State> state2 = 
prepareStreamThread(streamThreadTwo, 2);
-        prepareThreadState(streamThreadOne, state1);
-        prepareThreadState(streamThreadTwo, state2);
-        try (final MockedConstruction<StateDirectory> constructed = 
mockConstruction(StateDirectory.class,
-            (mock, context) -> 
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
-            try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
-                assertEquals(1, constructed.constructed().size());
-                final StateDirectory stateDirectory = 
constructed.constructed().get(0);
-                streams.setStateListener(streamsStateListener);
-                streams.start();
-                waitForCondition(() -> streams.state() == State.RUNNING, 
"Streams never started.");
-                verify(stateDirectory, times(1)).closeStartupTasks();
-            }
-        }
-    }
-
     @Test
     public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() 
throws Exception {
         prepareStreams();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 616c397d711..dadde62e92d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -17,20 +17,14 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogCaptureAppender;
-import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import 
org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.TestUtils;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -39,8 +33,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -80,7 +72,6 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.endsWith;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -97,7 +88,6 @@ public class StateDirectoryTest {
     private final MockTime time = new MockTime();
     private File stateDir;
     private final String applicationId = "applicationId";
-    private StreamsConfig config;
     private StateDirectory directory;
     private File appDir;
 
@@ -114,13 +104,14 @@ public class StateDirectoryTest {
         if (!createStateDirectory) {
             cleanup();
         }
-        config = new StreamsConfig(Map.of(
-                StreamsConfig.APPLICATION_ID_CONFIG, applicationId,
-                StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234",
-                StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath(),
+        directory = new StateDirectory(
+            new StreamsConfig(Map.of(
+                    StreamsConfig.APPLICATION_ID_CONFIG, applicationId,
+                    StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234",
+                    StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath(),
                 StreamsConfig.ALLOW_OS_GROUP_WRITE_ACCESS_CONFIG, 
allowOsGroupWriteAccess
-        ));
-        directory = new StateDirectory(config, time, createStateDirectory, 
hasNamedTopology);
+            )),
+            time, createStateDirectory, hasNamedTopology);
         appDir = new File(stateDir, applicationId);
     }
 
@@ -854,144 +845,6 @@ public class StateDirectoryTest {
         assertThat(directory.initializeProcessId(), equalTo(processId));
     }
 
-    @Test
-    public void shouldNotInitializeStandbyTasksWhenNoLocalState() {
-        final TaskId taskId = new TaskId(0, 0);
-        initializeStartupTasks(new TaskId(0, 0), false);
-        assertFalse(directory.hasStartupTasks());
-        assertNull(directory.removeStartupTask(taskId));
-        assertFalse(directory.hasStartupTasks());
-    }
-
-    @Test
-    public void shouldInitializeStandbyTasksForLocalState() {
-        final TaskId taskId = new TaskId(0, 0);
-        initializeStartupTasks(new TaskId(0, 0), true);
-        assertTrue(directory.hasStartupTasks());
-        assertNotNull(directory.removeStartupTask(taskId));
-        assertFalse(directory.hasStartupTasks());
-        assertNull(directory.removeStartupTask(taskId));
-    }
-
-    @Test
-    public void shouldNotAssignStartupTasksWeDontHave() {
-        final TaskId taskId = new TaskId(0, 0);
-        initializeStartupTasks(taskId, false);
-        final Task task = directory.removeStartupTask(taskId);
-        assertNull(task);
-    }
-
-    private class FakeStreamThread extends Thread {
-        private final TaskId taskId;
-        private final AtomicReference<Task> result;
-
-        private FakeStreamThread(final TaskId taskId, final 
AtomicReference<Task> result) {
-            this.taskId = taskId;
-            this.result = result;
-        }
-
-        @Override
-        public void run() {
-            result.set(directory.removeStartupTask(taskId));
-        }
-    }
-
-    @Test
-    public void shouldAssignStartupTaskToStreamThread() throws 
InterruptedException {
-        final TaskId taskId = new TaskId(0, 0);
-
-        initializeStartupTasks(taskId, true);
-
-        // main thread owns the newly initialized tasks
-        assertThat(directory.lockOwner(taskId), is(Thread.currentThread()));
-
-        // spawn off a "fake" StreamThread, so we can verify the lock was 
updated to the correct thread
-        final AtomicReference<Task> result = new AtomicReference<>();
-        final Thread streamThread = new FakeStreamThread(taskId, result);
-        streamThread.start();
-        streamThread.join();
-        final Task task = result.get();
-
-        assertNotNull(task);
-        assertThat(task, instanceOf(StandbyTask.class));
-
-        // verify the owner of the task directory lock has been shifted over 
to our assigned StreamThread
-        assertThat(directory.lockOwner(taskId), 
is(instanceOf(FakeStreamThread.class)));
-    }
-
-    @Test
-    public void shouldUnlockStartupTasksOnClose() {
-        final TaskId taskId = new TaskId(0, 0);
-        initializeStartupTasks(taskId, true);
-
-        assertEquals(Thread.currentThread(), directory.lockOwner(taskId));
-        directory.closeStartupTasks();
-        assertNull(directory.lockOwner(taskId));
-    }
-
-    @Test
-    public void shouldCloseStartupTasksOnDirectoryClose() {
-        final StateStore store = initializeStartupTasks(new TaskId(0, 0), 
true);
-
-        assertTrue(directory.hasStartupTasks());
-        assertTrue(store.isOpen());
-
-        directory.close();
-
-        assertFalse(directory.hasStartupTasks());
-        assertFalse(store.isOpen());
-    }
-
-    @Test
-    public void shouldNotCloseStartupTasksOnAutoCleanUp() {
-        // we need to set this because the auto-cleanup uses the last-modified 
time from the filesystem,
-        // which can't be mocked
-        time.setCurrentTimeMs(System.currentTimeMillis());
-
-        final StateStore store = initializeStartupTasks(new TaskId(0, 0), 
true);
-
-        assertTrue(directory.hasStartupTasks());
-        assertTrue(store.isOpen());
-
-        time.sleep(10000);
-
-        directory.cleanRemovedTasks(1000);
-
-        assertTrue(directory.hasStartupTasks());
-        assertTrue(store.isOpen());
-    }
-
-    private StateStore initializeStartupTasks(final TaskId taskId, final 
boolean createTaskDir) {
-        directory.initializeProcessId();
-        final TopologyMetadata metadata = Mockito.mock(TopologyMetadata.class);
-        final TopologyConfig topologyConfig = new TopologyConfig(config);
-
-        final StateStore store = new MockKeyValueStore("test", true);
-
-        if (createTaskDir) {
-            final File taskDir = directory.getOrCreateDirectoryForTask(taskId);
-            final File storeDir = new File(taskDir, store.name());
-            storeDir.mkdir();
-        }
-
-        final ProcessorTopology processorTopology = new ProcessorTopology(
-                Collections.emptyList(),
-                Collections.emptyMap(),
-                Collections.emptyMap(),
-                Collections.singletonList(store),
-                Collections.emptyList(),
-                Collections.singletonMap(store.name(), store.name() + 
"-changelog"),
-                Collections.emptySet(),
-                Collections.emptyMap()
-        );
-        
Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology);
-        
Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig());
-
-        directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new 
Metrics(), "test", time), new LogContext("test"));
-
-        return store;
-    }
-
     private static class FutureStateDirectoryProcessFile {
 
         @JsonProperty
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 63783b59aca..f687f1be2a0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1196,7 +1196,6 @@ public class StreamThreadTest {
             new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        stateDirectory = new StateDirectory(config, mockTime, true, false);
 
         final TaskManager taskManager = new TaskManager(
             new MockTime(),
@@ -1208,7 +1207,7 @@ public class StreamThreadTest {
             new Tasks(new LogContext()),
             topologyMetadata,
             null,
-            stateDirectory,
+            null,
             stateUpdater,
             schedulingTaskManager
         ) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 3e87eebe733..4f13c41ba31 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -4817,84 +4817,6 @@ public class TaskManagerTest {
         assertEquals(0, taskManager.notPausedTasks().size());
     }
 
-    @Test
-    public void shouldRecycleStartupTasksFromStateDirectoryAsActive() {
-        final Tasks taskRegistry = new Tasks(new LogContext());
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
-        final StandbyTask startupTask = standbyTask(taskId00, 
taskId00ChangelogPartitions).build();
-
-        final StreamTask activeTask = statefulTask(taskId00, 
taskId00ChangelogPartitions).build();
-        when(activeTaskCreator.createActiveTaskFromStandby(eq(startupTask), 
eq(taskId00Partitions), any()))
-                .thenReturn(activeTask);
-
-        when(stateDirectory.hasStartupTasks()).thenReturn(true, false);
-        
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) 
null);
-
-        taskManager.handleAssignment(taskId00Assignment, 
Collections.emptyMap());
-
-        // ensure we used our existing startup Task directly as a Standby
-        assertTrue(taskRegistry.hasPendingTasksToInit());
-        assertEquals(Collections.singleton(activeTask), 
taskRegistry.drainPendingTasksToInit());
-
-        // we're using a mock StateUpdater here, so now that we've drained the 
task from the queue of startup tasks to init
-        // let's "add" it to our mock StateUpdater
-        
when(stateUpdater.tasks()).thenReturn(Collections.singleton(activeTask));
-        when(stateUpdater.standbyTasks()).thenReturn(Collections.emptySet());
-
-        // ensure we recycled our existing startup Standby into an Active task
-        verify(activeTaskCreator).createActiveTaskFromStandby(eq(startupTask), 
eq(taskId00Partitions), any());
-
-        // ensure we didn't construct any new Tasks
-        verify(activeTaskCreator).createTasks(any(), 
eq(Collections.emptyMap()));
-        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
-        verifyNoMoreInteractions(activeTaskCreator);
-        verifyNoMoreInteractions(standbyTaskCreator);
-
-        // verify the recycled task is now being used as an assigned Active
-        assertEquals(Collections.singletonMap(taskId00, activeTask), 
taskManager.activeTaskMap());
-        assertEquals(Collections.emptyMap(), taskManager.standbyTaskMap());
-    }
-
-    @Test
-    public void shouldUseStartupTasksFromStateDirectoryAsStandby() {
-        final Tasks taskRegistry = new Tasks(new LogContext());
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
-        final StandbyTask startupTask = standbyTask(taskId00, 
taskId00ChangelogPartitions).build();
-
-        when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
-        
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) 
null);
-
-        assertFalse(taskRegistry.hasPendingTasksToInit());
-
-        taskManager.handleAssignment(Collections.emptyMap(), 
taskId00Assignment);
-
-        // ensure we used our existing startup Task directly as a Standby
-        assertTrue(taskRegistry.hasPendingTasksToInit());
-        assertEquals(Collections.singleton(startupTask), 
taskRegistry.drainPendingTasksToInit());
-
-        // we're using a mock StateUpdater here, so now that we've drained the 
task from the queue of startup tasks to init
-        // let's "add" it to our mock StateUpdater
-        
when(stateUpdater.tasks()).thenReturn(Collections.singleton(startupTask));
-        
when(stateUpdater.standbyTasks()).thenReturn(Collections.singleton(startupTask));
-
-        // ensure we didn't construct any new Tasks, or recycle an existing 
Task; we only used the one we already have
-        verify(activeTaskCreator).createTasks(any(), 
eq(Collections.emptyMap()));
-        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
-        verifyNoMoreInteractions(activeTaskCreator);
-        verifyNoMoreInteractions(standbyTaskCreator);
-
-        // verify the startup Standby is now being used as an assigned Standby
-        assertEquals(Collections.emptyMap(), taskManager.activeTaskMap());
-        assertEquals(Collections.singletonMap(taskId00, startupTask), 
taskManager.standbyTaskMap());
-    }
-
-    @Test
-    public void shouldStartStateUpdaterOnInit() {
-        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null);
-        taskManager.init();
-        verify(stateUpdater).start();
-    }
-
     private static KafkaFutureImpl<DeletedRecords> completedFuture() {
         final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new 
KafkaFutureImpl<>();
         futureDeletedRecords.complete(null);


Reply via email to