This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 0186534a992 Revert "KAFKA-17411: Create local state Standbys on start 
(#16922)" and "KAFKA-17978: Fix invalid topology on Task assignment (#17778)"
0186534a992 is described below

commit 0186534a992a123a7f53dd32860c6ba5787dbb18
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Mon Jan 13 10:51:03 2025 -0800

    Revert "KAFKA-17411: Create local state Standbys on start (#16922)" and 
"KAFKA-17978: Fix invalid topology on Task assignment (#17778)"
    
    This reverts commit 571f50817c0c3e81a8f767396e485bc23a0731ba.
    This reverts commit a696b4d6f4917ccd942a7ba21ef660f351e47b01.
---
 checkstyle/suppressions.xml                        |   2 +-
 .../org/apache/kafka/streams/KafkaStreams.java     |   9 +-
 .../processor/internals/ProcessorStateManager.java |  43 +-----
 .../processor/internals/StateDirectory.java        | 129 ----------------
 .../streams/processor/internals/TaskManager.java   |  63 --------
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  38 -----
 .../processor/internals/StateDirectoryTest.java    | 170 ++-------------------
 .../processor/internals/StreamThreadTest.java      |   3 +-
 .../processor/internals/TaskManagerTest.java       | 120 ---------------
 9 files changed, 18 insertions(+), 559 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2d05fac1e70..460e14871c8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -189,7 +189,7 @@
 
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
-              
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask|TaskManager).java"/>
+              
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
 
     <suppress checks="MethodLength"
               files="KTableImpl.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 79e6af29be7..00e9ede261f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -184,7 +184,6 @@ public class KafkaStreams implements AutoCloseable {
     protected final TopologyMetadata topologyMetadata;
     private final QueryableStoreProvider queryableStoreProvider;
     private final DelegatingStandbyUpdateListener 
delegatingStandbyUpdateListener;
-    private final LogContext logContext;
 
     GlobalStreamThread globalStreamThread;
     protected StateDirectory stateDirectory = null;
@@ -636,9 +635,6 @@ public class KafkaStreams implements AutoCloseable {
                 return;
             }
 
-            // all (alive) threads have received their assignment, close any 
remaining startup tasks, they're not needed
-            stateDirectory.closeStartupTasks();
-
             setState(State.RUNNING);
         }
 
@@ -961,7 +957,7 @@ public class KafkaStreams implements AutoCloseable {
         } else {
             clientId = userClientId;
         }
-        logContext = new LogContext(String.format("stream-client [%s] ", 
clientId));
+        final LogContext logContext = new 
LogContext(String.format("stream-client [%s] ", clientId));
         this.log = logContext.logger(getClass());
         topologyMetadata.setLog(logContext);
 
@@ -1396,9 +1392,6 @@ public class KafkaStreams implements AutoCloseable {
      */
     public synchronized void start() throws IllegalStateException, 
StreamsException {
         if (setState(State.REBALANCING)) {
-            log.debug("Initializing STANDBY tasks for existing local state");
-            stateDirectory.initializeStartupTasks(topologyMetadata, 
streamsMetrics, logContext);
-
             log.debug("Starting Streams client");
 
             if (globalStreamThread != null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3506845d288..e708c317677 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -166,11 +166,11 @@ public class ProcessorStateManager implements 
StateManager {
 
     private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
 
-    private String logPrefix;
+    private final String logPrefix;
 
     private final TaskId taskId;
     private final boolean eosEnabled;
-    private ChangelogRegister changelogReader;
+    private final ChangelogRegister changelogReader;
     private final Collection<TopicPartition> sourcePartitions;
     private final Map<String, String> storeToChangelogTopic;
 
@@ -222,39 +222,6 @@ public class ProcessorStateManager implements StateManager 
{
         log.debug("Created state store manager for task {}", taskId);
     }
 
-    /**
-     * Special constructor used by {@link StateDirectory} to partially 
initialize startup tasks for local state, before
-     * they're assigned to a thread. When the task is assigned to a thread, 
the initialization of this StateManager is
-     * completed in {@link #assignToStreamThread(LogContext, 
ChangelogRegister, Collection)}.
-     */
-    static ProcessorStateManager createStartupTaskStateManager(final TaskId 
taskId,
-                                                               final boolean 
eosEnabled,
-                                                               final 
LogContext logContext,
-                                                               final 
StateDirectory stateDirectory,
-                                                               final 
Map<String, String> storeToChangelogTopic,
-                                                               final 
Set<TopicPartition> sourcePartitions,
-                                                               final boolean 
stateUpdaterEnabled) {
-        return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, 
logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, 
stateUpdaterEnabled);
-    }
-
-    /**
-     * Standby tasks initialized for local state on-startup are only partially 
initialized, because they are not yet
-     * assigned to a StreamThread. Once assigned to a StreamThread, we 
complete their initialization here using the
-     * assigned StreamThread's context.
-     */
-    void assignToStreamThread(final LogContext logContext,
-                              final ChangelogRegister changelogReader,
-                              final Collection<TopicPartition> 
sourcePartitions) {
-        if (this.changelogReader != null) {
-            throw new IllegalStateException("Attempted to replace an existing 
changelogReader on a StateManager without closing it.");
-        }
-        this.sourcePartitions.clear();
-        this.log = logContext.logger(ProcessorStateManager.class);
-        this.logPrefix = logContext.logPrefix();
-        this.changelogReader = changelogReader;
-        this.sourcePartitions.addAll(sourcePartitions);
-    }
-
     void registerStateStores(final List<StateStore> allStores, final 
InternalProcessorContext<?, ?> processorContext) {
         processorContext.uninitialize();
         for (final StateStore store : allStores) {
@@ -347,7 +314,7 @@ public class ProcessorStateManager implements StateManager {
     }
 
     private void maybeRegisterStoreWithChangelogReader(final String storeName) 
{
-        if (isLoggingEnabled(storeName) && changelogReader != null) {
+        if (isLoggingEnabled(storeName)) {
             changelogReader.register(getStorePartition(storeName), this);
         }
     }
@@ -616,7 +583,7 @@ public class ProcessorStateManager implements StateManager {
     public void close() throws ProcessorStateException {
         log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
 
-        if (!stateUpdaterEnabled && changelogReader != null) {
+        if (!stateUpdaterEnabled) {
             changelogReader.unregister(getAllChangelogTopicPartitions());
         }
 
@@ -664,7 +631,7 @@ public class ProcessorStateManager implements StateManager {
     void recycle() {
         log.debug("Recycling state for {} task {}.", taskType, taskId);
 
-        if (!stateUpdaterEnabled && changelogReader != null) {
+        if (!stateUpdaterEnabled) {
             final List<TopicPartition> allChangelogs = 
getAllChangelogTopicPartitions();
             changelogReader.unregister(allChangelogs);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 04a62bad1bf..f892793168d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -16,18 +16,12 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskCorruptedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -49,19 +43,13 @@ import java.nio.file.attribute.PosixFilePermission;
 import java.nio.file.attribute.PosixFilePermissions;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -112,9 +100,6 @@ public class StateDirectory implements AutoCloseable {
     private FileChannel stateDirLockChannel;
     private FileLock stateDirLock;
 
-    private final StreamsConfig config;
-    private final ConcurrentMap<TaskId, Task> tasksForLocalState = new 
ConcurrentHashMap<>();
-
     /**
      * Ensures that the state base directory as well as the application's 
sub-directory are created.
      *
@@ -133,7 +118,6 @@ public class StateDirectory implements AutoCloseable {
         this.hasPersistentStores = hasPersistentStores;
         this.hasNamedTopologies = hasNamedTopologies;
         this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-        this.config = config;
         final String stateDirName = 
config.getString(StreamsConfig.STATE_DIR_CONFIG);
         final File baseDir = new File(stateDirName);
         stateDir = new File(baseDir, appId);
@@ -198,109 +182,6 @@ public class StateDirectory implements AutoCloseable {
         return stateDirLock != null;
     }
 
-    public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
-                                       final StreamsMetricsImpl streamsMetrics,
-                                       final LogContext logContext) {
-        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
-        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
-            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
-            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
-            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
-
-            // discover all non-empty task directories in StateDirectory
-            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
-                final String dirName = taskDirectory.file().getName();
-                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
-                final ProcessorTopology subTopology = 
topologyMetadata.buildSubtopology(id);
-
-                // we still check if the task's sub-topology is stateful, even 
though we know its directory contains state,
-                // because it's possible that the topology has changed since 
that data was written, and is now stateless
-                // this therefore prevents us from creating unnecessary Tasks 
just because of some left-over state
-                if (subTopology.hasStateWithChangelogs()) {
-                    final Set<TopicPartition> inputPartitions = 
topologyMetadata.nodeToSourceTopics(id).values().stream()
-                            .flatMap(Collection::stream)
-                            .map(t -> new TopicPartition(t, id.partition()))
-                            .collect(Collectors.toSet());
-                    final ProcessorStateManager stateManager = 
ProcessorStateManager.createStartupTaskStateManager(
-                        id,
-                        eosEnabled,
-                        logContext,
-                        this,
-                        subTopology.storeToChangelogTopic(),
-                        inputPartitions,
-                        stateUpdaterEnabled
-                    );
-
-                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
-                        id,
-                        config,
-                        stateManager,
-                        streamsMetrics,
-                        dummyCache
-                    );
-
-                    final Task task = new StandbyTask(
-                        id,
-                        inputPartitions,
-                        subTopology,
-                        topologyMetadata.taskConfig(id),
-                        streamsMetrics,
-                        stateManager,
-                        this,
-                        dummyCache,
-                        context
-                    );
-
-                    try {
-                        task.initializeIfNeeded();
-
-                        tasksForLocalState.put(id, task);
-                    } catch (final TaskCorruptedException e) {
-                        // Task is corrupt - wipe it out (under EOS) and don't 
initialize a Standby for it
-                        task.suspend();
-                        task.closeDirty();
-                    }
-                }
-            }
-        }
-    }
-
-    public boolean hasStartupTasks() {
-        return !tasksForLocalState.isEmpty();
-    }
-
-    public Task removeStartupTask(final TaskId taskId) {
-        final Task task = tasksForLocalState.remove(taskId);
-        if (task != null) {
-            lockedTasksToOwner.replace(taskId, Thread.currentThread());
-        }
-        return task;
-    }
-
-    public void closeStartupTasks() {
-        closeStartupTasks(t -> true);
-    }
-
-    private void closeStartupTasks(final Predicate<Task> predicate) {
-        if (!tasksForLocalState.isEmpty()) {
-            // "drain" Tasks first to ensure that we don't try to close Tasks 
that another thread is attempting to close
-            final Set<Task> drainedTasks = new 
HashSet<>(tasksForLocalState.size());
-            for (final Map.Entry<TaskId, Task> entry : 
tasksForLocalState.entrySet()) {
-                if (predicate.test(entry.getValue()) && 
tasksForLocalState.remove(entry.getKey()) != null) {
-                    // only add to our list of drained Tasks if we exclusively 
"claimed" a Task from tasksForLocalState
-                    // to ensure we don't accidentally try to drain the same 
Task multiple times from concurrent threads
-                    drainedTasks.add(entry.getValue());
-                }
-            }
-
-            // now that we have exclusive ownership of the drained tasks, 
close them
-            for (final Task task : drainedTasks) {
-                task.suspend();
-                task.closeClean();
-            }
-        }
-    }
-
     public UUID initializeProcessId() {
         if (!hasPersistentStores) {
             final UUID processId = UUID.randomUUID();
@@ -498,17 +379,9 @@ public class StateDirectory implements AutoCloseable {
         }
     }
 
-    /**
-     * Expose for tests.
-     */
-    Thread lockOwner(final TaskId taskId) {
-        return lockedTasksToOwner.get(taskId);
-    }
-
     @Override
     public void close() {
         if (hasPersistentStores) {
-            closeStartupTasks();
             try {
                 stateDirLock.release();
                 stateDirLockChannel.close();
@@ -626,7 +499,6 @@ public class StateDirectory implements AutoCloseable {
         );
         if (namedTopologyDirs != null) {
             for (final File namedTopologyDir : namedTopologyDirs) {
-                closeStartupTasks(task -> 
task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName())));
                 final File[] contents = namedTopologyDir.listFiles();
                 if (contents != null && contents.length == 0) {
                     try {
@@ -664,7 +536,6 @@ public class StateDirectory implements AutoCloseable {
             log.debug("Tried to clear out the local state for NamedTopology {} 
but none was found", topologyName);
         }
         try {
-            closeStartupTasks(task -> 
task.id().topologyName().equals(topologyName));
             Utils.delete(namedTopologyDir);
         } catch (final IOException e) {
             log.error("Hit an unexpected error while clearing local state for 
topology " + topologyName, e);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 037ff941105..e18cb2f94b4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -324,31 +324,6 @@ public class TaskManager {
         }
     }
 
-    private Map<Task, Set<TopicPartition>> assignStartupTasks(final 
Map<TaskId, Set<TopicPartition>> tasksToAssign,
-                                                              final String 
threadLogPrefix,
-                                                              final 
TopologyMetadata topologyMetadata,
-                                                              final 
ChangelogRegister changelogReader) {
-        if (stateDirectory.hasStartupTasks()) {
-            final Map<Task, Set<TopicPartition>> assignedTasks = new 
HashMap<>(tasksToAssign.size());
-            for (final Map.Entry<TaskId, Set<TopicPartition>> entry : 
tasksToAssign.entrySet()) {
-                final TaskId taskId = entry.getKey();
-                final Task task = stateDirectory.removeStartupTask(taskId);
-                if (task != null) {
-                    // replace our dummy values with the real ones, now we 
know our thread and assignment
-                    final Set<TopicPartition> inputPartitions = 
entry.getValue();
-                    task.stateManager().assignToStreamThread(new 
LogContext(threadLogPrefix), changelogReader, inputPartitions);
-                    updateInputPartitionsOfStandbyTaskIfTheyChanged(task, 
inputPartitions);
-
-                    assignedTasks.put(task, inputPartitions);
-                }
-            }
-
-            return assignedTasks;
-        } else {
-            return Collections.emptyMap();
-        }
-    }
-
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      * @throws StreamsException fatal error while creating / initializing the 
task
@@ -478,15 +453,6 @@ public class TaskManager {
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
                                                 final Set<Task> 
tasksToCloseClean) {
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = 
assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = 
assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-
-        // recycle the startup standbys to active
-        tasks.addStandbyTasks(startupStandbyTasksToRecycle.keySet());
-
-        // use startup Standbys as real Standby tasks
-        tasks.addStandbyTasks(startupStandbyTasksToUse.keySet());
-
         for (final Task task : tasks.allTasks()) {
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
@@ -541,7 +507,6 @@ public class TaskManager {
                                              final Set<Task> tasksToCloseClean,
                                              final Map<TaskId, 
RuntimeException> failedTasks) {
         handleTasksPendingInitialization();
-        handleStartupTaskReuse(activeTasksToCreate, standbyTasksToCreate, 
failedTasks);
         handleRestoringAndUpdatingTasks(activeTasksToCreate, 
standbyTasksToCreate, failedTasks);
         handleRunningAndSuspendedTasks(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
     }
@@ -553,34 +518,6 @@ public class TaskManager {
         }
     }
 
-    private void handleStartupTaskReuse(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
-                                        final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
-                                        final Map<TaskId, RuntimeException> 
failedTasks) {
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = 
assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = 
assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-
-        // recycle the startup standbys to active, and remove them from the 
set of actives that need to be created
-        if (!startupStandbyTasksToRecycle.isEmpty()) {
-            final Set<Task> tasksToCloseDirty = new HashSet<>();
-            for (final Map.Entry<Task, Set<TopicPartition>> entry : 
startupStandbyTasksToRecycle.entrySet()) {
-                final Task task = entry.getKey();
-                recycleTaskFromStateUpdater(task, entry.getValue(), 
tasksToCloseDirty, failedTasks);
-                activeTasksToCreate.remove(task.id());
-            }
-
-            // if any standby tasks failed to recycle, close them dirty
-            tasksToCloseDirty.forEach(task ->
-                closeTaskDirty(task, false)
-            );
-        }
-
-        // use startup Standbys as real Standby tasks
-        if (!startupStandbyTasksToUse.isEmpty()) {
-            tasks.addPendingTasksToInit(startupStandbyTasksToUse.keySet());
-            startupStandbyTasksToUse.keySet().forEach(task -> 
standbyTasksToCreate.remove(task.id()));
-        }
-    }
-
     private void handleRunningAndSuspendedTasks(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index ab35530abd1..26ac3a862c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -363,44 +363,6 @@ public class KafkaStreamsTest {
         }
     }
 
-    @Test
-    public void shouldInitializeTasksForLocalStateOnStart() {
-        prepareStreams();
-        prepareStreamThread(streamThreadOne, 1);
-        prepareStreamThread(streamThreadTwo, 2);
-
-        try (final MockedConstruction<StateDirectory> constructed = 
mockConstruction(StateDirectory.class,
-                (mock, context) -> 
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
-            try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
-                assertEquals(1, constructed.constructed().size());
-                final StateDirectory stateDirectory = 
constructed.constructed().get(0);
-                verify(stateDirectory, times(0)).initializeStartupTasks(any(), 
any(), any());
-                streams.start();
-                verify(stateDirectory, times(1)).initializeStartupTasks(any(), 
any(), any());
-            }
-        }
-    }
-
-    @Test
-    public void shouldCloseStartupTasksAfterFirstRebalance() throws Exception {
-        prepareStreams();
-        final AtomicReference<StreamThread.State> state1 = 
prepareStreamThread(streamThreadOne, 1);
-        final AtomicReference<StreamThread.State> state2 = 
prepareStreamThread(streamThreadTwo, 2);
-        prepareThreadState(streamThreadOne, state1);
-        prepareThreadState(streamThreadTwo, state2);
-        try (final MockedConstruction<StateDirectory> constructed = 
mockConstruction(StateDirectory.class,
-            (mock, context) -> 
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
-            try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
-                assertEquals(1, constructed.constructed().size());
-                final StateDirectory stateDirectory = 
constructed.constructed().get(0);
-                streams.setStateListener(streamsStateListener);
-                streams.start();
-                waitForCondition(() -> streams.state() == State.RUNNING, 
"Streams never started.");
-                verify(stateDirectory, times(1)).closeStartupTasks();
-            }
-        }
-    }
-
     @Test
     public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() 
throws Exception {
         prepareStreams();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 8e7ae80af35..3f806a364d0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -17,20 +17,14 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogCaptureAppender;
-import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import 
org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.TestUtils;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -39,10 +33,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -81,7 +71,6 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.endsWith;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -95,11 +84,9 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 public class StateDirectoryTest {
 
-    private static final Logger log = 
LoggerFactory.getLogger(StateDirectoryTest.class);
     private final MockTime time = new MockTime();
     private File stateDir;
     private final String applicationId = "applicationId";
-    private StreamsConfig config;
     private StateDirectory directory;
     private File appDir;
 
@@ -108,14 +95,15 @@ public class StateDirectoryTest {
         if (!createStateDirectory) {
             cleanup();
         }
-        config = new StreamsConfig(new Properties() {
-            {
-                put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-                put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
-                put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
-            }
-        });
-        directory = new StateDirectory(config, time, createStateDirectory, 
hasNamedTopology);
+        directory = new StateDirectory(
+            new StreamsConfig(new Properties() {
+                {
+                    put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+                    put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
+                    put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath());
+                }
+            }),
+            time, createStateDirectory, hasNamedTopology);
         appDir = new File(stateDir, applicationId);
     }
 
@@ -824,144 +812,6 @@ public class StateDirectoryTest {
         assertThat(directory.initializeProcessId(), equalTo(processId));
     }
 
-    @Test
-    public void shouldNotInitializeStandbyTasksWhenNoLocalState() {
-        final TaskId taskId = new TaskId(0, 0);
-        initializeStartupTasks(new TaskId(0, 0), false);
-        assertFalse(directory.hasStartupTasks());
-        assertNull(directory.removeStartupTask(taskId));
-        assertFalse(directory.hasStartupTasks());
-    }
-
-    @Test
-    public void shouldInitializeStandbyTasksForLocalState() {
-        final TaskId taskId = new TaskId(0, 0);
-        initializeStartupTasks(new TaskId(0, 0), true);
-        assertTrue(directory.hasStartupTasks());
-        assertNotNull(directory.removeStartupTask(taskId));
-        assertFalse(directory.hasStartupTasks());
-        assertNull(directory.removeStartupTask(taskId));
-    }
-
-    @Test
-    public void shouldNotAssignStartupTasksWeDontHave() {
-        final TaskId taskId = new TaskId(0, 0);
-        initializeStartupTasks(taskId, false);
-        final Task task = directory.removeStartupTask(taskId);
-        assertNull(task);
-    }
-
-    private class FakeStreamThread extends Thread {
-        private final TaskId taskId;
-        private final AtomicReference<Task> result;
-
-        private FakeStreamThread(final TaskId taskId, final 
AtomicReference<Task> result) {
-            this.taskId = taskId;
-            this.result = result;
-        }
-
-        @Override
-        public void run() {
-            result.set(directory.removeStartupTask(taskId));
-        }
-    }
-
-    @Test
-    public void shouldAssignStartupTaskToStreamThread() throws 
InterruptedException {
-        final TaskId taskId = new TaskId(0, 0);
-
-        initializeStartupTasks(taskId, true);
-
-        // main thread owns the newly initialized tasks
-        assertThat(directory.lockOwner(taskId), is(Thread.currentThread()));
-
-        // spawn off a "fake" StreamThread, so we can verify the lock was 
updated to the correct thread
-        final AtomicReference<Task> result = new AtomicReference<>();
-        final Thread streamThread = new FakeStreamThread(taskId, result);
-        streamThread.start();
-        streamThread.join();
-        final Task task = result.get();
-
-        assertNotNull(task);
-        assertThat(task, instanceOf(StandbyTask.class));
-
-        // verify the owner of the task directory lock has been shifted over 
to our assigned StreamThread
-        assertThat(directory.lockOwner(taskId), 
is(instanceOf(FakeStreamThread.class)));
-    }
-
-    @Test
-    public void shouldUnlockStartupTasksOnClose() {
-        final TaskId taskId = new TaskId(0, 0);
-        initializeStartupTasks(taskId, true);
-
-        assertEquals(Thread.currentThread(), directory.lockOwner(taskId));
-        directory.closeStartupTasks();
-        assertNull(directory.lockOwner(taskId));
-    }
-
-    @Test
-    public void shouldCloseStartupTasksOnDirectoryClose() {
-        final StateStore store = initializeStartupTasks(new TaskId(0, 0), 
true);
-
-        assertTrue(directory.hasStartupTasks());
-        assertTrue(store.isOpen());
-
-        directory.close();
-
-        assertFalse(directory.hasStartupTasks());
-        assertFalse(store.isOpen());
-    }
-
-    @Test
-    public void shouldNotCloseStartupTasksOnAutoCleanUp() {
-        // we need to set this because the auto-cleanup uses the last-modified 
time from the filesystem,
-        // which can't be mocked
-        time.setCurrentTimeMs(System.currentTimeMillis());
-
-        final StateStore store = initializeStartupTasks(new TaskId(0, 0), 
true);
-
-        assertTrue(directory.hasStartupTasks());
-        assertTrue(store.isOpen());
-
-        time.sleep(10000);
-
-        directory.cleanRemovedTasks(1000);
-
-        assertTrue(directory.hasStartupTasks());
-        assertTrue(store.isOpen());
-    }
-
-    private StateStore initializeStartupTasks(final TaskId taskId, final 
boolean createTaskDir) {
-        directory.initializeProcessId();
-        final TopologyMetadata metadata = Mockito.mock(TopologyMetadata.class);
-        final TopologyConfig topologyConfig = new TopologyConfig(config);
-
-        final StateStore store = new MockKeyValueStore("test", true);
-
-        if (createTaskDir) {
-            final File taskDir = directory.getOrCreateDirectoryForTask(taskId);
-            final File storeDir = new File(taskDir, store.name());
-            storeDir.mkdir();
-        }
-
-        final ProcessorTopology processorTopology = new ProcessorTopology(
-                Collections.emptyList(),
-                Collections.emptyMap(),
-                Collections.emptyMap(),
-                Collections.singletonList(store),
-                Collections.emptyList(),
-                Collections.singletonMap(store.name(), store.name() + 
"-changelog"),
-                Collections.emptySet(),
-                Collections.emptyMap()
-        );
-        
Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology);
-        
Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig());
-
-        directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new 
Metrics(), "test", "processId", time), new LogContext("test"));
-
-        return store;
-    }
-
     private static class FutureStateDirectoryProcessFile {
 
         @JsonProperty
@@ -1007,4 +857,4 @@ public class StateDirectoryTest {
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e48d9275b3a..4ca0e3e8e61 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1150,7 +1150,6 @@ public class StreamThreadTest {
             new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), 
mockTime);
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
         topologyMetadata.buildAndRewriteTopology();
-        stateDirectory = new StateDirectory(config, mockTime, true, false);
 
         final TaskManager taskManager = new TaskManager(
             new MockTime(),
@@ -1162,7 +1161,7 @@ public class StreamThreadTest {
             new Tasks(new LogContext()),
             topologyMetadata,
             null,
-            stateDirectory,
+            null,
             stateUpdater,
             schedulingTaskManager
         ) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 6d812e0119e..392e6ce5804 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -4652,126 +4652,6 @@ public class TaskManagerTest {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    @Test
-    public void shouldRecycleStartupTasksFromStateDirectoryAsActive() {
-        final StandbyTask startupTask = standbyTask(taskId00, 
taskId00ChangelogPartitions).build();
-        final StreamTask activeTask = statefulTask(taskId00, 
taskId00ChangelogPartitions).build();
-        when(activeTaskCreator.createActiveTaskFromStandby(eq(startupTask), 
eq(taskId00Partitions), any()))
-            .thenReturn(activeTask);
-
-        when(stateDirectory.hasStartupTasks()).thenReturn(true, false);
-        
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) 
null);
-
-        taskManager.handleAssignment(taskId00Assignment, 
Collections.emptyMap());
-
-        // ensure we recycled our existing startup Standby into an Active task
-        verify(activeTaskCreator).createActiveTaskFromStandby(eq(startupTask), 
eq(taskId00Partitions), any());
-
-        // ensure we didn't construct any new Tasks
-        verify(activeTaskCreator).createTasks(any(), 
eq(Collections.emptyMap()));
-        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
-        verifyNoMoreInteractions(activeTaskCreator);
-        verifyNoMoreInteractions(standbyTaskCreator);
-
-        // verify the recycled task is now being used as an assiged Active
-        assertEquals(Collections.singletonMap(taskId00, activeTask), 
taskManager.activeTaskMap());
-        assertEquals(Collections.emptyMap(), taskManager.standbyTaskMap());
-    }
-
-    @Test
-    public void shouldUseStartupTasksFromStateDirectoryAsStandby() {
-        final StandbyTask startupTask = standbyTask(taskId00, 
taskId00ChangelogPartitions).build();
-
-        when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
-        
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) 
null);
-
-        taskManager.handleAssignment(Collections.emptyMap(), 
taskId00Assignment);
-
-        // ensure we used our existing startup Task directly as a Standby
-        verify(startupTask).resume();
-
-        // ensure we didn't construct any new Tasks, or recycle an existing 
Task; we only used the one we already have
-        verify(activeTaskCreator).createTasks(any(), 
eq(Collections.emptyMap()));
-        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
-        verifyNoMoreInteractions(activeTaskCreator);
-        verifyNoMoreInteractions(standbyTaskCreator);
-
-        // verify the startup Standby is now being used as an assigned Standby
-        assertEquals(Collections.emptyMap(), taskManager.activeTaskMap());
-        assertEquals(Collections.singletonMap(taskId00, startupTask), 
taskManager.standbyTaskMap());
-    }
-
-    @Test
-    public void 
shouldRecycleStartupTasksFromStateDirectoryAsActiveWithStateUpdater() {
-        final Tasks taskRegistry = new Tasks(new LogContext());
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
-        final StandbyTask startupTask = standbyTask(taskId00, 
taskId00ChangelogPartitions).build();
-
-        final StreamTask activeTask = statefulTask(taskId00, 
taskId00ChangelogPartitions).build();
-        when(activeTaskCreator.createActiveTaskFromStandby(eq(startupTask), 
eq(taskId00Partitions), any()))
-                .thenReturn(activeTask);
-
-        when(stateDirectory.hasStartupTasks()).thenReturn(true, false);
-        
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) 
null);
-
-        taskManager.handleAssignment(taskId00Assignment, 
Collections.emptyMap());
-
-        // ensure we used our existing startup Task directly as a Standby
-        assertTrue(taskRegistry.hasPendingTasksToInit());
-        assertEquals(Collections.singleton(activeTask), 
taskRegistry.drainPendingTasksToInit());
-
-        // we're using a mock StateUpdater here, so now that we've drained the 
task from the queue of startup tasks to init
-        // let's "add" it to our mock StateUpdater
-        
when(stateUpdater.tasks()).thenReturn(Collections.singleton(activeTask));
-        when(stateUpdater.standbyTasks()).thenReturn(Collections.emptySet());
-
-        // ensure we recycled our existing startup Standby into an Active task
-        verify(activeTaskCreator).createActiveTaskFromStandby(eq(startupTask), 
eq(taskId00Partitions), any());
-
-        // ensure we didn't construct any new Tasks
-        verify(activeTaskCreator).createTasks(any(), 
eq(Collections.emptyMap()));
-        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
-        verifyNoMoreInteractions(activeTaskCreator);
-        verifyNoMoreInteractions(standbyTaskCreator);
-
-        // verify the recycled task is now being used as an assiged Active
-        assertEquals(Collections.singletonMap(taskId00, activeTask), 
taskManager.activeTaskMap());
-        assertEquals(Collections.emptyMap(), taskManager.standbyTaskMap());
-    }
-
-    @Test
-    public void 
shouldUseStartupTasksFromStateDirectoryAsStandbyWithStateUpdater() {
-        final Tasks taskRegistry = new Tasks(new LogContext());
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true);
-        final StandbyTask startupTask = standbyTask(taskId00, 
taskId00ChangelogPartitions).build();
-
-        when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
-        
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) 
null);
-
-        assertFalse(taskRegistry.hasPendingTasksToInit());
-
-        taskManager.handleAssignment(Collections.emptyMap(), 
taskId00Assignment);
-
-        // ensure we used our existing startup Task directly as a Standby
-        assertTrue(taskRegistry.hasPendingTasksToInit());
-        assertEquals(Collections.singleton(startupTask), 
taskRegistry.drainPendingTasksToInit());
-
-        // we're using a mock StateUpdater here, so now that we've drained the 
task from the queue of startup tasks to init
-        // let's "add" it to our mock StateUpdater
-        
when(stateUpdater.tasks()).thenReturn(Collections.singleton(startupTask));
-        
when(stateUpdater.standbyTasks()).thenReturn(Collections.singleton(startupTask));
-
-        // ensure we didn't construct any new Tasks, or recycle an existing 
Task; we only used the one we already have
-        verify(activeTaskCreator).createTasks(any(), 
eq(Collections.emptyMap()));
-        verify(standbyTaskCreator).createTasks(Collections.emptyMap());
-        verifyNoMoreInteractions(activeTaskCreator);
-        verifyNoMoreInteractions(standbyTaskCreator);
-
-        // verify the startup Standby is now being used as an assigned Standby
-        assertEquals(Collections.emptyMap(), taskManager.activeTaskMap());
-        assertEquals(Collections.singletonMap(taskId00, startupTask), 
taskManager.standbyTaskMap());
-    }
-
     private static KafkaFutureImpl<DeletedRecords> completedFuture() {
         final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new 
KafkaFutureImpl<>();
         futureDeletedRecords.complete(null);


Reply via email to