This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f8ecdc9a716165d2c541a27ed3953d6118928380
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Apr 26 13:12:15 2024 +0200

    Reslove merge conflict from 11/25 trunk rebase - StreamsInitialize RPC and 
some errors
    
    Initial RPCs for StreamsInitialize.
    
    See https://github.com/lucasbru/kafka/pull/7
---
 .../StreamsInconsistentTopologyException.java      | 23 ++++++
 .../errors/StreamsInvalidTopologyException.java    | 23 ++++++
 .../StreamsMissingInternalTopicsException.java     | 23 ++++++
 .../StreamsMissingSourceTopicsException.java       | 23 ++++++
 .../StreamsShutdownApplicationException.java       | 23 ++++++
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  5 +-
 .../org/apache/kafka/common/protocol/Errors.java   | 17 ++++-
 .../common/requests/StreamsHeartbeatRequest.java   | 88 ++++++++++++++++++++++
 .../common/requests/StreamsHeartbeatResponse.java  | 78 +++++++++++++++++++
 .../common/requests/StreamsInitializeRequest.java  | 62 +++++++++++++++
 .../common/requests/StreamsInitializeResponse.java | 55 ++++++++++++++
 .../common/message/StreamsHeartbeatRequest.json    | 37 +++++----
 .../common/message/StreamsHeartbeatResponse.json   |  9 +--
 .../common/message/StreamsInitializeRequest.json   | 64 ++++++++++++++++
 .../common/message/StreamsInitializeResponse.json  | 37 +++++++++
 15 files changed, 547 insertions(+), 20 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentTopologyException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentTopologyException.java
new file mode 100644
index 00000000000..d5d590ceb10
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentTopologyException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class StreamsInconsistentTopologyException extends ApiException {
+    public StreamsInconsistentTopologyException(String message) {
+        super(message);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyException.java
new file mode 100644
index 00000000000..28a5c8ab77d
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class StreamsInvalidTopologyException extends ApiException {
+    public StreamsInvalidTopologyException(String message) {
+        super(message);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java
new file mode 100644
index 00000000000..ffd0b63d8e3
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class StreamsMissingInternalTopicsException extends ApiException {
+    public StreamsMissingInternalTopicsException(String message) {
+        super(message);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingSourceTopicsException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingSourceTopicsException.java
new file mode 100644
index 00000000000..6a347837701
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingSourceTopicsException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class StreamsMissingSourceTopicsException extends ApiException {
+    public StreamsMissingSourceTopicsException(String message) {
+        super(message);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StreamsShutdownApplicationException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsShutdownApplicationException.java
new file mode 100644
index 00000000000..e578c985a68
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StreamsShutdownApplicationException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class StreamsShutdownApplicationException extends ApiException {
+    public StreamsShutdownApplicationException(String message) {
+        super(message);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index e95882be699..fa9600d8536 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -130,7 +130,10 @@ public enum ApiKeys {
     READ_SHARE_GROUP_STATE(ApiMessageType.READ_SHARE_GROUP_STATE, true),
     WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true),
     DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true),
-    
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, 
true);
+    
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, 
true),
+    STREAMS_HEARTBEAT(ApiMessageType.STREAMS_HEARTBEAT),
+    STREAMS_INITIALIZE(ApiMessageType.STREAMS_INITIALIZE);
+    
 
     private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> 
APIS_BY_LISTENER =
         new EnumMap<>(ApiMessageType.ListenerType.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 309ae7bc86a..6246c4b4d14 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -20,6 +20,11 @@ import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
+import org.apache.kafka.common.errors.StreamsInconsistentTopologyException;
+import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
+import org.apache.kafka.common.errors.StreamsMissingInternalTopicsException;
+import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException;
+import org.apache.kafka.common.errors.StreamsShutdownApplicationException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.ConcurrentTransactionsException;
 import org.apache.kafka.common.errors.ControllerMovedException;
@@ -413,7 +418,17 @@ public enum Errors {
     DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", 
DuplicateVoterException::new),
     VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", 
VoterNotFoundException::new),
     INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", 
InvalidRegularExpression::new),
-    REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should 
rebootstrap to obtain new metadata.", RebootstrapRequiredException::new);
+    REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should 
rebootstrap to obtain new metadata.", RebootstrapRequiredException::new),
+    STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.",
+        StreamsInvalidTopologyException::new),
+    STREAMS_INCONSISTENT_TOPOLOGY(131, "The topology hash supplied is 
inconsistent with the topology for this consumer group.",
+        StreamsInconsistentTopologyException::new),
+    STREAMS_MISSING_SOURCE_TOPICS(132, "One or more source topics are 
missing.",
+        StreamsMissingSourceTopicsException::new),
+    STREAMS_MISSING_INTERNAL_TOPICS(133, "One or more internal topics are 
missing.",
+        StreamsMissingInternalTopicsException::new),
+    STREAMS_SHUTDOWN_APPLICATION(134, "A client requested the shutdown of the 
whole application.",
+        StreamsShutdownApplicationException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatRequest.java
new file mode 100644
index 00000000000..0e552934c4f
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatRequest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.StreamsHeartbeatRequestData;
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class StreamsHeartbeatRequest extends AbstractRequest {
+
+    /**
+     * A member epoch of <code>-1</code> means that the member wants to leave 
the group.
+     */
+    public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
+    public static final int LEAVE_GROUP_STATIC_MEMBER_EPOCH = -2;
+
+    /**
+     * A member epoch of <code>0</code> means that the member wants to join 
the group.
+     */
+    public static final int JOIN_GROUP_MEMBER_EPOCH = 0;
+
+    public static class Builder extends 
AbstractRequest.Builder<StreamsHeartbeatRequest> {
+        private final StreamsHeartbeatRequestData data;
+
+        public Builder(StreamsHeartbeatRequestData data) {
+            this(data, false);
+        }
+
+        public Builder(StreamsHeartbeatRequestData data, boolean 
enableUnstableLastVersion) {
+            super(ApiKeys.STREAMS_HEARTBEAT, enableUnstableLastVersion);
+            this.data = data;
+        }
+
+        @Override
+        public StreamsHeartbeatRequest build(short version) {
+            return new StreamsHeartbeatRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final StreamsHeartbeatRequestData data;
+
+    public StreamsHeartbeatRequest(StreamsHeartbeatRequestData data, short 
version) {
+        super(ApiKeys.STREAMS_HEARTBEAT, version);
+        this.data = data;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new StreamsHeartbeatResponse(
+            new StreamsHeartbeatResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode(Errors.forException(e).code())
+        );
+    }
+
+    @Override
+    public StreamsHeartbeatRequestData data() {
+        return data;
+    }
+
+    public static StreamsHeartbeatRequest parse(ByteBuffer buffer, short 
version) {
+        return new StreamsHeartbeatRequest(new StreamsHeartbeatRequestData(
+            new ByteBufferAccessor(buffer), version), version);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java
new file mode 100644
index 00000000000..1e8c7a5b7e9
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.StreamsHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Possible error codes.
+ *
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#UNKNOWN_MEMBER_ID}
+ * - {@link Errors#FENCED_MEMBER_EPOCH}
+ * - {@link Errors#UNSUPPORTED_ASSIGNOR}
+ * - {@link Errors#UNRELEASED_INSTANCE_ID}
+ * - {@link Errors#GROUP_MAX_SIZE_REACHED}
+ * - {@link Errors#STREAMS_SHUTDOWN_APPLICATION}
+ * - {@link Errors#STREAMS_INCONSISTENT_TOPOLOGY}
+ * - {@link Errors#STREAMS_MISSING_SOURCE_TOPICS}
+ * - {@link Errors#STREAMS_MISSING_INTERNAL_TOPICS}
+ */
+public class StreamsHeartbeatResponse extends AbstractResponse {
+    private final StreamsHeartbeatResponseData data;
+
+    public StreamsHeartbeatResponse(StreamsHeartbeatResponseData data) {
+        super(ApiKeys.STREAMS_HEARTBEAT);
+        this.data = data;
+    }
+
+    @Override
+    public StreamsHeartbeatResponseData data() {
+        return data;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
+    public static StreamsHeartbeatResponse parse(ByteBuffer buffer, short 
version) {
+        return new StreamsHeartbeatResponse(new StreamsHeartbeatResponseData(
+            new ByteBufferAccessor(buffer), version));
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
new file mode 100644
index 00000000000..04072dd8565
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java
@@ -0,0 +1,62 @@
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.StreamsInitializeRequestData;
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class StreamsInitializeRequest extends AbstractRequest {
+
+    public static class Builder extends 
AbstractRequest.Builder<StreamsInitializeRequest> {
+        private final StreamsInitializeRequestData data;
+
+        public Builder(StreamsInitializeRequestData data) {
+            this(data, false);
+        }
+
+        public Builder(StreamsInitializeRequestData data, boolean 
enableUnstableLastVersion) {
+            super(ApiKeys.STREAMS_INITIALIZE, enableUnstableLastVersion);
+            this.data = data;
+        }
+
+        @Override
+        public StreamsInitializeRequest build(short version) {
+            return new StreamsInitializeRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final StreamsInitializeRequestData data;
+
+    public StreamsInitializeRequest(StreamsInitializeRequestData data, short 
version) {
+        super(ApiKeys.STREAMS_HEARTBEAT, version);
+        this.data = data;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new StreamsInitializeResponse(
+            new StreamsInitializeResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode(Errors.forException(e).code())
+        );
+    }
+
+    @Override
+    public StreamsInitializeRequestData data() {
+        return data;
+    }
+
+    public static StreamsInitializeRequest parse(ByteBuffer buffer, short 
version) {
+        return new StreamsInitializeRequest(new StreamsInitializeRequestData(
+            new ByteBufferAccessor(buffer), version), version);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
new file mode 100644
index 00000000000..7b7d19961de
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java
@@ -0,0 +1,55 @@
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.StreamsInitializeResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Possible error codes.
+ *
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#STREAMS_INVALID_TOPOLOGY}
+ */
+public class StreamsInitializeResponse extends AbstractResponse {
+
+    private final StreamsInitializeResponseData data;
+
+    public StreamsInitializeResponse(StreamsInitializeResponseData data) {
+        super(ApiKeys.STREAMS_HEARTBEAT);
+        this.data = data;
+    }
+
+    @Override
+    public StreamsInitializeResponseData data() {
+        return data;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
+    public static StreamsInitializeResponse parse(ByteBuffer buffer, short 
version) {
+        return new StreamsInitializeResponse(new StreamsInitializeResponseData(
+            new ByteBufferAccessor(buffer), version));
+    }
+}
diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
index a5f52e262f2..e21762e7e10 100644
--- a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json
@@ -48,9 +48,16 @@
 
     { "name": "ProcessId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Identity of the streams instance that may have multiple 
consumers. Null if unchanged since last heartbeat." },
-    { "name": "UserEndPoint", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-      "about": "End-point a user can use for IQ. Null if unchanged since last 
heartbeat." },
-    { "name": "ClientTags", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+    { "name": "HostInfo", "type": "HostInfo", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Host information for running interactive queries on this 
instance. Null if unchanged since last heartbeat.",
+      "fields": [
+        { "name": "Host", "type": "string", "versions": "0+",
+          "about": "Host for running interactive queries on this instance." },
+        { "name": "Port", "type": "int32", "versions": "0+",
+          "about": "Port for running interactive queries on this instance." }
+      ]
+    },
+    { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "Used for rack-aware assignment algorithm. Null if unchanged 
since last heartbeat." },
 
     { "name": "TaskLagUpdateReason", "type":  "int8", "versions": "0+", 
"default": 0,
@@ -64,15 +71,11 @@
           "about": "The cumulative lag for the task" }
       ]
     },
-    { "name": "AssignmentConfigs", "type": "[]AssignmentConfig", "versions": 
"0+","nullableVersions": "0+", "default": "null",
-      "about": "Generic array of assignment configuration strings. Null if 
unchanged since last heartbeat.",
-      "fields": [
-        { "name": "Key", "type": "string", "versions": "0+",
-          "about": "key of the config" },
-        { "name": "Value", "type": "string", "versions": "0+",
-          "about": "value of the config" }
-      ]
-    }
+    { "name": "AssignmentConfigs", "type": "[]KeyValue", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "Generic array of assignment configuration strings. Null if 
unchanged since last heartbeat."
+    },
+    { "name": "ShutdownApplication", "type": "bool", "versions": "0+", 
"default": false,
+      "about": "Whether all Streams clients in the group should shut down." }
   ],
   "commonStructs": [
     { "name": "TaskId", "versions": "0+", "fields": [
@@ -82,6 +85,14 @@
         "about": "subtopology ID" },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
         "about": "partitions" }
-    ]}
+    ]},
+    { "name": "KeyValue", "versions": "0+",
+      "fields": [
+        { "name": "Key", "type": "string", "versions": "0+",
+          "about": "key of the config" },
+        { "name": "Value", "type": "string", "versions": "0+",
+          "about": "value of the config" }
+      ]
+    }
   ]
 }
diff --git 
a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json 
b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
index 0ab1b0f0ea0..272a5a4cd51 100644
--- a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
+++ b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json
@@ -30,11 +30,10 @@
   // - UNSUPPORTED_ASSIGNOR (version 0+)
   // - UNRELEASED_INSTANCE_ID (version 0+)
   // - GROUP_MAX_SIZE_REACHED (version 0+)
-
-  // Streams-specific errors:
-  // - INCONSISTENT_TOPOLOGY (version 0+)
-  // - MISSING_SOURCE_TOPICS (version 0+)
-  // - MISSING_INTERNAL_TOPICS (version 0+)
+  // - STREAMS_INCONSISTENT_TOPOLOGY (version 0+)
+  // - STREAMS_MISSING_SOURCE_TOPICS (version 0+)
+  // - STREAMS_MISSING_INTERNAL_TOPICS (version 0+)
+  // - STREAMS_SHUTDOWN_APPLICATION (version 0+)
   "fields": [
     // Same as consumer group heart beat
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/StreamsInitializeRequest.json 
b/clients/src/main/resources/common/message/StreamsInitializeRequest.json
new file mode 100644
index 00000000000..c031ba5a11f
--- /dev/null
+++ b/clients/src/main/resources/common/message/StreamsInitializeRequest.json
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 77,
+  "type": "request",
+  "listeners": ["zkBroker", "broker"],
+  "name": "StreamsInitializeRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
+      "about": "The group identifier." },
+    { "name":  "Topology", "type": "[]Subtopology", "versions": "0+",
+      "about": "The sub-topologies of the streams application.",
+      "fields": [
+        // We are using strings here, to avoid fixing a structure on task IDs 
which will
+        // allow introducing stable naming for subtopologies in the future.
+        { "name": "Subtopology", "type": "string", "versions": "0+",
+          "about": "String to uniquely the sub-topology. Deterministically 
generated from the topology" },
+        { "name": "Partitions", "type": "int32", "versions": "0+",
+          "about": "The number of partitions expected for all source topics 
and changelog topics." },
+        { "name": "SourceTopics", "type": "[]string", "versions": "0+",
+          "about": "The topics the topology reads from." },
+        { "name": "SinkTopics", "type": "[]string", "versions": "0+",
+          "about": "The topics the topology writes to." },
+        { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": 
"0+",
+          "about": "The set of state changelog topics associated with this 
subtopology. Created automatically." },
+        { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", 
"versions": "0+",
+          "about": "The set of source topics that are internally created 
repartition topics. Created automatically." }
+      ]
+    }
+  ],
+  "commonStructs": [
+    { "name": "TopicConfig", "versions": "0+", "fields": [
+        { "name": "key", "type": "string", "versions": "0+",
+          "about": "The key of the topic-level configuration." },
+        { "name": "value", "type": "string", "versions": "0+",
+          "about": "The value of the topic-level configuration," }
+      ]
+    },
+    { "name": "TopicInfo", "versions": "0+", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+",
+        "about": "The name of the topic." },
+      { "name": "Partitions", "type": "int32", "versions": "0+",
+        "about": "The number of partitions in the topic. Can be 0 if no 
specific number of partitions is enforced. Always 0 for changelog topics." },
+      { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+        "about": "Topic-level configurations as key-value pairs."
+      }
+    ]}
+  ]
+}
diff --git 
a/clients/src/main/resources/common/message/StreamsInitializeResponse.json 
b/clients/src/main/resources/common/message/StreamsInitializeResponse.json
new file mode 100644
index 00000000000..ff73577303e
--- /dev/null
+++ b/clients/src/main/resources/common/message/StreamsInitializeResponse.json
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 77,
+  "type": "response",
+  "name": "StreamsInitializeResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED (version 0+)
+  // - NOT_COORDINATOR (version 0+)
+  // - COORDINATOR_NOT_AVAILABLE (version 0+)
+  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+  // - INVALID_REQUEST (version 0+)
+  // - STREAMS_INVALID_TOPOLOGY (version 0+)
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top-level error code, or 0 if there was no error" },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+      "about": "The top-level error message, or null if there was no error." }
+  ]
+}

Reply via email to