This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new ebd46cc5a1c MINOR: Revert create local state Standbys on start
(#16922)" (#20981)
ebd46cc5a1c is described below
commit ebd46cc5a1c37bbf8406392c3511389ccd470c21
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Nov 24 18:54:49 2025 -0500
MINOR: Revert create local state Standbys on start (#16922)" (#20981)
This reverts commit 571f5081
(https://github.com/apache/kafka/pull/16922) of an incomplete feature.
PR https://github.com/apache/kafka/pull/16922 is part of
[KIP-1035](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets).
In particular, on starting a Kafka Streams instance, if it has
pre-existing state, the state stores are initialized on the main
thread. Part of this initialization registers the stateful metrics with
the JMX thread-id tag of main. This breaks the KIP-1076 implementation
where need to register metrics with thread-id tags of
xxxStreamThread-N. This is necessary due to the fact that the
StreamsMetric is a singleton shared by all StreamThread instances, so
we need to make sure only add metrics for the current StreamThread
otherwise duplicate metrics are registered. This PR reverts the changes
until a fix is implemented, allowing the individual StreamThreads to
register the metrics.
Reviewers: Matthias Sax<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/streams/KafkaStreams.java | 9 +-
.../processor/internals/ProcessorStateManager.java | 43 +-----
.../processor/internals/StateDirectory.java | 126 +---------------
.../streams/processor/internals/TaskManager.java | 63 --------
.../org/apache/kafka/streams/KafkaStreamsTest.java | 38 -----
.../processor/internals/StateDirectoryTest.java | 161 +--------------------
.../processor/internals/StreamThreadTest.java | 3 +-
.../processor/internals/TaskManagerTest.java | 78 ----------
9 files changed, 16 insertions(+), 507 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 40b50d2b5ad..b37981b1b01 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -195,7 +195,7 @@
<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
-
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask|TaskManager).java"/>
+
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
<suppress checks="MethodLength"
files="KTableImpl.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index e37634fe8dd..ae8131b4f5b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -183,7 +183,6 @@ public class KafkaStreams implements AutoCloseable {
protected final TopologyMetadata topologyMetadata;
private final QueryableStoreProvider queryableStoreProvider;
private final DelegatingStandbyUpdateListener
delegatingStandbyUpdateListener;
- private final LogContext logContext;
GlobalStreamThread globalStreamThread;
protected StateDirectory stateDirectory = null;
@@ -636,9 +635,6 @@ public class KafkaStreams implements AutoCloseable {
return;
}
- // all (alive) threads have received their assignment, close any
remaining startup tasks, they're not needed
- stateDirectory.closeStartupTasks();
-
setState(State.RUNNING);
}
@@ -961,7 +957,7 @@ public class KafkaStreams implements AutoCloseable {
} else {
clientId = userClientId;
}
- logContext = new LogContext(String.format("stream-client [%s] ",
clientId));
+ final LogContext logContext = new
LogContext(String.format("stream-client [%s] ", clientId));
this.log = logContext.logger(getClass());
topologyMetadata.setLog(logContext);
@@ -1374,9 +1370,6 @@ public class KafkaStreams implements AutoCloseable {
*/
public synchronized void start() throws IllegalStateException,
StreamsException {
if (setState(State.REBALANCING)) {
- log.debug("Initializing STANDBY tasks for existing local state");
- stateDirectory.initializeStartupTasks(topologyMetadata,
streamsMetrics, logContext);
-
log.debug("Starting Streams client");
if (globalStreamThread != null) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3506845d288..e708c317677 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -166,11 +166,11 @@ public class ProcessorStateManager implements
StateManager {
private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
- private String logPrefix;
+ private final String logPrefix;
private final TaskId taskId;
private final boolean eosEnabled;
- private ChangelogRegister changelogReader;
+ private final ChangelogRegister changelogReader;
private final Collection<TopicPartition> sourcePartitions;
private final Map<String, String> storeToChangelogTopic;
@@ -222,39 +222,6 @@ public class ProcessorStateManager implements StateManager
{
log.debug("Created state store manager for task {}", taskId);
}
- /**
- * Special constructor used by {@link StateDirectory} to partially
initialize startup tasks for local state, before
- * they're assigned to a thread. When the task is assigned to a thread,
the initialization of this StateManager is
- * completed in {@link #assignToStreamThread(LogContext,
ChangelogRegister, Collection)}.
- */
- static ProcessorStateManager createStartupTaskStateManager(final TaskId
taskId,
- final boolean
eosEnabled,
- final
LogContext logContext,
- final
StateDirectory stateDirectory,
- final
Map<String, String> storeToChangelogTopic,
- final
Set<TopicPartition> sourcePartitions,
- final boolean
stateUpdaterEnabled) {
- return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled,
logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions,
stateUpdaterEnabled);
- }
-
- /**
- * Standby tasks initialized for local state on-startup are only partially
initialized, because they are not yet
- * assigned to a StreamThread. Once assigned to a StreamThread, we
complete their initialization here using the
- * assigned StreamThread's context.
- */
- void assignToStreamThread(final LogContext logContext,
- final ChangelogRegister changelogReader,
- final Collection<TopicPartition>
sourcePartitions) {
- if (this.changelogReader != null) {
- throw new IllegalStateException("Attempted to replace an existing
changelogReader on a StateManager without closing it.");
- }
- this.sourcePartitions.clear();
- this.log = logContext.logger(ProcessorStateManager.class);
- this.logPrefix = logContext.logPrefix();
- this.changelogReader = changelogReader;
- this.sourcePartitions.addAll(sourcePartitions);
- }
-
void registerStateStores(final List<StateStore> allStores, final
InternalProcessorContext<?, ?> processorContext) {
processorContext.uninitialize();
for (final StateStore store : allStores) {
@@ -347,7 +314,7 @@ public class ProcessorStateManager implements StateManager {
}
private void maybeRegisterStoreWithChangelogReader(final String storeName)
{
- if (isLoggingEnabled(storeName) && changelogReader != null) {
+ if (isLoggingEnabled(storeName)) {
changelogReader.register(getStorePartition(storeName), this);
}
}
@@ -616,7 +583,7 @@ public class ProcessorStateManager implements StateManager {
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state
stores: {}", stores);
- if (!stateUpdaterEnabled && changelogReader != null) {
+ if (!stateUpdaterEnabled) {
changelogReader.unregister(getAllChangelogTopicPartitions());
}
@@ -664,7 +631,7 @@ public class ProcessorStateManager implements StateManager {
void recycle() {
log.debug("Recycling state for {} task {}.", taskType, taskId);
- if (!stateUpdaterEnabled && changelogReader != null) {
+ if (!stateUpdaterEnabled) {
final List<TopicPartition> allChangelogs =
getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 8ea2d3ae65a..3504edc8e7a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -16,18 +16,12 @@
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.TaskCorruptedException;
-import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.internals.ThreadCache;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -48,19 +42,15 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.PosixFilePermission;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -106,13 +96,12 @@ public class StateDirectory implements AutoCloseable {
private final boolean hasPersistentStores;
private final boolean hasNamedTopologies;
- private final ConcurrentMap<TaskId, Thread> lockedTasksToOwner = new
ConcurrentHashMap<>();
+ private final Map<TaskId, Thread> lockedTasksToOwner = new
ConcurrentHashMap<>();
private FileChannel stateDirLockChannel;
private FileLock stateDirLock;
private final StreamsConfig config;
- private final ConcurrentMap<TaskId, Task> tasksForLocalState = new
ConcurrentHashMap<>();
/**
* Ensures that the state base directory as well as the application's
sub-directory are created.
@@ -206,109 +195,6 @@ public class StateDirectory implements AutoCloseable {
return stateDirLock != null;
}
- public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
- final StreamsMetricsImpl streamsMetrics,
- final LogContext logContext) {
- final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
- if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
- final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
- final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
- final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
-
- // discover all non-empty task directories in StateDirectory
- for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
- final String dirName = taskDirectory.file().getName();
- final TaskId id = parseTaskDirectoryName(dirName,
taskDirectory.namedTopology());
- final ProcessorTopology subTopology =
topologyMetadata.buildSubtopology(id);
-
- // we still check if the task's sub-topology is stateful, even
though we know its directory contains state,
- // because it's possible that the topology has changed since
that data was written, and is now stateless
- // this therefore prevents us from creating unnecessary Tasks
just because of some left-over state
- if (subTopology.hasStateWithChangelogs()) {
- final Set<TopicPartition> inputPartitions =
topologyMetadata.nodeToSourceTopics(id).values().stream()
- .flatMap(Collection::stream)
- .map(t -> new TopicPartition(t, id.partition()))
- .collect(Collectors.toSet());
- final ProcessorStateManager stateManager =
ProcessorStateManager.createStartupTaskStateManager(
- id,
- eosEnabled,
- logContext,
- this,
- subTopology.storeToChangelogTopic(),
- inputPartitions,
- stateUpdaterEnabled
- );
-
- final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
- id,
- config,
- stateManager,
- streamsMetrics,
- dummyCache
- );
-
- final Task task = new StandbyTask(
- id,
- inputPartitions,
- subTopology,
- topologyMetadata.taskConfig(id),
- streamsMetrics,
- stateManager,
- this,
- dummyCache,
- context
- );
-
- try {
- task.initializeIfNeeded();
-
- tasksForLocalState.put(id, task);
- } catch (final TaskCorruptedException e) {
- // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
- task.suspend();
- task.closeDirty();
- }
- }
- }
- }
- }
-
- public boolean hasStartupTasks() {
- return !tasksForLocalState.isEmpty();
- }
-
- public Task removeStartupTask(final TaskId taskId) {
- final Task task = tasksForLocalState.remove(taskId);
- if (task != null) {
- lockedTasksToOwner.replace(taskId, Thread.currentThread());
- }
- return task;
- }
-
- public void closeStartupTasks() {
- closeStartupTasks(t -> true);
- }
-
- private void closeStartupTasks(final Predicate<Task> predicate) {
- if (!tasksForLocalState.isEmpty()) {
- // "drain" Tasks first to ensure that we don't try to close Tasks
that another thread is attempting to close
- final Set<Task> drainedTasks = new
HashSet<>(tasksForLocalState.size());
- for (final Map.Entry<TaskId, Task> entry :
tasksForLocalState.entrySet()) {
- if (predicate.test(entry.getValue()) &&
removeStartupTask(entry.getKey()) != null) {
- // only add to our list of drained Tasks if we exclusively
"claimed" a Task from tasksForLocalState
- // to ensure we don't accidentally try to drain the same
Task multiple times from concurrent threads
- drainedTasks.add(entry.getValue());
- }
- }
-
- // now that we have exclusive ownership of the drained tasks,
close them
- for (final Task task : drainedTasks) {
- task.suspend();
- task.closeClean();
- }
- }
- }
-
public UUID initializeProcessId() {
if (!hasPersistentStores) {
final UUID processId = UUID.randomUUID();
@@ -505,17 +391,9 @@ public class StateDirectory implements AutoCloseable {
}
}
- /**
- * Expose for tests.
- */
- Thread lockOwner(final TaskId taskId) {
- return lockedTasksToOwner.get(taskId);
- }
-
@Override
public void close() {
if (hasPersistentStores) {
- closeStartupTasks();
try {
stateDirLock.release();
stateDirLockChannel.close();
@@ -633,7 +511,6 @@ public class StateDirectory implements AutoCloseable {
);
if (namedTopologyDirs != null) {
for (final File namedTopologyDir : namedTopologyDirs) {
- closeStartupTasks(task ->
task.id().topologyName().equals(parseNamedTopologyFromDirectory(namedTopologyDir.getName())));
final File[] contents = namedTopologyDir.listFiles();
if (contents != null && contents.length == 0) {
try {
@@ -671,7 +548,6 @@ public class StateDirectory implements AutoCloseable {
log.debug("Tried to clear out the local state for NamedTopology {}
but none was found", topologyName);
}
try {
- closeStartupTasks(task ->
task.id().topologyName().equals(topologyName));
Utils.delete(namedTopologyDir);
} catch (final IOException e) {
log.error("Hit an unexpected error while clearing local state for
topology " + topologyName, e);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index c1b1c06379e..064662ee01f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -334,31 +334,6 @@ public class TaskManager {
}
}
- private Map<Task, Set<TopicPartition>> assignStartupTasks(final
Map<TaskId, Set<TopicPartition>> tasksToAssign,
- final String
threadLogPrefix,
- final
TopologyMetadata topologyMetadata,
- final
ChangelogRegister changelogReader) {
- if (stateDirectory.hasStartupTasks()) {
- final Map<Task, Set<TopicPartition>> assignedTasks = new
HashMap<>(tasksToAssign.size());
- for (final Map.Entry<TaskId, Set<TopicPartition>> entry :
tasksToAssign.entrySet()) {
- final TaskId taskId = entry.getKey();
- final Task task = stateDirectory.removeStartupTask(taskId);
- if (task != null) {
- // replace our dummy values with the real ones, now we
know our thread and assignment
- final Set<TopicPartition> inputPartitions =
entry.getValue();
- task.stateManager().assignToStreamThread(new
LogContext(threadLogPrefix), changelogReader, inputPartitions);
- updateInputPartitionsOfStandbyTaskIfTheyChanged(task,
inputPartitions);
-
- assignedTasks.put(task, inputPartitions);
- }
- }
-
- return assignedTasks;
- } else {
- return Collections.emptyMap();
- }
- }
-
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
* @throws StreamsException fatal error while creating / initializing the
task
@@ -488,15 +463,6 @@ public class TaskManager {
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<Task,
Set<TopicPartition>> tasksToRecycle,
final Set<Task>
tasksToCloseClean) {
- final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle =
assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
- final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse =
assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
-
- // recycle the startup standbys to active
- tasks.addStandbyTasks(startupStandbyTasksToRecycle.keySet());
-
- // use startup Standbys as real Standby tasks
- tasks.addStandbyTasks(startupStandbyTasksToUse.keySet());
-
for (final Task task : tasks.allTasks()) {
final TaskId taskId = task.id();
if (activeTasksToCreate.containsKey(taskId)) {
@@ -551,7 +517,6 @@ public class TaskManager {
final Set<Task> tasksToCloseClean,
final Map<TaskId,
RuntimeException> failedTasks) {
handleTasksPendingInitialization();
- handleStartupTaskReuse(activeTasksToCreate, standbyTasksToCreate,
failedTasks);
handleRestoringAndUpdatingTasks(activeTasksToCreate,
standbyTasksToCreate, failedTasks);
handleRunningAndSuspendedTasks(activeTasksToCreate,
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
}
@@ -569,34 +534,6 @@ public class TaskManager {
}
}
- private void handleStartupTaskReuse(final Map<TaskId, Set<TopicPartition>>
activeTasksToCreate,
- final Map<TaskId, Set<TopicPartition>>
standbyTasksToCreate,
- final Map<TaskId, RuntimeException>
failedTasks) {
- final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle =
assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
- final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse =
assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
-
- // recycle the startup standbys to active, and remove them from the
set of actives that need to be created
- if (!startupStandbyTasksToRecycle.isEmpty()) {
- final Set<Task> tasksToCloseDirty = new HashSet<>();
- for (final Map.Entry<Task, Set<TopicPartition>> entry :
startupStandbyTasksToRecycle.entrySet()) {
- final Task task = entry.getKey();
- recycleTaskFromStateUpdater(task, entry.getValue(),
tasksToCloseDirty, failedTasks);
- activeTasksToCreate.remove(task.id());
- }
-
- // if any standby tasks failed to recycle, close them dirty
- tasksToCloseDirty.forEach(task ->
- closeTaskDirty(task, false)
- );
- }
-
- // use startup Standbys as real Standby tasks
- if (!startupStandbyTasksToUse.isEmpty()) {
- tasks.addPendingTasksToInit(startupStandbyTasksToUse.keySet());
- startupStandbyTasksToUse.keySet().forEach(task ->
standbyTasksToCreate.remove(task.id()));
- }
- }
-
private void handleRunningAndSuspendedTasks(final Map<TaskId,
Set<TopicPartition>> activeTasksToCreate,
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<Task,
Set<TopicPartition>> tasksToRecycle,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 14d4cf8c21f..847be900366 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -405,44 +405,6 @@ public class KafkaStreamsTest {
}
}
- @Test
- public void shouldInitializeTasksForLocalStateOnStart() {
- prepareStreams();
- prepareStreamThread(streamThreadOne, 1);
- prepareStreamThread(streamThreadTwo, 2);
-
- try (final MockedConstruction<StateDirectory> constructed =
mockConstruction(StateDirectory.class,
- (mock, context) ->
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
- assertEquals(1, constructed.constructed().size());
- final StateDirectory stateDirectory =
constructed.constructed().get(0);
- verify(stateDirectory, times(0)).initializeStartupTasks(any(),
any(), any());
- streams.start();
- verify(stateDirectory, times(1)).initializeStartupTasks(any(),
any(), any());
- }
- }
- }
-
- @Test
- public void shouldCloseStartupTasksAfterFirstRebalance() throws Exception {
- prepareStreams();
- final AtomicReference<StreamThread.State> state1 =
prepareStreamThread(streamThreadOne, 1);
- final AtomicReference<StreamThread.State> state2 =
prepareStreamThread(streamThreadTwo, 2);
- prepareThreadState(streamThreadOne, state1);
- prepareThreadState(streamThreadTwo, state2);
- try (final MockedConstruction<StateDirectory> constructed =
mockConstruction(StateDirectory.class,
- (mock, context) ->
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
- try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
- assertEquals(1, constructed.constructed().size());
- final StateDirectory stateDirectory =
constructed.constructed().get(0);
- streams.setStateListener(streamsStateListener);
- streams.start();
- waitForCondition(() -> streams.state() == State.RUNNING,
"Streams never started.");
- verify(stateDirectory, times(1)).closeStartupTasks();
- }
- }
- }
-
@Test
public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning()
throws Exception {
prepareStreams();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 616c397d711..dadde62e92d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -17,20 +17,14 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogCaptureAppender;
-import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import
org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.TestUtils;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -39,8 +33,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
import java.io.BufferedWriter;
import java.io.File;
@@ -80,7 +72,6 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -97,7 +88,6 @@ public class StateDirectoryTest {
private final MockTime time = new MockTime();
private File stateDir;
private final String applicationId = "applicationId";
- private StreamsConfig config;
private StateDirectory directory;
private File appDir;
@@ -114,13 +104,14 @@ public class StateDirectoryTest {
if (!createStateDirectory) {
cleanup();
}
- config = new StreamsConfig(Map.of(
- StreamsConfig.APPLICATION_ID_CONFIG, applicationId,
- StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234",
- StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath(),
+ directory = new StateDirectory(
+ new StreamsConfig(Map.of(
+ StreamsConfig.APPLICATION_ID_CONFIG, applicationId,
+ StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234",
+ StreamsConfig.STATE_DIR_CONFIG, stateDir.getPath(),
StreamsConfig.ALLOW_OS_GROUP_WRITE_ACCESS_CONFIG,
allowOsGroupWriteAccess
- ));
- directory = new StateDirectory(config, time, createStateDirectory,
hasNamedTopology);
+ )),
+ time, createStateDirectory, hasNamedTopology);
appDir = new File(stateDir, applicationId);
}
@@ -854,144 +845,6 @@ public class StateDirectoryTest {
assertThat(directory.initializeProcessId(), equalTo(processId));
}
- @Test
- public void shouldNotInitializeStandbyTasksWhenNoLocalState() {
- final TaskId taskId = new TaskId(0, 0);
- initializeStartupTasks(new TaskId(0, 0), false);
- assertFalse(directory.hasStartupTasks());
- assertNull(directory.removeStartupTask(taskId));
- assertFalse(directory.hasStartupTasks());
- }
-
- @Test
- public void shouldInitializeStandbyTasksForLocalState() {
- final TaskId taskId = new TaskId(0, 0);
- initializeStartupTasks(new TaskId(0, 0), true);
- assertTrue(directory.hasStartupTasks());
- assertNotNull(directory.removeStartupTask(taskId));
- assertFalse(directory.hasStartupTasks());
- assertNull(directory.removeStartupTask(taskId));
- }
-
- @Test
- public void shouldNotAssignStartupTasksWeDontHave() {
- final TaskId taskId = new TaskId(0, 0);
- initializeStartupTasks(taskId, false);
- final Task task = directory.removeStartupTask(taskId);
- assertNull(task);
- }
-
- private class FakeStreamThread extends Thread {
- private final TaskId taskId;
- private final AtomicReference<Task> result;
-
- private FakeStreamThread(final TaskId taskId, final
AtomicReference<Task> result) {
- this.taskId = taskId;
- this.result = result;
- }
-
- @Override
- public void run() {
- result.set(directory.removeStartupTask(taskId));
- }
- }
-
- @Test
- public void shouldAssignStartupTaskToStreamThread() throws
InterruptedException {
- final TaskId taskId = new TaskId(0, 0);
-
- initializeStartupTasks(taskId, true);
-
- // main thread owns the newly initialized tasks
- assertThat(directory.lockOwner(taskId), is(Thread.currentThread()));
-
- // spawn off a "fake" StreamThread, so we can verify the lock was
updated to the correct thread
- final AtomicReference<Task> result = new AtomicReference<>();
- final Thread streamThread = new FakeStreamThread(taskId, result);
- streamThread.start();
- streamThread.join();
- final Task task = result.get();
-
- assertNotNull(task);
- assertThat(task, instanceOf(StandbyTask.class));
-
- // verify the owner of the task directory lock has been shifted over
to our assigned StreamThread
- assertThat(directory.lockOwner(taskId),
is(instanceOf(FakeStreamThread.class)));
- }
-
- @Test
- public void shouldUnlockStartupTasksOnClose() {
- final TaskId taskId = new TaskId(0, 0);
- initializeStartupTasks(taskId, true);
-
- assertEquals(Thread.currentThread(), directory.lockOwner(taskId));
- directory.closeStartupTasks();
- assertNull(directory.lockOwner(taskId));
- }
-
- @Test
- public void shouldCloseStartupTasksOnDirectoryClose() {
- final StateStore store = initializeStartupTasks(new TaskId(0, 0),
true);
-
- assertTrue(directory.hasStartupTasks());
- assertTrue(store.isOpen());
-
- directory.close();
-
- assertFalse(directory.hasStartupTasks());
- assertFalse(store.isOpen());
- }
-
- @Test
- public void shouldNotCloseStartupTasksOnAutoCleanUp() {
- // we need to set this because the auto-cleanup uses the last-modified
time from the filesystem,
- // which can't be mocked
- time.setCurrentTimeMs(System.currentTimeMillis());
-
- final StateStore store = initializeStartupTasks(new TaskId(0, 0),
true);
-
- assertTrue(directory.hasStartupTasks());
- assertTrue(store.isOpen());
-
- time.sleep(10000);
-
- directory.cleanRemovedTasks(1000);
-
- assertTrue(directory.hasStartupTasks());
- assertTrue(store.isOpen());
- }
-
- private StateStore initializeStartupTasks(final TaskId taskId, final
boolean createTaskDir) {
- directory.initializeProcessId();
- final TopologyMetadata metadata = Mockito.mock(TopologyMetadata.class);
- final TopologyConfig topologyConfig = new TopologyConfig(config);
-
- final StateStore store = new MockKeyValueStore("test", true);
-
- if (createTaskDir) {
- final File taskDir = directory.getOrCreateDirectoryForTask(taskId);
- final File storeDir = new File(taskDir, store.name());
- storeDir.mkdir();
- }
-
- final ProcessorTopology processorTopology = new ProcessorTopology(
- Collections.emptyList(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.singletonList(store),
- Collections.emptyList(),
- Collections.singletonMap(store.name(), store.name() +
"-changelog"),
- Collections.emptySet(),
- Collections.emptyMap()
- );
-
Mockito.when(metadata.buildSubtopology(ArgumentMatchers.any())).thenReturn(processorTopology);
-
Mockito.when(metadata.taskConfig(ArgumentMatchers.any())).thenReturn(topologyConfig.getTaskConfig());
-
- directory.initializeStartupTasks(metadata, new StreamsMetricsImpl(new
Metrics(), "test", time), new LogContext("test"));
-
- return store;
- }
-
private static class FutureStateDirectoryProcessFile {
@JsonProperty
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 63783b59aca..f687f1be2a0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1196,7 +1196,6 @@ public class StreamThreadTest {
new StreamsMetricsImpl(metrics, CLIENT_ID, mockTime);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
- stateDirectory = new StateDirectory(config, mockTime, true, false);
final TaskManager taskManager = new TaskManager(
new MockTime(),
@@ -1208,7 +1207,7 @@ public class StreamThreadTest {
new Tasks(new LogContext()),
topologyMetadata,
null,
- stateDirectory,
+ null,
stateUpdater,
schedulingTaskManager
) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 3e87eebe733..4f13c41ba31 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -4817,84 +4817,6 @@ public class TaskManagerTest {
assertEquals(0, taskManager.notPausedTasks().size());
}
- @Test
- public void shouldRecycleStartupTasksFromStateDirectoryAsActive() {
- final Tasks taskRegistry = new Tasks(new LogContext());
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
- final StandbyTask startupTask = standbyTask(taskId00,
taskId00ChangelogPartitions).build();
-
- final StreamTask activeTask = statefulTask(taskId00,
taskId00ChangelogPartitions).build();
- when(activeTaskCreator.createActiveTaskFromStandby(eq(startupTask),
eq(taskId00Partitions), any()))
- .thenReturn(activeTask);
-
- when(stateDirectory.hasStartupTasks()).thenReturn(true, false);
-
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task)
null);
-
- taskManager.handleAssignment(taskId00Assignment,
Collections.emptyMap());
-
- // ensure we used our existing startup Task directly as a Standby
- assertTrue(taskRegistry.hasPendingTasksToInit());
- assertEquals(Collections.singleton(activeTask),
taskRegistry.drainPendingTasksToInit());
-
- // we're using a mock StateUpdater here, so now that we've drained the
task from the queue of startup tasks to init
- // let's "add" it to our mock StateUpdater
-
when(stateUpdater.tasks()).thenReturn(Collections.singleton(activeTask));
- when(stateUpdater.standbyTasks()).thenReturn(Collections.emptySet());
-
- // ensure we recycled our existing startup Standby into an Active task
- verify(activeTaskCreator).createActiveTaskFromStandby(eq(startupTask),
eq(taskId00Partitions), any());
-
- // ensure we didn't construct any new Tasks
- verify(activeTaskCreator).createTasks(any(),
eq(Collections.emptyMap()));
- verify(standbyTaskCreator).createTasks(Collections.emptyMap());
- verifyNoMoreInteractions(activeTaskCreator);
- verifyNoMoreInteractions(standbyTaskCreator);
-
- // verify the recycled task is now being used as an assigned Active
- assertEquals(Collections.singletonMap(taskId00, activeTask),
taskManager.activeTaskMap());
- assertEquals(Collections.emptyMap(), taskManager.standbyTaskMap());
- }
-
- @Test
- public void shouldUseStartupTasksFromStateDirectoryAsStandby() {
- final Tasks taskRegistry = new Tasks(new LogContext());
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry);
- final StandbyTask startupTask = standbyTask(taskId00,
taskId00ChangelogPartitions).build();
-
- when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false);
-
when(stateDirectory.removeStartupTask(taskId00)).thenReturn(startupTask, (Task)
null);
-
- assertFalse(taskRegistry.hasPendingTasksToInit());
-
- taskManager.handleAssignment(Collections.emptyMap(),
taskId00Assignment);
-
- // ensure we used our existing startup Task directly as a Standby
- assertTrue(taskRegistry.hasPendingTasksToInit());
- assertEquals(Collections.singleton(startupTask),
taskRegistry.drainPendingTasksToInit());
-
- // we're using a mock StateUpdater here, so now that we've drained the
task from the queue of startup tasks to init
- // let's "add" it to our mock StateUpdater
-
when(stateUpdater.tasks()).thenReturn(Collections.singleton(startupTask));
-
when(stateUpdater.standbyTasks()).thenReturn(Collections.singleton(startupTask));
-
- // ensure we didn't construct any new Tasks, or recycle an existing
Task; we only used the one we already have
- verify(activeTaskCreator).createTasks(any(),
eq(Collections.emptyMap()));
- verify(standbyTaskCreator).createTasks(Collections.emptyMap());
- verifyNoMoreInteractions(activeTaskCreator);
- verifyNoMoreInteractions(standbyTaskCreator);
-
- // verify the startup Standby is now being used as an assigned Standby
- assertEquals(Collections.emptyMap(), taskManager.activeTaskMap());
- assertEquals(Collections.singletonMap(taskId00, startupTask),
taskManager.standbyTaskMap());
- }
-
- @Test
- public void shouldStartStateUpdaterOnInit() {
- final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null);
- taskManager.init();
- verify(stateUpdater).start();
- }
-
private static KafkaFutureImpl<DeletedRecords> completedFuture() {
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new
KafkaFutureImpl<>();
futureDeletedRecords.complete(null);