This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit f8ecdc9a716165d2c541a27ed3953d6118928380 Author: Lucas Brutschy <[email protected]> AuthorDate: Fri Apr 26 13:12:15 2024 +0200 Reslove merge conflict from 11/25 trunk rebase - StreamsInitialize RPC and some errors Initial RPCs for StreamsInitialize. See https://github.com/lucasbru/kafka/pull/7 --- .../StreamsInconsistentTopologyException.java | 23 ++++++ .../errors/StreamsInvalidTopologyException.java | 23 ++++++ .../StreamsMissingInternalTopicsException.java | 23 ++++++ .../StreamsMissingSourceTopicsException.java | 23 ++++++ .../StreamsShutdownApplicationException.java | 23 ++++++ .../org/apache/kafka/common/protocol/ApiKeys.java | 5 +- .../org/apache/kafka/common/protocol/Errors.java | 17 ++++- .../common/requests/StreamsHeartbeatRequest.java | 88 ++++++++++++++++++++++ .../common/requests/StreamsHeartbeatResponse.java | 78 +++++++++++++++++++ .../common/requests/StreamsInitializeRequest.java | 62 +++++++++++++++ .../common/requests/StreamsInitializeResponse.java | 55 ++++++++++++++ .../common/message/StreamsHeartbeatRequest.json | 37 +++++---- .../common/message/StreamsHeartbeatResponse.json | 9 +-- .../common/message/StreamsInitializeRequest.json | 64 ++++++++++++++++ .../common/message/StreamsInitializeResponse.json | 37 +++++++++ 15 files changed, 547 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentTopologyException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentTopologyException.java new file mode 100644 index 00000000000..d5d590ceb10 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInconsistentTopologyException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class StreamsInconsistentTopologyException extends ApiException { + public StreamsInconsistentTopologyException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyException.java new file mode 100644 index 00000000000..28a5c8ab77d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidTopologyException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class StreamsInvalidTopologyException extends ApiException { + public StreamsInvalidTopologyException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java new file mode 100644 index 00000000000..ffd0b63d8e3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class StreamsMissingInternalTopicsException extends ApiException { + public StreamsMissingInternalTopicsException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingSourceTopicsException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingSourceTopicsException.java new file mode 100644 index 00000000000..6a347837701 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingSourceTopicsException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class StreamsMissingSourceTopicsException extends ApiException { + public StreamsMissingSourceTopicsException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsShutdownApplicationException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsShutdownApplicationException.java new file mode 100644 index 00000000000..e578c985a68 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsShutdownApplicationException.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class StreamsShutdownApplicationException extends ApiException { + public StreamsShutdownApplicationException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index e95882be699..fa9600d8536 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -130,7 +130,10 @@ public enum ApiKeys { READ_SHARE_GROUP_STATE(ApiMessageType.READ_SHARE_GROUP_STATE, true), WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true), DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true), - READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true); + READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true), + STREAMS_HEARTBEAT(ApiMessageType.STREAMS_HEARTBEAT), + STREAMS_INITIALIZE(ApiMessageType.STREAMS_INITIALIZE); + private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 309ae7bc86a..6246c4b4d14 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -20,6 +20,11 @@ import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; import org.apache.kafka.common.errors.BrokerNotAvailableException; +import org.apache.kafka.common.errors.StreamsInconsistentTopologyException; +import org.apache.kafka.common.errors.StreamsInvalidTopologyException; +import org.apache.kafka.common.errors.StreamsMissingInternalTopicsException; +import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException; +import org.apache.kafka.common.errors.StreamsShutdownApplicationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ConcurrentTransactionsException; import org.apache.kafka.common.errors.ControllerMovedException; @@ -413,7 +418,17 @@ public enum Errors { DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", DuplicateVoterException::new), VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", VoterNotFoundException::new), INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", InvalidRegularExpression::new), - REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should rebootstrap to obtain new metadata.", RebootstrapRequiredException::new); + REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should rebootstrap to obtain new metadata.", RebootstrapRequiredException::new), + STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", + StreamsInvalidTopologyException::new), + STREAMS_INCONSISTENT_TOPOLOGY(131, "The topology hash supplied is inconsistent with the topology for this consumer group.", + StreamsInconsistentTopologyException::new), + STREAMS_MISSING_SOURCE_TOPICS(132, "One or more source topics are missing.", + StreamsMissingSourceTopicsException::new), + STREAMS_MISSING_INTERNAL_TOPICS(133, "One or more internal topics are missing.", + StreamsMissingInternalTopicsException::new), + STREAMS_SHUTDOWN_APPLICATION(134, "A client requested the shutdown of the whole application.", + StreamsShutdownApplicationException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatRequest.java new file mode 100644 index 00000000000..0e552934c4f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatRequest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class StreamsHeartbeatRequest extends AbstractRequest { + + /** + * A member epoch of <code>-1</code> means that the member wants to leave the group. + */ + public static final int LEAVE_GROUP_MEMBER_EPOCH = -1; + public static final int LEAVE_GROUP_STATIC_MEMBER_EPOCH = -2; + + /** + * A member epoch of <code>0</code> means that the member wants to join the group. + */ + public static final int JOIN_GROUP_MEMBER_EPOCH = 0; + + public static class Builder extends AbstractRequest.Builder<StreamsHeartbeatRequest> { + private final StreamsHeartbeatRequestData data; + + public Builder(StreamsHeartbeatRequestData data) { + this(data, false); + } + + public Builder(StreamsHeartbeatRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.STREAMS_HEARTBEAT, enableUnstableLastVersion); + this.data = data; + } + + @Override + public StreamsHeartbeatRequest build(short version) { + return new StreamsHeartbeatRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final StreamsHeartbeatRequestData data; + + public StreamsHeartbeatRequest(StreamsHeartbeatRequestData data, short version) { + super(ApiKeys.STREAMS_HEARTBEAT, version); + this.data = data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new StreamsHeartbeatResponse( + new StreamsHeartbeatResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(Errors.forException(e).code()) + ); + } + + @Override + public StreamsHeartbeatRequestData data() { + return data; + } + + public static StreamsHeartbeatRequest parse(ByteBuffer buffer, short version) { + return new StreamsHeartbeatRequest(new StreamsHeartbeatRequestData( + new ByteBufferAccessor(buffer), version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java new file mode 100644 index 00000000000..1e8c7a5b7e9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#UNKNOWN_MEMBER_ID} + * - {@link Errors#FENCED_MEMBER_EPOCH} + * - {@link Errors#UNSUPPORTED_ASSIGNOR} + * - {@link Errors#UNRELEASED_INSTANCE_ID} + * - {@link Errors#GROUP_MAX_SIZE_REACHED} + * - {@link Errors#STREAMS_SHUTDOWN_APPLICATION} + * - {@link Errors#STREAMS_INCONSISTENT_TOPOLOGY} + * - {@link Errors#STREAMS_MISSING_SOURCE_TOPICS} + * - {@link Errors#STREAMS_MISSING_INTERNAL_TOPICS} + */ +public class StreamsHeartbeatResponse extends AbstractResponse { + private final StreamsHeartbeatResponseData data; + + public StreamsHeartbeatResponse(StreamsHeartbeatResponseData data) { + super(ApiKeys.STREAMS_HEARTBEAT); + this.data = data; + } + + @Override + public StreamsHeartbeatResponseData data() { + return data; + } + + @Override + public Map<Errors, Integer> errorCounts() { + return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static StreamsHeartbeatResponse parse(ByteBuffer buffer, short version) { + return new StreamsHeartbeatResponse(new StreamsHeartbeatResponseData( + new ByteBufferAccessor(buffer), version)); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java new file mode 100644 index 00000000000..04072dd8565 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java @@ -0,0 +1,62 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsInitializeRequestData; +import org.apache.kafka.common.message.StreamsInitializeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class StreamsInitializeRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder<StreamsInitializeRequest> { + private final StreamsInitializeRequestData data; + + public Builder(StreamsInitializeRequestData data) { + this(data, false); + } + + public Builder(StreamsInitializeRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.STREAMS_INITIALIZE, enableUnstableLastVersion); + this.data = data; + } + + @Override + public StreamsInitializeRequest build(short version) { + return new StreamsInitializeRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final StreamsInitializeRequestData data; + + public StreamsInitializeRequest(StreamsInitializeRequestData data, short version) { + super(ApiKeys.STREAMS_HEARTBEAT, version); + this.data = data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new StreamsInitializeResponse( + new StreamsInitializeResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(Errors.forException(e).code()) + ); + } + + @Override + public StreamsInitializeRequestData data() { + return data; + } + + public static StreamsInitializeRequest parse(ByteBuffer buffer, short version) { + return new StreamsInitializeRequest(new StreamsInitializeRequestData( + new ByteBufferAccessor(buffer), version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java new file mode 100644 index 00000000000..7b7d19961de --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java @@ -0,0 +1,55 @@ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.StreamsInitializeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +/** + * Possible error codes. + * + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#STREAMS_INVALID_TOPOLOGY} + */ +public class StreamsInitializeResponse extends AbstractResponse { + + private final StreamsInitializeResponseData data; + + public StreamsInitializeResponse(StreamsInitializeResponseData data) { + super(ApiKeys.STREAMS_HEARTBEAT); + this.data = data; + } + + @Override + public StreamsInitializeResponseData data() { + return data; + } + + @Override + public Map<Errors, Integer> errorCounts() { + return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public void maybeSetThrottleTimeMs(int throttleTimeMs) { + data.setThrottleTimeMs(throttleTimeMs); + } + + public static StreamsInitializeResponse parse(ByteBuffer buffer, short version) { + return new StreamsInitializeResponse(new StreamsInitializeResponseData( + new ByteBufferAccessor(buffer), version)); + } +} diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json index a5f52e262f2..e21762e7e10 100644 --- a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json @@ -48,9 +48,16 @@ { "name": "ProcessId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Identity of the streams instance that may have multiple consumers. Null if unchanged since last heartbeat." }, - { "name": "UserEndPoint", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null", - "about": "End-point a user can use for IQ. Null if unchanged since last heartbeat." }, - { "name": "ClientTags", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", + { "name": "HostInfo", "type": "HostInfo", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Host information for running interactive queries on this instance. Null if unchanged since last heartbeat.", + "fields": [ + { "name": "Host", "type": "string", "versions": "0+", + "about": "Host for running interactive queries on this instance." }, + { "name": "Port", "type": "int32", "versions": "0+", + "about": "Port for running interactive queries on this instance." } + ] + }, + { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Used for rack-aware assignment algorithm. Null if unchanged since last heartbeat." }, { "name": "TaskLagUpdateReason", "type": "int8", "versions": "0+", "default": 0, @@ -64,15 +71,11 @@ "about": "The cumulative lag for the task" } ] }, - { "name": "AssignmentConfigs", "type": "[]AssignmentConfig", "versions": "0+","nullableVersions": "0+", "default": "null", - "about": "Generic array of assignment configuration strings. Null if unchanged since last heartbeat.", - "fields": [ - { "name": "Key", "type": "string", "versions": "0+", - "about": "key of the config" }, - { "name": "Value", "type": "string", "versions": "0+", - "about": "value of the config" } - ] - } + { "name": "AssignmentConfigs", "type": "[]KeyValue", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Generic array of assignment configuration strings. Null if unchanged since last heartbeat." + }, + { "name": "ShutdownApplication", "type": "bool", "versions": "0+", "default": false, + "about": "Whether all Streams clients in the group should shut down." } ], "commonStructs": [ { "name": "TaskId", "versions": "0+", "fields": [ @@ -82,6 +85,14 @@ "about": "subtopology ID" }, { "name": "Partitions", "type": "[]int32", "versions": "0+", "about": "partitions" } - ]} + ]}, + { "name": "KeyValue", "versions": "0+", + "fields": [ + { "name": "Key", "type": "string", "versions": "0+", + "about": "key of the config" }, + { "name": "Value", "type": "string", "versions": "0+", + "about": "value of the config" } + ] + } ] } diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json index 0ab1b0f0ea0..272a5a4cd51 100644 --- a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json @@ -30,11 +30,10 @@ // - UNSUPPORTED_ASSIGNOR (version 0+) // - UNRELEASED_INSTANCE_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) - - // Streams-specific errors: - // - INCONSISTENT_TOPOLOGY (version 0+) - // - MISSING_SOURCE_TOPICS (version 0+) - // - MISSING_INTERNAL_TOPICS (version 0+) + // - STREAMS_INCONSISTENT_TOPOLOGY (version 0+) + // - STREAMS_MISSING_SOURCE_TOPICS (version 0+) + // - STREAMS_MISSING_INTERNAL_TOPICS (version 0+) + // - STREAMS_SHUTDOWN_APPLICATION (version 0+) "fields": [ // Same as consumer group heart beat { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", diff --git a/clients/src/main/resources/common/message/StreamsInitializeRequest.json b/clients/src/main/resources/common/message/StreamsInitializeRequest.json new file mode 100644 index 00000000000..c031ba5a11f --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsInitializeRequest.json @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 77, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "StreamsInitializeRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group identifier." }, + { "name": "Topology", "type": "[]Subtopology", "versions": "0+", + "about": "The sub-topologies of the streams application.", + "fields": [ + // We are using strings here, to avoid fixing a structure on task IDs which will + // allow introducing stable naming for subtopologies in the future. + { "name": "Subtopology", "type": "string", "versions": "0+", + "about": "String to uniquely the sub-topology. Deterministically generated from the topology" }, + { "name": "Partitions", "type": "int32", "versions": "0+", + "about": "The number of partitions expected for all source topics and changelog topics." }, + { "name": "SourceTopics", "type": "[]string", "versions": "0+", + "about": "The topics the topology reads from." }, + { "name": "SinkTopics", "type": "[]string", "versions": "0+", + "about": "The topics the topology writes to." }, + { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", + "about": "The set of state changelog topics associated with this subtopology. Created automatically." }, + { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", + "about": "The set of source topics that are internally created repartition topics. Created automatically." } + ] + } + ], + "commonStructs": [ + { "name": "TopicConfig", "versions": "0+", "fields": [ + { "name": "key", "type": "string", "versions": "0+", + "about": "The key of the topic-level configuration." }, + { "name": "value", "type": "string", "versions": "0+", + "about": "The value of the topic-level configuration," } + ] + }, + { "name": "TopicInfo", "versions": "0+", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", + "about": "The name of the topic." }, + { "name": "Partitions", "type": "int32", "versions": "0+", + "about": "The number of partitions in the topic. Can be 0 if no specific number of partitions is enforced. Always 0 for changelog topics." }, + { "name": "TopicConfigs", "type": "[]TopicConfig", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Topic-level configurations as key-value pairs." + } + ]} + ] +} diff --git a/clients/src/main/resources/common/message/StreamsInitializeResponse.json b/clients/src/main/resources/common/message/StreamsInitializeResponse.json new file mode 100644 index 00000000000..ff73577303e --- /dev/null +++ b/clients/src/main/resources/common/message/StreamsInitializeResponse.json @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 77, + "type": "response", + "name": "StreamsInitializeResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - INVALID_REQUEST (version 0+) + // - STREAMS_INVALID_TOPOLOGY (version 0+) + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "The top-level error message, or null if there was no error." } + ] +}
