This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8b30e4  MINOR: MetadataResponse#toStruct should serialize null 
leaders correctly. (#4449)
e8b30e4 is described below

commit e8b30e4d255dce455632d809f7bd54b04e6bc6fd
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Fri Jan 26 16:44:49 2018 -0800

    MINOR: MetadataResponse#toStruct should serialize null leaders correctly. 
(#4449)
    
    In MetadataResponse deserialization, if the partition leader key is set
    to -1, the leader is set to null.  The MetadataResponse#toStruct code
    should handle this correctly as well.
    
    Also fix a case in KafkaApis where we were not taking into account the
    possibility of the leader being null.
    
    RequestResponseTest should test this as well.
    
    Reviewers: Ismael Juma <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../java/org/apache/kafka/common/requests/MetadataResponse.java   | 8 ++++++--
 .../org/apache/kafka/common/requests/RequestResponseTest.java     | 3 +++
 core/src/main/scala/kafka/server/KafkaApis.scala                  | 1 +
 3 files changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 99c4ffb..cda3c07 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -464,6 +464,10 @@ public class MetadataResponse extends AbstractResponse {
             return partition;
         }
 
+        public int leaderId() {
+            return leader == null ? -1 : leader.id();
+        }
+
         public Node leader() {
             return leader;
         }
@@ -482,7 +486,7 @@ public class MetadataResponse extends AbstractResponse {
 
         @Override
         public String toString() {
-            return "(type=PartitionMetadata," +
+            return "(type=PartitionMetadata" +
                     ", error=" + error +
                     ", partition=" + partition +
                     ", leader=" + leader +
@@ -531,7 +535,7 @@ public class MetadataResponse extends AbstractResponse {
                 Struct partitionData = 
topicData.instance(PARTITION_METADATA_KEY_NAME);
                 partitionData.set(ERROR_CODE, partitionMetadata.error.code());
                 partitionData.set(PARTITION_ID, partitionMetadata.partition);
-                partitionData.set(LEADER_KEY_NAME, 
partitionMetadata.leader.id());
+                partitionData.set(LEADER_KEY_NAME, 
partitionMetadata.leaderId());
                 ArrayList<Integer> replicas = new 
ArrayList<>(partitionMetadata.replicas.size());
                 for (Node node : partitionMetadata.replicas)
                     replicas.add(node.id());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c18f5c2..2740616 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -702,6 +702,9 @@ public class RequestResponseTest {
                 asList(new MetadataResponse.PartitionMetadata(Errors.NONE, 1, 
node, replicas, isr, offlineReplicas))));
         allTopicMetadata.add(new 
MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
                 Collections.<MetadataResponse.PartitionMetadata>emptyList()));
+        allTopicMetadata.add(new MetadataResponse.TopicMetadata(Errors.NONE, 
"topic3", false,
+            asList(new 
MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE, 0, null,
+                replicas, isr, offlineReplicas))));
 
         return new MetadataResponse(asList(node), null, 
MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata);
     }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1ff75c0..13f5164 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1105,6 +1105,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
             .find(_.partition == partition)
             .map(_.leader)
+            .flatMap(p => Option(p))
 
           coordinatorEndpoint match {
             case Some(endpoint) if !endpoint.isEmpty =>

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to