This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 0186534a992 Revert "KAFKA-17411: Create local state Standbys on start (#16922)" and "KAFKA-17978: Fix invalid topology on Task assignment (#17778)" 0186534a992 is described below commit 0186534a992a123a7f53dd32860c6ba5787dbb18 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Mon Jan 13 10:51:03 2025 -0800 Revert "KAFKA-17411: Create local state Standbys on start (#16922)" and "KAFKA-17978: Fix invalid topology on Task assignment (#17778)" This reverts commit 571f50817c0c3e81a8f767396e485bc23a0731ba. This reverts commit a696b4d6f4917ccd942a7ba21ef660f351e47b01. --- checkstyle/suppressions.xml | 2 +- .../org/apache/kafka/streams/KafkaStreams.java | 9 +- .../processor/internals/ProcessorStateManager.java | 43 +----- .../processor/internals/StateDirectory.java | 129 ---------------- .../streams/processor/internals/TaskManager.java | 63 -------- .../org/apache/kafka/streams/KafkaStreamsTest.java | 38 ----- .../processor/internals/StateDirectoryTest.java | 170 ++------------------- .../processor/internals/StreamThreadTest.java | 3 +- .../processor/internals/TaskManagerTest.java | 120 --------------- 9 files changed, 18 insertions(+), 559 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 2d05fac1e70..460e14871c8 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -189,7 +189,7 @@ <!-- Streams --> <suppress checks="ClassFanOutComplexity" - files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask|TaskManager).java"/> + files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/> <suppress checks="MethodLength" files="KTableImpl.java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 79e6af29be7..00e9ede261f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -184,7 +184,6 @@ public class KafkaStreams implements AutoCloseable { protected final TopologyMetadata topologyMetadata; private final QueryableStoreProvider queryableStoreProvider; private final DelegatingStandbyUpdateListener delegatingStandbyUpdateListener; - private final LogContext logContext; GlobalStreamThread globalStreamThread; protected StateDirectory stateDirectory = null; @@ -636,9 +635,6 @@ public class KafkaStreams implements AutoCloseable { return; } - // all (alive) threads have received their assignment, close any remaining startup tasks, they're not needed - stateDirectory.closeStartupTasks(); - setState(State.RUNNING); } @@ -961,7 +957,7 @@ public class KafkaStreams implements AutoCloseable { } else { clientId = userClientId; } - logContext = new LogContext(String.format("stream-client [%s] ", clientId)); + final LogContext logContext = new LogContext(String.format("stream-client [%s] ", clientId)); this.log = logContext.logger(getClass()); topologyMetadata.setLog(logContext); @@ -1396,9 +1392,6 @@ public class KafkaStreams implements AutoCloseable { */ public synchronized void start() throws IllegalStateException, StreamsException { if (setState(State.REBALANCING)) { - log.debug("Initializing STANDBY tasks for existing local state"); - stateDirectory.initializeStartupTasks(topologyMetadata, streamsMetrics, logContext); - log.debug("Starting Streams client"); if (globalStreamThread != null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 3506845d288..e708c317677 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -166,11 +166,11 @@ public class ProcessorStateManager implements StateManager { private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; - private String logPrefix; + private final String logPrefix; private final TaskId taskId; private final boolean eosEnabled; - private ChangelogRegister changelogReader; + private final ChangelogRegister changelogReader; private final Collection<TopicPartition> sourcePartitions; private final Map<String, String> storeToChangelogTopic; @@ -222,39 +222,6 @@ public class ProcessorStateManager implements StateManager { log.debug("Created state store manager for task {}", taskId); } - /** - * Special constructor used by {@link StateDirectory} to partially initialize startup tasks for local state, before - * they're assigned to a thread. When the task is assigned to a thread, the initialization of this StateManager is - * completed in {@link #assignToStreamThread(LogContext, ChangelogRegister, Collection)}. - */ - static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId, - final boolean eosEnabled, - final LogContext logContext, - final StateDirectory stateDirectory, - final Map<String, String> storeToChangelogTopic, - final Set<TopicPartition> sourcePartitions, - final boolean stateUpdaterEnabled) { - return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled); - } - - /** - * Standby tasks initialized for local state on-startup are only partially initialized, because they are not yet - * assigned to a StreamThread. Once assigned to a StreamThread, we complete their initialization here using the - * assigned StreamThread's context. - */ - void assignToStreamThread(final LogContext logContext, - final ChangelogRegister changelogReader, - final Collection<TopicPartition> sourcePartitions) { - if (this.changelogReader != null) { - throw new IllegalStateException("Attempted to replace an existing changelogReader on a StateManager without closing it."); - } - this.sourcePartitions.clear(); - this.log = logContext.logger(ProcessorStateManager.class); - this.logPrefix = logContext.logPrefix(); - this.changelogReader = changelogReader; - this.sourcePartitions.addAll(sourcePartitions); - } - void registerStateStores(final List<StateStore> allStores, final InternalProcessorContext<?, ?> processorContext) { processorContext.uninitialize(); for (final StateStore store : allStores) { @@ -347,7 +314,7 @@ public class ProcessorStateManager implements StateManager { } private void maybeRegisterStoreWithChangelogReader(final String storeName) { - if (isLoggingEnabled(storeName) && changelogReader != null) { + if (isLoggingEnabled(storeName)) { changelogReader.register(getStorePartition(storeName), this); } } @@ -616,7 +583,7 @@ public class ProcessorStateManager implements StateManager { public void close() throws ProcessorStateException { log.debug("Closing its state manager and all the registered state stores: {}", stores); - if (!stateUpdaterEnabled && changelogReader != null) { + if (!stateUpdaterEnabled) { changelogReader.unregister(getAllChangelogTopicPartitions()); } @@ -664,7 +631,7 @@ public class ProcessorStateManager implements StateManager { void recycle() { log.debug("Recycling state for {} task {}.", taskType, taskId); - if (!stateUpdaterEnabled && changelogReader != null) { + if (!stateUpdaterEnabled) { final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions(); changelogReader.unregister(allChangelogs); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 04a62bad1bf..f892793168d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -16,18 +16,12 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.TaskCorruptedException; -import org.apache.kafka.streams.internals.StreamsConfigUtils; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.internals.ThreadCache; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; @@ -49,19 +43,13 @@ import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -112,9 +100,6 @@ public class StateDirectory implements AutoCloseable { private FileChannel stateDirLockChannel; private FileLock stateDirLock; - private final StreamsConfig config; - private final ConcurrentMap<TaskId, Task> tasksForLocalState = new ConcurrentHashMap<>(); - /** * Ensures that the state base directory as well as the application's sub-directory are created. * @@ -133,7 +118,6 @@ public class StateDirectory implements AutoCloseable { this.hasPersistentStores = hasPersistentStores; this.hasNamedTopologies = hasNamedTopologies; this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); - this.config = config; final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG); final File baseDir = new File(stateDirName); stateDir = new File(baseDir, appId); @@ -198,109 +182,6 @@ public class StateDirectory implements AutoCloseable { return stateDirLock != null; } - public void initializeStartupTasks(final TopologyMetadata topologyMetadata, - final StreamsMetricsImpl streamsMetrics, - final LogContext logContext) { - final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); - if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { - final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); - final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); - final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); - - // discover all non-empty task directories in StateDirectory - for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) { - final String dirName = taskDirectory.file().getName(); - final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology()); - final ProcessorTopology subTopology = topologyMetadata.buildSubtopology(id); - - // we still check if the task's sub-topology is stateful, even though we know its directory contains state, - // because it's possible that the topology has changed since that data was written, and is now stateless - // this therefore prevents us from creating unnecessary Tasks just because of some left-over state - if (subTopology.hasStateWithChangelogs()) { - final Set<TopicPartition> inputPartitions = topologyMetadata.nodeToSourceTopics(id).values().stream() - .flatMap(Collection::stream) - .map(t -> new TopicPartition(t, id.partition())) - .collect(Collectors.toSet()); - final ProcessorStateManager stateManager = ProcessorStateManager.createStartupTaskStateManager( - id, - eosEnabled, - logContext, - this, - subTopology.storeToChangelogTopic(), - inputPartitions, - stateUpdaterEnabled - ); - - final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl( - id, - config, - stateManager, - streamsMetrics, - dummyCache - ); - - final Task task = new StandbyTask( - id, - inputPartitions, - subTopology, - topologyMetadata.taskConfig(id), - streamsMetrics, - stateManager, - this, - dummyCache, - context - ); - - try { - task.initializeIfNeeded(); - - tasksForLocalState.put(id, task); - } catch (final TaskCorruptedException e) { - // Task is corrupt - wipe it out (under EOS) and don't initialize a Standby for it - task.suspend(); - task.closeDirty(); - } - } - } - } - } - - public boolean hasStartupTasks() { - return !tasksForLocalState.isEmpty(); - } - - public Task removeStartupTask(final TaskId taskId) { - final Task task = tasksForLocalState.remove(taskId); - if (task != null) { - lockedTasksToOwner.replace(taskId, Thread.currentThread()); - } - return task; - } - - public void closeStartupTasks() { - closeStartupTasks(t -> true); - } - - private void closeStartupTasks(final Predicate<Task> predicate) { - if (!tasksForLocalState.isEmpty()) { - // "drain" Tasks first to ensure that we don't try to close Tasks that another thread is attempting to close - final Set<Task> drainedTasks = new HashSet<>(tasksForLocalState.size()); - for (final Map.Entry<TaskId, Task> entry : tasksForLocalState.entrySet()) { - if (predicate.test(entry.getValue()) && tasksForLocalState.remove(entry.getKey()) != null) { - // only add to our list of drained Tasks if we exclusively "claimed" a Task from tasksForLocalState - // to ensure we don't accidentally try to drain the same Task multiple times from concurrent threads - drainedTasks.add(entry.getValue()); - } - } - - // now that we have exclusive ownership of the drained tasks, close them - for (final Task task : drainedTasks) { - task.suspend(); - task.closeClean(); - } - } - } - public UUID initializeProcessId() { if (!hasPersistentStores) { final UUID processId = UUID.randomUUID(); @@ -498,17 +379,9 @@ public class StateDirectory implements AutoCloseable { } } - /** - * Expose for tests. - */ - Thread lockOwner(final TaskId taskId) { - return lockedTasksToOwner.get(taskId); - } - @Override public void close() { if (hasPersistentStores) { - closeStartupTasks(); try { stateDirLock.release(); stateDirLockChannel.close(); @@ -626,7 +499,6 @@ public class StateDirectory implements AutoCloseable { ); if (namedTopologyDirs != null) { for (final File namedTopologyDir : namedTopologyDirs) { - closeStartupTasks(task -> task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName()))); final File[] contents = namedTopologyDir.listFiles(); if (contents != null && contents.length == 0) { try { @@ -664,7 +536,6 @@ public class StateDirectory implements AutoCloseable { log.debug("Tried to clear out the local state for NamedTopology {} but none was found", topologyName); } try { - closeStartupTasks(task -> task.id().topologyName().equals(topologyName)); Utils.delete(namedTopologyDir); } catch (final IOException e) { log.error("Hit an unexpected error while clearing local state for topology " + topologyName, e); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 037ff941105..e18cb2f94b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -324,31 +324,6 @@ public class TaskManager { } } - private Map<Task, Set<TopicPartition>> assignStartupTasks(final Map<TaskId, Set<TopicPartition>> tasksToAssign, - final String threadLogPrefix, - final TopologyMetadata topologyMetadata, - final ChangelogRegister changelogReader) { - if (stateDirectory.hasStartupTasks()) { - final Map<Task, Set<TopicPartition>> assignedTasks = new HashMap<>(tasksToAssign.size()); - for (final Map.Entry<TaskId, Set<TopicPartition>> entry : tasksToAssign.entrySet()) { - final TaskId taskId = entry.getKey(); - final Task task = stateDirectory.removeStartupTask(taskId); - if (task != null) { - // replace our dummy values with the real ones, now we know our thread and assignment - final Set<TopicPartition> inputPartitions = entry.getValue(); - task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions); - updateInputPartitionsOfStandbyTaskIfTheyChanged(task, inputPartitions); - - assignedTasks.put(task, inputPartitions); - } - } - - return assignedTasks; - } else { - return Collections.emptyMap(); - } - } - /** * @throws TaskMigratedException if the task producer got fenced (EOS only) * @throws StreamsException fatal error while creating / initializing the task @@ -478,15 +453,6 @@ public class TaskManager { final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, final Map<Task, Set<TopicPartition>> tasksToRecycle, final Set<Task> tasksToCloseClean) { - final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, changelogReader); - final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, changelogReader); - - // recycle the startup standbys to active - tasks.addStandbyTasks(startupStandbyTasksToRecycle.keySet()); - - // use startup Standbys as real Standby tasks - tasks.addStandbyTasks(startupStandbyTasksToUse.keySet()); - for (final Task task : tasks.allTasks()) { final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { @@ -541,7 +507,6 @@ public class TaskManager { final Set<Task> tasksToCloseClean, final Map<TaskId, RuntimeException> failedTasks) { handleTasksPendingInitialization(); - handleStartupTaskReuse(activeTasksToCreate, standbyTasksToCreate, failedTasks); handleRestoringAndUpdatingTasks(activeTasksToCreate, standbyTasksToCreate, failedTasks); handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); } @@ -553,34 +518,6 @@ public class TaskManager { } } - private void handleStartupTaskReuse(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, - final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, - final Map<TaskId, RuntimeException> failedTasks) { - final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, changelogReader); - final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, changelogReader); - - // recycle the startup standbys to active, and remove them from the set of actives that need to be created - if (!startupStandbyTasksToRecycle.isEmpty()) { - final Set<Task> tasksToCloseDirty = new HashSet<>(); - for (final Map.Entry<Task, Set<TopicPartition>> entry : startupStandbyTasksToRecycle.entrySet()) { - final Task task = entry.getKey(); - recycleTaskFromStateUpdater(task, entry.getValue(), tasksToCloseDirty, failedTasks); - activeTasksToCreate.remove(task.id()); - } - - // if any standby tasks failed to recycle, close them dirty - tasksToCloseDirty.forEach(task -> - closeTaskDirty(task, false) - ); - } - - // use startup Standbys as real Standby tasks - if (!startupStandbyTasksToUse.isEmpty()) { - tasks.addPendingTasksToInit(startupStandbyTasksToUse.keySet()); - startupStandbyTasksToUse.keySet().forEach(task -> standbyTasksToCreate.remove(task.id())); - } - } - private void handleRunningAndSuspendedTasks(final Map<TaskId, Set<TopicPartition>> activeTasksToCreate, final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, final Map<Task, Set<TopicPartition>> tasksToRecycle, diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index ab35530abd1..26ac3a862c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -363,44 +363,6 @@ public class KafkaStreamsTest { } } - @Test - public void shouldInitializeTasksForLocalStateOnStart() { - prepareStreams(); - prepareStreamThread(streamThreadOne, 1); - prepareStreamThread(streamThreadTwo, 2); - - try (final MockedConstruction<StateDirectory> constructed = mockConstruction(StateDirectory.class, - (mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) { - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { - assertEquals(1, constructed.constructed().size()); - final StateDirectory stateDirectory = constructed.constructed().get(0); - verify(stateDirectory, times(0)).initializeStartupTasks(any(), any(), any()); - streams.start(); - verify(stateDirectory, times(1)).initializeStartupTasks(any(), any(), any()); - } - } - } - - @Test - public void shouldCloseStartupTasksAfterFirstRebalance() throws Exception { - prepareStreams(); - final AtomicReference<StreamThread.State> state1 = prepareStreamThread(streamThreadOne, 1); - final AtomicReference<StreamThread.State> state2 = prepareStreamThread(streamThreadTwo, 2); - prepareThreadState(streamThreadOne, state1); - prepareThreadState(streamThreadTwo, state2); - try (final MockedConstruction<StateDirectory> constructed = mockConstruction(StateDirectory.class, - (mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) { - try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { - assertEquals(1, constructed.constructed().size()); - final StateDirectory stateDirectory = constructed.constructed().get(0); - streams.setStateListener(streamsStateListener); - streams.start(); - waitForCondition(() -> streams.state() == State.RUNNING, "Streams never started."); - verify(stateDirectory, times(1)).closeStartupTasks(); - } - } - } - @Test public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws Exception { prepareStreams(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 8e7ae80af35..3f806a364d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -17,20 +17,14 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogCaptureAppender; -import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; -import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.TestUtils; import com.fasterxml.jackson.annotation.JsonProperty; @@ -39,10 +33,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; -import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.File; @@ -81,7 +71,6 @@ import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; @@ -95,11 +84,9 @@ import static org.junit.jupiter.api.Assertions.fail; public class StateDirectoryTest { - private static final Logger log = LoggerFactory.getLogger(StateDirectoryTest.class); private final MockTime time = new MockTime(); private File stateDir; private final String applicationId = "applicationId"; - private StreamsConfig config; private StateDirectory directory; private File appDir; @@ -108,14 +95,15 @@ public class StateDirectoryTest { if (!createStateDirectory) { cleanup(); } - config = new StreamsConfig(new Properties() { - { - put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); - put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath()); - } - }); - directory = new StateDirectory(config, time, createStateDirectory, hasNamedTopology); + directory = new StateDirectory( + new StreamsConfig(new Properties() { + { + put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); + put(StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath()); + } + }), + time, createStateDirectory, hasNamedTopology); appDir = new File(stateDir, applicationId); } @@ -824,144 +812,6 @@ public class StateDirectoryTest { assertThat(directory.initializeProcessId(), equalTo(processId)); } - @Test - public void shouldNotInitializeStandbyTasksWhenNoLocalState() { - final TaskId taskId = new TaskId(0, 0); - initializeStartupTasks(new TaskId(0, 0), false); - assertFalse(directory.hasStartupTasks()); - assertNull(directory.removeStartupTask(taskId)); - assertFalse(directory.hasStartupTasks()); - } - - @Test - public void shouldInitializeStandbyTasksForLocalState() { - final TaskId taskId = new TaskId(0, 0); - initializeStartupTasks(new TaskId(0, 0), true); - assertTrue(directory.hasStartupTasks()); - assertNotNull(directory.removeStartupTask(taskId)); - assertFalse(directory.hasStartupTasks()); - assertNull(directory.removeStartupTask(taskId)); - } - - @Test - public void shouldNotAssignStartupTasksWeDontHave() { - final TaskId taskId = new TaskId(0, 0); - initializeStartupTasks(taskId, false); - final Task task = directory.removeStartupTask(taskId); - assertNull(task); - } - - private class FakeStreamThread extends Thread { - private final TaskId taskId; - private final AtomicReference<Task> result; - - private FakeStreamThread(final TaskId taskId, final AtomicReference<Task> result) { - this.taskId = taskId; - this.result = result; - } - - @Override - public void run() { - result.set(directory.removeStartupTask(taskId)); - } - } - - @Test - public void shouldAssignStartupTaskToStreamThread() throws InterruptedException { - final TaskId taskId = new TaskId(0, 0); - - initializeStartupTasks(taskId, true); - - // main thread owns the newly initialized tasks - assertThat(directory.lockOwner(taskId), is(Thread.currentThread())); - - // spawn off a "fake" StreamThread, so we can verify the lock was updated to the correct thread - final AtomicReference<Task> result = new AtomicReference<>(); - final Thread streamThread = new FakeStreamThread(taskId, result); - streamThread.start(); - streamThread.join(); - final Task task = result.get(); - - assertNotNull(task); - assertThat(task, instanceOf(StandbyTask.class)); - - // verify the owner of the task directory lock has been shifted over to our assigned StreamThread - assertThat(directory.lockOwner(taskId), is(instanceOf(FakeStreamThread.class))); - } - - @Test - public void shouldUnlockStartupTasksOnClose() { - final TaskId taskId = new TaskId(0, 0); - initializeStartupTasks(taskId, true); - - assertEquals(Thread.currentThread(), directory.lockOwner(taskId)); - directory.closeStartupTasks(); - assertNull(directory.lockOwner(taskId)); - } - - @Test - public void shouldCloseStartupTasksOnDirectoryClose() { - final StateStore store = initializeStartupTasks(new TaskId(0, 0), true); - - assertTrue(directory.hasStartupTasks()); - assertTrue(store.isOpen()); - - directory.close(); - - assertFalse(directory.hasStartupTasks()); - assertFalse(store.isOpen()); - } - - @Test - public void shouldNotCloseStartupTasksOnAutoCleanUp() { - // we need to set this because the auto-cleanup uses the last-modified time from the filesystem, - // which can't be mocked - time.setCurrentTimeMs(System.currentTimeMillis()); - - final StateStore store = initializeStartupTasks(new TaskId(0, 0), true); - - assertTrue(directory.hasStartupTasks()); - assertTrue(store.isOpen()); - - time.sleep(10000); - - directory.cleanRemovedTasks(1000); - - assertTrue(directory.hasStartupTasks()); - assertTrue(store.isOpen()); - } - - private StateStore initializeStartupTasks(final TaskId taskId, final boolean createTaskDir) { - directory.initializeProcessId(); - final TopologyMetadata metadata = Mockito.mock(TopologyMetadata.class); - final TopologyConfig topologyConfig = new TopologyConfig(config); - - final StateStore store = new MockKeyValueStore("test", true); - - if (createTaskDir) { - final File taskDir = directory.getOrCreateDirectoryForTask(taskId); - final File storeDir = new File(taskDir, store.name()); - storeDir.mkdir(); - } - - final ProcessorTopology processorTopology = new ProcessorTopology( - Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap(), - Collections.singletonList(store), - Collections.emptyList(), - Collections.singletonMap(store.name(), store.name() + "-changelog"), - Collections.emptySet(), - Collections.emptyMap() - ); - Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology); - Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig()); - - directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new Metrics(), "test", "processId", time), new LogContext("test")); - - return store; - } - private static class FutureStateDirectoryProcessFile { @JsonProperty @@ -1007,4 +857,4 @@ public class StateDirectoryTest { } } } -} \ No newline at end of file +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e48d9275b3a..4ca0e3e8e61 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1150,7 +1150,6 @@ public class StreamThreadTest { new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); - stateDirectory = new StateDirectory(config, mockTime, true, false); final TaskManager taskManager = new TaskManager( new MockTime(), @@ -1162,7 +1161,7 @@ public class StreamThreadTest { new Tasks(new LogContext()), topologyMetadata, null, - stateDirectory, + null, stateUpdater, schedulingTaskManager ) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 6d812e0119e..392e6ce5804 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -4652,126 +4652,6 @@ public class TaskManagerTest { assertEquals(taskManager.notPausedTasks().size(), 0); } - @Test - public void shouldRecycleStartupTasksFromStateDirectoryAsActive() { - final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); - final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions).build(); - when(activeTaskCreator.createActiveTaskFromStandby(eq(startupTask), eq(taskId00Partitions), any())) - .thenReturn(activeTask); - - when(stateDirectory.hasStartupTasks()).thenReturn(true, false); - when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null); - - taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap()); - - // ensure we recycled our existing startup Standby into an Active task - verify(activeTaskCreator).createActiveTaskFromStandby(eq(startupTask), eq(taskId00Partitions), any()); - - // ensure we didn't construct any new Tasks - verify(activeTaskCreator).createTasks(any(), eq(Collections.emptyMap())); - verify(standbyTaskCreator).createTasks(Collections.emptyMap()); - verifyNoMoreInteractions(activeTaskCreator); - verifyNoMoreInteractions(standbyTaskCreator); - - // verify the recycled task is now being used as an assiged Active - assertEquals(Collections.singletonMap(taskId00, activeTask), taskManager.activeTaskMap()); - assertEquals(Collections.emptyMap(), taskManager.standbyTaskMap()); - } - - @Test - public void shouldUseStartupTasksFromStateDirectoryAsStandby() { - final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); - - when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false); - when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null); - - taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment); - - // ensure we used our existing startup Task directly as a Standby - verify(startupTask).resume(); - - // ensure we didn't construct any new Tasks, or recycle an existing Task; we only used the one we already have - verify(activeTaskCreator).createTasks(any(), eq(Collections.emptyMap())); - verify(standbyTaskCreator).createTasks(Collections.emptyMap()); - verifyNoMoreInteractions(activeTaskCreator); - verifyNoMoreInteractions(standbyTaskCreator); - - // verify the startup Standby is now being used as an assigned Standby - assertEquals(Collections.emptyMap(), taskManager.activeTaskMap()); - assertEquals(Collections.singletonMap(taskId00, startupTask), taskManager.standbyTaskMap()); - } - - @Test - public void shouldRecycleStartupTasksFromStateDirectoryAsActiveWithStateUpdater() { - final Tasks taskRegistry = new Tasks(new LogContext()); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true); - final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); - - final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions).build(); - when(activeTaskCreator.createActiveTaskFromStandby(eq(startupTask), eq(taskId00Partitions), any())) - .thenReturn(activeTask); - - when(stateDirectory.hasStartupTasks()).thenReturn(true, false); - when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null); - - taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap()); - - // ensure we used our existing startup Task directly as a Standby - assertTrue(taskRegistry.hasPendingTasksToInit()); - assertEquals(Collections.singleton(activeTask), taskRegistry.drainPendingTasksToInit()); - - // we're using a mock StateUpdater here, so now that we've drained the task from the queue of startup tasks to init - // let's "add" it to our mock StateUpdater - when(stateUpdater.tasks()).thenReturn(Collections.singleton(activeTask)); - when(stateUpdater.standbyTasks()).thenReturn(Collections.emptySet()); - - // ensure we recycled our existing startup Standby into an Active task - verify(activeTaskCreator).createActiveTaskFromStandby(eq(startupTask), eq(taskId00Partitions), any()); - - // ensure we didn't construct any new Tasks - verify(activeTaskCreator).createTasks(any(), eq(Collections.emptyMap())); - verify(standbyTaskCreator).createTasks(Collections.emptyMap()); - verifyNoMoreInteractions(activeTaskCreator); - verifyNoMoreInteractions(standbyTaskCreator); - - // verify the recycled task is now being used as an assiged Active - assertEquals(Collections.singletonMap(taskId00, activeTask), taskManager.activeTaskMap()); - assertEquals(Collections.emptyMap(), taskManager.standbyTaskMap()); - } - - @Test - public void shouldUseStartupTasksFromStateDirectoryAsStandbyWithStateUpdater() { - final Tasks taskRegistry = new Tasks(new LogContext()); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true); - final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); - - when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false); - when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task) null); - - assertFalse(taskRegistry.hasPendingTasksToInit()); - - taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment); - - // ensure we used our existing startup Task directly as a Standby - assertTrue(taskRegistry.hasPendingTasksToInit()); - assertEquals(Collections.singleton(startupTask), taskRegistry.drainPendingTasksToInit()); - - // we're using a mock StateUpdater here, so now that we've drained the task from the queue of startup tasks to init - // let's "add" it to our mock StateUpdater - when(stateUpdater.tasks()).thenReturn(Collections.singleton(startupTask)); - when(stateUpdater.standbyTasks()).thenReturn(Collections.singleton(startupTask)); - - // ensure we didn't construct any new Tasks, or recycle an existing Task; we only used the one we already have - verify(activeTaskCreator).createTasks(any(), eq(Collections.emptyMap())); - verify(standbyTaskCreator).createTasks(Collections.emptyMap()); - verifyNoMoreInteractions(activeTaskCreator); - verifyNoMoreInteractions(standbyTaskCreator); - - // verify the startup Standby is now being used as an assigned Standby - assertEquals(Collections.emptyMap(), taskManager.activeTaskMap()); - assertEquals(Collections.singletonMap(taskId00, startupTask), taskManager.standbyTaskMap()); - } - private static KafkaFutureImpl<DeletedRecords> completedFuture() { final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>(); futureDeletedRecords.complete(null);