This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8d186bfb4fb KAFKA-16331: Remove task producers from Kafka Streams
(#17344)
8d186bfb4fb is described below
commit 8d186bfb4fbffcc804476a170ad58361ec3b6d28
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Oct 8 15:36:05 2024 -0700
KAFKA-16331: Remove task producers from Kafka Streams (#17344)
With EOSv1 removal, we don't have producer-per-task any longer,
and thus can remove the corresponding code which handles task producers.
Reviewers: Chia-Ping Tsai <[email protected]>, Bill Bejeck
<[email protected]>
---
.../processor/internals/ActiveTaskCreator.java | 52 ++-----
.../streams/processor/internals/StreamThread.java | 2 +-
.../streams/processor/internals/TaskExecutor.java | 25 +---
.../streams/processor/internals/TaskManager.java | 28 +---
.../processor/internals/ActiveTaskCreatorTest.java | 82 +++--------
.../processor/internals/TaskExecutorTest.java | 2 +-
.../processor/internals/TaskManagerTest.java | 154 ++++-----------------
7 files changed, 62 insertions(+), 283 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 1bbb4b921ba..16c864576dc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -44,7 +44,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
import static
org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
import static
org.apache.kafka.streams.processor.internals.ClientUtils.producerClientId;
@@ -63,9 +62,7 @@ class ActiveTaskCreator {
private final UUID processId;
private final Logger log;
private final Sensor createTaskSensor;
- private final StreamsProducer threadProducer;
- // TODO remove `taskProducers`
- private final Map<TaskId, StreamsProducer> taskProducers =
Collections.emptyMap();
+ private final StreamsProducer streamsProducer;
private final ProcessingMode processingMode;
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
@@ -105,7 +102,7 @@ class ActiveTaskCreator {
final String threadIdPrefix = String.format("stream-thread [%s] ",
Thread.currentThread().getName());
final LogContext logContext = new LogContext(threadIdPrefix);
- threadProducer = new StreamsProducer(
+ streamsProducer = new StreamsProducer(
processingMode,
producer(),
logContext,
@@ -124,27 +121,12 @@ class ActiveTaskCreator {
return clientSupplier.getProducer(producerConfig);
}
- public void reInitializeThreadProducer() {
- threadProducer.resetProducer(producer());
+ public void reInitializeProducer() {
+ streamsProducer.resetProducer(producer());
}
- StreamsProducer streamsProducerForTask(final TaskId taskId) {
- if (processingMode != EXACTLY_ONCE_ALPHA) {
- throw new IllegalStateException("Expected EXACTLY_ONCE to be
enabled, but the processing mode was " + processingMode);
- }
-
- final StreamsProducer taskProducer = taskProducers.get(taskId);
- if (taskProducer == null) {
- throw new IllegalStateException("Unknown TaskId: " + taskId);
- }
- return taskProducer;
- }
-
- StreamsProducer threadProducer() {
- if (processingMode == EXACTLY_ONCE_ALPHA) {
- throw new IllegalStateException("Expected AT_LEAST_ONCE or
EXACTLY_ONCE_V2 to be enabled, but the processing mode was " + processingMode);
- }
- return threadProducer;
+ StreamsProducer streamsProducer() {
+ return streamsProducer;
}
// TODO: convert to StreamTask when we remove TaskManager#StateMachineTask
with mocks
@@ -198,7 +180,7 @@ class ActiveTaskCreator {
return new RecordCollectorImpl(
logContext,
taskId,
- this.threadProducer,
+ streamsProducer,
applicationConfig.productionExceptionHandler(),
streamsMetrics,
topology
@@ -274,28 +256,16 @@ class ActiveTaskCreator {
return task;
}
- // TODO: rename and revisit test
- void closeThreadProducerIfNeeded() {
+ void close() {
try {
- threadProducer.close();
+ streamsProducer.close();
} catch (final RuntimeException e) {
throw new StreamsException("Thread producer encounter error trying
to close.", e);
}
}
- void closeAndRemoveTaskProducerIfNeeded(final TaskId id) {
- final StreamsProducer taskProducer = taskProducers.remove(id);
- if (taskProducer != null) {
- try {
- taskProducer.close();
- } catch (final RuntimeException e) {
- throw new StreamsException("[" + id + "] task producer
encounter error trying to close.", e, id);
- }
- }
- }
-
Map<MetricName, Metric> producerMetrics() {
- return
ClientUtils.producerMetrics(Collections.singleton(threadProducer));
+ return
ClientUtils.producerMetrics(Collections.singleton(streamsProducer));
}
String producerClientIds() {
@@ -309,6 +279,6 @@ class ActiveTaskCreator {
}
public double totalProducerBlockedTime() {
- return threadProducer.totalBlockedTime();
+ return streamsProducer.totalBlockedTime();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 5c58fce1e99..af008a6dbc0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -812,7 +812,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
threadProducerInstanceIdFuture.complete(
-
taskManager.threadProducer().kafkaProducer().clientInstanceId(Duration.ZERO)
+
taskManager.streamsProducer().kafkaProducer().clientInstanceId(Duration.ZERO)
);
} catch (final IllegalStateException disabledError) {
// if telemetry is disabled on a client, we swallow
the error,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index e2145a94121..c993787503e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -38,8 +38,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
-import static java.util.Collections.emptyMap;
-import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
/**
@@ -177,30 +175,13 @@ public class TaskExecutor {
final Set<TaskId> corruptedTasks = new HashSet<>();
- if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
- for (final Task task : taskManager.activeRunningTaskIterable()) {
- final Map<TopicPartition, OffsetAndMetadata>
taskOffsetsToCommit = offsetsPerTask.getOrDefault(task, emptyMap());
- if (!taskOffsetsToCommit.isEmpty() ||
taskManager.streamsProducerForTask(task.id()).transactionInFlight()) {
- try {
- taskManager.streamsProducerForTask(task.id())
- .commitTransaction(taskOffsetsToCommit,
taskManager.consumerGroupMetadata());
- updateTaskCommitMetadata(taskOffsetsToCommit);
- } catch (final TimeoutException timeoutException) {
- log.error(
- String.format("Committing task %s failed.",
task.id()),
- timeoutException
- );
- corruptedTasks.add(task.id());
- }
- }
- }
- } else if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
- if (!offsetsPerTask.isEmpty() ||
taskManager.threadProducer().transactionInFlight()) {
+ if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
+ if (!offsetsPerTask.isEmpty() ||
taskManager.streamsProducer().transactionInFlight()) {
final Map<TopicPartition, OffsetAndMetadata> allOffsets =
offsetsPerTask.values().stream()
.flatMap(e ->
e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
try {
- taskManager.threadProducer().commitTransaction(allOffsets,
taskManager.consumerGroupMetadata());
+
taskManager.streamsProducer().commitTransaction(allOffsets,
taskManager.consumerGroupMetadata());
updateTaskCommitMetadata(allOffsets);
} catch (final TimeoutException timeoutException) {
log.error(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 3ab8a42910e..0d222b61600 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -105,7 +105,7 @@ public class TaskManager {
// includes assigned & initialized tasks and unassigned tasks we locked
temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
- private Map<TaskId, BackoffRecord> taskIdToBackoffRecord = new HashMap<>();
+ private final Map<TaskId, BackoffRecord> taskIdToBackoffRecord = new
HashMap<>();
private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
@@ -173,12 +173,8 @@ public class TaskManager {
mainConsumer.commitSync(offsets);
}
- StreamsProducer streamsProducerForTask(final TaskId taskId) {
- return activeTaskCreator.streamsProducerForTask(taskId);
- }
-
- StreamsProducer threadProducer() {
- return activeTaskCreator.threadProducer();
+ StreamsProducer streamsProducer() {
+ return activeTaskCreator.streamsProducer();
}
boolean rebalanceInProgress() {
@@ -829,9 +825,7 @@ public class TaskManager {
}
private StandbyTask convertActiveToStandby(final StreamTask activeTask,
final Set<TopicPartition> partitions) {
- final StandbyTask standbyTask =
standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
- return standbyTask;
+ return standbyTaskCreator.createStandbyTaskFromActive(activeTask,
partitions);
}
private StreamTask convertStandbyToActive(final StandbyTask standbyTask,
final Set<TopicPartition> partitions) {
@@ -956,9 +950,6 @@ public class TaskManager {
try {
task.suspend();
task.closeClean();
- if (task.isActive()) {
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
- }
} catch (final RuntimeException e) {
final String uncleanMessage = String.format("Failed to close task
%s cleanly. " +
"Attempting to close remaining tasks before re-throwing:",
task.id());
@@ -1228,7 +1219,7 @@ public class TaskManager {
removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit();
if (processingMode == EXACTLY_ONCE_V2) {
- activeTaskCreator.reInitializeThreadProducer();
+ activeTaskCreator.reInitializeProducer();
}
}
@@ -1433,10 +1424,6 @@ public class TaskManager {
if (removeFromTasksRegistry) {
tasks.removeTask(task);
}
-
- if (task.isActive()) {
-
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
- }
} catch (final RuntimeException swallow) {
log.error("Error removing dirty task {}: {}", task.id(),
swallow.getMessage());
}
@@ -1445,9 +1432,6 @@ public class TaskManager {
private void closeTaskClean(final Task task) {
task.closeClean();
tasks.removeTask(task);
- if (task.isActive()) {
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
- }
}
void shutdown(final boolean clean) {
@@ -1469,7 +1453,7 @@ public class TaskManager {
executeAndMaybeSwallow(
clean,
- activeTaskCreator::closeThreadProducerIfNeeded,
+ activeTaskCreator::close,
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while closing thread
producer.", e)
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index e2d5bf74687..04371509fc1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -87,7 +87,7 @@ public class ActiveTaskCreatorTest {
@Test
public void shouldConstructProducerMetricsWithEosDisabled() {
- shouldConstructThreadProducerMetric();
+ shouldConstructStreamsProducerMetric();
}
@Test
@@ -100,26 +100,16 @@ public class ActiveTaskCreatorTest {
}
@Test
- public void shouldCloseThreadProducerIfEosDisabled() {
+ public void shouldCloseIfEosDisabled() {
createTasks();
- activeTaskCreator.closeThreadProducerIfNeeded();
+ activeTaskCreator.close();
assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
}
@Test
- public void shouldNoOpCloseTaskProducerIfEosDisabled() {
- createTasks();
-
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
-
- assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
- }
-
- @Test
- public void shouldReturnBlockedTimeWhenThreadProducer() {
+ public void shouldReturnBlockedTimeWhenStreamsProducer() {
final double blockedTime = 123.0;
createTasks();
final MockProducer<?, ?> producer =
mockClientSupplier.producers.get(0);
@@ -131,35 +121,23 @@ public class ActiveTaskCreatorTest {
// error handling
@Test
- public void shouldFailOnStreamsProducerPerTaskIfEosDisabled() {
- createTasks();
-
- final IllegalStateException thrown = assertThrows(
- IllegalStateException.class,
- () -> activeTaskCreator.streamsProducerForTask(null)
- );
-
- assertThat(thrown.getMessage(), is("Expected EXACTLY_ONCE to be
enabled, but the processing mode was AT_LEAST_ONCE"));
- }
-
- @Test
- public void shouldReturnThreadProducerIfAtLeastOnceIsEnabled() {
+ public void shouldReturnStreamsProducerIfAtLeastOnceIsEnabled() {
createTasks();
- final StreamsProducer threadProducer =
activeTaskCreator.threadProducer();
+ final StreamsProducer threadProducer =
activeTaskCreator.streamsProducer();
assertThat(mockClientSupplier.producers.size(), is(1));
assertThat(threadProducer.kafkaProducer(),
is(mockClientSupplier.producers.get(0)));
}
@Test
- public void
shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosDisabled() {
+ public void shouldThrowStreamsExceptionOnErrorCloseIfEosDisabled() {
createTasks();
mockClientSupplier.producers.get(0).closeException = new
RuntimeException("KABOOM!");
final StreamsException thrown = assertThrows(
StreamsException.class,
- activeTaskCreator::closeThreadProducerIfNeeded
+ activeTaskCreator::close
);
assertThat(thrown.getMessage(), is("Thread producer encounter error
trying to close."));
@@ -173,13 +151,13 @@ public class ActiveTaskCreatorTest {
// functional test
@Test
- public void shouldReturnThreadProducerIfEosV2Enabled() {
+ public void shouldReturnStreamsProducerIfEosV2Enabled() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
- final StreamsProducer threadProducer =
activeTaskCreator.threadProducer();
+ final StreamsProducer threadProducer =
activeTaskCreator.streamsProducer();
assertThat(mockClientSupplier.producers.size(), is(1));
assertThat(threadProducer.kafkaProducer(),
is(mockClientSupplier.producers.get(0)));
@@ -190,7 +168,7 @@ public class ActiveTaskCreatorTest {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
- shouldConstructThreadProducerMetric();
+ shouldConstructStreamsProducerMetric();
}
@Test
@@ -205,48 +183,20 @@ public class ActiveTaskCreatorTest {
}
@Test
- public void shouldCloseThreadProducerIfEosV2Enabled() {
+ public void shouldCloseIfEosV2Enabled() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
- activeTaskCreator.closeThreadProducerIfNeeded();
+ activeTaskCreator.close();
assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
}
- @Test
- public void shouldNoOpCloseTaskProducerIfEosV2Enabled() {
- properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
- mockClientSupplier.setApplicationIdForProducer("appId");
-
- createTasks();
-
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
-
- assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
- }
-
// error handling
@Test
- public void shouldFailOnStreamsProducerPerTaskIfEosV2Enabled() {
- properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
- mockClientSupplier.setApplicationIdForProducer("appId");
-
- createTasks();
-
- final IllegalStateException thrown = assertThrows(
- IllegalStateException.class,
- () -> activeTaskCreator.streamsProducerForTask(null)
- );
-
- assertThat(thrown.getMessage(), is("Expected EXACTLY_ONCE to be
enabled, but the processing mode was EXACTLY_ONCE_V2"));
- }
-
- @Test
- public void
shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosV2Enabled() {
+ public void shouldThrowStreamsExceptionOnErrorCloseIfEosV2Enabled() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
@@ -254,14 +204,14 @@ public class ActiveTaskCreatorTest {
final StreamsException thrown = assertThrows(
StreamsException.class,
- activeTaskCreator::closeThreadProducerIfNeeded
+ activeTaskCreator::close
);
assertThat(thrown.getMessage(), is("Thread producer encounter error
trying to close."));
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
}
- private void shouldConstructThreadProducerMetric() {
+ private void shouldConstructStreamsProducerMetric() {
createTasks();
final MetricName testMetricName = new MetricName("test_metric", "",
"", new HashMap<>());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
index 18648277743..8d9ef70c5e3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -51,7 +51,7 @@ public class TaskExecutorTest {
final TaskExecutionMetadata metadata =
mock(TaskExecutionMetadata.class);
final StreamsProducer producer = mock(StreamsProducer.class);
when(metadata.processingMode()).thenReturn(EXACTLY_ONCE_V2);
- when(taskManager.threadProducer()).thenReturn(producer);
+ when(taskManager.streamsProducer()).thenReturn(producer);
when(producer.transactionInFlight()).thenReturn(true);
final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager,
metadata, new LogContext());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index fa5dca53bb1..6ee45c56d69 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -442,7 +442,6 @@ public class TaskManagerTest {
verify(activeTaskToClose).suspend();
verify(activeTaskToClose).closeClean();
-
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@@ -464,7 +463,6 @@ public class TaskManagerTest {
verify(activeTaskToClose).prepareCommit();
verify(activeTaskToClose).suspend();
verify(activeTaskToClose).closeDirty();
-
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@@ -898,7 +896,6 @@ public class TaskManagerTest {
verify(tasks).addPendingTasksToInit(Collections.singleton(recycledActiveTask));
verify(activeTaskToClose).suspend();
verify(activeTaskToClose).closeClean();
-
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
}
@@ -985,7 +982,7 @@ public class TaskManagerTest {
}
@Test
- public void
shouldAddRecycledStandbyTaskfromActiveToPendingTasksToInitWithStateUpdaterEnabled()
{
+ public void
shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdaterEnabled()
{
final StreamTask activeTaskToRecycle = statefulTask(taskId01,
taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
@@ -1001,7 +998,6 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01,
taskId01Partitions)));
verify(activeTaskToRecycle).prepareCommit();
-
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
verify(tasks).removeTask(activeTaskToRecycle);
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
@@ -1009,7 +1005,7 @@ public class TaskManagerTest {
}
@Test
- public void
shouldAddRecycledStandbyTaskfromActiveToTaskRegistryWithStateUpdaterDisabled() {
+ public void
shouldAddRecycledStandbyTasksFromActiveToTaskRegistryWithStateUpdaterDisabled()
{
final StreamTask activeTaskToRecycle = statefulTask(taskId01,
taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
@@ -1025,7 +1021,6 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01,
taskId01Partitions)));
verify(activeTaskToRecycle).prepareCommit();
-
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
verify(tasks).replaceActiveWithStandby(standbyTask);
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
@@ -1065,7 +1060,6 @@ public class TaskManagerTest {
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
-
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
verify(activeTaskToClose).prepareCommit();
verify(activeTaskToClose).closeClean();
verify(tasks).removeTask(activeTaskToClose);
@@ -2129,7 +2123,6 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -2157,7 +2150,6 @@ public class TaskManagerTest {
is("Encounter unexpected fatal error for task 0_0")
);
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
- verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -2203,48 +2195,15 @@ public class TaskManagerTest {
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
- verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
- public void
shouldReInitializeThreadProducerOnHandleLostAllIfEosV2Enabled() {
+ public void
shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
taskManager.handleLostAll();
- verify(activeTaskCreator).reInitializeThreadProducer();
- }
-
- @Test
- public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager);
- final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
- task00.setCommittableOffsetsAndMetadata(offsets);
-
- // `handleAssignment`
- when(consumer.assignment()).thenReturn(assignment);
- when(activeTaskCreator.createTasks(any(),
eq(taskId00Assignment))).thenReturn(singletonList(task00));
-
- // `handleAssignment`
- doThrow(new
RuntimeException("KABOOM!")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
-
- taskManager.handleAssignment(taskId00Assignment, emptyMap());
- assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(),
null), is(true));
- assertThat(task00.state(), is(Task.State.RUNNING));
-
- taskManager.handleRevocation(taskId00Partitions);
-
- final RuntimeException thrown = assertThrows(
- RuntimeException.class,
- () -> taskManager.handleAssignment(emptyMap(), emptyMap())
- );
-
- assertThat(
- thrown.getMessage(),
- is("Encounter unexpected fatal error for task 0_0")
- );
- assertThat(thrown.getCause(), instanceOf(RuntimeException.class));
- assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
+ verify(activeTaskCreator).reInitializeProducer();
}
@Test
@@ -2552,7 +2511,7 @@ public class TaskManagerTest {
}
@Test
- public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS()
{
+ public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithAlos()
{
final ProcessorStateManager stateManager =
mock(ProcessorStateManager.class);
final StateMachineTask corruptedActive = new
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@@ -2613,7 +2572,7 @@ public class TaskManagerTest {
public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS()
{
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
- when(activeTaskCreator.threadProducer()).thenReturn(producer);
+ when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final ProcessorStateManager stateManager =
mock(ProcessorStateManager.class);
final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new
AtomicBoolean(false);
@@ -2691,7 +2650,7 @@ public class TaskManagerTest {
}
@Test
- public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithALOS()
{
+ public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos()
{
final StateMachineTask revokedActiveTask = new
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
revokedActiveTask.setCommittableOffsetsAndMetadata(offsets00);
@@ -2745,7 +2704,7 @@ public class TaskManagerTest {
public void
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS()
{
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
- when(activeTaskCreator.threadProducer()).thenReturn(producer);
+ when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final ProcessorStateManager stateManager =
mock(ProcessorStateManager.class);
final StateMachineTask revokedActiveTask = new
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@@ -3021,7 +2980,7 @@ public class TaskManagerTest {
when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(task00, task01, task02));
- when(activeTaskCreator.threadProducer()).thenReturn(producer);
+ when(activeTaskCreator.streamsProducer()).thenReturn(producer);
when(standbyTaskCreator.createTasks(assignmentStandby))
.thenReturn(singletonList(task10));
@@ -3176,7 +3135,6 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), emptyMap());
assertThat(task00.state(), is(Task.State.CLOSED));
- verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -3206,7 +3164,7 @@ public class TaskManagerTest {
@Test
public void
shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithExactlyOnceV2() {
-
when(activeTaskCreator.threadProducer()).thenReturn(mock(StreamsProducer.class));
+
when(activeTaskCreator.streamsProducer()).thenReturn(mock(StreamsProducer.class));
shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(ProcessingMode.EXACTLY_ONCE_V2);
}
@@ -3315,62 +3273,12 @@ public class TaskManagerTest {
assertThat(task03.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(activeTaskCreator,
times(4)).closeAndRemoveTaskProducerIfNeeded(any());
// the active task creator should also get closed (so that it closes
the thread producer if applicable)
- verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(activeTaskCreator).close();
}
@Test
- public void
shouldCloseActiveTasksAndPropagateTaskProducerExceptionsOnCleanShutdown() {
- final TopicPartition changelog = new TopicPartition("changelog", 0);
- final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
- mkEntry(taskId00, taskId00Partitions)
- );
- final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager) {
- @Override
- public Set<TopicPartition> changelogPartitions() {
- return singleton(changelog);
- }
- };
- final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
- task00.setCommittableOffsetsAndMetadata(offsets);
-
- when(activeTaskCreator.createTasks(any(),
eq(assignment))).thenReturn(singletonList(task00));
- doThrow(new RuntimeException("whatever"))
-
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
-
- taskManager.handleAssignment(assignment, emptyMap());
-
- assertThat(task00.state(), is(Task.State.CREATED));
-
- taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
- assertThat(task00.state(), is(Task.State.RESTORING));
- assertThat(
- taskManager.activeTaskMap(),
- Matchers.equalTo(
- mkMap(
- mkEntry(taskId00, task00)
- )
- )
- );
- assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(changeLogReader).enforceRestoreActive();
- verify(changeLogReader).completedChangelogs();
-
- final RuntimeException exception =
assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
-
- assertThat(task00.state(), is(Task.State.CLOSED));
- assertThat(exception.getCause().getMessage(), is("whatever"));
- assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
- assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
- // the active task creator should also get closed (so that it closes
the thread producer if applicable)
- verify(activeTaskCreator).closeThreadProducerIfNeeded();
- }
-
- @Test
- public void
shouldCloseActiveTasksAndPropagateThreadProducerExceptionsOnCleanShutdown() {
+ public void
shouldCloseActiveTasksAndPropagateStreamsProducerExceptionsOnCleanShutdown() {
final TopicPartition changelog = new TopicPartition("changelog", 0);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
mkEntry(taskId00, taskId00Partitions)
@@ -3383,7 +3291,7 @@ public class TaskManagerTest {
};
when(activeTaskCreator.createTasks(any(),
eq(assignment))).thenReturn(singletonList(task00));
- doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
+ doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).close();
taskManager.handleAssignment(assignment, emptyMap());
@@ -3411,7 +3319,7 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes
the thread producer if applicable)
- verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+ verify(activeTaskCreator).close();
}
@Test
@@ -3505,8 +3413,7 @@ public class TaskManagerTest {
};
when(activeTaskCreator.createTasks(any(),
eq(assignment))).thenReturn(asList(task00, task01, task02));
- doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
- doThrow(new RuntimeException("whatever
all")).when(activeTaskCreator).closeThreadProducerIfNeeded();
+ doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).close();
taskManager.handleAssignment(assignment, emptyMap());
@@ -3540,9 +3447,8 @@ public class TaskManagerTest {
assertThat(task02.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
- verify(activeTaskCreator,
times(3)).closeAndRemoveTaskProducerIfNeeded(any());
// the active task creator should also get closed (so that it closes
the thread producer if applicable)
- verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(activeTaskCreator).close();
}
@Test
@@ -3566,7 +3472,7 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes
the thread producer if applicable)
- verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(activeTaskCreator).close();
// `tryToCompleteRestoration`
verify(consumer).assignment();
verify(consumer).resume(eq(emptySet()));
@@ -3588,8 +3494,7 @@ public class TaskManagerTest {
taskManager.shutdown(true);
-
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
- verify(activeTaskCreator).closeThreadProducerIfNeeded();
+ verify(activeTaskCreator).close();
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
verify(failedStatefulTask).prepareCommit();
verify(failedStatefulTask).suspend();
@@ -3668,19 +3573,15 @@ public class TaskManagerTest {
verify(removedFailedStatefulTask).prepareCommit();
verify(removedFailedStatefulTask).suspend();
verify(removedFailedStatefulTask).closeDirty();
- verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId03);
verify(removedFailedStandbyTask).prepareCommit();
verify(removedFailedStandbyTask).suspend();
verify(removedFailedStandbyTask).closeDirty();
- verify(activeTaskCreator,
never()).closeAndRemoveTaskProducerIfNeeded(taskId04);
verify(removedFailedStatefulTaskDuringRemoval).prepareCommit();
verify(removedFailedStatefulTaskDuringRemoval).suspend();
verify(removedFailedStatefulTaskDuringRemoval).closeDirty();
- verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId05);
verify(removedFailedStandbyTaskDuringRemoval).prepareCommit();
verify(removedFailedStandbyTaskDuringRemoval).suspend();
verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
- verify(activeTaskCreator,
never()).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@@ -3873,7 +3774,7 @@ public class TaskManagerTest {
@Test
public void shouldCommitViaProducerIfEosV2Enabled() {
final StreamsProducer producer = mock(StreamsProducer.class);
- when(activeTaskCreator.threadProducer()).thenReturn(producer);
+ when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 =
singletonMap(t1p2, new OffsetAndMetadata(1L, null));
@@ -3881,16 +3782,7 @@ public class TaskManagerTest {
allOffsets.putAll(offsetsT01);
allOffsets.putAll(offsetsT02);
- shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2,
offsetsT01, offsetsT02);
-
- verify(producer).commitTransaction(allOffsets, new
ConsumerGroupMetadata("appId"));
- verifyNoMoreInteractions(producer);
- }
-
- private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode
processingMode,
- final Map<TopicPartition,
OffsetAndMetadata> offsetsT01,
- final Map<TopicPartition,
OffsetAndMetadata> offsetsT02) {
- final TaskManager taskManager = setUpTaskManager(processingMode,
false);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, true, stateManager);
task01.setCommittableOffsetsAndMetadata(offsetsT01);
@@ -3904,6 +3796,9 @@ public class TaskManagerTest {
when(consumer.groupMetadata()).thenReturn(new
ConsumerGroupMetadata("appId"));
taskManager.commitAll();
+
+ verify(producer).commitTransaction(allOffsets, new
ConsumerGroupMetadata("appId"));
+ verifyNoMoreInteractions(producer);
}
@Test
@@ -4608,7 +4503,7 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
- when(activeTaskCreator.threadProducer()).thenReturn(producer);
+ when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT00 =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 =
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
@@ -4720,7 +4615,6 @@ public class TaskManagerTest {
taskManager.handleAssignment(taskId00Assignment,
Collections.emptyMap());
taskManager.handleAssignment(Collections.emptyMap(),
taskId00Assignment);
- verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
verify(standbyTaskCreator,
times(2)).createTasks(Collections.emptyMap());
verifyNoInteractions(consumer);