This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a19bc7d97b77edaacbfafe56b69256214b5c648b
Author: Bruno Cadonna <[email protected]>
AuthorDate: Mon May 27 10:51:54 2024 +0200

    Implement StreamsInitialize request manager
    
    See https://github.com/lucasbru/kafka/pull/11
---
 .../consumer/internals/RequestManagers.java        |  16 +++
 .../internals/StreamsInitializeRequestManager.java |  98 ++++++++++++++-
 .../StreamsInitializeRequestManagerTest.java       | 131 +++++++++++++++++++++
 3 files changed, 241 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 5045743848d..65dec12140b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -251,6 +251,22 @@ public class RequestManagers implements Closeable {
                     
membershipManager.registerStateListener(commitRequestManager);
                     
membershipManager.registerStateListener(applicationThreadMemberStateListener);
 
+                    if (streamsInstanceMetadata.isPresent()) {
+                        streamsInitializeRequestManager = new 
StreamsInitializeRequestManager(
+                            logContext,
+                            groupRebalanceConfig.groupId,
+                            streamsInstanceMetadata.get(),
+                            coordinator);
+                        streamsPrepareAssignmentRequestManager = new 
StreamsPrepareAssignmentRequestManager(
+                            streamsInstanceMetadata.get());
+                        streamsInstallAssignmentRequestManager = new 
StreamsInstallAssignmentRequestManager(
+                            streamsInstanceMetadata.get());
+                        streamsHeartbeatRequestManager = new 
StreamsHeartbeatRequestManager(
+                            streamsInitializeRequestManager,
+                            streamsPrepareAssignmentRequestManager,
+                            streamsInstanceMetadata.get());
+                    } else {
+                        heartbeatRequestManager = new HeartbeatRequestManager(
                             logContext,
                             time,
                             config,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
index 73d2955ef2f..34f9fee96f7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
@@ -16,23 +16,113 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ClientResponse;
 import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.requests.StreamsInitializeRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 public class StreamsInitializeRequestManager implements RequestManager {
 
+    private final Logger logger;
+    private final String groupId;
     private final StreamsAssignmentInterface streamsAssignmentInterface;
+    private final CoordinatorRequestManager coordinatorRequestManager;
+
+    private Optional<NetworkClientDelegate.UnsentRequest> unsentRequest = 
Optional.empty();
 
-    StreamsInitializeRequestManager(final StreamsAssignmentInterface 
streamsAssignmentInterface) {
+
+    StreamsInitializeRequestManager(final LogContext logContext,
+                                    final String groupId,
+                                    final StreamsAssignmentInterface 
streamsAssignmentInterface,
+                                    final CoordinatorRequestManager 
coordinatorRequestManager) {
+        this.logger = logContext.logger(getClass());
+        this.groupId = groupId;
         this.streamsAssignmentInterface = streamsAssignmentInterface;
+        this.coordinatorRequestManager = coordinatorRequestManager;
     }
 
     @Override
     public PollResult poll(final long currentTimeMs) {
-        // TODO: Return an initalize request
-        return PollResult.EMPTY;
+        return unsentRequest.map(PollResult::new).orElse(PollResult.EMPTY);
     }
 
     public void initialize() {
-        // TODO: Create a `StreamsInitializeRequest`
+        final NetworkClientDelegate.UnsentRequest unsentRequest = 
makeRequest();
+
+        unsentRequest.whenComplete(this::onResponse);
+
+        this.unsentRequest = Optional.of(unsentRequest);
+    }
+
+    private NetworkClientDelegate.UnsentRequest makeRequest() {
+        final StreamsInitializeRequestData streamsInitializeRequestData = new 
StreamsInitializeRequestData();
+        streamsInitializeRequestData.setGroupId(groupId);
+        final List<StreamsInitializeRequestData.Subtopology> topology = 
getTopologyFromStreams();
+        streamsInitializeRequestData.setTopology(topology);
+        final StreamsInitializeRequest.Builder streamsInitializeRequestBuilder 
= new StreamsInitializeRequest.Builder(
+            streamsInitializeRequestData
+        );
+        return new NetworkClientDelegate.UnsentRequest(
+            streamsInitializeRequestBuilder,
+            coordinatorRequestManager.coordinator()
+        );
+    }
+
+    private List<StreamsInitializeRequestData.Subtopology> 
getTopologyFromStreams() {
+        final Map<String, StreamsAssignmentInterface.SubTopology> 
subTopologyMap = streamsAssignmentInterface.subtopologyMap();
+        final List<StreamsInitializeRequestData.Subtopology> topology = new 
ArrayList<>(subTopologyMap.size());
+        for (final Map.Entry<String, StreamsAssignmentInterface.SubTopology> 
subtopology : subTopologyMap.entrySet()) {
+            topology.add(getSubtopologyFromStreams(subtopology.getKey(), 
subtopology.getValue()));
+        }
+        return topology;
+    }
+
+    private static StreamsInitializeRequestData.Subtopology 
getSubtopologyFromStreams(final String subtopologyName,
+                                                                               
       final StreamsAssignmentInterface.SubTopology subtopology) {
+        final StreamsInitializeRequestData.Subtopology subtopologyData = new 
StreamsInitializeRequestData.Subtopology();
+        subtopologyData.setSubtopology(subtopologyName);
+        subtopologyData.setSourceTopics(new 
ArrayList<>(subtopology.sourceTopics));
+        subtopologyData.setSinkTopics(new ArrayList<>(subtopology.sinkTopics));
+        
subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology));
+        
subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology));
+        return subtopologyData;
+    }
+
+    private static List<StreamsInitializeRequestData.TopicInfo> 
getRepartitionTopicsInfoFromStreams(final 
StreamsAssignmentInterface.SubTopology subtopologyDataFromStreams) {
+        final List<StreamsInitializeRequestData.TopicInfo> 
repartitionTopicsInfo = new ArrayList<>();
+        for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> 
repartitionTopic : 
subtopologyDataFromStreams.repartitionSourceTopics.entrySet()) {
+            final StreamsInitializeRequestData.TopicInfo repartitionTopicInfo 
= new StreamsInitializeRequestData.TopicInfo();
+            repartitionTopicInfo.setName(repartitionTopic.getKey());
+            
repartitionTopic.getValue().numPartitions.ifPresent(repartitionTopicInfo::setPartitions);
+            repartitionTopicsInfo.add(repartitionTopicInfo);
+        }
+        return repartitionTopicsInfo;
+    }
+
+    private static List<StreamsInitializeRequestData.TopicInfo> 
getChangelogTopicsInfoFromStreams(final StreamsAssignmentInterface.SubTopology 
subtopologyDataFromStreams) {
+        final List<StreamsInitializeRequestData.TopicInfo> changelogTopicsInfo 
= new ArrayList<>();
+        for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> 
changelogTopic : subtopologyDataFromStreams.stateChangelogTopics.entrySet()) {
+            final StreamsInitializeRequestData.TopicInfo changelogTopicInfo = 
new StreamsInitializeRequestData.TopicInfo();
+            changelogTopicInfo.setName(changelogTopic.getKey());
+            changelogTopicsInfo.add(changelogTopicInfo);
+        }
+        return changelogTopicsInfo;
+    }
+    
+    private void onResponse(final ClientResponse response, final Throwable 
exception) {
+        if (exception != null) {
+            // todo: handle error
+            logger.error("Error during Streams initialization: ", exception);
+        } else {
+            // todo: handle success
+            logger.info("Streams initialization successful", exception);
+        }
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java
new file mode 100644
index 00000000000..bb734a7b1f5
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.StreamsInitializeRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class StreamsInitializeRequestManagerTest {
+
+    final private String groupId = "groupId";
+    final private LogContext logContext = new LogContext("test");
+
+    @Test
+    public void shouldPollEmptyResult() {
+        final CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
+        final StreamsAssignmentInterface streamsAssignmentInterface = 
mock(StreamsAssignmentInterface.class);
+        final StreamsInitializeRequestManager streamsInitializeRequestManager 
= new StreamsInitializeRequestManager(
+            logContext,
+            groupId,
+            streamsAssignmentInterface,
+            coordinatorRequestManager
+        );
+
+        final NetworkClientDelegate.PollResult pollResult = 
streamsInitializeRequestManager.poll(0);
+
+        assertEquals(NetworkClientDelegate.PollResult.EMPTY, pollResult);
+    }
+
+    @Test
+    public void shouldPollStreamsInitializeRequest() {
+        final Node node = mock(Node.class);
+        final CoordinatorRequestManager coordinatorRequestManager = 
mock(CoordinatorRequestManager.class);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(node));
+        final StreamsAssignmentInterface streamsAssignmentInterface = 
mock(StreamsAssignmentInterface.class);
+        final Set<String> sourceTopics = mkSet("sourceTopic1", "sourceTopic2");
+        final Set<String> sinkTopics = mkSet("sinkTopic1", "sinkTopic2", 
"sinkTopic3");
+        final Map<String, StreamsAssignmentInterface.TopicInfo> 
repartitionTopics = mkMap(
+            mkEntry("repartitionTopic1", new 
StreamsAssignmentInterface.TopicInfo(Optional.of(2), Collections.emptyMap())),
+            mkEntry("repartitionTopic2", new 
StreamsAssignmentInterface.TopicInfo(Optional.of(3), Collections.emptyMap()))
+        );
+        final Map<String, StreamsAssignmentInterface.TopicInfo> 
changelogTopics = mkMap(
+            mkEntry("changelogTopic1", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())),
+            mkEntry("changelogTopic2", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())),
+            mkEntry("changelogTopic3", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap()))
+        );
+        final StreamsAssignmentInterface.SubTopology subtopology1 = new 
StreamsAssignmentInterface.SubTopology(
+            sinkTopics,
+            sourceTopics,
+            repartitionTopics,
+            changelogTopics
+        );
+        final String subtopologyName1 = "subtopology1";
+        when(streamsAssignmentInterface.subtopologyMap()).thenReturn(
+            mkMap(mkEntry(subtopologyName1, subtopology1))
+        );
+        final StreamsInitializeRequestManager streamsInitializeRequestManager 
= new StreamsInitializeRequestManager(
+            logContext,
+            groupId,
+            streamsAssignmentInterface,
+            coordinatorRequestManager
+        );
+
+        streamsInitializeRequestManager.initialize();
+        final NetworkClientDelegate.PollResult pollResult = 
streamsInitializeRequestManager.poll(0);
+
+        assertEquals(1, pollResult.unsentRequests.size());
+        final NetworkClientDelegate.UnsentRequest unsentRequest = 
pollResult.unsentRequests.get(0);
+        assertTrue(unsentRequest.node().isPresent());
+        assertEquals(node, unsentRequest.node().get());
+        assertEquals(ApiKeys.STREAMS_INITIALIZE, 
unsentRequest.requestBuilder().apiKey());
+        final StreamsInitializeRequest.Builder streamsInitializeRequestBuilder 
= (StreamsInitializeRequest.Builder) unsentRequest.requestBuilder();
+        final StreamsInitializeRequest streamsInitializeRequest = 
streamsInitializeRequestBuilder.build();
+        final StreamsInitializeRequestData streamsInitializeRequestData = 
streamsInitializeRequest.data();
+        assertEquals(ApiKeys.STREAMS_INITIALIZE.id, 
streamsInitializeRequestData.apiKey());
+        assertEquals(groupId, streamsInitializeRequestData.groupId());
+        assertNotNull(streamsInitializeRequestData.topology());
+        final List<StreamsInitializeRequestData.Subtopology> subtopologies = 
streamsInitializeRequestData.topology();
+        assertEquals(1, subtopologies.size());
+        final StreamsInitializeRequestData.Subtopology subtopology = 
subtopologies.get(0);
+        assertEquals(subtopologyName1, subtopology.subtopology());
+        assertEquals(new ArrayList<>(sourceTopics), 
subtopology.sourceTopics());
+        assertEquals(new ArrayList<>(sinkTopics), subtopology.sinkTopics());
+        assertEquals(repartitionTopics.size(), 
subtopology.repartitionSourceTopics().size());
+        subtopology.repartitionSourceTopics().forEach(topicInfo -> {
+            final StreamsAssignmentInterface.TopicInfo repartitionTopic = 
repartitionTopics.get(topicInfo.name());
+            assertEquals(repartitionTopic.numPartitions.get(), 
topicInfo.partitions());
+        });
+        assertEquals(changelogTopics.size(), 
subtopology.stateChangelogTopics().size());
+        subtopology.stateChangelogTopics().forEach(topicInfo -> {
+            assertTrue(changelogTopics.containsKey(topicInfo.name()));
+            assertEquals(0, topicInfo.partitions());
+        });
+    }
+}
\ No newline at end of file

Reply via email to