This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/kip1071 by this push:
     new 6f6cbef487e Follow-ups to the integration of Streams membership 
manager (#18019)
6f6cbef487e is described below

commit 6f6cbef487e94d67db7ed941799b42a6bfd1ba75
Author: Bruno Cadonna <[email protected]>
AuthorDate: Tue Dec 3 16:51:32 2024 +0100

    Follow-ups to the integration of Streams membership manager (#18019)
---
 .../internals/StreamsAssignmentInterface.java      | 25 +++++------
 .../events/ApplicationEventProcessor.java          |  6 +--
 .../StreamsGroupHeartbeatRequestManagerTest.java   | 25 ++---------
 .../streams/processor/internals/StreamThread.java  | 48 ++++++++++++----------
 .../processor/internals/StreamThreadTest.java      | 20 ++++-----
 5 files changed, 53 insertions(+), 71 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
index 8d5754b7c9c..821b42cca08 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java
@@ -304,9 +304,9 @@ public class StreamsAssignmentInterface {
 
     private ApplicationEventHandler applicationEventHandler = null;
 
-    private Optional<Function<Set<StreamsAssignmentInterface.TaskId>, 
Optional<Exception>>> onTasksRevokedCallback = null;
-    private Optional<Function<Assignment, Optional<Exception>>> 
onTasksAssignedCallback = null;
-    private Optional<Supplier<Optional<Exception>>> onAllTasksLostCallback = 
null;
+    private Optional<Function<Set<StreamsAssignmentInterface.TaskId>, 
Optional<Exception>>> onTasksRevokedCallback = Optional.empty();
+    private Optional<Function<Assignment, Optional<Exception>>> 
onTasksAssignedCallback = Optional.empty();
+    private Optional<Supplier<Optional<Exception>>> onAllTasksLostCallback = 
Optional.empty();
 
     private final StreamsRebalanceEventProcessor 
streamsRebalanceEventProcessor;
 
@@ -316,19 +316,18 @@ public class StreamsAssignmentInterface {
         public void process(final BackgroundEvent event) {
             switch (event.type()) {
                 case ERROR:
-                    process((ErrorEvent) event);
-                    break;
+                    throw ((ErrorEvent) event).error();
 
                 case STREAMS_ON_TASKS_REVOKED_CALLBACK_NEEDED:
-                    process((StreamsOnTasksRevokedCallbackNeededEvent) event);
+                    
processStreamsOnTasksRevokedCallbackNeededEvent((StreamsOnTasksRevokedCallbackNeededEvent)
 event);
                     break;
 
                 case STREAMS_ON_TASKS_ASSIGNED_CALLBACK_NEEDED:
-                    process((StreamsOnTasksAssignedCallbackNeededEvent) event);
+                    
processStreamsOnTasksAssignedCallbackNeededEvent((StreamsOnTasksAssignedCallbackNeededEvent)
 event);
                     break;
 
                 case STREAMS_ON_ALL_TASKS_LOST_CALLBACK_NEEDED:
-                    process((StreamsOnAllTasksLostCallbackNeededEvent) event);
+                    
processStreamsOnAllTasksLostCallbackNeededEvent((StreamsOnAllTasksLostCallbackNeededEvent)
 event);
                     break;
 
                 default:
@@ -337,11 +336,7 @@ public class StreamsAssignmentInterface {
             }
         }
 
-        private void process(final ErrorEvent event) {
-            throw event.error();
-        }
-
-        private void process(final StreamsOnTasksRevokedCallbackNeededEvent 
event) {
+        private void processStreamsOnTasksRevokedCallbackNeededEvent(final 
StreamsOnTasksRevokedCallbackNeededEvent event) {
             StreamsOnTasksRevokedCallbackCompletedEvent invokedEvent = 
invokeOnTasksRevokedCallback(event.activeTasksToRevoke(), event.future());
             applicationEventHandler.add(invokedEvent);
             if (invokedEvent.error().isPresent()) {
@@ -349,7 +344,7 @@ public class StreamsAssignmentInterface {
             }
         }
 
-        private void process(final StreamsOnTasksAssignedCallbackNeededEvent 
event) {
+        private void processStreamsOnTasksAssignedCallbackNeededEvent(final 
StreamsOnTasksAssignedCallbackNeededEvent event) {
             StreamsOnTasksAssignedCallbackCompletedEvent invokedEvent = 
invokeOnTasksAssignedCallback(event.assignment(), event.future());
             applicationEventHandler.add(invokedEvent);
             if (invokedEvent.error().isPresent()) {
@@ -357,7 +352,7 @@ public class StreamsAssignmentInterface {
             }
         }
 
-        private void process(final StreamsOnAllTasksLostCallbackNeededEvent 
event) {
+        private void processStreamsOnAllTasksLostCallbackNeededEvent(final 
StreamsOnAllTasksLostCallbackNeededEvent event) {
             StreamsOnAllTasksLostCallbackCompletedEvent invokedEvent = 
invokeOnAllTasksLostCallback(event.future());
             applicationEventHandler.add(invokedEvent);
             if (invokedEvent.error().isPresent()) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index f36852c091a..e2cf2526d38 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -572,7 +572,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(final StreamsOnTasksRevokedCallbackCompletedEvent 
event) {
-        if (!requestManagers.streamsMembershipManager.isPresent()) {
+        if (requestManagers.streamsMembershipManager.isEmpty()) {
             log.warn("An internal error occurred; the Streams membership 
manager was not present, so the notification " +
                 "of the onTasksRevoked callback execution could not be sent");
             return;
@@ -581,7 +581,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(final StreamsOnTasksAssignedCallbackCompletedEvent 
event) {
-        if (!requestManagers.streamsMembershipManager.isPresent()) {
+        if (requestManagers.streamsMembershipManager.isEmpty()) {
             log.warn("An internal error occurred; the Streams membership 
manager was not present, so the notification " +
                 "of the onTasksAssigned callback execution could not be sent");
             return;
@@ -590,7 +590,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(final StreamsOnAllTasksLostCallbackCompletedEvent 
event) {
-        if (!requestManagers.streamsMembershipManager.isPresent()) {
+        if (requestManagers.streamsMembershipManager.isEmpty()) {
             log.warn("An internal error occurred; the Streams membership 
manager was not present, so the notification " +
                 "of the onAllTasksLost callback execution could not be sent");
             return;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
index 4f22ed64c3b..6006bcb286a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java
@@ -349,28 +349,9 @@ class StreamsGroupHeartbeatRequestManagerTest {
         assertEquals(TEST_MEMBER_EPOCH, response.memberEpoch());
         assertEquals(TEST_THROTTLE_TIME_MS, response.throttleTimeMs());
         assertEquals(1000, response.heartbeatIntervalMs());
-//        final List<TopicPartitions> tps = response.assign.topicPartitions();
-//        assertEquals(2, tps.size());
-//        assertEquals(Set.of(uuid0, uuid1), 
tps.stream().map(TopicPartitions::topicId).collect(Collectors.toSet()));
-//        assertEquals(Collections.singletonList(0), tps.get(0).partitions());
-//        assertEquals(Collections.singletonList(0), tps.get(1).partitions());
-
-//        final Assignment targetAssignment = 
streamsAssignmentInterface.targetAssignment.get();
-//        assertEquals(1, targetAssignment.activeTasks.size());
-//        final TaskId activeTaskId = 
targetAssignment.activeTasks.stream().findFirst().get();
-//        assertEquals(activeTaskId.subtopologyId(), "0");
-//        assertEquals(activeTaskId.partitionId(), 0);
-//
-//        assertEquals(1, targetAssignment.standbyTasks.size());
-//        final TaskId standbyTaskId = 
targetAssignment.standbyTasks.stream().findFirst().get();
-//        assertEquals(standbyTaskId.subtopologyId(), "1");
-//        assertEquals(standbyTaskId.partitionId(), 1);
-//
-//        assertEquals(1, targetAssignment.warmupTasks.size());
-//        final TaskId warmupTaskId = 
targetAssignment.warmupTasks.stream().findFirst().get();
-//        assertEquals(warmupTaskId.subtopologyId(), "2");
-//        assertEquals(warmupTaskId.partitionId(), 2);
-
+        assertEquals(data.activeTasks(), response.activeTasks());
+        assertEquals(data.standbyTasks(), response.standbyTasks());
+        assertEquals(data.warmupTasks(), response.warmupTasks());
     }
 
     private void mockResponse(final StreamsGroupHeartbeatResponseData data) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e5208bdea97..721eb5aabc7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -352,7 +352,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     private final Queue<StreamsException> nonFatalExceptionsToHandle;
 
     // These are used only with the Streams Rebalance Protocol client
-    private final StreamsAssignmentInterface streamsAssignmentInterface;
+    private final Optional<StreamsAssignmentInterface> 
streamsAssignmentInterface;
     private final StreamsMetadataState streamsMetadataState;
 
     // These are used to signal from outside the stream thread, but the 
variables themselves are internal to the thread
@@ -491,23 +491,28 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
 
         final Consumer<byte[], byte[]> mainConsumer;
-        final StreamsAssignmentInterface streamsAssignmentInterface;
+        final Optional<StreamsAssignmentInterface> streamsAssignmentInterface;
         if 
(config.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name))
 {
             if (topologyMetadata.hasNamedTopologies()) {
                 throw new IllegalStateException("Named topologies and the 
CONSUMER protocol cannot be used at the same time.");
             }
             log.info("Streams rebalance protocol enabled");
 
-            streamsAssignmentInterface =
-                initAssignmentInterface(processId,
-                config,
-                
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
-                topologyMetadata);
-
-            mainConsumer = 
clientSupplier.getStreamsRebalanceProtocolConsumer(consumerConfigs, 
streamsAssignmentInterface);
+            streamsAssignmentInterface = Optional.of(
+                initAssignmentInterface(
+                    processId,
+                    config,
+                    
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
+                    topologyMetadata
+                )
+            );
+            mainConsumer = clientSupplier.getStreamsRebalanceProtocolConsumer(
+                consumerConfigs,
+                streamsAssignmentInterface.get()
+            );
         } else {
             mainConsumer = clientSupplier.getConsumer(consumerConfigs);
-            streamsAssignmentInterface = null;
+            streamsAssignmentInterface = Optional.empty();
         }
 
         taskManager.setMainConsumer(mainConsumer);
@@ -693,7 +698,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                         final Runnable shutdownErrorHook,
                         final BiConsumer<Throwable, Boolean> 
streamsUncaughtExceptionHandler,
                         final java.util.function.Consumer<Long> cacheResizer,
-                        final StreamsAssignmentInterface 
streamsAssignmentInterface,
+                        final Optional<StreamsAssignmentInterface> 
streamsAssignmentInterface,
                         final StreamsMetadataState streamsMetadataState) {
         super(threadId);
         this.stateLock = new Object();
@@ -716,11 +721,11 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
         this.cacheResizer = cacheResizer;
         this.streamsAssignmentInterface = streamsAssignmentInterface;
-        if (streamsAssignmentInterface != null) {
-            
streamsAssignmentInterface.setOnTasksRevokedCallback(this::onTasksRevoked);
-            
streamsAssignmentInterface.setOnTasksAssignedCallback(this::onTasksAssigned);
-            
streamsAssignmentInterface.setOnAllTasksLostCallback(this::onAllTasksLost);
-        }
+        streamsAssignmentInterface.ifPresent(assignmentInterface -> {
+            
assignmentInterface.setOnTasksRevokedCallback(this::onTasksRevoked);
+            
assignmentInterface.setOnTasksAssignedCallback(this::onTasksAssigned);
+            
assignmentInterface.setOnAllTasksLostCallback(this::onAllTasksLost);
+        });
         this.streamsMetadataState = streamsMetadataState;
 
         // The following sensors are created here but their references are not 
stored in this object, since within
@@ -1391,14 +1396,15 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     }
 
     public void maybeHandleAssignmentFromStreamsRebalanceProtocol() {
-        if (streamsAssignmentInterface != null) {
+        if (streamsAssignmentInterface.isPresent()) {
 
-            if (streamsAssignmentInterface.shutdownRequested()) {
+            if (streamsAssignmentInterface.get().shutdownRequested()) {
                 
assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code());
             }
 
             // Process metadata from Streams Rebalance Protocol
-            final Map<StreamsAssignmentInterface.HostInfo, 
List<TopicPartition>> partitionsByEndpoint = 
streamsAssignmentInterface.partitionsByHost.get();
+            final Map<StreamsAssignmentInterface.HostInfo, 
List<TopicPartition>> partitionsByEndpoint =
+                streamsAssignmentInterface.get().partitionsByHost.get();
             final Map<HostInfo, Set<TopicPartition>> convertedHostInfoMap = 
new HashMap<>();
             partitionsByEndpoint.forEach((hostInfo, topicPartitions) ->
                 convertedHostInfoMap.put(new HostInfo(hostInfo.host, 
hostInfo.port), new HashSet<>(topicPartitions)));
@@ -1409,7 +1415,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             );
 
             // Process assignment from Streams Rebalance Protocol
-            streamsAssignmentInterface.processStreamsRebalanceEvents();
+            streamsAssignmentInterface.get().processStreamsRebalanceEvents();
         }
     }
 
@@ -1488,7 +1494,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         return taskIdStream
             .collect(Collectors.toMap(
                 this::toTaskId,
-                task -> toTopicPartitions(task, 
streamsAssignmentInterface.subtopologyMap().get(task.subtopologyId()))
+                task -> toTopicPartitions(task, 
streamsAssignmentInterface.get().subtopologyMap().get(task.subtopologyId()))
             ));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 9efb5780732..b71fe0af407 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1448,7 +1448,7 @@ public class StreamThreadTest {
             null,
             HANDLER,
             null,
-            null,
+            Optional.empty(),
             null).updateThreadMetadata(adminClientId(CLIENT_ID));
 
         final StreamsException thrown = assertThrows(StreamsException.class, 
thread::run);
@@ -2675,7 +2675,7 @@ public class StreamThreadTest {
             null,
             HANDLER,
             null,
-            null,
+            Optional.empty(),
             null) {
             @Override
             void runOnceWithProcessingThreads() {
@@ -2734,7 +2734,7 @@ public class StreamThreadTest {
             null,
             HANDLER,
             null,
-            null,
+            Optional.empty(),
             null) {
             @Override
             void runOnceWithProcessingThreads() {
@@ -2802,7 +2802,7 @@ public class StreamThreadTest {
             null,
             HANDLER,
             null,
-            null,
+            Optional.empty(),
             null) {
             @Override
             void runOnceWithProcessingThreads() {
@@ -2866,7 +2866,7 @@ public class StreamThreadTest {
             null,
             HANDLER,
             null,
-            null,
+            Optional.empty(),
             null) {
             @Override
             void runOnceWithProcessingThreads() {
@@ -2927,7 +2927,7 @@ public class StreamThreadTest {
             null,
             HANDLER,
             null,
-            null,
+            Optional.empty(),
             null) {
             @Override
             void runOnceWithProcessingThreads() {
@@ -3161,7 +3161,7 @@ public class StreamThreadTest {
             null,
             HANDLER,
             null,
-            null,
+            Optional.empty(),
             null);
         final MetricName testMetricName = new MetricName("test_metric", "", 
"", new HashMap<>());
         final Metric testMetric = new KafkaMetric(
@@ -3218,7 +3218,7 @@ public class StreamThreadTest {
             null,
             (e, b) -> { },
             null,
-            null,
+            Optional.empty(),
             null) {
             @Override
             void runOnceWithProcessingThreads() {
@@ -3598,7 +3598,7 @@ public class StreamThreadTest {
             null,
             null,
             null,
-            null,
+            Optional.empty(),
             null);
     }
 
@@ -3720,7 +3720,7 @@ public class StreamThreadTest {
             null,
             HANDLER,
             null,
-            null,
+            Optional.empty(),
             null);
     }
     

Reply via email to