This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8f22f7a618193c11fcb5024ba8c382539831061e
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue May 28 18:17:03 2024 +0200

    Minor: Test fixes
    
    Fixed some tests and some actual bugs. Added missing request parsing.
    
    See https://github.com/lucasbru/kafka/pull/13
---
 .../internals/StreamsInitializeRequestManager.java | 10 ++---
 .../kafka/common/requests/AbstractRequest.java     |  4 ++
 .../kafka/common/requests/AbstractResponse.java    |  4 ++
 .../common/requests/StreamsInitializeRequest.java  |  2 +-
 .../common/requests/StreamsInitializeResponse.java |  2 +-
 .../StreamsInitializeRequestManagerTest.java       |  4 +-
 .../kafka/common/requests/RequestResponseTest.java | 43 ++++++++++++++++++++++
 7 files changed, 60 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
index 34f9fee96f7..5d19d8a8e50 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java
@@ -76,16 +76,16 @@ public class StreamsInitializeRequestManager implements 
RequestManager {
     }
 
     private List<StreamsInitializeRequestData.Subtopology> 
getTopologyFromStreams() {
-        final Map<String, StreamsAssignmentInterface.SubTopology> 
subTopologyMap = streamsAssignmentInterface.subtopologyMap();
+        final Map<String, StreamsAssignmentInterface.Subtopology> 
subTopologyMap = streamsAssignmentInterface.subtopologyMap();
         final List<StreamsInitializeRequestData.Subtopology> topology = new 
ArrayList<>(subTopologyMap.size());
-        for (final Map.Entry<String, StreamsAssignmentInterface.SubTopology> 
subtopology : subTopologyMap.entrySet()) {
+        for (final Map.Entry<String, StreamsAssignmentInterface.Subtopology> 
subtopology : subTopologyMap.entrySet()) {
             topology.add(getSubtopologyFromStreams(subtopology.getKey(), 
subtopology.getValue()));
         }
         return topology;
     }
 
     private static StreamsInitializeRequestData.Subtopology 
getSubtopologyFromStreams(final String subtopologyName,
-                                                                               
       final StreamsAssignmentInterface.SubTopology subtopology) {
+                                                                               
       final StreamsAssignmentInterface.Subtopology subtopology) {
         final StreamsInitializeRequestData.Subtopology subtopologyData = new 
StreamsInitializeRequestData.Subtopology();
         subtopologyData.setSubtopology(subtopologyName);
         subtopologyData.setSourceTopics(new 
ArrayList<>(subtopology.sourceTopics));
@@ -95,7 +95,7 @@ public class StreamsInitializeRequestManager implements 
RequestManager {
         return subtopologyData;
     }
 
-    private static List<StreamsInitializeRequestData.TopicInfo> 
getRepartitionTopicsInfoFromStreams(final 
StreamsAssignmentInterface.SubTopology subtopologyDataFromStreams) {
+    private static List<StreamsInitializeRequestData.TopicInfo> 
getRepartitionTopicsInfoFromStreams(final 
StreamsAssignmentInterface.Subtopology subtopologyDataFromStreams) {
         final List<StreamsInitializeRequestData.TopicInfo> 
repartitionTopicsInfo = new ArrayList<>();
         for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> 
repartitionTopic : 
subtopologyDataFromStreams.repartitionSourceTopics.entrySet()) {
             final StreamsInitializeRequestData.TopicInfo repartitionTopicInfo 
= new StreamsInitializeRequestData.TopicInfo();
@@ -106,7 +106,7 @@ public class StreamsInitializeRequestManager implements 
RequestManager {
         return repartitionTopicsInfo;
     }
 
-    private static List<StreamsInitializeRequestData.TopicInfo> 
getChangelogTopicsInfoFromStreams(final StreamsAssignmentInterface.SubTopology 
subtopologyDataFromStreams) {
+    private static List<StreamsInitializeRequestData.TopicInfo> 
getChangelogTopicsInfoFromStreams(final StreamsAssignmentInterface.Subtopology 
subtopologyDataFromStreams) {
         final List<StreamsInitializeRequestData.TopicInfo> changelogTopicsInfo 
= new ArrayList<>();
         for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> 
changelogTopic : subtopologyDataFromStreams.stateChangelogTopics.entrySet()) {
             final StreamsInitializeRequestData.TopicInfo changelogTopicInfo = 
new StreamsInitializeRequestData.TopicInfo();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 87a7a826869..e0cb5e97b5d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -350,6 +350,10 @@ public abstract class AbstractRequest implements 
AbstractRequestResponse {
                 return DeleteShareGroupStateRequest.parse(buffer, apiVersion);
             case READ_SHARE_GROUP_STATE_SUMMARY:
                 return ReadShareGroupStateSummaryRequest.parse(buffer, 
apiVersion);
+            case STREAMS_HEARTBEAT:
+                return StreamsHeartbeatRequest.parse(buffer, apiVersion);
+            case STREAMS_INITIALIZE:
+                return StreamsInitializeRequest.parse(buffer, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 83f29471ba3..cc3d3384a27 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -287,6 +287,10 @@ public abstract class AbstractResponse implements 
AbstractRequestResponse {
                 return DeleteShareGroupStateResponse.parse(responseBuffer, 
version);
             case READ_SHARE_GROUP_STATE_SUMMARY:
                 return 
ReadShareGroupStateSummaryResponse.parse(responseBuffer, version);
+            case STREAMS_HEARTBEAT:
+                return StreamsHeartbeatResponse.parse(responseBuffer, version);
+            case STREAMS_INITIALIZE:
+                return StreamsInitializeResponse.parse(responseBuffer, 
version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
index 83761d805de..5f150053798 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
@@ -52,7 +52,7 @@ public class StreamsInitializeRequest extends AbstractRequest 
{
     private final StreamsInitializeRequestData data;
 
     public StreamsInitializeRequest(StreamsInitializeRequestData data, short 
version) {
-        super(ApiKeys.STREAMS_HEARTBEAT, version);
+        super(ApiKeys.STREAMS_INITIALIZE, version);
         this.data = data;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
index 81c281aba98..050b28b02af 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
@@ -40,7 +40,7 @@ public class StreamsInitializeResponse extends 
AbstractResponse {
     private final StreamsInitializeResponseData data;
 
     public StreamsInitializeResponse(StreamsInitializeResponseData data) {
-        super(ApiKeys.STREAMS_HEARTBEAT);
+        super(ApiKeys.STREAMS_INITIALIZE);
         this.data = data;
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java
index bb734a7b1f5..6aebd48efec 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java
@@ -80,9 +80,9 @@ class StreamsInitializeRequestManagerTest {
             mkEntry("changelogTopic2", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())),
             mkEntry("changelogTopic3", new 
StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap()))
         );
-        final StreamsAssignmentInterface.SubTopology subtopology1 = new 
StreamsAssignmentInterface.SubTopology(
-            sinkTopics,
+        final StreamsAssignmentInterface.Subtopology subtopology1 = new 
StreamsAssignmentInterface.Subtopology(
             sourceTopics,
+            sinkTopics,
             repartitionTopics,
             changelogTopics
         );
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index cb6a2458261..da4ae96af47 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -234,6 +234,14 @@ import 
org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import 
org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState;
 import 
org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaTopicState;
 import org.apache.kafka.common.message.StopReplicaResponseData;
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
+import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData;
+import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData;
+import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData;
+import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
 import 
org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
 import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -1123,6 +1131,8 @@ public class RequestResponseTest {
             case WRITE_SHARE_GROUP_STATE: return 
createWriteShareGroupStateRequest(version);
             case DELETE_SHARE_GROUP_STATE: return 
createDeleteShareGroupStateRequest(version);
             case READ_SHARE_GROUP_STATE_SUMMARY: return 
createReadShareGroupStateSummaryRequest(version);
+            case STREAMS_HEARTBEAT: return 
createStreamsHeartbeatRequest(version);
+            case STREAMS_INITIALIZE: return 
createStreamsInitializeRequest(version);
             default: throw new IllegalArgumentException("Unknown API key " + 
apikey);
         }
     }
@@ -1217,6 +1227,8 @@ public class RequestResponseTest {
             case WRITE_SHARE_GROUP_STATE: return 
createWriteShareGroupStateResponse();
             case DELETE_SHARE_GROUP_STATE: return 
createDeleteShareGroupStateResponse();
             case READ_SHARE_GROUP_STATE_SUMMARY: return 
createReadShareGroupStateSummaryResponse();
+            case STREAMS_HEARTBEAT: return createStreamsHeartbeatResponse();
+            case STREAMS_INITIALIZE: return createStreamsInitializeResponse();
             default: throw new IllegalArgumentException("Unknown API key " + 
apikey);
         }
     }
@@ -4015,6 +4027,37 @@ public class RequestResponseTest {
         return new ReadShareGroupStateSummaryResponse(data);
     }
 
+    private AbstractRequest createStreamsPrepareAssignmentRequest(final short 
version) {
+        return new StreamsPrepareAssignmentRequest.Builder(new 
StreamsPrepareAssignmentRequestData()).build(version);
+    }
+
+    private AbstractRequest createStreamsInstallAssignmentRequest(final short 
version) {
+        return new StreamsInstallAssignmentRequest.Builder(new 
StreamsInstallAssignmentRequestData()).build(version);
+    }
+
+    private AbstractRequest createStreamsInitializeRequest(final short 
version) {
+        return new StreamsInitializeRequest.Builder(new 
StreamsInitializeRequestData()).build(version);
+    }
+
+    private AbstractRequest createStreamsHeartbeatRequest(final short version) 
{
+        return new StreamsHeartbeatRequest.Builder(new 
StreamsHeartbeatRequestData()).build(version);
+    }
+
+    private AbstractResponse createStreamsPrepareAssignmentResponse() {
+        return new StreamsPrepareAssignmentResponse(new 
StreamsPrepareAssignmentResponseData());
+    }
+
+    private AbstractResponse createStreamsInstallAssignmentResponse() {
+        return new StreamsInstallAssignmentResponse(new 
StreamsInstallAssignmentResponseData());
+    }
+
+    private AbstractResponse createStreamsInitializeResponse() {
+        return new StreamsInitializeResponse(new 
StreamsInitializeResponseData());
+    }
+
+    private AbstractResponse createStreamsHeartbeatResponse() {
+        return new StreamsHeartbeatResponse(new 
StreamsHeartbeatResponseData());
+
     @Test
     public void testInvalidSaslHandShakeRequest() {
         AbstractRequest request = new SaslHandshakeRequest.Builder(

Reply via email to