This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8f687ac150 KAFKA-15661: KIP-951: protocol changes (#14627)
c8f687ac150 is described below
commit c8f687ac1505456cb568de2b60df235eb1ceb5f0
Author: Crispin Bernier <[email protected]>
AuthorDate: Tue Oct 31 20:16:11 2023 -0400
KAFKA-15661: KIP-951: protocol changes (#14627)
Separating out the protocol changes from #14444 in an effort to more
quickly unblock the client side PR.
This is the protocol changes to populate the fields in KIP-951. On
NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and
epoch are included in the response. The endpoint for the new leader is
retrieved from the metadata cache. The new fields are all optional (tagged) and
an IBP bump is required.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client
Reviewers: Justine Olshan <[email protected]>, Mayank Shekhar Narula
<[email protected]>
---
.../resources/common/message/FetchRequest.json | 4 +++-
.../resources/common/message/FetchResponse.json | 13 +++++++++++-
.../resources/common/message/ProduceRequest.json | 4 +++-
.../resources/common/message/ProduceResponse.json | 24 +++++++++++++++++++---
.../kafka/server/common/MetadataVersion.java | 4 +++-
5 files changed, 42 insertions(+), 7 deletions(-)
diff --git a/clients/src/main/resources/common/message/FetchRequest.json
b/clients/src/main/resources/common/message/FetchRequest.json
index 295cbf3aa82..4f1b7b17ad3 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -53,7 +53,9 @@
//
// Version 15 adds the ReplicaState which includes new field ReplicaEpoch
and the ReplicaId. Also,
// deprecate the old ReplicaId field and set its default value to -1.
(KIP-903)
- "validVersions": "0-15",
+ //
+ // Version 16 is the same as version 15 (KIP-951).
+ "validVersions": "0-16",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+",
"nullableVersions": "12+", "default": "null",
diff --git a/clients/src/main/resources/common/message/FetchResponse.json
b/clients/src/main/resources/common/message/FetchResponse.json
index 366e702cfbd..e5f49ba6fde 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -45,7 +45,9 @@
// Version 14 is the same as version 13 but it also receives a new error
called OffsetMovedToTieredStorageException (KIP-405)
//
// Version 15 is the same as version 14 (KIP-903).
- "validVersions": "0-15",
+ //
+ // Version 16 adds the 'NodeEndpoints' field (KIP-951).
+ "validVersions": "0-16",
"flexibleVersions": "12+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+",
"ignorable": true,
@@ -102,6 +104,15 @@
"about": "The preferred read replica for the consumer to use on its
next fetch request"},
{ "name": "Records", "type": "records", "versions": "0+",
"nullableVersions": "0+", "about": "The record data."}
]}
+ ]},
+ { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+",
"taggedVersions": "16+", "tag": 0,
+ "about": "Endpoints for all current-leaders enumerated in PartitionData,
with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [
+ { "name": "NodeId", "type": "int32", "versions": "16+",
+ "mapKey": true, "entityType": "brokerId", "about": "The ID of the
associated node."},
+ { "name": "Host", "type": "string", "versions": "16+", "about": "The
node's hostname." },
+ { "name": "Port", "type": "int32", "versions": "16+", "about": "The
node's port." },
+ { "name": "Rack", "type": "string", "versions": "16+",
"nullableVersions": "16+", "default": "null",
+ "about": "The rack of the node, or null if it has not been assigned to
a rack." }
]}
]
}
diff --git a/clients/src/main/resources/common/message/ProduceRequest.json
b/clients/src/main/resources/common/message/ProduceRequest.json
index 82a168e63cc..6b2d909ab84 100644
--- a/clients/src/main/resources/common/message/ProduceRequest.json
+++ b/clients/src/main/resources/common/message/ProduceRequest.json
@@ -33,7 +33,9 @@
// Starting in Version 8, response has RecordErrors and ErrorMessage. See
KIP-467.
//
// Version 9 enables flexible versions.
- "validVersions": "0-9",
+ //
+ // Version 10 is the same as version 9 (KIP-951).
+ "validVersions": "0-10",
"flexibleVersions": "9+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "3+",
"nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
diff --git a/clients/src/main/resources/common/message/ProduceResponse.json
b/clients/src/main/resources/common/message/ProduceResponse.json
index 0c47f6d938e..d294fb8aa2e 100644
--- a/clients/src/main/resources/common/message/ProduceResponse.json
+++ b/clients/src/main/resources/common/message/ProduceResponse.json
@@ -32,7 +32,9 @@
// records that cause the whole batch to be dropped. See KIP-467 for
details.
//
// Version 9 enables flexible versions.
- "validVersions": "0-9",
+ //
+ // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields
(KIP-951)
+ "validVersions": "0-10",
"flexibleVersions": "9+",
"fields": [
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
@@ -59,10 +61,26 @@
"about": "The error message of the record that caused the batch to
be dropped"}
]},
{ "name": "ErrorMessage", "type": "string", "default": "null",
"versions": "8+", "nullableVersions": "8+", "ignorable": true,
- "about": "The global error message summarizing the common root
cause of the records that caused the batch to be dropped"}
+ "about": "The global error message summarizing the common root
cause of the records that caused the batch to be dropped"},
+ { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions":
"10+", "taggedVersions": "10+", "tag": 0, "fields": [
+ { "name": "LeaderId", "type": "int32", "versions": "10+", "default":
"-1", "entityType": "brokerId",
+ "about": "The ID of the current leader or -1 if the leader is
unknown."},
+ { "name": "LeaderEpoch", "type": "int32", "versions": "10+",
"default": "-1", "about": "The latest known leader epoch"}
+ ]}
]}
]},
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+",
"ignorable": true, "default": "0",
- "about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." }
+ "about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
+ { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+",
"taggedVersions": "10+", "tag": 0,
+ "about": "Endpoints for all current-leaders enumerated in
PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.", "fields": [
+ { "name": "NodeId", "type": "int32", "versions": "10+",
+ "mapKey": true, "entityType": "brokerId", "about": "The ID of the
associated node."},
+ { "name": "Host", "type": "string", "versions": "10+",
+ "about": "The node's hostname." },
+ { "name": "Port", "type": "int32", "versions": "10+",
+ "about": "The node's port." },
+ { "name": "Rack", "type": "string", "versions": "10+",
"nullableVersions": "10+", "default": "null",
+ "about": "The rack of the node, or null if it has not been assigned to
a rack." }
+ ]}
]
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 8f22ed582a6..ee3c3fdd23c 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -360,7 +360,9 @@ public enum MetadataVersion {
}
public short fetchRequestVersion() {
- if (this.isAtLeast(IBP_3_5_IV1)) {
+ if (this.isAtLeast(IBP_3_7_IV0)) {
+ return 16;
+ } else if (this.isAtLeast(IBP_3_5_IV1)) {
return 15;
} else if (this.isAtLeast(IBP_3_5_IV0)) {
return 14;