This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8f687ac150 KAFKA-15661: KIP-951: protocol changes (#14627)
c8f687ac150 is described below

commit c8f687ac1505456cb568de2b60df235eb1ceb5f0
Author: Crispin Bernier <[email protected]>
AuthorDate: Tue Oct 31 20:16:11 2023 -0400

    KAFKA-15661: KIP-951: protocol changes (#14627)
    
    Separating out the protocol changes from #14444 in an effort to more 
quickly unblock the client side PR.
    
    This is the protocol changes to populate the fields in KIP-951. On 
NOT_LEADER_OR_FOLLOWER errors in both FETCH and PRODUCE the new leader ID and 
epoch are included in the response. The endpoint for the new leader is 
retrieved from the metadata cache. The new fields are all optional (tagged) and 
an IBP bump is required.
    
    
https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client
    
    Reviewers: Justine Olshan <[email protected]>, Mayank Shekhar Narula 
<[email protected]>
---
 .../resources/common/message/FetchRequest.json     |  4 +++-
 .../resources/common/message/FetchResponse.json    | 13 +++++++++++-
 .../resources/common/message/ProduceRequest.json   |  4 +++-
 .../resources/common/message/ProduceResponse.json  | 24 +++++++++++++++++++---
 .../kafka/server/common/MetadataVersion.java       |  4 +++-
 5 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/resources/common/message/FetchRequest.json 
b/clients/src/main/resources/common/message/FetchRequest.json
index 295cbf3aa82..4f1b7b17ad3 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -53,7 +53,9 @@
   //
   // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
   // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
-  "validVersions": "0-15",
+  //
+  // Version 16 is the same as version 15 (KIP-951).
+  "validVersions": "0-16",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
diff --git a/clients/src/main/resources/common/message/FetchResponse.json 
b/clients/src/main/resources/common/message/FetchResponse.json
index 366e702cfbd..e5f49ba6fde 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -45,7 +45,9 @@
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException (KIP-405)
   //
   // Version 15 is the same as version 14 (KIP-903).
-  "validVersions": "0-15",
+  //
+  // Version 16 adds the 'NodeEndpoints' field (KIP-951).
+  "validVersions": "0-16",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
@@ -102,6 +104,15 @@
           "about": "The preferred read replica for the consumer to use on its 
next fetch request"},
         { "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0+", "about": "The record data."}
       ]}
+    ]},
+    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", 
"taggedVersions": "16+", "tag": 0,
+      "about": "Endpoints for all current-leaders enumerated in PartitionData, 
with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "16+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the 
associated node."},
+      { "name": "Host", "type": "string", "versions": "16+", "about": "The 
node's hostname." },
+      { "name": "Port", "type": "int32", "versions": "16+", "about": "The 
node's port." },
+      { "name": "Rack", "type": "string", "versions": "16+", 
"nullableVersions": "16+", "default": "null",
+        "about": "The rack of the node, or null if it has not been assigned to 
a rack." }
     ]}
   ]
 }
diff --git a/clients/src/main/resources/common/message/ProduceRequest.json 
b/clients/src/main/resources/common/message/ProduceRequest.json
index 82a168e63cc..6b2d909ab84 100644
--- a/clients/src/main/resources/common/message/ProduceRequest.json
+++ b/clients/src/main/resources/common/message/ProduceRequest.json
@@ -33,7 +33,9 @@
   // Starting in Version 8, response has RecordErrors and ErrorMessage. See 
KIP-467.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 is the same as version 9 (KIP-951).
+  "validVersions": "0-10",
   "flexibleVersions": "9+",
   "fields": [
     { "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "3+", "default": "null", "entityType": "transactionalId",
diff --git a/clients/src/main/resources/common/message/ProduceResponse.json 
b/clients/src/main/resources/common/message/ProduceResponse.json
index 0c47f6d938e..d294fb8aa2e 100644
--- a/clients/src/main/resources/common/message/ProduceResponse.json
+++ b/clients/src/main/resources/common/message/ProduceResponse.json
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields 
(KIP-951)
+  "validVersions": "0-10",
   "flexibleVersions": "9+",
   "fields": [
     { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
@@ -59,10 +61,26 @@
             "about": "The error message of the record that caused the batch to 
be dropped"}
         ]},
         { "name":  "ErrorMessage", "type": "string", "default": "null", 
"versions": "8+", "nullableVersions": "8+", "ignorable":  true,
-          "about":  "The global error message summarizing the common root 
cause of the records that caused the batch to be dropped"}
+          "about":  "The global error message summarizing the common root 
cause of the records that caused the batch to be dropped"},
+        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": 
"10+", "taggedVersions": "10+", "tag": 0, "fields": [
+          { "name": "LeaderId", "type": "int32", "versions": "10+", "default": 
"-1", "entityType": "brokerId",
+            "about": "The ID of the current leader or -1 if the leader is 
unknown."},
+          { "name": "LeaderEpoch", "type": "int32", "versions": "10+", 
"default": "-1", "about": "The latest known leader epoch"}
+        ]}
       ]}
     ]},
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true, "default": "0",
-      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." }
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "10+", 
"taggedVersions": "10+", "tag": 0,
+      "about": "Endpoints for all current-leaders enumerated in 
PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "10+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the 
associated node."},
+      { "name": "Host", "type": "string", "versions": "10+",
+        "about": "The node's hostname." },
+      { "name": "Port", "type": "int32", "versions": "10+",
+        "about": "The node's port." },
+      { "name": "Rack", "type": "string", "versions": "10+", 
"nullableVersions": "10+", "default": "null",
+        "about": "The rack of the node, or null if it has not been assigned to 
a rack." }
+    ]}
   ]
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 8f22ed582a6..ee3c3fdd23c 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -360,7 +360,9 @@ public enum MetadataVersion {
     }
 
     public short fetchRequestVersion() {
-        if (this.isAtLeast(IBP_3_5_IV1)) {
+        if (this.isAtLeast(IBP_3_7_IV0)) {
+            return 16;
+        } else if (this.isAtLeast(IBP_3_5_IV1)) {
             return 15;
         } else if (this.isAtLeast(IBP_3_5_IV0)) {
             return 14;

Reply via email to