This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 286314d72aefe139192e2b28b7351a75872079b0 Author: Lucas Brutschy <[email protected]> AuthorDate: Wed May 29 16:33:54 2024 +0200 Define initial record types for the consumer offset topic The goal is to define initial record types for storing group metadata. The aim is roughly match the data represented in the RPCs and the KIP-848 records. There is an extra record for the topology that is called when StreamsInitialize is called. See https://github.com/lucasbru/kafka/pull/14 target assignment and topology current assignment member metadata names and version initial copy --- .../internals/StreamsInitializeRequestManager.java | 2 +- .../common/message/StreamsInitializeRequest.json | 6 +- .../StreamsGroupCurrentMemberAssignmentKey.json | 28 +++++++++ .../StreamsGroupCurrentMemberAssignmentValue.json | 46 +++++++++++++++ .../message/StreamsGroupMemberMetadataKey.json | 28 +++++++++ .../message/StreamsGroupMemberMetadataValue.json | 69 ++++++++++++++++++++++ .../common/message/StreamsGroupMetadataKey.json | 26 ++++++++ .../common/message/StreamsGroupMetadataValue.json | 26 ++++++++ .../message/StreamsGroupPartitionMetadataKey.json | 26 ++++++++ .../StreamsGroupPartitionMetadataValue.json | 40 +++++++++++++ .../StreamsGroupTargetAssignmentMemberKey.json | 28 +++++++++ .../StreamsGroupTargetAssignmentMemberValue.json | 38 ++++++++++++ .../StreamsGroupTargetAssignmentMetadataKey.json | 26 ++++++++ .../StreamsGroupTargetAssignmentMetadataValue.json | 26 ++++++++ .../common/message/StreamsGroupTopologyKey.json | 26 ++++++++ .../common/message/StreamsGroupTopologyValue.json | 31 ++++------ 16 files changed, 448 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java index 5d19d8a8e50..2bc4f79f548 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInitializeRequestManager.java @@ -122,7 +122,7 @@ public class StreamsInitializeRequestManager implements RequestManager { logger.error("Error during Streams initialization: ", exception); } else { // todo: handle success - logger.info("Streams initialization successful", exception); + logger.info("Streams initialization successful"); } } } diff --git a/clients/src/main/resources/common/message/StreamsInitializeRequest.json b/clients/src/main/resources/common/message/StreamsInitializeRequest.json index c031ba5a11f..7f18cd455d0 100644 --- a/clients/src/main/resources/common/message/StreamsInitializeRequest.json +++ b/clients/src/main/resources/common/message/StreamsInitializeRequest.json @@ -26,12 +26,8 @@ { "name": "Topology", "type": "[]Subtopology", "versions": "0+", "about": "The sub-topologies of the streams application.", "fields": [ - // We are using strings here, to avoid fixing a structure on task IDs which will - // allow introducing stable naming for subtopologies in the future. { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "String to uniquely the sub-topology. Deterministically generated from the topology" }, - { "name": "Partitions", "type": "int32", "versions": "0+", - "about": "The number of partitions expected for all source topics and changelog topics." }, + "about": "String to uniquely identify the subtopology." }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, { "name": "SinkTopics", "type": "[]string", "versions": "0+", diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json new file mode 100644 index 00000000000..6d8cb6b906c --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentKey.json @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupCurrentMemberAssignmentKey", + "validVersions": "14", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "8", + "about": "The group id." }, + { "name": "MemberId", "type": "string", "versions": "8", + "about": "The member id." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json new file mode 100644 index 00000000000..d79e58cba75 --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupCurrentMemberAssignmentValue.json @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupCurrentMemberAssignmentValue", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "MemberEpoch", "versions": "0+", "type": "int32", + "about": "The current member epoch that is expected from the member in the heartbeat request." }, + { "name": "PreviousMemberEpoch", "versions": "0+", "type": "int32", + "about": "If the last epoch bump is lost before reaching the member, the member will retry with the previous epoch." }, + { "name": "State", "versions": "0+", "type": "int8", + "about": "The member state. See StreamsGroupMember.MemberState for the possible values." }, + { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskId", + "about": "Currently assigned active tasks for this streams client." }, + { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskId", + "about": "Currently assigned standby tasks for this streams client." }, + { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskId", + "about": "Currently assigned warming up tasks for this streams client." }, + { "name": "ActiveTasksPendingRevocation", "versions": "0+", "type": "[]TaskId", + "about": "The active tasks that must be revoked by this member." } + ], + "commonStructs": [ + { "name": "TaskId", "versions": "0+", "fields": [ + { "name": "Subtopology", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partitions of the input topics processed by this member." } + ]} + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json new file mode 100644 index 00000000000..bc54bc8010b --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataKey.json @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupMemberMetadataKey", + "validVersions": "11", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "5", + "about": "The group id." }, + { "name": "MemberId", "type": "string", "versions": "5", + "about": "The member id." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json new file mode 100644 index 00000000000..eab6f18310e --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMemberMetadataValue.json @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupMemberMetadataValue", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "InstanceId", "versions": "0+", "nullableVersions": "0+", "type": "string", + "about": "The (optional) instance id." }, + { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string", + "about": "The (optional) rack id." }, + { "name": "ClientId", "versions": "0+", "type": "string", + "about": "The client id." }, + { "name": "ClientHost", "versions": "0+", "type": "string", + "about": "The client host." }, + { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1, + "about": "The rebalance timeout." }, + + { "name": "TopologyHash", "type": "bytes", "versions": "0+", + "about": "The hash of the topology. " }, + { "name": "Assignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": null, + "about": "The desired assignor. If set to null, the vote goes towards the broker taking a pick." }, + + { "name": "ProcessId", "type": "string", "versions": "0+", + "about": "Identity of the streams instance that may have multiple consumers." }, + { "name": "HostInfo", "type": "HostInfo", "versions": "0+", + "about": "Host information for running interactive queries on this instance.", + "fields": [ + { "name": "Host", "type": "string", "versions": "0+", + "about": "Host for running interactive queries on this instance." }, + { "name": "Port", "type": "int32", "versions": "0+", + "about": "Port for running interactive queries on this instance." } + ] + }, + // TODO: maybe it would be nice to just stuff this info into "AssignmentConfigs", since it's quite specific + { "name": "ClientTags", "type": "[]KeyValue", "versions": "0+", + "about": "Used for rack-aware assignment algorithm." }, + + { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null", + "about": "Opaque user data to be passed to the client-side assignor. Null for server-side assignors." }, + { "name": "AssignmentConfigs", "type": "[]KeyValue", "versions": "0+", + "about": "Generic array of assignment configuration strings." } + ], + "commonStructs": [ + { "name": "KeyValue", "versions": "0+", + "fields": [ + { "name": "Key", "type": "string", "versions": "0+", + "about": "key of the config" }, + { "name": "Value", "type": "string", "versions": "0+", + "about": "value of the config" } + ] + } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json new file mode 100644 index 00000000000..0ad8fbce88b --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataKey.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupMetadataKey", + "validVersions": "9", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "3", + "about": "The group id." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json new file mode 100644 index 00000000000..7d63be55eff --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupMetadataValue.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupMetadataValue", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "Epoch", "versions": "0+", "type": "int32", + "about": "The group epoch." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json new file mode 100644 index 00000000000..52b879dbcd1 --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataKey.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupPartitionMetadataKey", + "validVersions": "10", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "4", + "about": "The group id." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json new file mode 100644 index 00000000000..a208aa8769c --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupPartitionMetadataValue.json @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupPartitionMetadataValue", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "Topics", "versions": "0+", "type": "[]TopicMetadata", + "about": "The list of topic metadata.", "fields": [ + { "name": "TopicId", "versions": "0+", "type": "uuid", + "about": "The topic id." }, + { "name": "TopicName", "versions": "0+", "type": "string", + "about": "The topic name." }, + { "name": "NumPartitions", "versions": "0+", "type": "int32", + "about": "The number of partitions of the topic." }, + { "name": "PartitionMetadata", "versions": "0+", "type": "[]PartitionMetadata", + "about": "Partitions mapped to a set of racks. If the rack information is unavailable for all the partitions, an empty list is stored", "fields": [ + { "name": "Partition", "versions": "0+", "type": "int32", + "about": "The partition number." }, + { "name": "Racks", "versions": "0+", "type": "[]string", + "about": "The set of racks that the partition is mapped to." } + ]} + ]} + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json new file mode 100644 index 00000000000..6faac03a9e2 --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberKey.json @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupTargetAssignmentMemberKey", + "validVersions": "13", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "7", + "about": "The group id." }, + { "name": "MemberId", "type": "string", "versions": "7", + "about": "The member id." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json new file mode 100644 index 00000000000..38526e4381c --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMemberValue.json @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupTargetAssignmentMemberValue", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ActiveTasks", "versions": "0+", "type": "[]TaskId", + "about": "Currently assigned active tasks for this streams client." }, + { "name": "StandbyTasks", "versions": "0+", "type": "[]TaskId", + "about": "Currently assigned standby tasks for this streams client." }, + { "name": "WarmupTasks", "versions": "0+", "type": "[]TaskId", + "about": "Currently assigned warming up tasks for this streams client." } + ], + "commonStructs": [ + { "name": "TaskId", "versions": "0+", "fields": [ + { "name": "Subtopology", "type": "string", "versions": "0+", + "about": "The subtopology identifier." }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partitions of the input topics processed by this member." } + ]} + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json new file mode 100644 index 00000000000..91f58c07f64 --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataKey.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupTargetAssignmentMetadataKey", + "validVersions": "12", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "6", + "about": "The group id." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json new file mode 100644 index 00000000000..b35fc7e1dda --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTargetAssignmentMetadataValue.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupTargetAssignmentMetadataValue", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "AssignmentEpoch", "versions": "0+", "type": "int32", + "about": "The assignment epoch." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json new file mode 100644 index 00000000000..0e72d8c3372 --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyKey.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "StreamsGroupTopologyKey", + "validVersions": "15", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "3", + "about": "The group id." } + ] +} diff --git a/clients/src/main/resources/common/message/StreamsInitializeRequest.json b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json similarity index 68% copy from clients/src/main/resources/common/message/StreamsInitializeRequest.json copy to group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json index c031ba5a11f..74ced92af39 100644 --- a/clients/src/main/resources/common/message/StreamsInitializeRequest.json +++ b/group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json @@ -13,43 +13,38 @@ // See the License for the specific language governing permissions and // limitations under the License. +// The streams rebalance protocol is in development. This schema is subject to non-backwards-compatible changes. { - "apiKey": 77, - "type": "request", - "listeners": ["zkBroker", "broker"], - "name": "StreamsInitializeRequest", + "type": "data", + "name": "StreamsGroupTopologyValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ - { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", - "about": "The group identifier." }, + { "name": "TopologyHash", "type": "bytes", "versions": "0+", + "about": "The hash of the topology. " }, { "name": "Topology", "type": "[]Subtopology", "versions": "0+", "about": "The sub-topologies of the streams application.", "fields": [ - // We are using strings here, to avoid fixing a structure on task IDs which will - // allow introducing stable naming for subtopologies in the future. { "name": "Subtopology", "type": "string", "versions": "0+", - "about": "String to uniquely the sub-topology. Deterministically generated from the topology" }, - { "name": "Partitions", "type": "int32", "versions": "0+", - "about": "The number of partitions expected for all source topics and changelog topics." }, + "about": "String to uniquely identify the subtopology." }, { "name": "SourceTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology reads from." }, { "name": "SinkTopics", "type": "[]string", "versions": "0+", "about": "The topics the topology writes to." }, { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": "0+", - "about": "The set of state changelog topics associated with this subtopology. Created automatically." }, + "about": "The set of state changelog topics associated with this subtopology. " }, { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", "versions": "0+", - "about": "The set of source topics that are internally created repartition topics. Created automatically." } + "about": "The set of source topics that are internally created repartition topics. " } ] } ], "commonStructs": [ { "name": "TopicConfig", "versions": "0+", "fields": [ - { "name": "key", "type": "string", "versions": "0+", - "about": "The key of the topic-level configuration." }, - { "name": "value", "type": "string", "versions": "0+", - "about": "The value of the topic-level configuration," } - ] + { "name": "key", "type": "string", "versions": "0+", + "about": "The key of the topic-level configuration." }, + { "name": "value", "type": "string", "versions": "0+", + "about": "The value of the topic-level configuration," } + ] }, { "name": "TopicInfo", "versions": "0+", "fields": [ { "name": "Name", "type": "string", "versions": "0+",
