This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8d186bfb4fb KAFKA-16331: Remove task producers from Kafka Streams 
(#17344)
8d186bfb4fb is described below

commit 8d186bfb4fbffcc804476a170ad58361ec3b6d28
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Oct 8 15:36:05 2024 -0700

    KAFKA-16331: Remove task producers from Kafka Streams (#17344)
    
    With EOSv1 removal, we don't have producer-per-task any longer,
    and thus can remove the corresponding code which handles task producers.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Bill Bejeck 
<[email protected]>
---
 .../processor/internals/ActiveTaskCreator.java     |  52 ++-----
 .../streams/processor/internals/StreamThread.java  |   2 +-
 .../streams/processor/internals/TaskExecutor.java  |  25 +---
 .../streams/processor/internals/TaskManager.java   |  28 +---
 .../processor/internals/ActiveTaskCreatorTest.java |  82 +++--------
 .../processor/internals/TaskExecutorTest.java      |   2 +-
 .../processor/internals/TaskManagerTest.java       | 154 ++++-----------------
 7 files changed, 62 insertions(+), 283 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 1bbb4b921ba..16c864576dc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -44,7 +44,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
 import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.producerClientId;
@@ -63,9 +62,7 @@ class ActiveTaskCreator {
     private final UUID processId;
     private final Logger log;
     private final Sensor createTaskSensor;
-    private final StreamsProducer threadProducer;
-    // TODO remove `taskProducers`
-    private final Map<TaskId, StreamsProducer> taskProducers = 
Collections.emptyMap();
+    private final StreamsProducer streamsProducer;
     private final ProcessingMode processingMode;
     private final boolean stateUpdaterEnabled;
     private final boolean processingThreadsEnabled;
@@ -105,7 +102,7 @@ class ActiveTaskCreator {
         final String threadIdPrefix = String.format("stream-thread [%s] ", 
Thread.currentThread().getName());
         final LogContext logContext = new LogContext(threadIdPrefix);
 
-        threadProducer = new StreamsProducer(
+        streamsProducer = new StreamsProducer(
             processingMode,
             producer(),
             logContext,
@@ -124,27 +121,12 @@ class ActiveTaskCreator {
         return clientSupplier.getProducer(producerConfig);
     }
 
-    public void reInitializeThreadProducer() {
-        threadProducer.resetProducer(producer());
+    public void reInitializeProducer() {
+        streamsProducer.resetProducer(producer());
     }
 
-    StreamsProducer streamsProducerForTask(final TaskId taskId) {
-        if (processingMode != EXACTLY_ONCE_ALPHA) {
-            throw new IllegalStateException("Expected EXACTLY_ONCE to be 
enabled, but the processing mode was " + processingMode);
-        }
-
-        final StreamsProducer taskProducer = taskProducers.get(taskId);
-        if (taskProducer == null) {
-            throw new IllegalStateException("Unknown TaskId: " + taskId);
-        }
-        return taskProducer;
-    }
-
-    StreamsProducer threadProducer() {
-        if (processingMode == EXACTLY_ONCE_ALPHA) {
-            throw new IllegalStateException("Expected AT_LEAST_ONCE or 
EXACTLY_ONCE_V2 to be enabled, but the processing mode was " + processingMode);
-        }
-        return threadProducer;
+    StreamsProducer streamsProducer() {
+        return streamsProducer;
     }
 
     // TODO: convert to StreamTask when we remove TaskManager#StateMachineTask 
with mocks
@@ -198,7 +180,7 @@ class ActiveTaskCreator {
         return new RecordCollectorImpl(
             logContext,
             taskId,
-            this.threadProducer,
+            streamsProducer,
             applicationConfig.productionExceptionHandler(),
             streamsMetrics,
             topology
@@ -274,28 +256,16 @@ class ActiveTaskCreator {
         return task;
     }
 
-    // TODO: rename and revisit test
-    void closeThreadProducerIfNeeded() {
+    void close() {
         try {
-            threadProducer.close();
+            streamsProducer.close();
         } catch (final RuntimeException e) {
             throw new StreamsException("Thread producer encounter error trying 
to close.", e);
         }
     }
 
-    void closeAndRemoveTaskProducerIfNeeded(final TaskId id) {
-        final StreamsProducer taskProducer = taskProducers.remove(id);
-        if (taskProducer != null) {
-            try {
-                taskProducer.close();
-            } catch (final RuntimeException e) {
-                throw new StreamsException("[" + id + "] task producer 
encounter error trying to close.", e, id);
-            }
-        }
-    }
-
     Map<MetricName, Metric> producerMetrics() {
-        return 
ClientUtils.producerMetrics(Collections.singleton(threadProducer));
+        return 
ClientUtils.producerMetrics(Collections.singleton(streamsProducer));
     }
 
     String producerClientIds() {
@@ -309,6 +279,6 @@ class ActiveTaskCreator {
     }
 
     public double totalProducerBlockedTime() {
-        return threadProducer.totalBlockedTime();
+        return streamsProducer.totalBlockedTime();
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 5c58fce1e99..af008a6dbc0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -812,7 +812,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                 if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
                     try {
                         threadProducerInstanceIdFuture.complete(
-                            
taskManager.threadProducer().kafkaProducer().clientInstanceId(Duration.ZERO)
+                            
taskManager.streamsProducer().kafkaProducer().clientInstanceId(Duration.ZERO)
                         );
                     } catch (final IllegalStateException disabledError) {
                         // if telemetry is disabled on a client, we swallow 
the error,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index e2145a94121..c993787503e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -38,8 +38,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.emptyMap;
-import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
 import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
 
 /**
@@ -177,30 +175,13 @@ public class TaskExecutor {
 
         final Set<TaskId> corruptedTasks = new HashSet<>();
 
-        if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
-            for (final Task task : taskManager.activeRunningTaskIterable()) {
-                final Map<TopicPartition, OffsetAndMetadata> 
taskOffsetsToCommit = offsetsPerTask.getOrDefault(task, emptyMap());
-                if (!taskOffsetsToCommit.isEmpty() || 
taskManager.streamsProducerForTask(task.id()).transactionInFlight()) {
-                    try {
-                        taskManager.streamsProducerForTask(task.id())
-                            .commitTransaction(taskOffsetsToCommit, 
taskManager.consumerGroupMetadata());
-                        updateTaskCommitMetadata(taskOffsetsToCommit);
-                    } catch (final TimeoutException timeoutException) {
-                        log.error(
-                            String.format("Committing task %s failed.", 
task.id()),
-                            timeoutException
-                        );
-                        corruptedTasks.add(task.id());
-                    }
-                }
-            }
-        } else if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
-            if (!offsetsPerTask.isEmpty() || 
taskManager.threadProducer().transactionInFlight()) {
+        if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
+            if (!offsetsPerTask.isEmpty() || 
taskManager.streamsProducer().transactionInFlight()) {
                 final Map<TopicPartition, OffsetAndMetadata> allOffsets = 
offsetsPerTask.values().stream()
                     .flatMap(e -> 
e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
 
                 try {
-                    taskManager.threadProducer().commitTransaction(allOffsets, 
taskManager.consumerGroupMetadata());
+                    
taskManager.streamsProducer().commitTransaction(allOffsets, 
taskManager.consumerGroupMetadata());
                     updateTaskCommitMetadata(allOffsets);
                 } catch (final TimeoutException timeoutException) {
                     log.error(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 3ab8a42910e..0d222b61600 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -105,7 +105,7 @@ public class TaskManager {
     // includes assigned & initialized tasks and unassigned tasks we locked 
temporarily during rebalance
     private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
 
-    private Map<TaskId, BackoffRecord> taskIdToBackoffRecord = new HashMap<>();
+    private final Map<TaskId, BackoffRecord> taskIdToBackoffRecord = new 
HashMap<>();
 
     private final ActiveTaskCreator activeTaskCreator;
     private final StandbyTaskCreator standbyTaskCreator;
@@ -173,12 +173,8 @@ public class TaskManager {
         mainConsumer.commitSync(offsets);
     }
 
-    StreamsProducer streamsProducerForTask(final TaskId taskId) {
-        return activeTaskCreator.streamsProducerForTask(taskId);
-    }
-
-    StreamsProducer threadProducer() {
-        return activeTaskCreator.threadProducer();
+    StreamsProducer streamsProducer() {
+        return activeTaskCreator.streamsProducer();
     }
 
     boolean rebalanceInProgress() {
@@ -829,9 +825,7 @@ public class TaskManager {
     }
 
     private StandbyTask convertActiveToStandby(final StreamTask activeTask, 
final Set<TopicPartition> partitions) {
-        final StandbyTask standbyTask = 
standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
-        return standbyTask;
+        return standbyTaskCreator.createStandbyTaskFromActive(activeTask, 
partitions);
     }
 
     private StreamTask convertStandbyToActive(final StandbyTask standbyTask, 
final Set<TopicPartition> partitions) {
@@ -956,9 +950,6 @@ public class TaskManager {
         try {
             task.suspend();
             task.closeClean();
-            if (task.isActive()) {
-                
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-            }
         } catch (final RuntimeException e) {
             final String uncleanMessage = String.format("Failed to close task 
%s cleanly. " +
                 "Attempting to close remaining tasks before re-throwing:", 
task.id());
@@ -1228,7 +1219,7 @@ public class TaskManager {
         removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit();
 
         if (processingMode == EXACTLY_ONCE_V2) {
-            activeTaskCreator.reInitializeThreadProducer();
+            activeTaskCreator.reInitializeProducer();
         }
     }
 
@@ -1433,10 +1424,6 @@ public class TaskManager {
             if (removeFromTasksRegistry) {
                 tasks.removeTask(task);
             }
-
-            if (task.isActive()) {
-                
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-            }
         } catch (final RuntimeException swallow) {
             log.error("Error removing dirty task {}: {}", task.id(), 
swallow.getMessage());
         }
@@ -1445,9 +1432,6 @@ public class TaskManager {
     private void closeTaskClean(final Task task) {
         task.closeClean();
         tasks.removeTask(task);
-        if (task.isActive()) {
-            activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
-        }
     }
 
     void shutdown(final boolean clean) {
@@ -1469,7 +1453,7 @@ public class TaskManager {
 
         executeAndMaybeSwallow(
             clean,
-            activeTaskCreator::closeThreadProducerIfNeeded,
+            activeTaskCreator::close,
             e -> firstException.compareAndSet(null, e),
             e -> log.warn("Ignoring an exception while closing thread 
producer.", e)
         );
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index e2d5bf74687..04371509fc1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -87,7 +87,7 @@ public class ActiveTaskCreatorTest {
 
     @Test
     public void shouldConstructProducerMetricsWithEosDisabled() {
-        shouldConstructThreadProducerMetric();
+        shouldConstructStreamsProducerMetric();
     }
 
     @Test
@@ -100,26 +100,16 @@ public class ActiveTaskCreatorTest {
     }
 
     @Test
-    public void shouldCloseThreadProducerIfEosDisabled() {
+    public void shouldCloseIfEosDisabled() {
         createTasks();
 
-        activeTaskCreator.closeThreadProducerIfNeeded();
+        activeTaskCreator.close();
 
         assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
     }
 
     @Test
-    public void shouldNoOpCloseTaskProducerIfEosDisabled() {
-        createTasks();
-
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
-
-        assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
-    }
-
-    @Test
-    public void shouldReturnBlockedTimeWhenThreadProducer() {
+    public void shouldReturnBlockedTimeWhenStreamsProducer() {
         final double blockedTime = 123.0;
         createTasks();
         final MockProducer<?, ?> producer = 
mockClientSupplier.producers.get(0);
@@ -131,35 +121,23 @@ public class ActiveTaskCreatorTest {
     // error handling
 
     @Test
-    public void shouldFailOnStreamsProducerPerTaskIfEosDisabled() {
-        createTasks();
-
-        final IllegalStateException thrown = assertThrows(
-            IllegalStateException.class,
-            () -> activeTaskCreator.streamsProducerForTask(null)
-        );
-
-        assertThat(thrown.getMessage(), is("Expected EXACTLY_ONCE to be 
enabled, but the processing mode was AT_LEAST_ONCE"));
-    }
-
-    @Test
-    public void shouldReturnThreadProducerIfAtLeastOnceIsEnabled() {
+    public void shouldReturnStreamsProducerIfAtLeastOnceIsEnabled() {
         createTasks();
 
-        final StreamsProducer threadProducer = 
activeTaskCreator.threadProducer();
+        final StreamsProducer threadProducer = 
activeTaskCreator.streamsProducer();
 
         assertThat(mockClientSupplier.producers.size(), is(1));
         assertThat(threadProducer.kafkaProducer(), 
is(mockClientSupplier.producers.get(0)));
     }
 
     @Test
-    public void 
shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosDisabled() {
+    public void shouldThrowStreamsExceptionOnErrorCloseIfEosDisabled() {
         createTasks();
         mockClientSupplier.producers.get(0).closeException = new 
RuntimeException("KABOOM!");
 
         final StreamsException thrown = assertThrows(
             StreamsException.class,
-            activeTaskCreator::closeThreadProducerIfNeeded
+            activeTaskCreator::close
         );
 
         assertThat(thrown.getMessage(), is("Thread producer encounter error 
trying to close."));
@@ -173,13 +151,13 @@ public class ActiveTaskCreatorTest {
     // functional test
 
     @Test
-    public void shouldReturnThreadProducerIfEosV2Enabled() {
+    public void shouldReturnStreamsProducerIfEosV2Enabled() {
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
         mockClientSupplier.setApplicationIdForProducer("appId");
 
         createTasks();
 
-        final StreamsProducer threadProducer = 
activeTaskCreator.threadProducer();
+        final StreamsProducer threadProducer = 
activeTaskCreator.streamsProducer();
 
         assertThat(mockClientSupplier.producers.size(), is(1));
         assertThat(threadProducer.kafkaProducer(), 
is(mockClientSupplier.producers.get(0)));
@@ -190,7 +168,7 @@ public class ActiveTaskCreatorTest {
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
         mockClientSupplier.setApplicationIdForProducer("appId");
 
-        shouldConstructThreadProducerMetric();
+        shouldConstructStreamsProducerMetric();
     }
 
     @Test
@@ -205,48 +183,20 @@ public class ActiveTaskCreatorTest {
     }
 
     @Test
-    public void shouldCloseThreadProducerIfEosV2Enabled() {
+    public void shouldCloseIfEosV2Enabled() {
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
         mockClientSupplier.setApplicationIdForProducer("appId");
         createTasks();
 
-        activeTaskCreator.closeThreadProducerIfNeeded();
+        activeTaskCreator.close();
 
         assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
     }
 
-    @Test
-    public void shouldNoOpCloseTaskProducerIfEosV2Enabled() {
-        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
-        mockClientSupplier.setApplicationIdForProducer("appId");
-
-        createTasks();
-
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
-
-        assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
-    }
-
     // error handling
 
     @Test
-    public void shouldFailOnStreamsProducerPerTaskIfEosV2Enabled() {
-        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
-        mockClientSupplier.setApplicationIdForProducer("appId");
-
-        createTasks();
-
-        final IllegalStateException thrown = assertThrows(
-            IllegalStateException.class,
-            () -> activeTaskCreator.streamsProducerForTask(null)
-        );
-
-        assertThat(thrown.getMessage(), is("Expected EXACTLY_ONCE to be 
enabled, but the processing mode was EXACTLY_ONCE_V2"));
-    }
-
-    @Test
-    public void 
shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosV2Enabled() {
+    public void shouldThrowStreamsExceptionOnErrorCloseIfEosV2Enabled() {
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
         mockClientSupplier.setApplicationIdForProducer("appId");
         createTasks();
@@ -254,14 +204,14 @@ public class ActiveTaskCreatorTest {
 
         final StreamsException thrown = assertThrows(
             StreamsException.class,
-            activeTaskCreator::closeThreadProducerIfNeeded
+            activeTaskCreator::close
         );
 
         assertThat(thrown.getMessage(), is("Thread producer encounter error 
trying to close."));
         assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
     }
 
-    private void shouldConstructThreadProducerMetric() {
+    private void shouldConstructStreamsProducerMetric() {
         createTasks();
 
         final MetricName testMetricName = new MetricName("test_metric", "", 
"", new HashMap<>());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
index 18648277743..8d9ef70c5e3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -51,7 +51,7 @@ public class TaskExecutorTest {
         final TaskExecutionMetadata metadata = 
mock(TaskExecutionMetadata.class);
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(metadata.processingMode()).thenReturn(EXACTLY_ONCE_V2);
-        when(taskManager.threadProducer()).thenReturn(producer);
+        when(taskManager.streamsProducer()).thenReturn(producer);
         when(producer.transactionInFlight()).thenReturn(true);
 
         final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager, 
metadata, new LogContext());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index fa5dca53bb1..6ee45c56d69 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -442,7 +442,6 @@ public class TaskManagerTest {
 
         verify(activeTaskToClose).suspend();
         verify(activeTaskToClose).closeClean();
-        
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
@@ -464,7 +463,6 @@ public class TaskManagerTest {
         verify(activeTaskToClose).prepareCommit();
         verify(activeTaskToClose).suspend();
         verify(activeTaskToClose).closeDirty();
-        
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
@@ -898,7 +896,6 @@ public class TaskManagerTest {
         
verify(tasks).addPendingTasksToInit(Collections.singleton(recycledActiveTask));
         verify(activeTaskToClose).suspend();
         verify(activeTaskToClose).closeClean();
-        
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
         verify(standbyTaskCreator).createTasks(Collections.emptyMap());
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
     }
@@ -985,7 +982,7 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldAddRecycledStandbyTaskfromActiveToPendingTasksToInitWithStateUpdaterEnabled()
 {
+    public void 
shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdaterEnabled()
 {
         final StreamTask activeTaskToRecycle = statefulTask(taskId01, 
taskId01ChangelogPartitions)
             .withInputPartitions(taskId01Partitions)
             .inState(State.RUNNING).build();
@@ -1001,7 +998,6 @@ public class TaskManagerTest {
         taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, 
taskId01Partitions)));
 
         verify(activeTaskToRecycle).prepareCommit();
-        
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
         verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
         verify(tasks).removeTask(activeTaskToRecycle);
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
@@ -1009,7 +1005,7 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldAddRecycledStandbyTaskfromActiveToTaskRegistryWithStateUpdaterDisabled() {
+    public void 
shouldAddRecycledStandbyTasksFromActiveToTaskRegistryWithStateUpdaterDisabled() 
{
         final StreamTask activeTaskToRecycle = statefulTask(taskId01, 
taskId01ChangelogPartitions)
             .withInputPartitions(taskId01Partitions)
             .inState(State.RUNNING).build();
@@ -1025,7 +1021,6 @@ public class TaskManagerTest {
         taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, 
taskId01Partitions)));
 
         verify(activeTaskToRecycle).prepareCommit();
-        
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
         verify(tasks).replaceActiveWithStandby(standbyTask);
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         verify(standbyTaskCreator).createTasks(Collections.emptyMap());
@@ -1065,7 +1060,6 @@ public class TaskManagerTest {
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
 
         verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
-        
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
         verify(activeTaskToClose).prepareCommit();
         verify(activeTaskToClose).closeClean();
         verify(tasks).removeTask(activeTaskToClose);
@@ -2129,7 +2123,6 @@ public class TaskManagerTest {
         assertThat(task00.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -2157,7 +2150,6 @@ public class TaskManagerTest {
             is("Encounter unexpected fatal error for task 0_0")
         );
         assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
-        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -2203,48 +2195,15 @@ public class TaskManagerTest {
         taskManager.handleRebalanceStart(emptySet());
 
         assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
-        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
-    public void 
shouldReInitializeThreadProducerOnHandleLostAllIfEosV2Enabled() {
+    public void 
shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
 
         taskManager.handleLostAll();
 
-        verify(activeTaskCreator).reInitializeThreadProducer();
-    }
-
-    @Test
-    public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager);
-        final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
-        task00.setCommittableOffsetsAndMetadata(offsets);
-
-        // `handleAssignment`
-        when(consumer.assignment()).thenReturn(assignment);
-        when(activeTaskCreator.createTasks(any(), 
eq(taskId00Assignment))).thenReturn(singletonList(task00));
-
-        // `handleAssignment`
-        doThrow(new 
RuntimeException("KABOOM!")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
-
-        taskManager.handleAssignment(taskId00Assignment, emptyMap());
-        assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), 
null), is(true));
-        assertThat(task00.state(), is(Task.State.RUNNING));
-
-        taskManager.handleRevocation(taskId00Partitions);
-
-        final RuntimeException thrown = assertThrows(
-            RuntimeException.class,
-            () -> taskManager.handleAssignment(emptyMap(), emptyMap())
-        );
-
-        assertThat(
-            thrown.getMessage(),
-            is("Encounter unexpected fatal error for task 0_0")
-        );
-        assertThat(thrown.getCause(), instanceOf(RuntimeException.class));
-        assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
+        verify(activeTaskCreator).reInitializeProducer();
     }
 
     @Test
@@ -2552,7 +2511,7 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS()
 {
+    public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithAlos()
 {
         final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
 
         final StateMachineTask corruptedActive = new 
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@@ -2613,7 +2572,7 @@ public class TaskManagerTest {
     public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS()
 {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
         final StreamsProducer producer = mock(StreamsProducer.class);
-        when(activeTaskCreator.threadProducer()).thenReturn(producer);
+        when(activeTaskCreator.streamsProducer()).thenReturn(producer);
         final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
 
         final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new 
AtomicBoolean(false);
@@ -2691,7 +2650,7 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithALOS()
 {
+    public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos()
 {
         final StateMachineTask revokedActiveTask = new 
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
         final Map<TopicPartition, OffsetAndMetadata> offsets00 = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         revokedActiveTask.setCommittableOffsetsAndMetadata(offsets00);
@@ -2745,7 +2704,7 @@ public class TaskManagerTest {
     public void 
shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS()
 {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
         final StreamsProducer producer = mock(StreamsProducer.class);
-        when(activeTaskCreator.threadProducer()).thenReturn(producer);
+        when(activeTaskCreator.streamsProducer()).thenReturn(producer);
         final ProcessorStateManager stateManager = 
mock(ProcessorStateManager.class);
 
         final StateMachineTask revokedActiveTask = new 
StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@@ -3021,7 +2980,7 @@ public class TaskManagerTest {
         when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
             .thenReturn(asList(task00, task01, task02));
 
-        when(activeTaskCreator.threadProducer()).thenReturn(producer);
+        when(activeTaskCreator.streamsProducer()).thenReturn(producer);
         when(standbyTaskCreator.createTasks(assignmentStandby))
             .thenReturn(singletonList(task10));
 
@@ -3176,7 +3135,6 @@ public class TaskManagerTest {
 
         taskManager.handleAssignment(emptyMap(), emptyMap());
         assertThat(task00.state(), is(Task.State.CLOSED));
-        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -3206,7 +3164,7 @@ public class TaskManagerTest {
 
     @Test
     public void 
shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithExactlyOnceV2() {
-        
when(activeTaskCreator.threadProducer()).thenReturn(mock(StreamsProducer.class));
+        
when(activeTaskCreator.streamsProducer()).thenReturn(mock(StreamsProducer.class));
         
shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(ProcessingMode.EXACTLY_ONCE_V2);
     }
 
@@ -3315,62 +3273,12 @@ public class TaskManagerTest {
         assertThat(task03.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(activeTaskCreator, 
times(4)).closeAndRemoveTaskProducerIfNeeded(any());
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(activeTaskCreator).close();
     }
 
     @Test
-    public void 
shouldCloseActiveTasksAndPropagateTaskProducerExceptionsOnCleanShutdown() {
-        final TopicPartition changelog = new TopicPartition("changelog", 0);
-        final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
-            mkEntry(taskId00, taskId00Partitions)
-        );
-        final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true, stateManager) {
-            @Override
-            public Set<TopicPartition> changelogPartitions() {
-                return singleton(changelog);
-            }
-        };
-        final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
-        task00.setCommittableOffsetsAndMetadata(offsets);
-
-        when(activeTaskCreator.createTasks(any(), 
eq(assignment))).thenReturn(singletonList(task00));
-        doThrow(new RuntimeException("whatever"))
-            
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
-
-        taskManager.handleAssignment(assignment, emptyMap());
-
-        assertThat(task00.state(), is(Task.State.CREATED));
-
-        taskManager.tryToCompleteRestoration(time.milliseconds(), null);
-
-        assertThat(task00.state(), is(Task.State.RESTORING));
-        assertThat(
-            taskManager.activeTaskMap(),
-            Matchers.equalTo(
-                mkMap(
-                    mkEntry(taskId00, task00)
-                )
-            )
-        );
-        assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(changeLogReader).enforceRestoreActive();
-        verify(changeLogReader).completedChangelogs();
-
-        final RuntimeException exception = 
assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
-
-        assertThat(task00.state(), is(Task.State.CLOSED));
-        assertThat(exception.getCause().getMessage(), is("whatever"));
-        assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
-        assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
-        // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator).closeThreadProducerIfNeeded();
-    }
-
-    @Test
-    public void 
shouldCloseActiveTasksAndPropagateThreadProducerExceptionsOnCleanShutdown() {
+    public void 
shouldCloseActiveTasksAndPropagateStreamsProducerExceptionsOnCleanShutdown() {
         final TopicPartition changelog = new TopicPartition("changelog", 0);
         final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
             mkEntry(taskId00, taskId00Partitions)
@@ -3383,7 +3291,7 @@ public class TaskManagerTest {
         };
 
         when(activeTaskCreator.createTasks(any(), 
eq(assignment))).thenReturn(singletonList(task00));
-        doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
+        doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).close();
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3411,7 +3319,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+        verify(activeTaskCreator).close();
     }
 
     @Test
@@ -3505,8 +3413,7 @@ public class TaskManagerTest {
         };
 
         when(activeTaskCreator.createTasks(any(), 
eq(assignment))).thenReturn(asList(task00, task01, task02));
-        doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
-        doThrow(new RuntimeException("whatever 
all")).when(activeTaskCreator).closeThreadProducerIfNeeded();
+        doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).close();
 
         taskManager.handleAssignment(assignment, emptyMap());
 
@@ -3540,9 +3447,8 @@ public class TaskManagerTest {
         assertThat(task02.state(), is(Task.State.CLOSED));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
-        verify(activeTaskCreator, 
times(3)).closeAndRemoveTaskProducerIfNeeded(any());
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(activeTaskCreator).close();
     }
 
     @Test
@@ -3566,7 +3472,7 @@ public class TaskManagerTest {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(activeTaskCreator).close();
         // `tryToCompleteRestoration`
         verify(consumer).assignment();
         verify(consumer).resume(eq(emptySet()));
@@ -3588,8 +3494,7 @@ public class TaskManagerTest {
 
         taskManager.shutdown(true);
 
-        
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
-        verify(activeTaskCreator).closeThreadProducerIfNeeded();
+        verify(activeTaskCreator).close();
         verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
         verify(failedStatefulTask).prepareCommit();
         verify(failedStatefulTask).suspend();
@@ -3668,19 +3573,15 @@ public class TaskManagerTest {
         verify(removedFailedStatefulTask).prepareCommit();
         verify(removedFailedStatefulTask).suspend();
         verify(removedFailedStatefulTask).closeDirty();
-        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId03);
         verify(removedFailedStandbyTask).prepareCommit();
         verify(removedFailedStandbyTask).suspend();
         verify(removedFailedStandbyTask).closeDirty();
-        verify(activeTaskCreator, 
never()).closeAndRemoveTaskProducerIfNeeded(taskId04);
         verify(removedFailedStatefulTaskDuringRemoval).prepareCommit();
         verify(removedFailedStatefulTaskDuringRemoval).suspend();
         verify(removedFailedStatefulTaskDuringRemoval).closeDirty();
-        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId05);
         verify(removedFailedStandbyTaskDuringRemoval).prepareCommit();
         verify(removedFailedStandbyTaskDuringRemoval).suspend();
         verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
-        verify(activeTaskCreator, 
never()).closeAndRemoveTaskProducerIfNeeded(taskId00);
     }
 
     @Test
@@ -3873,7 +3774,7 @@ public class TaskManagerTest {
     @Test
     public void shouldCommitViaProducerIfEosV2Enabled() {
         final StreamsProducer producer = mock(StreamsProducer.class);
-        when(activeTaskCreator.threadProducer()).thenReturn(producer);
+        when(activeTaskCreator.streamsProducer()).thenReturn(producer);
 
         final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
         final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = 
singletonMap(t1p2, new OffsetAndMetadata(1L, null));
@@ -3881,16 +3782,7 @@ public class TaskManagerTest {
         allOffsets.putAll(offsetsT01);
         allOffsets.putAll(offsetsT02);
 
-        shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, 
offsetsT01, offsetsT02);
-
-        verify(producer).commitTransaction(allOffsets, new 
ConsumerGroupMetadata("appId"));
-        verifyNoMoreInteractions(producer);
-    }
-
-    private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode 
processingMode,
-                                                     final Map<TopicPartition, 
OffsetAndMetadata> offsetsT01,
-                                                     final Map<TopicPartition, 
OffsetAndMetadata> offsetsT02) {
-        final TaskManager taskManager = setUpTaskManager(processingMode, 
false);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
 
         final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, true, stateManager);
         task01.setCommittableOffsetsAndMetadata(offsetsT01);
@@ -3904,6 +3796,9 @@ public class TaskManagerTest {
         when(consumer.groupMetadata()).thenReturn(new 
ConsumerGroupMetadata("appId"));
 
         taskManager.commitAll();
+
+        verify(producer).commitTransaction(allOffsets, new 
ConsumerGroupMetadata("appId"));
+        verifyNoMoreInteractions(producer);
     }
 
     @Test
@@ -4608,7 +4503,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        when(activeTaskCreator.threadProducer()).thenReturn(producer);
+        when(activeTaskCreator.streamsProducer()).thenReturn(producer);
 
         final Map<TopicPartition, OffsetAndMetadata> offsetsT00 = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = 
singletonMap(t1p1, new OffsetAndMetadata(1L, null));
@@ -4720,7 +4615,6 @@ public class TaskManagerTest {
         taskManager.handleAssignment(taskId00Assignment, 
Collections.emptyMap());
         taskManager.handleAssignment(Collections.emptyMap(), 
taskId00Assignment);
 
-        verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
         verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
         verify(standbyTaskCreator, 
times(2)).createTasks(Collections.emptyMap());
         verifyNoInteractions(consumer);


Reply via email to