This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 8f22f7a618193c11fcb5024ba8c382539831061e Author: Lucas Brutschy <[email protected]> AuthorDate: Tue May 28 18:17:03 2024 +0200 Minor: Test fixes Fixed some tests and some actual bugs. Added missing request parsing. See https://github.com/lucasbru/kafka/pull/13 --- .../internals/StreamsInitializeRequestManager.java | 10 ++--- .../kafka/common/requests/AbstractRequest.java | 4 ++ .../kafka/common/requests/AbstractResponse.java | 4 ++ .../common/requests/StreamsInitializeRequest.java | 2 +- .../common/requests/StreamsInitializeResponse.java | 2 +- .../StreamsInitializeRequestManagerTest.java | 4 +- .../kafka/common/requests/RequestResponseTest.java | 43 ++++++++++++++++++++++ 7 files changed, 60 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java index 34f9fee96f7..5d19d8a8e50 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java @@ -76,16 +76,16 @@ public class StreamsInitializeRequestManager implements RequestManager { } private List<StreamsInitializeRequestData.Subtopology> getTopologyFromStreams() { - final Map<String, StreamsAssignmentInterface.SubTopology> subTopologyMap = streamsAssignmentInterface.subtopologyMap(); + final Map<String, StreamsAssignmentInterface.Subtopology> subTopologyMap = streamsAssignmentInterface.subtopologyMap(); final List<StreamsInitializeRequestData.Subtopology> topology = new ArrayList<>(subTopologyMap.size()); - for (final Map.Entry<String, StreamsAssignmentInterface.SubTopology> subtopology : subTopologyMap.entrySet()) { + for (final Map.Entry<String, StreamsAssignmentInterface.Subtopology> subtopology : subTopologyMap.entrySet()) { topology.add(getSubtopologyFromStreams(subtopology.getKey(), subtopology.getValue())); } return topology; } private static StreamsInitializeRequestData.Subtopology getSubtopologyFromStreams(final String subtopologyName, - final StreamsAssignmentInterface.SubTopology subtopology) { + final StreamsAssignmentInterface.Subtopology subtopology) { final StreamsInitializeRequestData.Subtopology subtopologyData = new StreamsInitializeRequestData.Subtopology(); subtopologyData.setSubtopology(subtopologyName); subtopologyData.setSourceTopics(new ArrayList<>(subtopology.sourceTopics)); @@ -95,7 +95,7 @@ public class StreamsInitializeRequestManager implements RequestManager { return subtopologyData; } - private static List<StreamsInitializeRequestData.TopicInfo> getRepartitionTopicsInfoFromStreams(final StreamsAssignmentInterface.SubTopology subtopologyDataFromStreams) { + private static List<StreamsInitializeRequestData.TopicInfo> getRepartitionTopicsInfoFromStreams(final StreamsAssignmentInterface.Subtopology subtopologyDataFromStreams) { final List<StreamsInitializeRequestData.TopicInfo> repartitionTopicsInfo = new ArrayList<>(); for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> repartitionTopic : subtopologyDataFromStreams.repartitionSourceTopics.entrySet()) { final StreamsInitializeRequestData.TopicInfo repartitionTopicInfo = new StreamsInitializeRequestData.TopicInfo(); @@ -106,7 +106,7 @@ public class StreamsInitializeRequestManager implements RequestManager { return repartitionTopicsInfo; } - private static List<StreamsInitializeRequestData.TopicInfo> getChangelogTopicsInfoFromStreams(final StreamsAssignmentInterface.SubTopology subtopologyDataFromStreams) { + private static List<StreamsInitializeRequestData.TopicInfo> getChangelogTopicsInfoFromStreams(final StreamsAssignmentInterface.Subtopology subtopologyDataFromStreams) { final List<StreamsInitializeRequestData.TopicInfo> changelogTopicsInfo = new ArrayList<>(); for (final Map.Entry<String, StreamsAssignmentInterface.TopicInfo> changelogTopic : subtopologyDataFromStreams.stateChangelogTopics.entrySet()) { final StreamsInitializeRequestData.TopicInfo changelogTopicInfo = new StreamsInitializeRequestData.TopicInfo(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 87a7a826869..e0cb5e97b5d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -350,6 +350,10 @@ public abstract class AbstractRequest implements AbstractRequestResponse { return DeleteShareGroupStateRequest.parse(buffer, apiVersion); case READ_SHARE_GROUP_STATE_SUMMARY: return ReadShareGroupStateSummaryRequest.parse(buffer, apiVersion); + case STREAMS_HEARTBEAT: + return StreamsHeartbeatRequest.parse(buffer, apiVersion); + case STREAMS_INITIALIZE: + return StreamsInitializeRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 83f29471ba3..cc3d3384a27 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -287,6 +287,10 @@ public abstract class AbstractResponse implements AbstractRequestResponse { return DeleteShareGroupStateResponse.parse(responseBuffer, version); case READ_SHARE_GROUP_STATE_SUMMARY: return ReadShareGroupStateSummaryResponse.parse(responseBuffer, version); + case STREAMS_HEARTBEAT: + return StreamsHeartbeatResponse.parse(responseBuffer, version); + case STREAMS_INITIALIZE: + return StreamsInitializeResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java index 83761d805de..5f150053798 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java @@ -52,7 +52,7 @@ public class StreamsInitializeRequest extends AbstractRequest { private final StreamsInitializeRequestData data; public StreamsInitializeRequest(StreamsInitializeRequestData data, short version) { - super(ApiKeys.STREAMS_HEARTBEAT, version); + super(ApiKeys.STREAMS_INITIALIZE, version); this.data = data; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java index 81c281aba98..050b28b02af 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java @@ -40,7 +40,7 @@ public class StreamsInitializeResponse extends AbstractResponse { private final StreamsInitializeResponseData data; public StreamsInitializeResponse(StreamsInitializeResponseData data) { - super(ApiKeys.STREAMS_HEARTBEAT); + super(ApiKeys.STREAMS_INITIALIZE); this.data = data; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java index bb734a7b1f5..6aebd48efec 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManagerTest.java @@ -80,9 +80,9 @@ class StreamsInitializeRequestManagerTest { mkEntry("changelogTopic2", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())), mkEntry("changelogTopic3", new StreamsAssignmentInterface.TopicInfo(Optional.empty(), Collections.emptyMap())) ); - final StreamsAssignmentInterface.SubTopology subtopology1 = new StreamsAssignmentInterface.SubTopology( - sinkTopics, + final StreamsAssignmentInterface.Subtopology subtopology1 = new StreamsAssignmentInterface.Subtopology( sourceTopics, + sinkTopics, repartitionTopics, changelogTopics ); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index cb6a2458261..da4ae96af47 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -234,6 +234,14 @@ import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaPartitionState; import org.apache.kafka.common.message.StopReplicaRequestData.StopReplicaTopicState; import org.apache.kafka.common.message.StopReplicaResponseData; +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; +import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData; +import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; +import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData; +import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -1123,6 +1131,8 @@ public class RequestResponseTest { case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateRequest(version); case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateRequest(version); case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryRequest(version); + case STREAMS_HEARTBEAT: return createStreamsHeartbeatRequest(version); + case STREAMS_INITIALIZE: return createStreamsInitializeRequest(version); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -1217,6 +1227,8 @@ public class RequestResponseTest { case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateResponse(); case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateResponse(); case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryResponse(); + case STREAMS_HEARTBEAT: return createStreamsHeartbeatResponse(); + case STREAMS_INITIALIZE: return createStreamsInitializeResponse(); default: throw new IllegalArgumentException("Unknown API key " + apikey); } } @@ -4015,6 +4027,37 @@ public class RequestResponseTest { return new ReadShareGroupStateSummaryResponse(data); } + private AbstractRequest createStreamsPrepareAssignmentRequest(final short version) { + return new StreamsPrepareAssignmentRequest.Builder(new StreamsPrepareAssignmentRequestData()).build(version); + } + + private AbstractRequest createStreamsInstallAssignmentRequest(final short version) { + return new StreamsInstallAssignmentRequest.Builder(new StreamsInstallAssignmentRequestData()).build(version); + } + + private AbstractRequest createStreamsInitializeRequest(final short version) { + return new StreamsInitializeRequest.Builder(new StreamsInitializeRequestData()).build(version); + } + + private AbstractRequest createStreamsHeartbeatRequest(final short version) { + return new StreamsHeartbeatRequest.Builder(new StreamsHeartbeatRequestData()).build(version); + } + + private AbstractResponse createStreamsPrepareAssignmentResponse() { + return new StreamsPrepareAssignmentResponse(new StreamsPrepareAssignmentResponseData()); + } + + private AbstractResponse createStreamsInstallAssignmentResponse() { + return new StreamsInstallAssignmentResponse(new StreamsInstallAssignmentResponseData()); + } + + private AbstractResponse createStreamsInitializeResponse() { + return new StreamsInitializeResponse(new StreamsInitializeResponseData()); + } + + private AbstractResponse createStreamsHeartbeatResponse() { + return new StreamsHeartbeatResponse(new StreamsHeartbeatResponseData()); + @Test public void testInvalidSaslHandShakeRequest() { AbstractRequest request = new SaslHandshakeRequest.Builder(
