This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e5b865d6bf KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if 
DescribeQuorum sent to non-leader (#12517)
e5b865d6bf is described below

commit e5b865d6bf37495e9949878c8206b9459aa5e1f4
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Aug 17 15:48:32 2022 -0700

    KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to 
non-leader (#12517)
    
    Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum` 
request is sent to a node that is not the current leader. In addition to being 
inconsistent with all of the other leader APIs in the raft layer, this error is 
treated as fatal by both the forwarding manager and the admin client. Instead, 
we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This 
error is retriable and we can rely on the admin client to retry it after seeing 
this error.
    
    Reviewers: David Jacot <[email protected]>
---
 .../common/requests/DescribeQuorumResponse.java    | 16 +++++++++++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 23 +++++++++++++++++
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  5 +++-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 29 ++++++++++++++++++++++
 .../apache/kafka/raft/RaftClientTestContext.java   | 27 ++++++++++----------
 5 files changed, 85 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
index cbf945b704..06ae681bc5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
@@ -37,7 +37,7 @@ import java.util.Map;
  * - {@link Errors#BROKER_NOT_AVAILABLE}
  *
  * Partition level errors:
- * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER}
  * - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
  */
 public class DescribeQuorumResponse extends AbstractResponse {
@@ -72,6 +72,19 @@ public class DescribeQuorumResponse extends AbstractResponse 
{
         return DEFAULT_THROTTLE_TIME;
     }
 
+    public static DescribeQuorumResponseData singletonErrorResponse(
+        TopicPartition topicPartition,
+        Errors error
+    ) {
+        return new DescribeQuorumResponseData()
+            .setTopics(Collections.singletonList(new 
DescribeQuorumResponseData.TopicData()
+                .setTopicName(topicPartition.topic())
+                .setPartitions(Collections.singletonList(new 
DescribeQuorumResponseData.PartitionData()
+                    .setPartitionIndex(topicPartition.partition())
+                    .setErrorCode(error.code())))));
+    }
+
+
     public static DescribeQuorumResponseData singletonResponse(TopicPartition 
topicPartition,
                                                                int leaderId,
                                                                int leaderEpoch,
@@ -82,6 +95,7 @@ public class DescribeQuorumResponse extends AbstractResponse {
             .setTopics(Collections.singletonList(new 
DescribeQuorumResponseData.TopicData()
                 .setTopicName(topicPartition.topic())
                 .setPartitions(Collections.singletonList(new 
DescribeQuorumResponseData.PartitionData()
+                    .setPartitionIndex(topicPartition.partition())
                     .setErrorCode(Errors.NONE.code())
                     .setLeaderId(leaderId)
                     .setLeaderEpoch(leaderEpoch)
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index de57813679..5faf53f075 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5192,6 +5192,29 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDescribeMetadataQuorumRetriableError() throws Exception {
+        try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+                ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+                ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+
+            // First request fails with a NOT_LEADER_OR_FOLLOWER error (which 
is retriable)
+            env.kafkaClient().prepareResponse(
+                body -> body instanceof DescribeQuorumRequest,
+                prepareDescribeQuorumResponse(Errors.NONE, 
Errors.NOT_LEADER_OR_FOLLOWER, false, false, false, false, false));
+
+            // The second request succeeds
+            env.kafkaClient().prepareResponse(
+                body -> body instanceof DescribeQuorumRequest,
+                prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false, 
false, false, false, false));
+
+            KafkaFuture<QuorumInfo> future = 
env.adminClient().describeMetadataQuorum().quorumInfo();
+            QuorumInfo quorumInfo = future.get();
+            assertEquals(defaultQuorumInfo(false), quorumInfo);
+        }
+    }
+
     @Test
     public void testDescribeMetadataQuorumFailure() {
         try (final AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index cac7a8a3cb..042a141a76 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1171,7 +1171,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> 
{
         }
 
         if (!quorum.isLeader()) {
-            return 
DescribeQuorumRequest.getTopLevelErrorResponse(Errors.INVALID_REQUEST);
+            return DescribeQuorumResponse.singletonErrorResponse(
+                log.topicPartition(),
+                Errors.NOT_LEADER_OR_FOLLOWER
+            );
         }
 
         LeaderState<T> leaderState = quorum.leaderStateOrThrow();
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 9b2771d2b3..a8a346e6db 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -20,6 +20,7 @@ import 
org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
 import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
 import org.apache.kafka.common.message.FetchResponseData;
@@ -1946,6 +1947,34 @@ public class KafkaRaftClientTest {
         );
     }
 
+    @Test
+    public void testDescribeQuorumNonLeader() throws Exception {
+        int localId = 0;
+        int voter2 = localId + 1;
+        int voter3 = localId + 2;
+        int epoch = 2;
+        Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .build();
+
+        
context.deliverRequest(DescribeQuorumRequest.singletonRequest(context.metadataPartition));
+        context.pollUntilResponse();
+
+        DescribeQuorumResponseData responseData = 
context.collectDescribeQuorumResponse();
+        assertEquals(Errors.NONE, Errors.forCode(responseData.errorCode()));
+
+        assertEquals(1, responseData.topics().size());
+        DescribeQuorumResponseData.TopicData topicData = 
responseData.topics().get(0);
+        assertEquals(context.metadataPartition.topic(), topicData.topicName());
+
+        assertEquals(1, topicData.partitions().size());
+        DescribeQuorumResponseData.PartitionData partitionData = 
topicData.partitions().get(0);
+        assertEquals(context.metadataPartition.partition(), 
partitionData.partitionIndex());
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, 
Errors.forCode(partitionData.errorCode()));
+    }
+
     @Test
     public void testDescribeQuorum() throws Exception {
         int localId = 0;
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index d48e41fb31..b825fc8867 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.raft;
 
-import java.util.function.Consumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.memory.MemoryPool;
@@ -57,8 +56,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.internals.BatchBuilder;
 import org.apache.kafka.raft.internals.StringSerde;
 import org.apache.kafka.server.common.serialization.RecordSerde;
-import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 
@@ -76,6 +75,7 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
@@ -128,7 +128,7 @@ public final class RaftClientTestContext {
         private final QuorumStateStore quorumStateStore = new 
MockQuorumStateStore();
         private final MockableRandom random = new MockableRandom(1L);
         private final LogContext logContext = new LogContext();
-        private final MockLog log = new MockLog(METADATA_PARTITION,  
Uuid.METADATA_TOPIC_ID, logContext);
+        private final MockLog log = new MockLog(METADATA_PARTITION, 
Uuid.METADATA_TOPIC_ID, logContext);
         private final Set<Integer> voters;
         private final OptionalInt localId;
 
@@ -440,21 +440,24 @@ public final class RaftClientTestContext {
         assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), 
quorumStateStore.readElectionState());
     }
 
-    int assertSentDescribeQuorumResponse(
-        int leaderId,
-        int leaderEpoch,
-        long highWatermark,
-        List<ReplicaState> voterStates,
-        List<ReplicaState> observerStates
-    ) {
+    DescribeQuorumResponseData collectDescribeQuorumResponse() {
         List<RaftResponse.Outbound> sentMessages = 
drainSentResponses(ApiKeys.DESCRIBE_QUORUM);
         assertEquals(1, sentMessages.size());
         RaftResponse.Outbound raftMessage = sentMessages.get(0);
         assertTrue(
             raftMessage.data() instanceof DescribeQuorumResponseData,
             "Unexpected request type " + raftMessage.data());
-        DescribeQuorumResponseData response = (DescribeQuorumResponseData) 
raftMessage.data();
+        return (DescribeQuorumResponseData) raftMessage.data();
+    }
 
+    void assertSentDescribeQuorumResponse(
+        int leaderId,
+        int leaderEpoch,
+        long highWatermark,
+        List<ReplicaState> voterStates,
+        List<ReplicaState> observerStates
+    ) {
+        DescribeQuorumResponseData response = collectDescribeQuorumResponse();
         DescribeQuorumResponseData expectedResponse = 
DescribeQuorumResponse.singletonResponse(
             metadataPartition,
             leaderId,
@@ -462,9 +465,7 @@ public final class RaftClientTestContext {
             highWatermark,
             voterStates,
             observerStates);
-
         assertEquals(expectedResponse, response);
-        return raftMessage.correlationId();
     }
 
     int assertSentVoteRequest(int epoch, int lastEpoch, long lastEpochOffset, 
int numVoteReceivers) {

Reply via email to