This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c9b6e19b3b KAFKA-10199: Cleanup TaskManager and Task interfaces
(#12397)
c9b6e19b3b is described below
commit c9b6e19b3b37499de17da19e82b1b98e3b9f6b5c
Author: Guozhang Wang <[email protected]>
AuthorDate: Thu Jul 21 06:11:40 2022 -0700
KAFKA-10199: Cleanup TaskManager and Task interfaces (#12397)
In order to integrate with the state updater, we would need to refactor the
TaskManager and Task interfaces. This PR achieved the following purposes:
Separate active and standby tasks in the Tasks placeholder, plus adding
pendingActiveTasks and pendingStandbyTasks into Tasks. The exposed
active/standby tasks from the Tasks set would only be mutated by a single
thread, and the pending tasks hold for those tasks that are assigned but cannot
be actively managed yet. For now they include two scenarios: a) tasks from
unknown sub-topologies and hence cannot be initialized, b) tasks that are
pending for being recycled from active to s [...]
Extract any logic that mutates a task out of the Tasks / TaskCreators.
Tasks should only be a place for maintaining the set of tasks, but not for
manipulations of a task; and TaskCreators should only be used for creating the
tasks, but not for anything else. These logic are all migrated into TaskManger.
While doing 2) I noticed we have a couple of minor issues in the code
where we duplicate the closing logics, so I also cleaned them up in the
following way:
a) When closing a task, we first trigger the corresponding
closeClean/Dirty function; then we remove the task from Tasks bookkeeping, and
for active task we also remove its task producer if EOS-V1 is used.
b) For closing dirty, we swallow the exception from close call and the
remove task producer call; for closing clean, we store the thrown exception
from either close call or the remove task producer, and then rethrow at the end
of the caller. The difference though is that, for the exception from close call
we need to retry close it dirty; for the exception from the remove task
producer we do not need to re-close it dirty.
Reviewer: Bruno Cadonna <[email protected]>
---
.../streams/processor/internals/AbstractTask.java | 17 +-
.../processor/internals/ActiveTaskCreator.java | 93 +++----
.../processor/internals/PartitionGroup.java | 3 +-
.../processor/internals/ProcessorStateManager.java | 7 +-
.../streams/processor/internals/StandbyTask.java | 41 ++-
.../processor/internals/StandbyTaskCreator.java | 45 +---
.../streams/processor/internals/StreamTask.java | 44 +++-
.../streams/processor/internals/StreamThread.java | 9 +-
.../processor/internals/TaskExecutionMetadata.java | 10 +
.../streams/processor/internals/TaskExecutor.java | 17 +-
.../streams/processor/internals/TaskManager.java | 179 +++++++------
.../kafka/streams/processor/internals/Tasks.java | 276 +++++++++++----------
.../processor/internals/StreamThreadTest.java | 33 +--
.../processor/internals/TaskExecutorTest.java | 4 +-
.../processor/internals/TaskManagerTest.java | 60 +++--
.../streams/processor/internals/TasksTest.java | 65 -----
16 files changed, 452 insertions(+), 451 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index c64fadfe5c..f8476b3e8b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig.TaskConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateStore;
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -55,25 +57,25 @@ public abstract class AbstractTask implements Task {
protected Map<TopicPartition, Long> offsetSnapshotSinceLastFlush = null;
protected final TaskId id;
+ protected final TaskConfig config;
protected final ProcessorTopology topology;
protected final StateDirectory stateDirectory;
protected final ProcessorStateManager stateMgr;
- private final long taskTimeoutMs;
AbstractTask(final TaskId id,
final ProcessorTopology topology,
final StateDirectory stateDirectory,
final ProcessorStateManager stateMgr,
final Set<TopicPartition> inputPartitions,
- final long taskTimeoutMs,
+ final TaskConfig config,
final String taskType,
final Class<? extends AbstractTask> clazz) {
this.id = id;
this.stateMgr = stateMgr;
this.topology = topology;
+ this.config = config;
this.inputPartitions = inputPartitions;
this.stateDirectory = stateDirectory;
- this.taskTimeoutMs = taskTimeoutMs;
final String threadIdPrefix = String.format("stream-thread [%s] ",
Thread.currentThread().getName());
logPrefix = threadIdPrefix + String.format("%s [%s] ", taskType, id);
@@ -106,7 +108,7 @@ public abstract class AbstractTask implements Task {
@Override
public Set<TopicPartition> inputPartitions() {
- return inputPartitions;
+ return Collections.unmodifiableSet(inputPartitions);
}
@Override
@@ -151,7 +153,8 @@ public abstract class AbstractTask implements Task {
@Override
public void updateInputPartitions(final Set<TopicPartition>
topicPartitions, final Map<String, List<String>>
allTopologyNodesToSourceTopics) {
- this.inputPartitions = topicPartitions;
+ this.inputPartitions.clear();
+ this.inputPartitions.addAll(topicPartitions);
topology.updateSourceTopics(allTopologyNodesToSourceTopics);
}
@@ -159,12 +162,12 @@ public abstract class AbstractTask implements Task {
public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
final Exception cause) {
if (deadlineMs == NO_DEADLINE) {
- deadlineMs = currentWallClockMs + taskTimeoutMs;
+ deadlineMs = currentWallClockMs + config.taskTimeoutMs;
} else if (currentWallClockMs > deadlineMs) {
final String errorMessage = String.format(
"Task %s did not make progress within %d ms. Adjust `%s` if
needed.",
id,
- currentWallClockMs - deadlineMs + taskTimeoutMs,
+ currentWallClockMs - deadlineMs + config.taskTimeoutMs,
StreamsConfig.TASK_TIMEOUT_MS_CONFIG
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index e7832df6b1..ef21a51c99 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -28,7 +28,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -44,7 +43,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
-import static org.apache.kafka.common.utils.Utils.filterMap;
import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
@@ -68,11 +66,6 @@ class ActiveTaskCreator {
private final Map<TaskId, StreamsProducer> taskProducers;
private final ProcessingMode processingMode;
- // Tasks may have been assigned for a NamedTopology that is not yet known
by this host. When that occurs we stash
- // these unknown tasks until either the corresponding NamedTopology is
added and we can create them at last, or
- // we receive a new assignment and they are revoked from the thread.
- private final Map<TaskId, Set<TopicPartition>> unknownTasksToBeCreated =
new HashMap<>();
-
ActiveTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfig,
final StreamsMetricsImpl streamsMetrics,
@@ -142,33 +135,16 @@ class ActiveTaskCreator {
return threadProducer;
}
- void removeRevokedUnknownTasks(final Set<TaskId> assignedTasks) {
- unknownTasksToBeCreated.keySet().retainAll(assignedTasks);
- }
-
- Map<TaskId, Set<TopicPartition>> uncreatedTasksForTopologies(final
Set<String> currentTopologies) {
- return filterMap(unknownTasksToBeCreated, t ->
currentTopologies.contains(t.getKey().topologyName()));
- }
-
// TODO: change return type to `StreamTask`
public Collection<Task> createTasks(final Consumer<byte[], byte[]>
consumer,
final Map<TaskId, Set<TopicPartition>>
tasksToBeCreated) {
// TODO: change type to `StreamTask`
final List<Task> createdTasks = new ArrayList<>();
- final Map<TaskId, Set<TopicPartition>> newUnknownTasks = new
HashMap<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions
: tasksToBeCreated.entrySet()) {
final TaskId taskId = newTaskAndPartitions.getKey();
- final Set<TopicPartition> partitions =
newTaskAndPartitions.getValue();
-
final LogContext logContext = getLogContext(taskId);
-
- // task belongs to a named topology that hasn't been added yet,
wait until it has to create this
- if (taskId.topologyName() != null &&
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
- newUnknownTasks.put(taskId, partitions);
- continue;
- }
-
+ final Set<TopicPartition> partitions =
newTaskAndPartitions.getValue();
final ProcessorTopology topology =
topologyMetadata.buildSubtopology(taskId);
final ProcessorStateManager stateManager = new
ProcessorStateManager(
@@ -182,7 +158,7 @@ class ActiveTaskCreator {
partitions
);
- final InternalProcessorContext context = new ProcessorContextImpl(
+ final InternalProcessorContext<Object, Object> context = new
ProcessorContextImpl(
taskId,
applicationConfig,
stateManager,
@@ -201,44 +177,13 @@ class ActiveTaskCreator {
context
)
);
- unknownTasksToBeCreated.remove(taskId);
- }
- if (!newUnknownTasks.isEmpty()) {
- log.info("Delaying creation of tasks not yet known by this
instance: {}", newUnknownTasks.keySet());
- unknownTasksToBeCreated.putAll(newUnknownTasks);
}
return createdTasks;
}
-
- StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
- final Set<TopicPartition>
inputPartitions,
- final Consumer<byte[], byte[]>
consumer) {
- final InternalProcessorContext context =
standbyTask.processorContext();
- final ProcessorStateManager stateManager = standbyTask.stateMgr;
- final LogContext logContext = getLogContext(standbyTask.id);
-
- standbyTask.closeCleanAndRecycleState();
- stateManager.transitionTaskType(TaskType.ACTIVE, logContext);
-
- return createActiveTask(
- standbyTask.id,
- inputPartitions,
- consumer,
- logContext,
- topologyMetadata.buildSubtopology(standbyTask.id),
- stateManager,
- context
- );
- }
-
- private StreamTask createActiveTask(final TaskId taskId,
- final Set<TopicPartition>
inputPartitions,
- final Consumer<byte[], byte[]>
consumer,
- final LogContext logContext,
- final ProcessorTopology topology,
- final ProcessorStateManager
stateManager,
- final InternalProcessorContext
context) {
+ private RecordCollector createRecordCollector(final TaskId taskId,
+ final LogContext logContext,
+ final ProcessorTopology
topology) {
final StreamsProducer streamsProducer;
if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA) {
log.info("Creating producer client for task {}", taskId);
@@ -249,13 +194,14 @@ class ActiveTaskCreator {
taskId,
null,
logContext,
- time);
+ time
+ );
taskProducers.put(taskId, streamsProducer);
} else {
streamsProducer = threadProducer;
}
- final RecordCollector recordCollector = new RecordCollectorImpl(
+ return new RecordCollectorImpl(
logContext,
taskId,
streamsProducer,
@@ -263,6 +209,27 @@ class ActiveTaskCreator {
streamsMetrics,
topology
);
+ }
+
+ StreamTask createActiveTaskFromStandby(final StandbyTask standbyTask,
+ final Set<TopicPartition>
inputPartitions,
+ final Consumer<byte[], byte[]>
consumer) {
+ final RecordCollector recordCollector =
createRecordCollector(standbyTask.id, getLogContext(standbyTask.id),
standbyTask.topology);
+ final StreamTask task = standbyTask.recycle(time, cache,
recordCollector, inputPartitions, consumer);
+
+ log.trace("Created active task {} with assigned partitions {}",
task.id, inputPartitions);
+ createTaskSensor.record();
+ return task;
+ }
+
+ private StreamTask createActiveTask(final TaskId taskId,
+ final Set<TopicPartition>
inputPartitions,
+ final Consumer<byte[], byte[]>
consumer,
+ final LogContext logContext,
+ final ProcessorTopology topology,
+ final ProcessorStateManager
stateManager,
+ final InternalProcessorContext
context) {
+ final RecordCollector recordCollector = createRecordCollector(taskId,
logContext, topology);
final StreamTask task = new StreamTask(
taskId,
@@ -280,7 +247,7 @@ class ActiveTaskCreator {
logContext
);
- log.trace("Created task {} with assigned partitions {}", taskId,
inputPartitions);
+ log.trace("Created active task {} with assigned partitions {}",
taskId, inputPartitions);
createTaskSensor.record();
return task;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index f6b5d800fd..7ce538e66b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -216,8 +216,9 @@ public class PartitionGroup {
}
// creates queues for new partitions, removes old queues, saves cached
records for previously assigned partitions
- void updatePartitions(final Set<TopicPartition> newInputPartitions, final
Function<TopicPartition, RecordQueue> recordQueueCreator) {
+ void updatePartitions(final Set<TopicPartition> inputPartitions, final
Function<TopicPartition, RecordQueue> recordQueueCreator) {
final Set<TopicPartition> removedPartitions = new HashSet<>();
+ final Set<TopicPartition> newInputPartitions = new
HashSet<>(inputPartitions);
final Iterator<Map.Entry<TopicPartition, RecordQueue>> queuesIterator
= partitionQueues.entrySet().iterator();
while (queuesIterator.hasNext()) {
final Map.Entry<TopicPartition, RecordQueue> queueEntry =
queuesIterator.next();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3c8c40fc32..6efd3124ef 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -576,17 +576,12 @@ public class ProcessorStateManager implements
StateManager {
changelogReader.unregister(allChangelogs);
}
- void transitionTaskType(final TaskType newType, final LogContext
logContext) {
+ void transitionTaskType(final TaskType newType) {
if (taskType.equals(newType)) {
throw new IllegalStateException("Tried to recycle state for task
type conversion but new type was the same.");
}
- final TaskType oldType = taskType;
taskType = newType;
- log = logContext.logger(ProcessorStateManager.class);
- logPrefix = logContext.logPrefix();
-
- log.debug("Transitioning state manager for {} task {} to {}", oldType,
taskId, newType);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 670c0c4beb..102a767495 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -16,10 +16,12 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
@@ -68,7 +70,7 @@ public class StandbyTask extends AbstractTask implements Task
{
stateDirectory,
stateMgr,
inputPartitions,
- config.taskTimeoutMs,
+ config,
"standby-task",
StandbyTask.class
);
@@ -234,6 +236,43 @@ public class StandbyTask extends AbstractTask implements
Task {
log.info("Closed clean and recycled state");
}
+ /**
+ * Create an active task from this standby task without closing and
re-initializing the state stores.
+ * The task should have been in suspended state when calling this function
+ *
+ * TODO: we should be able to not need the input partitions as input param
in future but always reuse
+ * the task's input partitions when we have fixed partitions ->
tasks mapping
+ */
+ public StreamTask recycle(final Time time,
+ final ThreadCache cache,
+ final RecordCollector recordCollector,
+ final Set<TopicPartition> inputPartitions,
+ final Consumer<byte[], byte[]> mainConsumer) {
+ if (!inputPartitions.equals(this.inputPartitions)) {
+ log.warn("Detected unmatched input partitions for task {} when
recycling it from active to standby", id);
+ }
+
+ stateMgr.transitionTaskType(TaskType.ACTIVE);
+
+ log.debug("Recycling standby task {} to active", id);
+
+ return new StreamTask(
+ id,
+ inputPartitions,
+ topology,
+ mainConsumer,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ cache,
+ time,
+ stateMgr,
+ recordCollector,
+ processorContext,
+ logContext
+ );
+ }
+
private void close(final boolean clean) {
switch (state()) {
case SUSPENDED:
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index 43ebd40a35..59e2aa0285 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -29,12 +28,10 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.kafka.common.utils.Utils.filterMap;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
class StandbyTaskCreator {
@@ -47,9 +44,6 @@ class StandbyTaskCreator {
private final Logger log;
private final Sensor createTaskSensor;
- // tasks may be assigned for a NamedTopology that is not yet known by this
host, and saved for later creation
- private final Map<TaskId, Set<TopicPartition>> unknownTasksToBeCreated =
new HashMap<>();
-
StandbyTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfig,
final StreamsMetricsImpl streamsMetrics,
@@ -73,30 +67,14 @@ class StandbyTaskCreator {
);
}
- void removeRevokedUnknownTasks(final Set<TaskId> assignedTasks) {
- unknownTasksToBeCreated.keySet().retainAll(assignedTasks);
- }
-
- Map<TaskId, Set<TopicPartition>> uncreatedTasksForTopologies(final
Set<String> currentTopologies) {
- return filterMap(unknownTasksToBeCreated, t ->
currentTopologies.contains(t.getKey().topologyName()));
- }
-
// TODO: change return type to `StandbyTask`
Collection<Task> createTasks(final Map<TaskId, Set<TopicPartition>>
tasksToBeCreated) {
// TODO: change type to `StandbyTask`
final List<Task> createdTasks = new ArrayList<>();
- final Map<TaskId, Set<TopicPartition>> newUnknownTasks = new
HashMap<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions
: tasksToBeCreated.entrySet()) {
final TaskId taskId = newTaskAndPartitions.getKey();
final Set<TopicPartition> partitions =
newTaskAndPartitions.getValue();
-
- // task belongs to a named topology that hasn't been added yet,
wait until it has to create this
- if (taskId.topologyName() != null &&
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
- newUnknownTasks.put(taskId, partitions);
- continue;
- }
-
final ProcessorTopology topology =
topologyMetadata.buildSubtopology(taskId);
if (topology.hasStateWithChangelogs()) {
@@ -111,7 +89,7 @@ class StandbyTaskCreator {
partitions
);
- final InternalProcessorContext context = new
ProcessorContextImpl(
+ final InternalProcessorContext<Object, Object> context = new
ProcessorContextImpl(
taskId,
applicationConfig,
stateManager,
@@ -127,30 +105,17 @@ class StandbyTaskCreator {
taskId, partitions
);
}
- unknownTasksToBeCreated.remove(taskId);
- }
- if (!newUnknownTasks.isEmpty()) {
- log.info("Delaying creation of tasks not yet known by this
instance: {}", newUnknownTasks.keySet());
- unknownTasksToBeCreated.putAll(newUnknownTasks);
}
return createdTasks;
}
StandbyTask createStandbyTaskFromActive(final StreamTask streamTask,
final Set<TopicPartition>
inputPartitions) {
- final InternalProcessorContext context = streamTask.processorContext();
- final ProcessorStateManager stateManager = streamTask.stateMgr;
-
- streamTask.closeCleanAndRecycleState();
- stateManager.transitionTaskType(TaskType.STANDBY,
getLogContext(streamTask.id()));
+ final StandbyTask task = streamTask.recycle(inputPartitions);
- return createStandbyTask(
- streamTask.id(),
- inputPartitions,
- topologyMetadata.buildSubtopology(streamTask.id),
- stateManager,
- context
- );
+ log.trace("Created task {} with assigned partitions {}", task.id,
inputPartitions);
+ createTaskSensor.record();
+ return task;
}
StandbyTask createStandbyTask(final TaskId taskId,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 5b62e3f579..0c7ea49e76 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -127,7 +127,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
stateDirectory,
stateMgr,
inputPartitions,
- config.taskTimeoutMs,
+ config,
"task",
StreamTask.class
);
@@ -292,7 +292,6 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
partitionGroup.clear();
} finally {
transitToSuspend();
- log.info("Suspended running");
}
break;
@@ -573,6 +572,45 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
log.info("Closed clean and recycled state");
}
+ /**
+ * Create a standby task from this active task without closing and
re-initializing the state stores.
+ * The task should have been in suspended state when calling this function
+ *
+ * TODO: we should be able to not need the input partitions as input param
in future but always reuse
+ * the task's input partitions when we have fixed partitions ->
tasks mapping
+ */
+ public StandbyTask recycle(final Set<TopicPartition> inputPartitions) {
+ if (state() != Task.State.CLOSED) {
+ throw new IllegalStateException("Attempted to convert an active
task that's not closed: " + id);
+ }
+
+ if (!inputPartitions.equals(this.inputPartitions)) {
+ log.warn("Detected unmatched input partitions for task {} when
recycling it from active to standby", id);
+ }
+
+ stateMgr.transitionTaskType(TaskType.STANDBY);
+
+ final ThreadCache dummyCache = new ThreadCache(
+ new LogContext(String.format("stream-thread [%s] ",
Thread.currentThread().getName())),
+ 0,
+ streamsMetrics
+ );
+
+ log.debug("Recycling active task {} to standby", id);
+
+ return new StandbyTask(
+ id,
+ inputPartitions,
+ topology,
+ config,
+ streamsMetrics,
+ stateMgr,
+ stateDirectory,
+ dummyCache,
+ processorContext
+ );
+ }
+
/**
* The following exceptions maybe thrown from the state manager flushing
call
*
@@ -1217,7 +1255,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
}
private void transitToSuspend() {
- log.info("Suspended {}", state());
+ log.info("Suspended from {}", state());
transitionTo(State.SUSPENDED);
timeCurrentIdlingStarted = Optional.of(System.currentTimeMillis());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 8b6820fc22..74a49a7e8e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -389,7 +389,6 @@ public class StreamThread extends Thread {
changelogReader,
processId,
logPrefix,
- streamsMetrics,
activeTaskCreator,
standbyTaskCreator,
topologyMetadata,
@@ -864,7 +863,7 @@ public class StreamThread extends Thread {
if (taskManager.tryToCompleteRestoration(now, partitions ->
resetOffsets(partitions, null))) {
changelogReader.transitToUpdateStandby();
log.info("Restoration took {} ms for all tasks {}",
time.milliseconds() - lastPartitionAssignedMs,
- taskManager.tasks().keySet());
+ taskManager.allTasks().keySet());
setState(State.RUNNING);
}
@@ -1064,7 +1063,7 @@ public class StreamThread extends Thread {
}
committed = taskManager.commit(
- taskManager.tasks()
+ taskManager.allTasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state()
== Task.State.RESTORING)
@@ -1124,7 +1123,7 @@ public class StreamThread extends Thread {
// intentionally do not check the returned flag
setState(State.PENDING_SHUTDOWN);
- log.info("Shutting down");
+ log.info("Shutting down {}", cleanRun ? "clean" : "unclean");
try {
taskManager.shutdown(cleanRun);
@@ -1232,7 +1231,7 @@ public class StreamThread extends Thread {
}
public Map<TaskId, Task> allTasks() {
- return taskManager.tasks();
+ return taskManager.allTasks();
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
index 310cdef66e..48ea76b84b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutionMetadata.java
@@ -59,6 +59,16 @@ public class TaskExecutionMetadata {
}
}
+ public boolean canPunctuateTask(final Task task) {
+ final String topologyName = task.id().topologyName();
+
+ if (topologyName == null) {
+ return !pausedTopologies.contains(UNNAMED_TOPOLOGY);
+ } else {
+ return !pausedTopologies.contains(topologyName);
+ }
+ }
+
public void registerTaskError(final Task task, final Throwable t, final
long now) {
if (hasNamedTopologies) {
final String topologyName = task.id().topologyName();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index bab8a75149..5aa6eb1fe2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -82,7 +82,7 @@ public class TaskExecutor {
}
} catch (final Throwable t) {
taskExecutionMetadata.registerTaskError(task, t, now);
-
tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
+
tasks.removeTaskFromSuccessfullyProcessedBeforeClosing(lastProcessed);
commitSuccessfullyProcessedTasks();
throw t;
}
@@ -163,6 +163,7 @@ public class TaskExecutor {
task.postCommit(false);
}
}
+
return committed;
}
@@ -275,13 +276,15 @@ public class TaskExecutor {
int punctuate() {
int punctuated = 0;
- for (final Task task : tasks.notPausedActiveTasks()) {
+ for (final Task task : tasks.activeTasks()) {
try {
- if (task.maybePunctuateStreamTime()) {
- punctuated++;
- }
- if (task.maybePunctuateSystemTime()) {
- punctuated++;
+ if (taskExecutionMetadata.canPunctuateTask(task)) {
+ if (task.maybePunctuateStreamTime()) {
+ punctuated++;
+ }
+ if (task.maybePunctuateSystemTime()) {
+ punctuated++;
+ }
}
} catch (final TaskMigratedException e) {
log.info("Failed to punctuate stream task {} since it got
migrated to another thread already. " +
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index d5a0ee3d03..4ea419ab9c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -38,7 +38,6 @@ import
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.TaskId;
import
org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
import org.apache.kafka.streams.processor.internals.Task.State;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
@@ -74,14 +73,15 @@ public class TaskManager {
// by QueryableState
private final Logger log;
private final Time time;
- private final ChangelogReader changelogReader;
+ private final Tasks tasks;
private final UUID processId;
private final String logPrefix;
- private final TopologyMetadata topologyMetadata;
private final Admin adminClient;
private final StateDirectory stateDirectory;
private final ProcessingMode processingMode;
- private final Tasks tasks;
+ private final ChangelogReader changelogReader;
+ private final TopologyMetadata topologyMetadata;
+
private final TaskExecutor taskExecutor;
private Consumer<byte[], byte[]> mainConsumer;
@@ -97,7 +97,6 @@ public class TaskManager {
final ChangelogReader changelogReader,
final UUID processId,
final String logPrefix,
- final StreamsMetricsImpl streamsMetrics,
final ActiveTaskCreator activeTaskCreator,
final StandbyTaskCreator standbyTaskCreator,
final TopologyMetadata topologyMetadata,
@@ -115,7 +114,7 @@ public class TaskManager {
final LogContext logContext = new LogContext(logPrefix);
this.log = logContext.logger(getClass());
- this.tasks = new Tasks(logContext, topologyMetadata,
activeTaskCreator, standbyTaskCreator);
+ this.tasks = new Tasks(logContext, activeTaskCreator,
standbyTaskCreator);
this.taskExecutor = new TaskExecutor(
tasks,
topologyMetadata.taskExecutionMetadata(),
@@ -186,7 +185,7 @@ public class TaskManager {
// We need to commit before closing the corrupted active tasks since
this will force the ongoing txn to abort
try {
- final Collection<Task> tasksToCommit = tasks()
+ final Collection<Task> tasksToCommit = allTasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() ==
Task.State.RESTORING)
@@ -285,34 +284,52 @@ public class TaskManager {
final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions =
new LinkedHashMap<>();
final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new
HashMap<>(activeTasks);
final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new
HashMap<>(standbyTasks);
+ final Map<Task, Set<TopicPartition>> tasksToRecycle = new HashMap<>();
final Comparator<Task> byId = Comparator.comparing(Task::id);
- final Set<Task> tasksToRecycle = new TreeSet<>(byId);
final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
- // first rectify all existing tasks
+ tasks.purgePendingTasks(activeTasks.keySet(), standbyTasks.keySet());
+
+ // first rectify all existing tasks:
+ // 1. for tasks that are already owned, just update input partitions /
resume and skip re-creating them
+ // 2. for tasks that have changed active/standby status, just recycle
and skip re-creating them
+ // 3. otherwise, close them since they are no longer owned
for (final Task task : tasks.allTasks()) {
- if (activeTasks.containsKey(task.id()) && task.isActive()) {
- tasks.updateInputPartitionsAndResume(task,
activeTasks.get(task.id()));
- activeTasksToCreate.remove(task.id());
- } else if (standbyTasks.containsKey(task.id()) &&
!task.isActive()) {
- tasks.updateInputPartitionsAndResume(task,
standbyTasks.get(task.id()));
- standbyTasksToCreate.remove(task.id());
- } else if (activeTasks.containsKey(task.id()) ||
standbyTasks.containsKey(task.id())) {
- // check for tasks that were owned previously but have changed
active/standby status
- tasksToRecycle.add(task);
+ final TaskId taskId = task.id();
+ if (activeTasksToCreate.containsKey(taskId)) {
+ if (task.isActive()) {
+ final Set<TopicPartition> topicPartitions =
activeTasksToCreate.get(taskId);
+ if (tasks.updateActiveTaskInputPartitions(task,
topicPartitions)) {
+ task.updateInputPartitions(topicPartitions,
topologyMetadata.nodeToSourceTopics(task.id()));
+ }
+ task.resume();
+ } else {
+ tasksToRecycle.put(task, activeTasksToCreate.get(taskId));
+ }
+ activeTasksToCreate.remove(taskId);
+ } else if (standbyTasksToCreate.containsKey(taskId)) {
+ if (!task.isActive()) {
+ final Set<TopicPartition> topicPartitions =
standbyTasksToCreate.get(taskId);
+ task.updateInputPartitions(topicPartitions,
topologyMetadata.nodeToSourceTopics(task.id()));
+ task.resume();
+ } else {
+ tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
+ }
+ standbyTasksToCreate.remove(taskId);
} else {
tasksToCloseClean.add(task);
}
}
+ tasks.addActivePendingTasks(pendingTasksToCreate(activeTasksToCreate));
+
tasks.addStandbyPendingTasks(pendingTasksToCreate(standbyTasksToCreate));
+
// close and recycle those tasks
- handleCloseAndRecycle(
+ closeAndRecycleTasks(
tasksToRecycle,
tasksToCloseClean,
tasksToCloseDirty,
- activeTasksToCreate,
- standbyTasksToCreate,
taskCloseExceptions
);
@@ -346,22 +363,34 @@ public class TaskManager {
throw first.getValue();
}
- tasks.handleNewAssignmentAndCreateTasks(activeTasksToCreate,
standbyTasksToCreate, activeTasks.keySet(), standbyTasks.keySet());
+ tasks.createTasks(activeTasksToCreate, standbyTasksToCreate);
+ }
+
+ private Map<TaskId, Set<TopicPartition>> pendingTasksToCreate(final
Map<TaskId, Set<TopicPartition>> tasksToCreate) {
+ final Map<TaskId, Set<TopicPartition>> pendingTasks = new HashMap<>();
+ final Iterator<Map.Entry<TaskId, Set<TopicPartition>>> iter =
tasksToCreate.entrySet().iterator();
+ while (iter.hasNext()) {
+ final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
+ final TaskId taskId = entry.getKey();
+ if (taskId.topologyName() != null &&
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
+ pendingTasks.put(taskId, entry.getValue());
+ iter.remove();
+ }
+ }
+ return pendingTasks;
}
- private void handleCloseAndRecycle(final Set<Task> tasksToRecycle,
- final Set<Task> tasksToCloseClean,
- final Set<Task> tasksToCloseDirty,
- final Map<TaskId, Set<TopicPartition>>
activeTasksToCreate,
- final Map<TaskId, Set<TopicPartition>>
standbyTasksToCreate,
- final LinkedHashMap<TaskId,
RuntimeException> taskCloseExceptions) {
+ private void closeAndRecycleTasks(final Map<Task, Set<TopicPartition>>
tasksToRecycle,
+ final Set<Task> tasksToCloseClean,
+ final Set<Task> tasksToCloseDirty,
+ final LinkedHashMap<TaskId,
RuntimeException> taskCloseExceptions) {
if (!tasksToCloseDirty.isEmpty()) {
throw new IllegalArgumentException("Tasks to close-dirty should be
empty");
}
// for all tasks to close or recycle, we should first write a
checkpoint as in post-commit
final List<Task> tasksToCheckpoint = new
ArrayList<>(tasksToCloseClean);
- tasksToCheckpoint.addAll(tasksToRecycle);
+ tasksToCheckpoint.addAll(tasksToRecycle.keySet());
for (final Task task : tasksToCheckpoint) {
try {
// Note that we are not actually committing here but just
check if we need to write checkpoint file:
@@ -399,29 +428,29 @@ public class TaskManager {
tasksToCloseClean.removeAll(tasksToCloseDirty);
for (final Task task : tasksToCloseClean) {
try {
- completeTaskCloseClean(task);
- if (task.isActive()) {
- tasks.cleanUpTaskProducerAndRemoveTask(task.id(),
taskCloseExceptions);
+ final RuntimeException removeTaskException =
completeTaskCloseClean(task);
+ if (removeTaskException != null) {
+ taskCloseExceptions.putIfAbsent(task.id(),
removeTaskException);
}
- } catch (final RuntimeException e) {
+ } catch (final RuntimeException closeTaskException) {
final String uncleanMessage = String.format(
"Failed to close task %s cleanly. Attempting to close
remaining tasks before re-throwing:",
task.id());
- log.error(uncleanMessage, e);
- taskCloseExceptions.putIfAbsent(task.id(), e);
+ log.error(uncleanMessage, closeTaskException);
+ taskCloseExceptions.putIfAbsent(task.id(), closeTaskException);
tasksToCloseDirty.add(task);
}
}
- tasksToRecycle.removeAll(tasksToCloseDirty);
- for (final Task oldTask : tasksToRecycle) {
+ tasksToRecycle.keySet().removeAll(tasksToCloseDirty);
+ for (final Map.Entry<Task, Set<TopicPartition>> entry :
tasksToRecycle.entrySet()) {
+ final Task oldTask = entry.getKey();
try {
+ oldTask.closeCleanAndRecycleState();
if (oldTask.isActive()) {
- final Set<TopicPartition> partitions =
standbyTasksToCreate.remove(oldTask.id());
- tasks.convertActiveToStandby((StreamTask) oldTask,
partitions, taskCloseExceptions);
+ tasks.convertActiveToStandby((StreamTask) oldTask,
entry.getValue(), taskCloseExceptions);
} else {
- final Set<TopicPartition> partitions =
activeTasksToCreate.remove(oldTask.id());
- tasks.convertStandbyToActive((StandbyTask) oldTask,
partitions);
+ tasks.convertStandbyToActive((StandbyTask) oldTask,
entry.getValue());
}
} catch (final RuntimeException e) {
final String uncleanMessage = String.format("Failed to recycle
task %s cleanly. Attempting to close remaining tasks before re-throwing:",
oldTask.id());
@@ -434,7 +463,6 @@ public class TaskManager {
// for tasks that cannot be cleanly closed or recycled, close them
dirty
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task);
- tasks.cleanUpTaskProducerAndRemoveTask(task.id(),
taskCloseExceptions);
}
}
@@ -646,14 +674,12 @@ public class TaskManager {
void handleLostAll() {
log.debug("Closing lost active tasks as zombies.");
- final Set<Task> allTask = new HashSet<>(tasks.allTasks());
+ final Set<Task> allTask = tasks.allTasks();
for (final Task task : allTask) {
// Even though we've apparently dropped out of the group, we can
continue safely to maintain our
// standby tasks while we rejoin.
if (task.isActive()) {
closeTaskDirty(task);
-
- tasks.cleanUpTaskProducerAndRemoveTask(task.id(), new
HashMap<>());
}
}
@@ -673,7 +699,7 @@ public class TaskManager {
// Not all tasks will create directories, and there may be directories
for tasks we don't currently own,
// so we consider all tasks that are either owned or on disk. This
includes stateless tasks, which should
// just have an empty changelogOffsets map.
- for (final TaskId id : union(HashSet::new, lockedTaskDirectories,
tasks.tasksPerId().keySet())) {
+ for (final TaskId id : union(HashSet::new, lockedTaskDirectories,
tasks.allTaskIds())) {
final Task task = tasks.owned(id) ? tasks.task(id) : null;
// Closed and uninitialized tasks don't have any offsets so we
should read directly from the checkpoint
if (task != null && task.state() != State.CREATED && task.state()
!= State.CLOSED) {
@@ -796,15 +822,27 @@ public class TaskManager {
try {
task.suspend();
} catch (final RuntimeException swallow) {
- log.error("Error suspending dirty task {} ", task.id(), swallow);
+ log.error("Error suspending dirty task {}: {}", task.id(),
swallow.getMessage());
}
- tasks.removeTaskBeforeClosing(task.id());
+
task.closeDirty();
+
+ try {
+ tasks.removeTask(task);
+ } catch (final RuntimeException swallow) {
+ log.error("Error removing dirty task {}: {}", task.id(),
swallow.getMessage());
+ }
}
- private void completeTaskCloseClean(final Task task) {
- tasks.removeTaskBeforeClosing(task.id());
+ private RuntimeException completeTaskCloseClean(final Task task) {
task.closeClean();
+ try {
+ tasks.removeTask(task);
+ } catch (final RuntimeException e) {
+ log.error("Error removing active task {}: {}", task.id(),
e.getMessage());
+ return e;
+ }
+ return null;
}
void shutdown(final boolean clean) {
@@ -859,16 +897,6 @@ public class TaskManager {
closeTaskDirty(task);
}
- // TODO: change type to `StreamTask`
- for (final Task activeTask : activeTasks) {
- executeAndMaybeSwallow(
- clean,
- () -> tasks.closeAndRemoveTaskProducerIfNeeded(activeTask),
- e -> firstException.compareAndSet(null, e),
- e -> log.warn("Ignoring an exception while closing task " +
activeTask.id() + " producer.", e)
- );
- }
-
final RuntimeException exception = firstException.get();
if (exception != null) {
throw exception;
@@ -957,7 +985,10 @@ public class TaskManager {
for (final Task task : tasksToCloseClean) {
try {
task.suspend();
- completeTaskCloseClean(task);
+ final RuntimeException exception =
completeTaskCloseClean(task);
+ if (exception != null) {
+ firstException.compareAndSet(null, exception);
+ }
} catch (final StreamsException e) {
log.error("Exception caught while clean-closing task " +
task.id(), e);
e.setTaskId(task.id());
@@ -988,7 +1019,10 @@ public class TaskManager {
task.prepareCommit();
task.postCommit(true);
task.suspend();
- completeTaskCloseClean(task);
+ final RuntimeException exception =
completeTaskCloseClean(task);
+ if (exception != null) {
+ maybeWrapAndSetFirstException(firstException, exception,
task.id());
+ }
} catch (final TaskMigratedException e) {
// just ignore the exception as it doesn't matter during
shutdown
tasksToCloseDirty.add(task);
@@ -1012,14 +1046,16 @@ public class TaskManager {
.collect(Collectors.toSet());
}
- Map<TaskId, Task> tasks() {
+ Map<TaskId, Task> allTasks() {
// not bothering with an unmodifiable map, since the tasks themselves
are mutable, but
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
- return tasks.tasksPerId();
+ return tasks.allTasksPerId();
}
Map<TaskId, Task> notPausedTasks() {
- return Collections.unmodifiableMap(tasks.notPausedTasks().stream()
+ return Collections.unmodifiableMap(tasks.allTasks()
+ .stream()
+ .filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
.collect(Collectors.toMap(Task::id, v -> v)));
}
@@ -1049,7 +1085,7 @@ public class TaskManager {
// For testing only.
int commitAll() {
- return commit(new HashSet<>(tasks.allTasks()));
+ return commit(tasks.allTasks());
}
/**
@@ -1135,7 +1171,7 @@ public class TaskManager {
*/
void handleTopologyUpdates() {
topologyMetadata.executeTopologyUpdatesAndBumpThreadVersion(
- tasks::maybeCreateTasksFromNewTopologies,
+ tasks::createPendingTasks,
this::maybeCloseTasksFromRemovedTopologies
);
@@ -1161,11 +1197,10 @@ public class TaskManager {
}
}
- final Set<TaskId> allRemovedTasks =
- union(HashSet::new, activeTasksToRemove,
standbyTasksToRemove).stream().map(Task::id).collect(Collectors.toSet());
+ final Set<Task> allTasksToRemove = union(HashSet::new,
activeTasksToRemove, standbyTasksToRemove);
closeAndCleanUpTasks(activeTasksToRemove, standbyTasksToRemove,
true);
- allRemovedTasks.forEach(tasks::removeTaskBeforeClosing);
- releaseLockedDirectoriesForTasks(allRemovedTasks);
+ allTasksToRemove.forEach(tasks::removeTask);
+
releaseLockedDirectoriesForTasks(allTasksToRemove.stream().map(Task::id).collect(Collectors.toSet()));
} catch (final Exception e) {
// TODO KAFKA-12648: for now just swallow the exception to avoid
interfering with the other topologies
// that are running alongside, but eventually we should be able
to rethrow up to the handler to inform
@@ -1300,7 +1335,7 @@ public class TaskManager {
}
boolean needsInitializationOrRestoration() {
- return
tasks().values().stream().anyMatch(Task::needsInitializationOrRestoration);
+ return
activeTaskIterable().stream().anyMatch(Task::needsInitializationOrRestoration);
}
// for testing only
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index ca5481b67b..fbb45c5940 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -34,29 +33,40 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
+import static org.apache.kafka.common.utils.Utils.filterMap;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * All tasks contained by the Streams instance.
+ *
+ * Note that these tasks are shared between the TaskManager (stream thread)
and the StateUpdater (restore thread),
+ * i.e. all running active tasks are processed by the former and all restoring
active tasks and standby tasks are
+ * processed by the latter.
+ */
class Tasks {
private final Logger log;
- private final TopologyMetadata topologyMetadata;
-
- private final Map<TaskId, Task> allTasksPerId =
Collections.synchronizedSortedMap(new TreeMap<>());
- private final Map<TaskId, Task> readOnlyTasksPerId =
Collections.unmodifiableMap(allTasksPerId);
- private final Collection<Task> readOnlyTasks =
Collections.unmodifiableCollection(allTasksPerId.values());
// TODO: change type to `StreamTask`
private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
+ // TODO: change type to `StandbyTask`
+ private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
+
+ // Tasks may have been assigned for a NamedTopology that is not yet known
by this host. When that occurs we stash
+ // these unknown tasks until either the corresponding NamedTopology is
added and we can create them at last, or
+ // we receive a new assignment and they are revoked from the thread.
+
+ // Tasks may have been assigned but not yet created because:
+ // 1. They are for a NamedTopology that is yet known by this host.
+ // 2. They are to be recycled from an existing restoring task yet to be
returned from the state updater.
+ //
+ // When that occurs we stash these pending tasks until either they are
finally clear to be created,
+ // or they are revoked from a new assignment.
+ private final Map<TaskId, Set<TopicPartition>> pendingActiveTasks = new
HashMap<>();
+ private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasks = new
HashMap<>();
+
// TODO: change type to `StreamTask`
private final Map<TopicPartition, Task> activeTasksPerPartition = new
HashMap<>();
- // TODO: change type to `StreamTask`
- private final Map<TaskId, Task> readOnlyActiveTasksPerId =
Collections.unmodifiableMap(activeTasksPerId);
- private final Set<TaskId> readOnlyActiveTaskIds =
Collections.unmodifiableSet(activeTasksPerId.keySet());
- // TODO: change type to `StreamTask`
- private final Collection<Task> readOnlyActiveTasks =
Collections.unmodifiableCollection(activeTasksPerId.values());
- // TODO: change type to `StandbyTask`
- private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();
- // TODO: change type to `StandbyTask`
- private final Map<TaskId, Task> readOnlyStandbyTasksPerId =
Collections.unmodifiableMap(standbyTasksPerId);
- private final Set<TaskId> readOnlyStandbyTaskIds =
Collections.unmodifiableSet(standbyTasksPerId.keySet());
private final Collection<Task> successfullyProcessed = new HashSet<>();
private final ActiveTaskCreator activeTaskCreator;
@@ -65,13 +75,10 @@ class Tasks {
private Consumer<byte[], byte[]> mainConsumer;
Tasks(final LogContext logContext,
- final TopologyMetadata topologyMetadata,
final ActiveTaskCreator activeTaskCreator,
final StandbyTaskCreator standbyTaskCreator) {
- log = logContext.logger(getClass());
-
- this.topologyMetadata = topologyMetadata;
+ this.log = logContext.logger(getClass());
this.activeTaskCreator = activeTaskCreator;
this.standbyTaskCreator = standbyTaskCreator;
}
@@ -80,88 +87,131 @@ class Tasks {
this.mainConsumer = mainConsumer;
}
- void handleNewAssignmentAndCreateTasks(final Map<TaskId,
Set<TopicPartition>> activeTasksToCreate,
- final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
- final Set<TaskId>
assignedActiveTasks,
- final Set<TaskId>
assignedStandbyTasks) {
- activeTaskCreator.removeRevokedUnknownTasks(assignedActiveTasks);
- standbyTaskCreator.removeRevokedUnknownTasks(assignedStandbyTasks);
- createTasks(activeTasksToCreate, standbyTasksToCreate);
+ void purgePendingTasks(final Set<TaskId> assignedActiveTasks, final
Set<TaskId> assignedStandbyTasks) {
+ pendingActiveTasks.keySet().retainAll(assignedActiveTasks);
+ pendingStandbyTasks.keySet().retainAll(assignedStandbyTasks);
}
- void maybeCreateTasksFromNewTopologies(final Set<String>
currentNamedTopologies) {
+ void addActivePendingTasks(final Map<TaskId, Set<TopicPartition>>
pendingTasks) {
+ pendingActiveTasks.putAll(pendingTasks);
+ }
+
+ void addStandbyPendingTasks(final Map<TaskId, Set<TopicPartition>>
pendingTasks) {
+ pendingStandbyTasks.putAll(pendingTasks);
+ }
+
+ void createPendingTasks(final Set<String> currentNamedTopologies) {
createTasks(
-
activeTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies),
-
standbyTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies)
+ pendingActiveTasksForTopologies(currentNamedTopologies),
+ pendingStandbyTasksForTopologies(currentNamedTopologies)
);
}
- double totalProducerBlockedTime() {
- return activeTaskCreator.totalProducerBlockedTime();
+ private Map<TaskId, Set<TopicPartition>>
pendingActiveTasksForTopologies(final Set<String> currentTopologies) {
+ return filterMap(pendingActiveTasks, t ->
currentTopologies.contains(t.getKey().topologyName()));
+ }
+
+ private Map<TaskId, Set<TopicPartition>>
pendingStandbyTasksForTopologies(final Set<String> currentTopologies) {
+ return filterMap(pendingStandbyTasks, t ->
currentTopologies.contains(t.getKey().topologyName()));
}
void createTasks(final Map<TaskId, Set<TopicPartition>>
activeTasksToCreate,
final Map<TaskId, Set<TopicPartition>>
standbyTasksToCreate) {
+ createActiveTasks(activeTasksToCreate);
+ createStandbyTasks(standbyTasksToCreate);
+ }
+
+ private void createActiveTasks(final Map<TaskId, Set<TopicPartition>>
activeTasksToCreate) {
for (final Map.Entry<TaskId, Set<TopicPartition>> taskToBeCreated :
activeTasksToCreate.entrySet()) {
final TaskId taskId = taskToBeCreated.getKey();
if (activeTasksPerId.containsKey(taskId)) {
throw new IllegalStateException("Attempted to create an active
task that we already own: " + taskId);
}
- }
-
- for (final Map.Entry<TaskId, Set<TopicPartition>> taskToBeCreated :
standbyTasksToCreate.entrySet()) {
- final TaskId taskId = taskToBeCreated.getKey();
- if (standbyTasksPerId.containsKey(taskId)) {
- throw new IllegalStateException("Attempted to create a standby
task that we already own: " + taskId);
+ if (pendingStandbyTasks.containsKey(taskId)) {
+ throw new IllegalStateException("Attempted to create an active
task while we already own its standby: " + taskId);
}
}
- // keep this check to simplify testing (ie, no need to mock
`activeTaskCreator`)
if (!activeTasksToCreate.isEmpty()) {
- // TODO: change type to `StreamTask`
for (final Task activeTask :
activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) {
activeTasksPerId.put(activeTask.id(), activeTask);
- allTasksPerId.put(activeTask.id(), activeTask);
+ pendingActiveTasks.remove(activeTask.id());
for (final TopicPartition topicPartition :
activeTask.inputPartitions()) {
activeTasksPerPartition.put(topicPartition, activeTask);
}
}
}
+ }
+
+ private void createStandbyTasks(final Map<TaskId, Set<TopicPartition>>
standbyTasksToCreate) {
+ for (final Map.Entry<TaskId, Set<TopicPartition>> taskToBeCreated :
standbyTasksToCreate.entrySet()) {
+ final TaskId taskId = taskToBeCreated.getKey();
+
+ if (standbyTasksPerId.containsKey(taskId)) {
+ throw new IllegalStateException("Attempted to create an active
task that we already own: " + taskId);
+ }
+
+ if (pendingActiveTasks.containsKey(taskId)) {
+ throw new IllegalStateException("Attempted to create an active
task while we already own its standby: " + taskId);
+ }
+ }
- // keep this check to simplify testing (ie, no need to mock
`standbyTaskCreator`)
if (!standbyTasksToCreate.isEmpty()) {
- // TODO: change type to `StandbyTask`
for (final Task standbyTask :
standbyTaskCreator.createTasks(standbyTasksToCreate)) {
standbyTasksPerId.put(standbyTask.id(), standbyTask);
- allTasksPerId.put(standbyTask.id(), standbyTask);
+ pendingActiveTasks.remove(standbyTask.id());
}
}
}
+ void removeTask(final Task taskToRemove) {
+ final TaskId taskId = taskToRemove.id();
+
+ if (taskToRemove.state() != Task.State.CLOSED) {
+ throw new IllegalStateException("Attempted to remove a task that
is not closed: " + taskId);
+ }
+
+ if (taskToRemove.isActive()) {
+ if (activeTasksPerId.remove(taskId) == null) {
+ throw new IllegalArgumentException("Attempted to remove an
active task that is not owned: " + taskId);
+ }
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId);
+ removePartitionsForActiveTask(taskId);
+ pendingActiveTasks.remove(taskId);
+ } else {
+ if (standbyTasksPerId.remove(taskId) == null) {
+ throw new IllegalArgumentException("Attempted to remove a
standby task that is not owned: " + taskId);
+ }
+ pendingStandbyTasks.remove(taskId);
+ }
+ }
+
void convertActiveToStandby(final StreamTask activeTask,
final Set<TopicPartition> partitions,
final Map<TaskId, RuntimeException>
taskCloseExceptions) {
- if (activeTasksPerId.remove(activeTask.id()) == null) {
- throw new IllegalStateException("Attempted to convert unknown
active task to standby task: " + activeTask.id());
+ final TaskId taskId = activeTask.id();
+ if (activeTasksPerId.remove(taskId) == null) {
+ throw new IllegalStateException("Attempted to convert unknown
active task to standby task: " + taskId);
}
- final Set<TopicPartition> toBeRemoved =
activeTasksPerPartition.entrySet().stream()
- .filter(e -> e.getValue().id().equals(activeTask.id()))
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
- toBeRemoved.forEach(activeTasksPerPartition::remove);
+ removePartitionsForActiveTask(taskId);
- cleanUpTaskProducerAndRemoveTask(activeTask.id(), taskCloseExceptions);
+ try {
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId);
+ } catch (final RuntimeException e) {
+ taskCloseExceptions.putIfAbsent(taskId, e);
+ }
final StandbyTask standbyTask =
standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
standbyTasksPerId.put(standbyTask.id(), standbyTask);
- allTasksPerId.put(standbyTask.id(), standbyTask);
}
- void convertStandbyToActive(final StandbyTask standbyTask, final
Set<TopicPartition> partitions) {
- if (standbyTasksPerId.remove(standbyTask.id()) == null) {
- throw new IllegalStateException("Attempted to convert unknown
standby task to stream task: " + standbyTask.id());
+ void convertStandbyToActive(final StandbyTask standbyTask,
+ final Set<TopicPartition> partitions) {
+ final TaskId taskId = standbyTask.id();
+ if (standbyTasksPerId.remove(taskId) == null) {
+ throw new IllegalStateException("Attempted to convert unknown
standby task to stream task: " + taskId);
}
final StreamTask activeTask =
activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions,
mainConsumer);
@@ -169,36 +219,23 @@ class Tasks {
for (final TopicPartition topicPartition :
activeTask.inputPartitions()) {
activeTasksPerPartition.put(topicPartition, activeTask);
}
- allTasksPerId.put(activeTask.id(), activeTask);
}
- void updateInputPartitionsAndResume(final Task task, final
Set<TopicPartition> topicPartitions) {
+ boolean updateActiveTaskInputPartitions(final Task task, final
Set<TopicPartition> topicPartitions) {
final boolean requiresUpdate =
!task.inputPartitions().equals(topicPartitions);
if (requiresUpdate) {
log.debug("Update task {} inputPartitions: current {}, new {}",
task, task.inputPartitions(), topicPartitions);
- for (final TopicPartition inputPartition : task.inputPartitions())
{
- activeTasksPerPartition.remove(inputPartition);
- }
if (task.isActive()) {
+ for (final TopicPartition inputPartition :
task.inputPartitions()) {
+ activeTasksPerPartition.remove(inputPartition);
+ }
for (final TopicPartition topicPartition : topicPartitions) {
activeTasksPerPartition.put(topicPartition, task);
}
}
- task.updateInputPartitions(topicPartitions,
topologyMetadata.nodeToSourceTopics(task.id()));
}
- task.resume();
- }
- void cleanUpTaskProducerAndRemoveTask(final TaskId taskId,
- final Map<TaskId, RuntimeException>
taskCloseExceptions) {
- try {
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId);
- } catch (final RuntimeException e) {
- final String uncleanMessage = String.format("Failed to close task
%s cleanly. Attempting to close remaining tasks before re-throwing:", taskId);
- log.error(uncleanMessage, e);
- taskCloseExceptions.putIfAbsent(taskId, e);
- }
- removeTaskBeforeClosing(taskId);
+ return requiresUpdate;
}
void reInitializeThreadProducer() {
@@ -209,27 +246,18 @@ class Tasks {
activeTaskCreator.closeThreadProducerIfNeeded();
}
- // TODO: change type to `StreamTask`
- void closeAndRemoveTaskProducerIfNeeded(final Task activeTask) {
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
- }
-
- void removeTaskBeforeClosing(final TaskId taskId) {
- activeTasksPerId.remove(taskId);
+ private void removePartitionsForActiveTask(final TaskId taskId) {
final Set<TopicPartition> toBeRemoved =
activeTasksPerPartition.entrySet().stream()
.filter(e -> e.getValue().id().equals(taskId))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
toBeRemoved.forEach(activeTasksPerPartition::remove);
- standbyTasksPerId.remove(taskId);
- allTasksPerId.remove(taskId);
}
void clear() {
activeTasksPerId.clear();
- activeTasksPerPartition.clear();
standbyTasksPerId.clear();
- allTasksPerId.clear();
+ activeTasksPerPartition.clear();
}
// TODO: change return type to `StreamTask`
@@ -237,19 +265,23 @@ class Tasks {
return activeTasksPerPartition.get(partition);
}
- // TODO: change return type to `StandbyTask`
- Task standbyTask(final TaskId taskId) {
- if (!standbyTasksPerId.containsKey(taskId)) {
- throw new IllegalStateException("Standby task unknown: " + taskId);
+ private Task getTask(final TaskId taskId) {
+ if (activeTasksPerId.containsKey(taskId)) {
+ return activeTasksPerId.get(taskId);
+ }
+ if (standbyTasksPerId.containsKey(taskId)) {
+ return standbyTasksPerId.get(taskId);
}
- return standbyTasksPerId.get(taskId);
+ return null;
}
Task task(final TaskId taskId) {
- if (!allTasksPerId.containsKey(taskId)) {
+ final Task task = getTask(taskId);
+
+ if (task != null)
+ return task;
+ else
throw new IllegalStateException("Task unknown: " + taskId);
- }
- return allTasksPerId.get(taskId);
}
Collection<Task> tasks(final Collection<TaskId> taskIds) {
@@ -262,51 +294,30 @@ class Tasks {
// TODO: change return type to `StreamTask`
Collection<Task> activeTasks() {
- return readOnlyActiveTasks;
- }
-
- Collection<Task> allTasks() {
- return readOnlyTasks;
- }
-
- Collection<Task> notPausedActiveTasks() {
- return new ArrayList<>(readOnlyActiveTasks)
- .stream()
- .filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
- .collect(Collectors.toList());
+ return Collections.unmodifiableCollection(activeTasksPerId.values());
}
- Collection<Task> notPausedTasks() {
- return new ArrayList<>(readOnlyTasks)
- .stream()
- .filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
- .collect(Collectors.toList());
+ /**
+ * All tasks returned by any of the getters are read-only and should NOT
be modified;
+ * and the returned task could be modified by other threads concurrently
+ */
+ Set<Task> allTasks() {
+ return union(HashSet::new, new HashSet<>(activeTasksPerId.values()),
new HashSet<>(standbyTasksPerId.values()));
}
- Set<TaskId> activeTaskIds() {
- return readOnlyActiveTaskIds;
+ Set<TaskId> allTaskIds() {
+ return union(HashSet::new, activeTasksPerId.keySet(),
standbyTasksPerId.keySet());
}
- Set<TaskId> standbyTaskIds() {
- return readOnlyStandbyTaskIds;
- }
-
- // TODO: change return type to `StreamTask`
- Map<TaskId, Task> activeTaskMap() {
- return readOnlyActiveTasksPerId;
- }
-
- // TODO: change return type to `StandbyTask`
- Map<TaskId, Task> standbyTaskMap() {
- return readOnlyStandbyTasksPerId;
- }
-
- Map<TaskId, Task> tasksPerId() {
- return readOnlyTasksPerId;
+ Map<TaskId, Task> allTasksPerId() {
+ final Map<TaskId, Task> ret = new HashMap<>();
+ ret.putAll(activeTasksPerId);
+ ret.putAll(standbyTasksPerId);
+ return ret;
}
boolean owned(final TaskId taskId) {
- return allTasksPerId.containsKey(taskId);
+ return getTask(taskId) != null;
}
StreamsProducer streamsProducerForTask(final TaskId taskId) {
@@ -337,7 +348,7 @@ class Tasks {
successfullyProcessed.add(task);
}
- void removeTaskFromCuccessfullyProcessedBeforeClosing(final Task task) {
+ void removeTaskFromSuccessfullyProcessedBeforeClosing(final Task task) {
successfullyProcessed.remove(task);
}
@@ -345,6 +356,10 @@ class Tasks {
successfullyProcessed.clear();
}
+ double totalProducerBlockedTime() {
+ return activeTaskCreator.totalProducerBlockedTime();
+ }
+
// for testing only
void addTask(final Task task) {
if (task.isActive()) {
@@ -352,6 +367,5 @@ class Tasks {
} else {
standbyTasksPerId.put(task.id(), task);
}
- allTasksPerId.put(task.id(), task);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index a43b0793a2..2c096789f2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -754,7 +754,6 @@ public class StreamThreadTest {
null,
null,
null,
- null,
topologyMetadata,
null,
null
@@ -839,12 +838,8 @@ public class StreamThreadTest {
final ActiveTaskCreator activeTaskCreator =
mock(ActiveTaskCreator.class);
expect(activeTaskCreator.createTasks(anyObject(),
anyObject())).andStubReturn(Collections.singleton(task));
expect(activeTaskCreator.producerClientIds()).andStubReturn(Collections.singleton("producerClientId"));
-
expect(activeTaskCreator.uncreatedTasksForTopologies(anyObject())).andStubReturn(emptyMap());
- activeTaskCreator.removeRevokedUnknownTasks(singleton(task1));
final StandbyTaskCreator standbyTaskCreator =
mock(StandbyTaskCreator.class);
-
expect(standbyTaskCreator.uncreatedTasksForTopologies(anyObject())).andStubReturn(emptyMap());
- standbyTaskCreator.removeRevokedUnknownTasks(emptySet());
EasyMock.replay(consumer, consumerGroupMetadata, task,
activeTaskCreator, standbyTaskCreator);
@@ -858,7 +853,6 @@ public class StreamThreadTest {
null,
null,
null,
- null,
activeTaskCreator,
standbyTaskCreator,
topologyMetadata,
@@ -1047,19 +1041,11 @@ public class StreamThreadTest {
final StreamThread thread = createStreamThread(CLIENT_ID, new
StreamsConfig(configProps(true)), true);
- thread.start();
- TestUtils.waitForCondition(
- () -> thread.state() == StreamThread.State.STARTING,
- 10 * 1000,
- "Thread never started.");
-
-
thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
thread.taskManager().handleRebalanceStart(Collections.singleton(topic1));
+ // assign single partition
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final List<TopicPartition> assignedPartitions = new ArrayList<>();
-
- // assign single partition
assignedPartitions.add(t1p1);
assignedPartitions.add(t1p2);
activeTasks.put(task1, Collections.singleton(t1p1));
@@ -1067,11 +1053,18 @@ public class StreamThreadTest {
thread.taskManager().handleAssignment(activeTasks, emptyMap());
+ thread.start();
+ TestUtils.waitForCondition(
+ () -> thread.state() == StreamThread.State.STARTING,
+ 10 * 1000,
+ "Thread never started.");
+
thread.shutdown();
// even if thread is no longer running, it should still be polling
// as long as the rebalance is still ongoing
assertFalse(thread.isRunning());
+ assertTrue(thread.isAlive());
Thread.sleep(1000);
assertEquals(Utils.mkSet(task1, task2),
thread.taskManager().activeTaskIds());
@@ -2657,7 +2650,7 @@ public class StreamThreadTest {
expect(task3.state()).andReturn(Task.State.CREATED).anyTimes();
expect(task3.id()).andReturn(taskId3).anyTimes();
- expect(taskManager.tasks()).andReturn(mkMap(
+ expect(taskManager.allTasks()).andReturn(mkMap(
mkEntry(taskId1, task1),
mkEntry(taskId2, task2),
mkEntry(taskId3, task3)
@@ -2922,7 +2915,7 @@ public class StreamThreadTest {
expect(runningTask.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(runningTask.id()).andReturn(taskId).anyTimes();
- expect(taskManager.tasks())
+ expect(taskManager.allTasks())
.andReturn(Collections.singletonMap(taskId,
runningTask)).anyTimes();
expect(taskManager.commit(Collections.singleton(runningTask))).andReturn(1).anyTimes();
taskManager.maybePurgeCommittedRecords();
@@ -2940,7 +2933,7 @@ public class StreamThreadTest {
expect(runningTask.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(runningTask.id()).andReturn(taskId).anyTimes();
- expect(taskManager.tasks())
+ expect(taskManager.allTasks())
.andReturn(Collections.singletonMap(taskId,
runningTask)).times(numberOfCommits);
expect(taskManager.commit(Collections.singleton(runningTask))).andReturn(commits).times(numberOfCommits);
EasyMock.replay(taskManager, runningTask);
@@ -2997,7 +2990,7 @@ public class StreamThreadTest {
}
StreamTask activeTask(final TaskManager taskManager, final TopicPartition
partition) {
- final Stream<Task> standbys =
taskManager.tasks().values().stream().filter(t -> t.isActive());
+ final Stream<Task> standbys =
taskManager.allTasks().values().stream().filter(Task::isActive);
for (final Task task : (Iterable<Task>) standbys::iterator) {
if (task.inputPartitions().contains(partition)) {
return (StreamTask) task;
@@ -3006,7 +2999,7 @@ public class StreamThreadTest {
return null;
}
StandbyTask standbyTask(final TaskManager taskManager, final
TopicPartition partition) {
- final Stream<Task> standbys =
taskManager.tasks().values().stream().filter(t -> !t.isActive());
+ final Stream<Task> standbys =
taskManager.allTasks().values().stream().filter(t -> !t.isActive());
for (final Task task : (Iterable<Task>) standbys::iterator) {
if (task.inputPartitions().contains(partition)) {
return (StandbyTask) task;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
index a44970238a..88ee70e57e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -22,7 +22,6 @@ import
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.junit.Test;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
public class TaskExecutorTest {
@@ -35,7 +34,6 @@ public class TaskExecutorTest {
new TaskExecutor(tasks, metadata, ProcessingMode.AT_LEAST_ONCE,
false, new LogContext());
taskExecutor.punctuate();
- verify(tasks).notPausedActiveTasks();
- verify(tasks, never()).notPausedTasks();
+ verify(tasks).activeTasks();
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 073dede23f..b3ffb29b1a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -33,10 +33,9 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
@@ -47,7 +46,6 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import
org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
import org.apache.kafka.streams.processor.internals.Task.State;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import
org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
import
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -93,6 +91,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.union;
+import static
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.eq;
@@ -172,6 +171,7 @@ public class TaskManagerTest {
private Admin adminClient;
private TaskManager taskManager;
+ private TopologyMetadata topologyMetadata;
private final Time time = new MockTime();
@Rule
@@ -183,25 +183,22 @@ public class TaskManagerTest {
}
private void setUpTaskManager(final StreamsConfigUtils.ProcessingMode
processingMode) {
+ topologyMetadata = new TopologyMetadata(topologyBuilder, new
DummyStreamsConfig(processingMode));
taskManager = new TaskManager(
time,
changeLogReader,
UUID.randomUUID(),
"taskManagerTest",
- new StreamsMetricsImpl(new Metrics(), "clientId",
StreamsConfig.METRICS_LATEST, time),
activeTaskCreator,
standbyTaskCreator,
- new TopologyMetadata(topologyBuilder, new
DummyStreamsConfig(processingMode)),
+ topologyMetadata,
adminClient,
stateDirectory
);
taskManager.setMainConsumer(consumer);
reset(topologyBuilder);
expect(topologyBuilder.hasNamedTopology()).andStubReturn(false);
- activeTaskCreator.removeRevokedUnknownTasks(anyObject());
- expectLastCall().asStub();
- standbyTaskCreator.removeRevokedUnknownTasks(anyObject());
- expectLastCall().asStub();
+ expect(topologyBuilder.nodeToSourceTopics()).andStubReturn(emptyMap());
}
@Test
@@ -1192,7 +1189,9 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer, changeLogReader);
expect(activeTaskCreator.createTasks(anyObject(),
eq(taskId00Assignment))).andReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andReturn(singletonList(task01));
- replay(activeTaskCreator, standbyTaskCreator, consumer,
changeLogReader);
+ topologyBuilder.addSubscribedTopicsFromAssignment(eq(asList(t1p0)),
anyString());
+ expectLastCall().anyTimes();
+ replay(activeTaskCreator, standbyTaskCreator, consumer,
changeLogReader, topologyBuilder);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -1518,8 +1517,10 @@ public class TaskManagerTest {
expect(activeTaskCreator.createTasks(anyObject(),
eq(assignmentActive))).andReturn(singleton(task00));
expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10));
+ topologyBuilder.addSubscribedTopicsFromAssignment(eq(asList(t1p0)),
anyString());
+ expectLastCall().anyTimes();
- replay(activeTaskCreator, standbyTaskCreator, consumer,
changeLogReader);
+ replay(activeTaskCreator, standbyTaskCreator, consumer,
changeLogReader, topologyBuilder);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
@@ -1665,14 +1666,8 @@ public class TaskManagerTest {
expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
.andStubReturn(asList(task00, task01, task02, task03));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
- expectLastCall();
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId01));
- expectLastCall();
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId02));
- expectLastCall();
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId03));
- expectLastCall();
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
+ expectLastCall().times(4);
activeTaskCreator.closeThreadProducerIfNeeded();
expectLastCall();
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
@@ -1857,7 +1852,7 @@ public class TaskManagerTest {
assertThat(task01.state(), is(Task.State.CLOSED));
// All the tasks involving in the commit should already be removed.
- assertThat(taskManager.tasks(), is(Collections.singletonMap(taskId00,
task00)));
+ assertThat(taskManager.allTasks(),
is(Collections.singletonMap(taskId00, task00)));
}
@Test
@@ -1923,12 +1918,8 @@ public class TaskManagerTest {
resetToStrict(changeLogReader);
expect(changeLogReader.completedChangelogs()).andReturn(emptySet());
expect(activeTaskCreator.createTasks(anyObject(),
eq(assignment))).andStubReturn(asList(task00, task01, task02));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
- expectLastCall().andThrow(new RuntimeException("whatever 0"));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId01));
- expectLastCall().andThrow(new RuntimeException("whatever 1"));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId02));
- expectLastCall().andThrow(new RuntimeException("whatever 2"));
+ activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
+ expectLastCall().andThrow(new RuntimeException("whatever")).times(3);
activeTaskCreator.closeThreadProducerIfNeeded();
expectLastCall().andThrow(new RuntimeException("whatever all"));
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
@@ -3204,6 +3195,8 @@ public class TaskManagerTest {
.andReturn(singletonList(activeTask));
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
expectLastCall().anyTimes();
+ activeTask.closeCleanAndRecycleState();
+ expectLastCall().once();
expect(standbyTaskCreator.createStandbyTaskFromActive(anyObject(),
eq(taskId00Partitions)))
.andReturn(standbyTask);
@@ -3226,6 +3219,8 @@ public class TaskManagerTest {
expectLastCall().anyTimes();
standbyTask.postCommit(true);
expectLastCall().anyTimes();
+ standbyTask.closeCleanAndRecycleState();
+ expectLastCall().once();
final StreamTask activeTask = mock(StreamTask.class);
expect(activeTask.id()).andStubReturn(taskId00);
@@ -3245,6 +3240,17 @@ public class TaskManagerTest {
verify(standbyTaskCreator, activeTaskCreator);
}
+ @Test
+ public void shouldListNotPausedTasks() {
+ handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
+
+ assertEquals(taskManager.notPausedTasks().size(), 2);
+
+ topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
+
+ assertEquals(taskManager.notPausedTasks().size(), 0);
+ }
+
private static void expectRestoreToBeCompleted(final Consumer<byte[],
byte[]> consumer,
final ChangelogReader
changeLogReader) {
expectRestoreToBeCompleted(consumer, changeLogReader, true);
@@ -3315,7 +3321,7 @@ public class TaskManagerTest {
final Set<TopicPartition> partitions,
final boolean active,
final ProcessorStateManager processorStateManager) {
- super(id, null, null, processorStateManager, partitions, 0L,
"test-task", StateMachineTask.class);
+ super(id, null, null, processorStateManager, partitions, (new
TopologyConfig(new DummyStreamsConfig())).getTaskConfig(), "test-task",
StateMachineTask.class);
this.active = active;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
deleted file mode 100644
index ad701f8ca4..0000000000
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TasksTest {
- @Test
- public void testNotPausedTasks() {
- final TopologyMetadata topologyMetadata = mock(TopologyMetadata.class);
- final String unnamedTopologyName = null;
- when(topologyMetadata.isPaused(unnamedTopologyName))
- .thenReturn(false)
- .thenReturn(false).thenReturn(false)
- .thenReturn(true)
- .thenReturn(true).thenReturn(true);
-
- final Tasks tasks = new Tasks(
- new LogContext(),
- topologyMetadata,
- mock(ActiveTaskCreator.class),
- mock(StandbyTaskCreator.class)
- );
-
- final TaskId taskId1 = new TaskId(0, 1);
- final TaskId taskId2 = new TaskId(0, 2);
-
- final StreamTask streamTask = mock(StreamTask.class);
- when(streamTask.isActive()).thenReturn(true);
- when(streamTask.id()).thenReturn(taskId1);
-
- final StandbyTask standbyTask1 = mock(StandbyTask.class);
- when(standbyTask1.isActive()).thenReturn(false);
- when(standbyTask1.id()).thenReturn(taskId2);
-
- tasks.addTask(streamTask);
- tasks.addTask(standbyTask1);
- Assert.assertEquals(tasks.notPausedActiveTasks().size(), 1);
- Assert.assertEquals(tasks.notPausedTasks().size(), 2);
-
- Assert.assertEquals(tasks.notPausedActiveTasks().size(), 0);
- Assert.assertEquals(tasks.notPausedTasks().size(), 0);
- }
-}