This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 0ac9c3498c KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if
DescribeQuorum sent to non-leader (#12517)
0ac9c3498c is described below
commit 0ac9c3498c761167db1ead46d89dea71b29981d6
Author: Jason Gustafson <[email protected]>
AuthorDate: Wed Aug 17 15:48:32 2022 -0700
KAFKA-13940; Return NOT_LEADER_OR_FOLLOWER if DescribeQuorum sent to
non-leader (#12517)
Currently the server will return `INVALID_REQUEST` if a `DescribeQuorum`
request is sent to a node that is not the current leader. In addition to being
inconsistent with all of the other leader APIs in the raft layer, this error is
treated as fatal by both the forwarding manager and the admin client. Instead,
we should return `NOT_LEADER_OR_FOLLOWER` as we do with the other APIs. This
error is retriable and we can rely on the admin client to retry it after seeing
this error.
Reviewers: David Jacot <[email protected]>
---
.../common/requests/DescribeQuorumResponse.java | 16 +++++++++++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 23 +++++++++++++++++
.../org/apache/kafka/raft/KafkaRaftClient.java | 5 +++-
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 29 ++++++++++++++++++++++
.../apache/kafka/raft/RaftClientTestContext.java | 27 ++++++++++----------
5 files changed, 85 insertions(+), 15 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
index cbf945b704..06ae681bc5 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
@@ -37,7 +37,7 @@ import java.util.Map;
* - {@link Errors#BROKER_NOT_AVAILABLE}
*
* Partition level errors:
- * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER}
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
*/
public class DescribeQuorumResponse extends AbstractResponse {
@@ -72,6 +72,19 @@ public class DescribeQuorumResponse extends AbstractResponse
{
return DEFAULT_THROTTLE_TIME;
}
+ public static DescribeQuorumResponseData singletonErrorResponse(
+ TopicPartition topicPartition,
+ Errors error
+ ) {
+ return new DescribeQuorumResponseData()
+ .setTopics(Collections.singletonList(new
DescribeQuorumResponseData.TopicData()
+ .setTopicName(topicPartition.topic())
+ .setPartitions(Collections.singletonList(new
DescribeQuorumResponseData.PartitionData()
+ .setPartitionIndex(topicPartition.partition())
+ .setErrorCode(error.code())))));
+ }
+
+
public static DescribeQuorumResponseData singletonResponse(TopicPartition
topicPartition,
int leaderId,
int leaderEpoch,
@@ -82,6 +95,7 @@ public class DescribeQuorumResponse extends AbstractResponse {
.setTopics(Collections.singletonList(new
DescribeQuorumResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(Collections.singletonList(new
DescribeQuorumResponseData.PartitionData()
+ .setPartitionIndex(topicPartition.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index de57813679..5faf53f075 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5192,6 +5192,29 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testDescribeMetadataQuorumRetriableError() throws Exception {
+ try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id,
+ ApiKeys.DESCRIBE_QUORUM.oldestVersion(),
+ ApiKeys.DESCRIBE_QUORUM.latestVersion()));
+
+ // First request fails with a NOT_LEADER_OR_FOLLOWER error (which
is retriable)
+ env.kafkaClient().prepareResponse(
+ body -> body instanceof DescribeQuorumRequest,
+ prepareDescribeQuorumResponse(Errors.NONE,
Errors.NOT_LEADER_OR_FOLLOWER, false, false, false, false, false));
+
+ // The second request succeeds
+ env.kafkaClient().prepareResponse(
+ body -> body instanceof DescribeQuorumRequest,
+ prepareDescribeQuorumResponse(Errors.NONE, Errors.NONE, false,
false, false, false, false));
+
+ KafkaFuture<QuorumInfo> future =
env.adminClient().describeMetadataQuorum().quorumInfo();
+ QuorumInfo quorumInfo = future.get();
+ assertEquals(defaultQuorumInfo(false), quorumInfo);
+ }
+ }
+
@Test
public void testDescribeMetadataQuorumFailure() {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index cac7a8a3cb..042a141a76 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1171,7 +1171,10 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
}
if (!quorum.isLeader()) {
- return
DescribeQuorumRequest.getTopLevelErrorResponse(Errors.INVALID_REQUEST);
+ return DescribeQuorumResponse.singletonErrorResponse(
+ log.topicPartition(),
+ Errors.NOT_LEADER_OR_FOLLOWER
+ );
}
LeaderState<T> leaderState = quorum.leaderStateOrThrow();
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 9b2771d2b3..a8a346e6db 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -20,6 +20,7 @@ import
org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchResponseData;
@@ -1946,6 +1947,34 @@ public class KafkaRaftClientTest {
);
}
+ @Test
+ public void testDescribeQuorumNonLeader() throws Exception {
+ int localId = 0;
+ int voter2 = localId + 1;
+ int voter3 = localId + 2;
+ int epoch = 2;
+ Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withUnknownLeader(epoch)
+ .build();
+
+
context.deliverRequest(DescribeQuorumRequest.singletonRequest(context.metadataPartition));
+ context.pollUntilResponse();
+
+ DescribeQuorumResponseData responseData =
context.collectDescribeQuorumResponse();
+ assertEquals(Errors.NONE, Errors.forCode(responseData.errorCode()));
+
+ assertEquals(1, responseData.topics().size());
+ DescribeQuorumResponseData.TopicData topicData =
responseData.topics().get(0);
+ assertEquals(context.metadataPartition.topic(), topicData.topicName());
+
+ assertEquals(1, topicData.partitions().size());
+ DescribeQuorumResponseData.PartitionData partitionData =
topicData.partitions().get(0);
+ assertEquals(context.metadataPartition.partition(),
partitionData.partitionIndex());
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER,
Errors.forCode(partitionData.errorCode()));
+ }
+
@Test
public void testDescribeQuorum() throws Exception {
int localId = 0;
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index d48e41fb31..b825fc8867 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.raft;
-import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.memory.MemoryPool;
@@ -57,8 +56,8 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.internals.BatchBuilder;
import org.apache.kafka.raft.internals.StringSerde;
import org.apache.kafka.server.common.serialization.RecordSerde;
-import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -76,6 +75,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
@@ -128,7 +128,7 @@ public final class RaftClientTestContext {
private final QuorumStateStore quorumStateStore = new
MockQuorumStateStore();
private final MockableRandom random = new MockableRandom(1L);
private final LogContext logContext = new LogContext();
- private final MockLog log = new MockLog(METADATA_PARTITION,
Uuid.METADATA_TOPIC_ID, logContext);
+ private final MockLog log = new MockLog(METADATA_PARTITION,
Uuid.METADATA_TOPIC_ID, logContext);
private final Set<Integer> voters;
private final OptionalInt localId;
@@ -440,21 +440,24 @@ public final class RaftClientTestContext {
assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters),
quorumStateStore.readElectionState());
}
- int assertSentDescribeQuorumResponse(
- int leaderId,
- int leaderEpoch,
- long highWatermark,
- List<ReplicaState> voterStates,
- List<ReplicaState> observerStates
- ) {
+ DescribeQuorumResponseData collectDescribeQuorumResponse() {
List<RaftResponse.Outbound> sentMessages =
drainSentResponses(ApiKeys.DESCRIBE_QUORUM);
assertEquals(1, sentMessages.size());
RaftResponse.Outbound raftMessage = sentMessages.get(0);
assertTrue(
raftMessage.data() instanceof DescribeQuorumResponseData,
"Unexpected request type " + raftMessage.data());
- DescribeQuorumResponseData response = (DescribeQuorumResponseData)
raftMessage.data();
+ return (DescribeQuorumResponseData) raftMessage.data();
+ }
+ void assertSentDescribeQuorumResponse(
+ int leaderId,
+ int leaderEpoch,
+ long highWatermark,
+ List<ReplicaState> voterStates,
+ List<ReplicaState> observerStates
+ ) {
+ DescribeQuorumResponseData response = collectDescribeQuorumResponse();
DescribeQuorumResponseData expectedResponse =
DescribeQuorumResponse.singletonResponse(
metadataPartition,
leaderId,
@@ -462,9 +465,7 @@ public final class RaftClientTestContext {
highWatermark,
voterStates,
observerStates);
-
assertEquals(expectedResponse, response);
- return raftMessage.correlationId();
}
int assertSentVoteRequest(int epoch, int lastEpoch, long lastEpochOffset,
int numVoteReceivers) {