This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2023aed KAFKA-10427: Fetch snapshot API (#9553)
2023aed is described below
commit 2023aed59d863278a6302e03066d387f994f085c
Author: José Armando García Sancio <[email protected]>
AuthorDate: Mon Dec 28 18:37:08 2020 -0800
KAFKA-10427: Fetch snapshot API (#9553)
Implements the code necessary for the leader to response to fetch snapshot
requests and for the follower to fetch snapshots. This API is described in more
detail in KIP-630:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot.
More specifically, this patch includes the following changes:
Leader Changes:
1. Raft leader response to FetchSnapshot request by reading the local
snapshot and sending the requested bytes in the response. This implementation
currently copies the bytes to memory. This will be fixed in a future PR.
Follower Changes:
1. Raft followers will start fetching snapshot if the leader sends a Fetch
response that includes a SnapshotId.
2. Raft followers send FetchSnapshot requests if there is a pending
download. The same timer is used for both Fetch and FetchSnapshot requests.
3. Raft follower handle FetchSnapshot responses by comping the bytes to the
pending SnapshotWriter. This implementation doesn't fix the replicated log
after the snapshot has been downloaded. This will be implemented in a future PR.
Reviewers: Jason Gustafson <[email protected]>
---
.../common/errors/PositionOutOfRangeException.java | 27 +-
.../common/errors/SnapshotNotFoundException.java | 27 +-
.../org/apache/kafka/common/protocol/ApiKeys.java | 3 +-
.../org/apache/kafka/common/protocol/Errors.java | 15 +-
.../kafka/common/requests/AbstractRequest.java | 2 +
.../kafka/common/requests/AbstractResponse.java | 2 +
.../common/requests/FetchSnapshotRequest.java | 122 +++
.../common/requests/FetchSnapshotResponse.java | 130 +++
.../resources/common/message/FetchResponse.json | 7 +
.../common/message/FetchSnapshotRequest.json | 50 +
.../common/message/FetchSnapshotResponse.json | 59 +
.../scala/kafka/network/RequestConvertToJson.scala | 2 +
.../scala/kafka/raft/KafkaNetworkChannel.scala | 3 +
core/src/main/scala/kafka/server/KafkaApis.scala | 3 +-
.../scala/kafka/tools/TestRaftRequestHandler.scala | 10 +-
.../java/org/apache/kafka/raft/CandidateState.java | 3 +
.../java/org/apache/kafka/raft/EpochState.java | 3 +-
.../java/org/apache/kafka/raft/FollowerState.java | 26 +
.../org/apache/kafka/raft/KafkaRaftClient.java | 292 ++++-
.../java/org/apache/kafka/raft/LeaderState.java | 3 +
.../java/org/apache/kafka/raft/QuorumState.java | 4 +
.../java/org/apache/kafka/raft/ResignedState.java | 3 +
.../org/apache/kafka/raft/UnattachedState.java | 2 +
.../java/org/apache/kafka/raft/VotedState.java | 2 +
.../kafka/raft/KafkaRaftClientSnapshotTest.java | 1153 ++++++++++++++++++++
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 2 +-
.../test/java/org/apache/kafka/raft/MockLog.java | 4 +
.../apache/kafka/raft/RaftClientTestContext.java | 46 +-
.../apache/kafka/snapshot/SnapshotWriterTest.java | 2 +-
29 files changed, 1934 insertions(+), 73 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/EpochState.java
b/clients/src/main/java/org/apache/kafka/common/errors/PositionOutOfRangeException.java
similarity index 63%
copy from raft/src/main/java/org/apache/kafka/raft/EpochState.java
copy to
clients/src/main/java/org/apache/kafka/common/errors/PositionOutOfRangeException.java
index 626657b..c502d19 100644
--- a/raft/src/main/java/org/apache/kafka/raft/EpochState.java
+++
b/clients/src/main/java/org/apache/kafka/common/errors/PositionOutOfRangeException.java
@@ -14,29 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.raft;
+package org.apache.kafka.common.errors;
-import java.util.Optional;
+public class PositionOutOfRangeException extends ApiException {
-public interface EpochState {
+ private static final long serialVersionUID = 1;
- default Optional<LogOffsetMetadata> highWatermark() {
- return Optional.empty();
+ public PositionOutOfRangeException(String s) {
+ super(s);
}
- /**
- * Get the current election state, which is guaranteed to be immutable.
- */
- ElectionState election();
-
- /**
- * Get the current (immutable) epoch.
- */
- int epoch();
-
- /**
- * User-friendly description of the state
- */
- String name();
+ public PositionOutOfRangeException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/EpochState.java
b/clients/src/main/java/org/apache/kafka/common/errors/SnapshotNotFoundException.java
similarity index 63%
copy from raft/src/main/java/org/apache/kafka/raft/EpochState.java
copy to
clients/src/main/java/org/apache/kafka/common/errors/SnapshotNotFoundException.java
index 626657b..5b3e7ed 100644
--- a/raft/src/main/java/org/apache/kafka/raft/EpochState.java
+++
b/clients/src/main/java/org/apache/kafka/common/errors/SnapshotNotFoundException.java
@@ -14,29 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.raft;
+package org.apache.kafka.common.errors;
-import java.util.Optional;
+public class SnapshotNotFoundException extends ApiException {
-public interface EpochState {
+ private static final long serialVersionUID = 1;
- default Optional<LogOffsetMetadata> highWatermark() {
- return Optional.empty();
+ public SnapshotNotFoundException(String s) {
+ super(s);
}
- /**
- * Get the current election state, which is guaranteed to be immutable.
- */
- ElectionState election();
-
- /**
- * Get the current (immutable) epoch.
- */
- int epoch();
-
- /**
- * User-friendly description of the state
- */
- String name();
+ public SnapshotNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 3fe016e..790f397 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -96,7 +96,8 @@ public enum ApiKeys {
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true,
RecordBatch.MAGIC_VALUE_V0, false, false),
ALTER_ISR(ApiMessageType.ALTER_ISR, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
- ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false,
false);
+ ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false,
false),
+ FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false,
RecordBatch.MAGIC_VALUE_V0, false, false);
// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index a0d5669..ed505f7 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -34,10 +34,9 @@ import
org.apache.kafka.common.errors.DuplicateResourceException;
import org.apache.kafka.common.errors.DuplicateSequenceException;
import org.apache.kafka.common.errors.ElectionNotNeededException;
import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException;
-import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FeatureUpdateFailedException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
-import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
@@ -56,6 +55,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
@@ -65,6 +65,7 @@ import
org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
+import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.ListenerNotFoundException;
@@ -83,6 +84,7 @@ import
org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OperationNotAttemptedException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.PositionOutOfRangeException;
import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException;
import org.apache.kafka.common.errors.PrincipalDeserializationException;
import org.apache.kafka.common.errors.ProducerFencedException;
@@ -95,6 +97,7 @@ import
org.apache.kafka.common.errors.ResourceNotFoundException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
+import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
@@ -115,7 +118,6 @@ import
org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -343,7 +345,12 @@ public enum Errors {
INVALID_UPDATE_VERSION(95, "The given update version was invalid.",
InvalidUpdateVersionException::new),
FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an
unexpected server error.", FeatureUpdateFailedException::new),
PRINCIPAL_DESERIALIZATION_FAILURE(97, "Request principal deserialization
failed during forwarding. " +
- "This indicates an internal error on the broker cluster security
setup.", PrincipalDeserializationException::new);
+ "This indicates an internal error on the broker cluster security
setup.", PrincipalDeserializationException::new),
+ SNAPSHOT_NOT_FOUND(98, "Requested snapshot was not found",
SnapshotNotFoundException::new),
+ POSITION_OUT_OF_RANGE(
+ 99,
+ "Requested position is not greater than or equal to zero, and less
than the size of the snapshot.",
+ PositionOutOfRangeException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index ca58e82..6dbd3d6 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -272,6 +272,8 @@ public abstract class AbstractRequest implements
AbstractRequestResponse {
return UpdateFeaturesRequest.parse(buffer, apiVersion);
case ENVELOPE:
return EnvelopeRequest.parse(buffer, apiVersion);
+ case FETCH_SNAPSHOT:
+ return FetchSnapshotRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not
currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 26eda33..f41b68f 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -229,6 +229,8 @@ public abstract class AbstractResponse implements
AbstractRequestResponse {
return UpdateFeaturesResponse.parse(responseBuffer, version);
case ENVELOPE:
return EnvelopeResponse.parse(responseBuffer, version);
+ case FETCH_SNAPSHOT:
+ return FetchSnapshotResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not
currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java
new file mode 100644
index 0000000..30952ac
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotRequest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.function.UnaryOperator;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.FetchSnapshotRequestData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+final public class FetchSnapshotRequest extends AbstractRequest {
+ public final FetchSnapshotRequestData data;
+
+ public FetchSnapshotRequest(FetchSnapshotRequestData data, short version) {
+ super(ApiKeys.FETCH_SNAPSHOT, (short)
(FetchSnapshotRequestData.SCHEMAS.length - 1));
+ this.data = data;
+ }
+
+ @Override
+ public FetchSnapshotResponse getErrorResponse(int throttleTimeMs,
Throwable e) {
+ return new FetchSnapshotResponse(
+ new FetchSnapshotResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setErrorCode(Errors.forException(e).code())
+ );
+ }
+
+ @Override
+ public ApiMessage data() {
+ return data;
+ }
+
+ /**
+ * Creates a FetchSnapshotRequestData with a single PartitionSnapshot for
the topic partition.
+ *
+ * The partition index will already be populated when calling operator.
+ *
+ * @param topicPartition the topic partition to include
+ * @param operator unary operator responsible for populating all the
appropriate fields
+ * @return the created fetch snapshot request data
+ */
+ public static FetchSnapshotRequestData singleton(
+ TopicPartition topicPartition,
+ UnaryOperator<FetchSnapshotRequestData.PartitionSnapshot> operator
+ ) {
+ FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot =
operator.apply(
+ new
FetchSnapshotRequestData.PartitionSnapshot().setPartition(topicPartition.partition())
+ );
+
+ return new FetchSnapshotRequestData()
+ .setTopics(
+ Collections.singletonList(
+ new FetchSnapshotRequestData.TopicSnapshot()
+ .setName(topicPartition.topic())
+
.setPartitions(Collections.singletonList(partitionSnapshot))
+ )
+ );
+ }
+
+ /**
+ * Finds the PartitionSnapshot for a given topic partition.
+ *
+ * @param data the fetch snapshot request data
+ * @param topicPartition the topic partition to find
+ * @return the request partition snapshot if found, otherwise an empty
Optional
+ */
+ public static Optional<FetchSnapshotRequestData.PartitionSnapshot>
forTopicPartition(
+ FetchSnapshotRequestData data,
+ TopicPartition topicPartition
+ ) {
+ return data
+ .topics()
+ .stream()
+ .filter(topic -> topic.name().equals(topicPartition.topic()))
+ .flatMap(topic -> topic.partitions().stream())
+ .filter(partition -> partition.partition() ==
topicPartition.partition())
+ .findAny();
+ }
+
+ public static FetchSnapshotRequest parse(ByteBuffer buffer, short version)
{
+ return new FetchSnapshotRequest(new FetchSnapshotRequestData(new
ByteBufferAccessor(buffer), version), version);
+ }
+
+ public static class Builder extends
AbstractRequest.Builder<FetchSnapshotRequest> {
+ private final FetchSnapshotRequestData data;
+
+ public Builder(FetchSnapshotRequestData data) {
+ super(ApiKeys.FETCH_SNAPSHOT);
+ this.data = data;
+ }
+
+ @Override
+ public FetchSnapshotRequest build(short version) {
+ return new FetchSnapshotRequest(data, version);
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java
new file mode 100644
index 0000000..22e1f81
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/requests/FetchSnapshotResponse.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.UnaryOperator;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+final public class FetchSnapshotResponse extends AbstractResponse {
+ public final FetchSnapshotResponseData data;
+
+ public FetchSnapshotResponse(FetchSnapshotResponseData data) {
+ super(ApiKeys.FETCH_SNAPSHOT);
+
+ this.data = data;
+ }
+
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ Map<Errors, Integer> errors = new HashMap<>();
+
+ Errors topLevelError = Errors.forCode(data.errorCode());
+ if (topLevelError != Errors.NONE) {
+ errors.put(topLevelError, 1);
+ }
+
+ for (FetchSnapshotResponseData.TopicSnapshot topicResponse :
data.topics()) {
+ for (FetchSnapshotResponseData.PartitionSnapshot partitionResponse
: topicResponse.partitions()) {
+ errors.compute(Errors.forCode(partitionResponse.errorCode()),
+ (error, count) -> count == null ? 1 : count + 1);
+ }
+ }
+
+ return errors;
+ }
+
+ @Override
+ public int throttleTimeMs() {
+ return data.throttleTimeMs();
+ }
+
+ @Override
+ public ApiMessage data() {
+ return data;
+ }
+
+ /**
+ * Creates a FetchSnapshotResponseData with a top level error.
+ *
+ * @param error the top level error
+ * @return the created fetch snapshot response data
+ */
+ public static FetchSnapshotResponseData withTopLevelError(Errors error) {
+ return new FetchSnapshotResponseData().setErrorCode(error.code());
+ }
+
+ /**
+ * Creates a FetchSnapshotResponseData with a single PartitionSnapshot for
the topic partition.
+ *
+ * The partition index will already by populated when calling operator.
+ *
+ * @param topicPartition the topic partition to include
+ * @param operator unary operator responsible for populating all of the
appropriate fields
+ * @return the created fetch snapshot response data
+ */
+ public static FetchSnapshotResponseData singleton(
+ TopicPartition topicPartition,
+ UnaryOperator<FetchSnapshotResponseData.PartitionSnapshot> operator
+ ) {
+ FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot =
operator.apply(
+ new
FetchSnapshotResponseData.PartitionSnapshot().setIndex(topicPartition.partition())
+ );
+
+ return new FetchSnapshotResponseData()
+ .setTopics(
+ Collections.singletonList(
+ new FetchSnapshotResponseData.TopicSnapshot()
+ .setName(topicPartition.topic())
+
.setPartitions(Collections.singletonList(partitionSnapshot))
+ )
+ );
+ }
+
+ /**
+ * Finds the PartitionSnapshot for a given topic partition.
+ *
+ * @param data the fetch snapshot response data
+ * @param topicPartition the topic partition to find
+ * @return the response partition snapshot if found, otherwise an empty
Optional
+ */
+ public static Optional<FetchSnapshotResponseData.PartitionSnapshot>
forTopicPartition(
+ FetchSnapshotResponseData data,
+ TopicPartition topicPartition
+ ) {
+ return data
+ .topics()
+ .stream()
+ .filter(topic -> topic.name().equals(topicPartition.topic()))
+ .flatMap(topic -> topic.partitions().stream())
+ .filter(parition -> parition.index() == topicPartition.partition())
+ .findAny();
+ }
+
+ public static FetchSnapshotResponse parse(ByteBuffer buffer, short
version) {
+ return new FetchSnapshotResponse(new FetchSnapshotResponseData(new
ByteBufferAccessor(buffer), version));
+ }
+}
diff --git a/clients/src/main/resources/common/message/FetchResponse.json
b/clients/src/main/resources/common/message/FetchResponse.json
index 3db4359..0aa9a3a 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -78,6 +78,13 @@
{ "name": "LeaderEpoch", "type": "int32", "versions": "12+",
"default": "-1",
"about": "The latest known leader epoch"}
]},
+ { "name": "SnapshotId", "type": "SnapshotId",
+ "versions": "12+", "taggedVersions": "12+", "tag": 2,
+ "about": "In the case of fetching an offset less than the
LogStartOffset, this is the end offset and epoch that should be used in the
FetchSnapshot request.",
+ "fields": [
+ { "name": "EndOffset", "type": "int64", "versions": "0+",
"default": "-1" },
+ { "name": "Epoch", "type": "int32", "versions": "0+", "default":
"-1" }
+ ]},
{ "name": "AbortedTransactions", "type": "[]AbortedTransaction",
"versions": "4+", "nullableVersions": "4+", "ignorable": true,
"about": "The aborted transactions.", "fields": [
{ "name": "ProducerId", "type": "int64", "versions": "4+",
"entityType": "producerId",
diff --git
a/clients/src/main/resources/common/message/FetchSnapshotRequest.json
b/clients/src/main/resources/common/message/FetchSnapshotRequest.json
new file mode 100644
index 0000000..c3518f4
--- /dev/null
+++ b/clients/src/main/resources/common/message/FetchSnapshotRequest.json
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 59,
+ "type": "request",
+ "name": "FetchSnapshotRequest",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "ClusterId", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null", "taggedVersions": "0+", "tag": 0,
+ "about": "The clusterId if known, this is used to validate metadata
fetches prior to broker registration" },
+ { "name": "ReplicaId", "type": "int32", "versions": "0+", "default": "-1",
+ "about": "The broker ID of the follower" },
+ { "name": "MaxBytes", "type": "int32", "versions": "0+", "default":
"0x7fffffff",
+ "about": "The maximum bytes to fetch from all of the snapshots" },
+ { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
+ "about": "The topics to fetch", "fields": [
+ { "name": "Name", "type": "string", "versions": "0+", "entityType":
"topicName",
+ "about": "The name of the topic to fetch" },
+ { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
+ "about": "The partitions to fetch", "fields": [
+ { "name": "Partition", "type": "int32", "versions": "0+",
+ "about": "The partition index" },
+ { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+",
+ "about": "The current leader epoch of the partition, -1 for unknown
leader epoch" },
+ { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
+ "about": "The snapshot endOffset and epoch to fetch",
+ "fields": [
+ { "name": "EndOffset", "type": "int64", "versions": "0+" },
+ { "name": "Epoch", "type": "int32", "versions": "0+" }
+ ]},
+ { "name": "Position", "type": "int64", "versions": "0+",
+ "about": "The byte position within the snapshot to start fetching
from" }
+ ]}
+ ]}
+ ]
+}
diff --git
a/clients/src/main/resources/common/message/FetchSnapshotResponse.json
b/clients/src/main/resources/common/message/FetchSnapshotResponse.json
new file mode 100644
index 0000000..711b536
--- /dev/null
+++ b/clients/src/main/resources/common/message/FetchSnapshotResponse.json
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 59,
+ "type": "response",
+ "name": "FetchSnapshotResponse",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": [
+ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"ignorable": true,
+ "about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
+ { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable":
false,
+ "about": "The top level response error code." },
+ { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
+ "about": "The topics to fetch.", "fields": [
+ { "name": "Name", "type": "string", "versions": "0+", "entityType":
"topicName",
+ "about": "The name of the topic to fetch." },
+ { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
+ "about": "The partitions to fetch.", "fields": [
+ { "name": "Index", "type": "int32", "versions": "0+",
+ "about": "The partition index." },
+ { "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The error code, or 0 if there was no fetch error." },
+ { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
+ "about": "The snapshot endOffset and epoch fetched",
+ "fields": [
+ { "name": "EndOffset", "type": "int64", "versions": "0+" },
+ { "name": "Epoch", "type": "int32", "versions": "0+" }
+ ]},
+ { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
+ "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
+ { "name": "LeaderId", "type": "int32", "versions": "0+",
+ "about": "The ID of the current leader or -1 if the leader is
unknown."},
+ { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
+ "about": "The latest known leader epoch"}
+ ]},
+ { "name": "Size", "type": "int64", "versions": "0+",
+ "about": "The total size of the snapshot." },
+ { "name": "Position", "type": "int64", "versions": "0+",
+ "about": "The starting byte position within the snapshot included in
the Bytes field." },
+ { "name": "Bytes", "type": "bytes", "versions": "0+", "zeroCopy": true,
+ "about": "Snapshot data." }
+ ]}
+ ]}
+ ]
+}
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
index f610df9..db3a11b 100644
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
@@ -86,6 +86,7 @@ object RequestConvertToJson {
case req: UpdateMetadataRequest =>
UpdateMetadataRequestDataJsonConverter.write(req.data, request.version)
case req: VoteRequest => VoteRequestDataJsonConverter.write(req.data,
request.version)
case req: WriteTxnMarkersRequest =>
WriteTxnMarkersRequestDataJsonConverter.write(req.data, request.version)
+ case req: FetchSnapshotRequest =>
FetchSnapshotRequestDataJsonConverter.write(req.data, request.version)
case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is
not currently handled in `request`, the " +
"code should be updated to do so.");
}
@@ -152,6 +153,7 @@ object RequestConvertToJson {
case res: UpdateMetadataResponse =>
UpdateMetadataResponseDataJsonConverter.write(res.data, version)
case res: WriteTxnMarkersResponse =>
WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
case res: VoteResponse => VoteResponseDataJsonConverter.write(res.data,
version)
+ case res: FetchSnapshotResponse =>
FetchSnapshotResponseDataJsonConverter.write(res.data, version)
case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is
not currently handled in `response`, the " +
"code should be updated to do so.");
}
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index 4351672..8c121b0 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -46,7 +46,10 @@ object KafkaNetworkChannel {
// Since we already have the request, we go through a simplified
builder
new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
override def build(version: Short): FetchRequest = new
FetchRequest(fetchRequest, version)
+ override def toString(): String = fetchRequest.toString
}
+ case fetchSnapshotRequest: FetchSnapshotRequestData =>
+ new FetchSnapshotRequest.Builder(fetchSnapshotRequest)
case _ =>
throw new IllegalArgumentException(s"Unexpected type for requestData:
$requestData")
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 183fd39..0477490 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -255,6 +255,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.BEGIN_QUORUM_EPOCH => closeConnection(request,
util.Collections.emptyMap())
case ApiKeys.END_QUORUM_EPOCH => closeConnection(request,
util.Collections.emptyMap())
case ApiKeys.DESCRIBE_QUORUM => closeConnection(request,
util.Collections.emptyMap())
+ case ApiKeys.FETCH_SNAPSHOT => closeConnection(request,
util.Collections.emptyMap())
}
}
} catch {
@@ -3508,4 +3509,4 @@ object KafkaApis {
FetchResponse.sizeOf(versionId, unconvertedResponse.responseData.entrySet
.iterator.asScala.filter(element =>
quota.isThrottled(element.getKey)).asJava)
}
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
index f8ab30c..298baf8 100644
--- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
@@ -22,10 +22,10 @@ import kafka.network.RequestConvertToJson
import kafka.server.ApiRequestHandler
import kafka.utils.Logging
import org.apache.kafka.common.internals.FatalExitError
-import org.apache.kafka.common.message.{BeginQuorumEpochResponseData,
EndQuorumEpochResponseData, FetchResponseData, VoteResponseData}
+import org.apache.kafka.common.message.{BeginQuorumEpochResponseData,
EndQuorumEpochResponseData, FetchResponseData, FetchSnapshotResponseData,
VoteResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.record.BaseRecords
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
BeginQuorumEpochResponse, EndQuorumEpochResponse, FetchResponse, VoteResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
BeginQuorumEpochResponse, EndQuorumEpochResponse, FetchResponse,
FetchSnapshotResponse, VoteResponse}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.raft.{KafkaRaftClient, RaftRequest}
@@ -49,7 +49,7 @@ class TestRaftRequestHandler(
case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request)
case ApiKeys.FETCH => handleFetch(request)
-
+ case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
case _ => throw new IllegalArgumentException(s"Unsupported api key:
${request.header.apiKey}")
}
} catch {
@@ -78,6 +78,10 @@ class TestRaftRequestHandler(
handle(request, response => new
FetchResponse[BaseRecords](response.asInstanceOf[FetchResponseData]))
}
+ private def handleFetchSnapshot(request: RequestChannel.Request): Unit = {
+ handle(request, response => new
FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
+ }
+
private def handle(
request: RequestChannel.Request,
buildResponse: ApiMessage => AbstractResponse
diff --git a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
index eb58b32..08f6906 100644
--- a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
@@ -250,6 +250,9 @@ public class CandidateState implements EpochState {
return "Candidate";
}
+ @Override
+ public void close() {}
+
private enum State {
UNRECORDED,
GRANTED,
diff --git a/raft/src/main/java/org/apache/kafka/raft/EpochState.java
b/raft/src/main/java/org/apache/kafka/raft/EpochState.java
index 626657b..b32a200 100644
--- a/raft/src/main/java/org/apache/kafka/raft/EpochState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/EpochState.java
@@ -16,9 +16,10 @@
*/
package org.apache.kafka.raft;
+import java.io.Closeable;
import java.util.Optional;
-public interface EpochState {
+public interface EpochState extends Closeable {
default Optional<LogOffsetMetadata> highWatermark() {
return Optional.empty();
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index 5cbe45be..e1ef7aa 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -18,7 +18,9 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.snapshot.RawSnapshotWriter;
+import java.io.IOException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
@@ -29,8 +31,13 @@ public class FollowerState implements EpochState {
private final int epoch;
private final int leaderId;
private final Set<Integer> voters;
+ // Used for tracking the expiration of both the Fetch and FetchSnapshot
requests
private final Timer fetchTimer;
private Optional<LogOffsetMetadata> highWatermark;
+ /* Used to track the currently fetching snapshot. When fetching snapshot
regular
+ * Fetch request are paused
+ */
+ private Optional<RawSnapshotWriter> fetchingSnapshot;
public FollowerState(
Time time,
@@ -46,6 +53,7 @@ public class FollowerState implements EpochState {
this.voters = voters;
this.fetchTimer = time.timer(fetchTimeoutMs);
this.highWatermark = highWatermark;
+ this.fetchingSnapshot = Optional.empty();
}
@Override
@@ -120,6 +128,17 @@ public class FollowerState implements EpochState {
return highWatermark;
}
+ public Optional<RawSnapshotWriter> fetchingSnapshot() {
+ return fetchingSnapshot;
+ }
+
+ public void setFetchingSnapshot(Optional<RawSnapshotWriter>
fetchingSnapshot) throws IOException {
+ if (fetchingSnapshot.isPresent()) {
+ fetchingSnapshot.get().close();
+ }
+ this.fetchingSnapshot = fetchingSnapshot;
+ }
+
@Override
public String toString() {
return "FollowerState(" +
@@ -129,4 +148,11 @@ public class FollowerState implements EpochState {
", voters=" + voters +
')';
}
+
+ @Override
+ public void close() throws IOException {
+ if (fetchingSnapshot.isPresent()) {
+ fetchingSnapshot.get().close();
+ }
+ }
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 2abbebf..241e08c 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -17,20 +17,23 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.DescribeQuorumRequestData;
-import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
-import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.FetchSnapshotRequestData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
@@ -47,6 +50,8 @@ import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.FetchSnapshotRequest;
+import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.LogContext;
@@ -62,11 +67,14 @@ import org.apache.kafka.raft.internals.KafkaRaftMetrics;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.raft.internals.RecordsBatchReader;
import org.apache.kafka.raft.internals.ThresholdPurgatory;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.RawSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotWriter;
import org.slf4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -999,7 +1007,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
- private OptionalInt optionalLeaderId(int leaderIdOrNil) {
+ private static OptionalInt optionalLeaderId(int leaderIdOrNil) {
if (leaderIdOrNil < 0)
return OptionalInt.empty();
return OptionalInt.of(leaderIdOrNil);
@@ -1053,6 +1061,32 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
logger.info("Truncated to offset {} from Fetch response
from leader {}",
truncationOffset, quorum.leaderIdOrNil());
});
+ } else if (partitionResponse.snapshotId().epoch() >= 0 ||
+ partitionResponse.snapshotId().endOffset() >= 0) {
+ // The leader is asking us to fetch a snapshot
+
+ if (partitionResponse.snapshotId().epoch() < 0) {
+ logger.error(
+ "The leader sent a snapshot id with a valid end offset
{} but with an invalid epoch {}",
+ partitionResponse.snapshotId().endOffset(),
+ partitionResponse.snapshotId().epoch()
+ );
+ return false;
+ } else if (partitionResponse.snapshotId().endOffset() < 0) {
+ logger.error(
+ "The leader sent a snapshot id with a valid epoch {}
but with an invalid end offset {}",
+ partitionResponse.snapshotId().epoch(),
+ partitionResponse.snapshotId().endOffset()
+ );
+ return false;
+ } else {
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+ partitionResponse.snapshotId().endOffset(),
+ partitionResponse.snapshotId().epoch()
+ );
+
+
state.setFetchingSnapshot(Optional.of(log.createSnapshot(snapshotId)));
+ }
} else {
Records records = (Records) partitionResponse.recordSet();
if (records.sizeInBytes() > 0) {
@@ -1117,6 +1151,187 @@ public class KafkaRaftClient<T> implements
RaftClient<T> {
);
}
+ private FetchSnapshotResponseData handleFetchSnapshotRequest(
+ RaftRequest.Inbound requestMetadata
+ ) throws IOException {
+ FetchSnapshotRequestData data = (FetchSnapshotRequestData)
requestMetadata.data;
+
+ if (data.topics().size() != 1 &&
data.topics().get(0).partitions().size() != 1) {
+ return
FetchSnapshotResponse.withTopLevelError(Errors.INVALID_REQUEST);
+ }
+
+ Optional<FetchSnapshotRequestData.PartitionSnapshot>
partitionSnapshotOpt = FetchSnapshotRequest
+ .forTopicPartition(data, log.topicPartition());
+ if (!partitionSnapshotOpt.isPresent()) {
+ // The Raft client assumes that there is only one topic partition.
+ TopicPartition unknownTopicPartition = new TopicPartition(
+ data.topics().get(0).name(),
+ data.topics().get(0).partitions().get(0).partition()
+ );
+
+ return FetchSnapshotResponse.singleton(
+ unknownTopicPartition,
+ responsePartitionSnapshot -> {
+ return responsePartitionSnapshot
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code());
+ }
+ );
+ }
+
+ FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot =
partitionSnapshotOpt.get();
+ Optional<Errors> leaderValidation = validateLeaderOnlyRequest(
+ partitionSnapshot.currentLeaderEpoch()
+ );
+ if (leaderValidation.isPresent()) {
+ return FetchSnapshotResponse.singleton(
+ log.topicPartition(),
+ responsePartitionSnapshot -> {
+ return addQuorumLeader(responsePartitionSnapshot)
+ .setErrorCode(leaderValidation.get().code());
+ }
+ );
+ }
+
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+ partitionSnapshot.snapshotId().endOffset(),
+ partitionSnapshot.snapshotId().epoch()
+ );
+ Optional<RawSnapshotReader> snapshotOpt = log.readSnapshot(snapshotId);
+ if (!snapshotOpt.isPresent()) {
+ return FetchSnapshotResponse.singleton(
+ log.topicPartition(),
+ responsePartitionSnapshot -> {
+ return addQuorumLeader(responsePartitionSnapshot)
+ .setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
+ }
+ );
+ }
+
+ try (RawSnapshotReader snapshot = snapshotOpt.get()) {
+ if (partitionSnapshot.position() < 0 ||
partitionSnapshot.position() >= snapshot.sizeInBytes()) {
+ return FetchSnapshotResponse.singleton(
+ log.topicPartition(),
+ responsePartitionSnapshot -> {
+ return addQuorumLeader(responsePartitionSnapshot)
+ .setErrorCode(Errors.POSITION_OUT_OF_RANGE.code());
+ }
+ );
+ }
+
+ int maxSnapshotSize;
+ try {
+ maxSnapshotSize = Math.toIntExact(snapshot.sizeInBytes());
+ } catch (ArithmeticException e) {
+ maxSnapshotSize = Integer.MAX_VALUE;
+ }
+
+ ByteBuffer buffer = ByteBuffer.allocate(Math.min(data.maxBytes(),
maxSnapshotSize));
+ snapshot.read(buffer, partitionSnapshot.position());
+ buffer.flip();
+
+ long snapshotSize = snapshot.sizeInBytes();
+
+ return FetchSnapshotResponse.singleton(
+ log.topicPartition(),
+ responsePartitionSnapshot -> {
+ addQuorumLeader(responsePartitionSnapshot)
+ .snapshotId()
+ .setEndOffset(snapshotId.offset)
+ .setEpoch(snapshotId.epoch);
+
+ return responsePartitionSnapshot
+ .setSize(snapshotSize)
+ .setPosition(partitionSnapshot.position())
+ .setBytes(buffer);
+ }
+ );
+ }
+ }
+
+ private boolean handleFetchSnapshotResponse(
+ RaftResponse.Inbound responseMetadata,
+ long currentTimeMs
+ ) throws IOException {
+ FetchSnapshotResponseData data = (FetchSnapshotResponseData)
responseMetadata.data;
+ Errors topLevelError = Errors.forCode(data.errorCode());
+ if (topLevelError != Errors.NONE) {
+ return handleTopLevelError(topLevelError, responseMetadata);
+ }
+
+ if (data.topics().size() != 1 &&
data.topics().get(0).partitions().size() != 1) {
+ return false;
+ }
+
+ Optional<FetchSnapshotResponseData.PartitionSnapshot>
partitionSnapshotOpt = FetchSnapshotResponse
+ .forTopicPartition(data, log.topicPartition());
+ if (!partitionSnapshotOpt.isPresent()) {
+ return false;
+ }
+
+ FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot =
partitionSnapshotOpt.get();
+
+ FetchSnapshotResponseData.LeaderIdAndEpoch currentLeaderIdAndEpoch =
partitionSnapshot.currentLeader();
+ OptionalInt responseLeaderId =
optionalLeaderId(currentLeaderIdAndEpoch.leaderId());
+ int responseEpoch = currentLeaderIdAndEpoch.leaderEpoch();
+ Errors error = Errors.forCode(partitionSnapshot.errorCode());
+
+ Optional<Boolean> handled = maybeHandleCommonResponse(
+ error, responseLeaderId, responseEpoch, currentTimeMs);
+ if (handled.isPresent()) {
+ return handled.get();
+ }
+
+ FollowerState state = quorum.followerStateOrThrow();
+
+ if (Errors.forCode(partitionSnapshot.errorCode()) ==
Errors.SNAPSHOT_NOT_FOUND ||
+ partitionSnapshot.snapshotId().endOffset() < 0 ||
+ partitionSnapshot.snapshotId().epoch() < 0) {
+
+ /* The leader deleted the snapshot before the follower could
download it. Start over by
+ * reseting the fetching snapshot state and sending another fetch
request.
+ */
+ logger.trace(
+ "Leader doesn't know about snapshot id {}, returned error {}
and snapshot id {}",
+ state.fetchingSnapshot(),
+ partitionSnapshot.errorCode(),
+ partitionSnapshot.snapshotId()
+ );
+ state.setFetchingSnapshot(Optional.empty());
+ state.resetFetchTimeout(currentTimeMs);
+ return true;
+ }
+
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+ partitionSnapshot.snapshotId().endOffset(),
+ partitionSnapshot.snapshotId().epoch()
+ );
+
+ RawSnapshotWriter snapshot;
+ if (state.fetchingSnapshot().isPresent()) {
+ snapshot = state.fetchingSnapshot().get();
+ } else {
+ throw new IllegalStateException(String.format("Received unexpected
fetch snapshot response: %s", partitionSnapshot));
+ }
+
+ if (!snapshot.snapshotId().equals(snapshotId)) {
+ throw new IllegalStateException(String.format("Received fetch
snapshot response with an invalid id. Expected %s; Received %s",
snapshot.snapshotId(), snapshotId));
+ }
+ if (snapshot.sizeInBytes() != partitionSnapshot.position()) {
+ throw new IllegalStateException(String.format("Received fetch
snapshot response with an invalid position. Expected %s; Received %s",
snapshot.sizeInBytes(), partitionSnapshot.position()));
+ }
+
+ snapshot.append(partitionSnapshot.bytes());
+
+ if (snapshot.sizeInBytes() == partitionSnapshot.size()) {
+ // Finished fetching the snapshot.
+ snapshot.freeze();
+ state.setFetchingSnapshot(Optional.empty());
+ }
+
+ state.resetFetchTimeout(currentTimeMs);
+ return true;
+ }
+
List<ReplicaState> convertToReplicaStates(Map<Integer, Long>
replicaEndOffsets) {
return replicaEndOffsets.entrySet().stream()
.map(entry -> new ReplicaState()
@@ -1263,6 +1478,10 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
handledSuccessfully = handleEndQuorumEpochResponse(response,
currentTimeMs);
break;
+ case FETCH_SNAPSHOT:
+ handledSuccessfully = handleFetchSnapshotResponse(response,
currentTimeMs);
+ break;
+
default:
throw new IllegalArgumentException("Received unexpected
response type: " + apiKey);
}
@@ -1338,6 +1557,10 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
responseFuture =
completedFuture(handleDescribeQuorumRequest(request, currentTimeMs));
break;
+ case FETCH_SNAPSHOT:
+ responseFuture =
completedFuture(handleFetchSnapshotRequest(request));
+ break;
+
default:
throw new IllegalArgumentException("Unexpected request type "
+ apiKey);
}
@@ -1386,7 +1609,9 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
ConnectionState connection = requestManager.getOrCreate(destinationId);
if (connection.isBackingOff(currentTimeMs)) {
- return connection.remainingBackoffMs(currentTimeMs);
+ long remainingBackoffMs =
connection.remainingBackoffMs(currentTimeMs);
+ logger.debug("Connection for {} is backing off for {} ms",
destinationId, remainingBackoffMs);
+ return remainingBackoffMs;
}
if (connection.isReady(currentTimeMs)) {
@@ -1495,6 +1720,34 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
}
}
+ private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch
snapshotId, long snapshotSize) {
+ FetchSnapshotRequestData.SnapshotId requestSnapshotId = new
FetchSnapshotRequestData.SnapshotId()
+ .setEpoch(snapshotId.epoch)
+ .setEndOffset(snapshotId.offset);
+
+ FetchSnapshotRequestData request = FetchSnapshotRequest.singleton(
+ log.topicPartition(),
+ snapshotPartition -> {
+ return snapshotPartition
+ .setCurrentLeaderEpoch(quorum.epoch())
+ .setSnapshotId(requestSnapshotId)
+ .setPosition(snapshotSize);
+ }
+ );
+
+ return request.setReplicaId(quorum.localId);
+ }
+
+ private FetchSnapshotResponseData.PartitionSnapshot addQuorumLeader(
+ FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot
+ ) {
+ partitionSnapshot.currentLeader()
+ .setLeaderEpoch(quorum.epoch())
+ .setLeaderId(quorum.leaderIdOrNil());
+
+ return partitionSnapshot;
+ }
+
public boolean isRunning() {
GracefulShutdown gracefulShutdown = shutdown.get();
return gracefulShutdown == null || !gracefulShutdown.isFinished();
@@ -1673,16 +1926,13 @@ public class KafkaRaftClient<T> implements
RaftClient<T> {
transitionToCandidate(currentTimeMs);
return 0L;
} else {
- long backoffMs = maybeSendRequest(
- currentTimeMs,
- state.leaderId(),
- this::buildFetchRequest
- );
+ long backoffMs = maybeSendFetchOrFetchSnapshot(state,
currentTimeMs);
+
return Math.min(backoffMs,
state.remainingFetchTimeMs(currentTimeMs));
}
}
- private long pollFollowerAsObserver(FollowerState state, long
currentTimeMs) {
+ private long pollFollowerAsObserver(FollowerState state, long
currentTimeMs) throws IOException {
if (state.hasFetchTimeoutExpired(currentTimeMs)) {
return maybeSendAnyVoterFetch(currentTimeMs);
} else {
@@ -1698,16 +1948,28 @@ public class KafkaRaftClient<T> implements
RaftClient<T> {
} else if (connection.isBackingOff(currentTimeMs)) {
backoffMs = maybeSendAnyVoterFetch(currentTimeMs);
} else {
- backoffMs = maybeSendRequest(
- currentTimeMs,
- state.leaderId(),
- this::buildFetchRequest
- );
+ backoffMs = maybeSendFetchOrFetchSnapshot(state,
currentTimeMs);
}
+
return Math.min(backoffMs,
state.remainingFetchTimeMs(currentTimeMs));
}
}
+ private long maybeSendFetchOrFetchSnapshot(FollowerState state, long
currentTimeMs) throws IOException {
+ final Supplier<ApiMessage> requestSupplier;
+
+ if (state.fetchingSnapshot().isPresent()) {
+ RawSnapshotWriter snapshot = state.fetchingSnapshot().get();
+ long snapshotSize = snapshot.sizeInBytes();
+
+ requestSupplier = () ->
buildFetchSnapshotRequest(snapshot.snapshotId(), snapshotSize);
+ } else {
+ requestSupplier = this::buildFetchRequest;
+ }
+
+ return maybeSendRequest(currentTimeMs, state.leaderId(),
requestSupplier);
+ }
+
private long pollVoted(long currentTimeMs) throws IOException {
VotedState state = quorum.votedStateOrThrow();
GracefulShutdown shutdown = this.shutdown.get();
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index c35abf1..0f0f728 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -301,4 +301,7 @@ public class LeaderState implements EpochState {
return "Leader";
}
+ @Override
+ public void close() {}
+
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index b9157c4..dc4f81f 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -426,6 +426,10 @@ public class QuorumState {
}
private void transitionTo(EpochState state) throws IOException {
+ if (this.state != null) {
+ this.state.close();
+ }
+
this.store.writeElectionState(state.election());
this.state = state;
log.info("Completed transition to {}", state);
diff --git a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
index 0dcb719..c1608aa 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
@@ -141,4 +141,7 @@ public class ResignedState implements EpochState {
", preferredSuccessors=" + preferredSuccessors +
')';
}
+
+ @Override
+ public void close() {}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
index ca69f59..62b82e2 100644
--- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
@@ -97,4 +97,6 @@ public class UnattachedState implements EpochState {
')';
}
+ @Override
+ public void close() {}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/VotedState.java
b/raft/src/main/java/org/apache/kafka/raft/VotedState.java
index 5b47c3a..4138176 100644
--- a/raft/src/main/java/org/apache/kafka/raft/VotedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/VotedState.java
@@ -107,4 +107,6 @@ public class VotedState implements EpochState {
')';
}
+ @Override
+ public void close() {}
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
new file mode 100644
index 0000000..61fb5b9
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -0,0 +1,1153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.FetchSnapshotRequestData;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.FetchSnapshotRequest;
+import org.apache.kafka.common.requests.FetchSnapshotResponse;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.internals.StringSerde;
+import org.apache.kafka.snapshot.RawSnapshotReader;
+import org.apache.kafka.snapshot.RawSnapshotWriter;
+import org.apache.kafka.snapshot.SnapshotWriter;
+import org.apache.kafka.snapshot.SnapshotWriterTest;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+final public class KafkaRaftClientSnapshotTest {
+ @Test
+ public void testMissingFetchSnapshotRequest() throws Exception {
+ int localId = 0;
+ int epoch = 2;
+ Set<Integer> voters = Utils.mkSet(localId, localId + 1);
+
+ RaftClientTestContext context =
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch,
+ new OffsetAndEpoch(0, 0),
+ Integer.MAX_VALUE,
+ 0
+ )
+ );
+
+ context.client.poll();
+
+ FetchSnapshotResponseData.PartitionSnapshot response =
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+ assertEquals(Errors.SNAPSHOT_NOT_FOUND,
Errors.forCode(response.errorCode()));
+ }
+
+ @Test
+ public void testUnknownFetchSnapshotRequest() throws Exception {
+ int localId = 0;
+ Set<Integer> voters = Utils.mkSet(localId, localId + 1);
+ int epoch = 2;
+ TopicPartition topicPartition = new TopicPartition("unknown", 0);
+
+ RaftClientTestContext context =
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ topicPartition,
+ epoch,
+ new OffsetAndEpoch(0, 0),
+ Integer.MAX_VALUE,
+ 0
+ )
+ );
+
+ context.client.poll();
+
+ FetchSnapshotResponseData.PartitionSnapshot response =
context.assertSentFetchSnapshotResponse(topicPartition).get();
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.forCode(response.errorCode()));
+ }
+
+ @Test
+ public void testFetchSnapshotRequestAsLeader() throws Exception {
+ int localId = 0;
+ Set<Integer> voters = Utils.mkSet(localId, localId + 1);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(0, 0);
+ List<String> records = Arrays.asList("foo", "bar");
+
+ RaftClientTestContext context =
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+ try (SnapshotWriter<String> snapshot =
context.client.createSnapshot(snapshotId)) {
+ snapshot.append(records);
+ snapshot.freeze();
+ }
+
+ try (RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get()) {
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch,
+ snapshotId,
+ Integer.MAX_VALUE,
+ 0
+ )
+ );
+
+ context.client.poll();
+
+ FetchSnapshotResponseData.PartitionSnapshot response = context
+ .assertSentFetchSnapshotResponse(context.metadataPartition)
+ .get();
+
+ assertEquals(Errors.NONE, Errors.forCode(response.errorCode()));
+ assertEquals(snapshot.sizeInBytes(), response.size());
+ assertEquals(0, response.position());
+ assertEquals(snapshot.sizeInBytes(), response.bytes().remaining());
+
+ ByteBuffer buffer =
ByteBuffer.allocate(Math.toIntExact(snapshot.sizeInBytes()));
+ snapshot.read(buffer, 0);
+ buffer.flip();
+
+ assertEquals(buffer.slice(), response.bytes());
+ }
+ }
+
+ @Test
+ public void testPartialFetchSnapshotRequestAsLeader() throws Exception {
+ int localId = 0;
+ Set<Integer> voters = Utils.mkSet(localId, localId + 1);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(0, 0);
+ List<String> records = Arrays.asList("foo", "bar");
+
+ RaftClientTestContext context =
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+ try (SnapshotWriter<String> snapshot =
context.client.createSnapshot(snapshotId)) {
+ snapshot.append(records);
+ snapshot.freeze();
+ }
+
+ try (RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get()) {
+ // Fetch half of the snapshot
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch,
+ snapshotId,
+ Math.toIntExact(snapshot.sizeInBytes() / 2),
+ 0
+ )
+ );
+
+ context.client.poll();
+
+ FetchSnapshotResponseData.PartitionSnapshot response = context
+ .assertSentFetchSnapshotResponse(context.metadataPartition)
+ .get();
+
+ assertEquals(Errors.NONE, Errors.forCode(response.errorCode()));
+ assertEquals(snapshot.sizeInBytes(), response.size());
+ assertEquals(0, response.position());
+ assertEquals(snapshot.sizeInBytes() / 2,
response.bytes().remaining());
+
+ ByteBuffer snapshotBuffer =
ByteBuffer.allocate(Math.toIntExact(snapshot.sizeInBytes()));
+ snapshot.read(snapshotBuffer, 0);
+ snapshotBuffer.flip();
+
+ ByteBuffer responseBuffer =
ByteBuffer.allocate(Math.toIntExact(snapshot.sizeInBytes()));
+ responseBuffer.put(response.bytes());
+
+ ByteBuffer expectedBytes = snapshotBuffer.duplicate();
+ expectedBytes.limit(Math.toIntExact(snapshot.sizeInBytes() / 2));
+
+ assertEquals(expectedBytes, responseBuffer.duplicate().flip());
+
+ // Fetch the remainder of the snapshot
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch,
+ snapshotId,
+ Integer.MAX_VALUE,
+ responseBuffer.position()
+ )
+ );
+
+ context.client.poll();
+
+ response =
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+ assertEquals(Errors.NONE, Errors.forCode(response.errorCode()));
+ assertEquals(snapshot.sizeInBytes(), response.size());
+ assertEquals(responseBuffer.position(), response.position());
+ assertEquals(snapshot.sizeInBytes() - (snapshot.sizeInBytes() /
2), response.bytes().remaining());
+
+ responseBuffer.put(response.bytes());
+ assertEquals(snapshotBuffer, responseBuffer.flip());
+ }
+ }
+
+ @Test
+ public void testFetchSnapshotRequestAsFollower() throws IOException {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(0, 0);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch,
+ snapshotId,
+ Integer.MAX_VALUE,
+ 0
+ )
+ );
+
+ context.client.poll();
+
+ FetchSnapshotResponseData.PartitionSnapshot response =
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER,
Errors.forCode(response.errorCode()));
+ assertEquals(epoch, response.currentLeader().leaderEpoch());
+ assertEquals(leaderId, response.currentLeader().leaderId());
+ }
+
+ @Test
+ public void testFetchSnapshotRequestWithInvalidPosition() throws Exception
{
+ int localId = 0;
+ Set<Integer> voters = Utils.mkSet(localId, localId + 1);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(0, 0);
+ List<String> records = Arrays.asList("foo", "bar");
+
+ RaftClientTestContext context =
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+ try (SnapshotWriter<String> snapshot =
context.client.createSnapshot(snapshotId)) {
+ snapshot.append(records);
+ snapshot.freeze();
+ }
+
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch,
+ snapshotId,
+ Integer.MAX_VALUE,
+ -1
+ )
+ );
+
+ context.client.poll();
+
+ FetchSnapshotResponseData.PartitionSnapshot response =
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+ assertEquals(Errors.POSITION_OUT_OF_RANGE,
Errors.forCode(response.errorCode()));
+ assertEquals(epoch, response.currentLeader().leaderEpoch());
+ assertEquals(localId, response.currentLeader().leaderId());
+
+ try (RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get()) {
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch,
+ snapshotId,
+ Integer.MAX_VALUE,
+ snapshot.sizeInBytes()
+ )
+ );
+
+ context.client.poll();
+
+ response =
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+ assertEquals(Errors.POSITION_OUT_OF_RANGE,
Errors.forCode(response.errorCode()));
+ assertEquals(epoch, response.currentLeader().leaderEpoch());
+ assertEquals(localId, response.currentLeader().leaderId());
+ }
+ }
+
+ @Test
+ public void testFetchSnapshotRequestWithOlderEpoch() throws Exception {
+ int localId = 0;
+ Set<Integer> voters = Utils.mkSet(localId, localId + 1);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(0, 0);
+
+ RaftClientTestContext context =
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch - 1,
+ snapshotId,
+ Integer.MAX_VALUE,
+ 0
+ )
+ );
+
+ context.client.poll();
+
+ FetchSnapshotResponseData.PartitionSnapshot response =
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+ assertEquals(Errors.FENCED_LEADER_EPOCH,
Errors.forCode(response.errorCode()));
+ assertEquals(epoch, response.currentLeader().leaderEpoch());
+ assertEquals(localId, response.currentLeader().leaderId());
+ }
+
+ @Test
+ public void testFetchSnapshotRequestWithNewerEpoch() throws Exception {
+ int localId = 0;
+ Set<Integer> voters = Utils.mkSet(localId, localId + 1);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(0, 0);
+
+ RaftClientTestContext context =
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
+
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch + 1,
+ snapshotId,
+ Integer.MAX_VALUE,
+ 0
+ )
+ );
+
+ context.client.poll();
+
+ FetchSnapshotResponseData.PartitionSnapshot response =
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+ assertEquals(Errors.UNKNOWN_LEADER_EPOCH,
Errors.forCode(response.errorCode()));
+ assertEquals(epoch, response.currentLeader().leaderEpoch());
+ assertEquals(localId, response.currentLeader().leaderId());
+ }
+
+ @Test
+ public void testFetchResponseWithInvalidSnapshotId() throws Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch invalidEpoch = new OffsetAndEpoch(100L, -1);
+ OffsetAndEpoch invalidEndOffset = new OffsetAndEpoch(-1L, 1);
+ int slept = 0;
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch, leaderId,
invalidEpoch, 200L)
+ );
+
+ // Handle the invalid response
+ context.client.poll();
+
+ // Expect another fetch request after backoff has expired
+ context.time.sleep(context.retryBackoffMs);
+ slept += context.retryBackoffMs;
+
+ context.pollUntilRequest();
+ fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch, leaderId,
invalidEndOffset, 200L)
+ );
+
+ // Handle the invalid response
+ context.client.poll();
+
+ // Expect another fetch request after backoff has expired
+ context.time.sleep(context.retryBackoffMs);
+ slept += context.retryBackoffMs;
+
+ context.pollUntilRequest();
+ fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ // Fetch timer is not reset; sleeping for remainder should transition
to candidate
+ context.time.sleep(context.fetchTimeoutMs - slept);
+
+ context.pollUntilRequest();
+
+ context.assertSentVoteRequest(epoch + 1, 0, 0L, 1);
+ context.assertVotedCandidate(epoch + 1, context.localId);
+ }
+
+ @Test
+ public void testFetchResponseWithSnapshotId() throws Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch, leaderId,
snapshotId, 200L)
+ );
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound snapshotRequest =
context.assertSentFetchSnapshotRequest();
+ FetchSnapshotRequestData.PartitionSnapshot request =
assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(0, request.position());
+
+ List<String> records = Arrays.asList("foo", "bar");
+ MemorySnapshotWriter memorySnapshot = new
MemorySnapshotWriter(snapshotId);
+ try (SnapshotWriter<String> snapshotWriter = snapshotWriter(context,
memorySnapshot)) {
+ snapshotWriter.append(records);
+ snapshotWriter.freeze();
+ }
+
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ fetchSnapshotResponse(
+ context.metadataPartition,
+ epoch,
+ leaderId,
+ snapshotId,
+ memorySnapshot.buffer().remaining(),
+ 0L,
+ memorySnapshot.buffer().slice()
+ )
+ );
+
+ context.pollUntilRequest();
+
+ try (RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get()) {
+ assertEquals(memorySnapshot.buffer().remaining(),
snapshot.sizeInBytes());
+ SnapshotWriterTest.assertSnapshot(Arrays.asList(records),
snapshot);
+ }
+ }
+
+ @Test
+ public void testFetchSnapshotResponsePartialData() throws Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch, leaderId,
snapshotId, 200L)
+ );
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound snapshotRequest =
context.assertSentFetchSnapshotRequest();
+ FetchSnapshotRequestData.PartitionSnapshot request =
assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(0, request.position());
+
+ List<String> records = Arrays.asList("foo", "bar");
+ MemorySnapshotWriter memorySnapshot = new
MemorySnapshotWriter(snapshotId);
+ try (SnapshotWriter<String> snapshotWriter = snapshotWriter(context,
memorySnapshot)) {
+ snapshotWriter.append(records);
+ snapshotWriter.freeze();
+ }
+
+ ByteBuffer sendingBuffer = memorySnapshot.buffer().slice();
+ sendingBuffer.limit(sendingBuffer.limit() / 2);
+
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ fetchSnapshotResponse(
+ context.metadataPartition,
+ epoch,
+ leaderId,
+ snapshotId,
+ memorySnapshot.buffer().remaining(),
+ 0L,
+ sendingBuffer
+ )
+ );
+
+ context.pollUntilRequest();
+ snapshotRequest = context.assertSentFetchSnapshotRequest();
+ request = assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(sendingBuffer.limit(), request.position());
+
+ sendingBuffer = memorySnapshot.buffer().slice();
+ sendingBuffer.position(Math.toIntExact(request.position()));
+
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ fetchSnapshotResponse(
+ context.metadataPartition,
+ epoch,
+ leaderId,
+ snapshotId,
+ memorySnapshot.buffer().remaining(),
+ request.position(),
+ sendingBuffer
+ )
+ );
+
+ context.pollUntilRequest();
+
+ try (RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get()) {
+ assertEquals(memorySnapshot.buffer().remaining(),
snapshot.sizeInBytes());
+ SnapshotWriterTest.assertSnapshot(Arrays.asList(records),
snapshot);
+ }
+ }
+
+ @Test
+ public void testFetchSnapshotResponseMissingSnapshot() throws Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch, leaderId,
snapshotId, 200L)
+ );
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound snapshotRequest =
context.assertSentFetchSnapshotRequest();
+ FetchSnapshotRequestData.PartitionSnapshot request =
assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(0, request.position());
+
+ // Reply with a snapshot not found error
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ FetchSnapshotResponse.singleton(
+ context.metadataPartition,
+ responsePartitionSnapshot -> {
+ responsePartitionSnapshot
+ .currentLeader()
+ .setLeaderEpoch(epoch)
+ .setLeaderId(leaderId);
+
+ return responsePartitionSnapshot
+ .setErrorCode(Errors.SNAPSHOT_NOT_FOUND.code());
+ }
+ )
+ );
+
+ context.pollUntilRequest();
+ fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ }
+
+ @Test
+ public void testFetchSnapshotResponseFromNewerEpochNotLeader() throws
Exception {
+ int localId = 0;
+ int firstLeaderId = localId + 1;
+ int secondLeaderId = firstLeaderId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, firstLeaderId,
secondLeaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, firstLeaderId)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch,
firstLeaderId, snapshotId, 200L)
+ );
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound snapshotRequest =
context.assertSentFetchSnapshotRequest();
+ FetchSnapshotRequestData.PartitionSnapshot request =
assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(0, request.position());
+
+ // Reply with new leader response
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ FetchSnapshotResponse.singleton(
+ context.metadataPartition,
+ responsePartitionSnapshot -> {
+ responsePartitionSnapshot
+ .currentLeader()
+ .setLeaderEpoch(epoch + 1)
+ .setLeaderId(secondLeaderId);
+
+ return responsePartitionSnapshot
+ .setErrorCode(Errors.FENCED_LEADER_EPOCH.code());
+ }
+ )
+ );
+
+ context.pollUntilRequest();
+ fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
+ }
+
+ @Test
+ public void testFetchSnapshotResponseFromNewerEpochLeader() throws
Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch, leaderId,
snapshotId, 200L)
+ );
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound snapshotRequest =
context.assertSentFetchSnapshotRequest();
+ FetchSnapshotRequestData.PartitionSnapshot request =
assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(0, request.position());
+
+ // Reply with new leader epoch
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ FetchSnapshotResponse.singleton(
+ context.metadataPartition,
+ responsePartitionSnapshot -> {
+ responsePartitionSnapshot
+ .currentLeader()
+ .setLeaderEpoch(epoch + 1)
+ .setLeaderId(leaderId);
+
+ return responsePartitionSnapshot
+ .setErrorCode(Errors.FENCED_LEADER_EPOCH.code());
+ }
+ )
+ );
+
+ context.pollUntilRequest();
+ fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
+ }
+
+ @Test
+ public void testFetchSnapshotResponseFromOlderEpoch() throws Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch, leaderId,
snapshotId, 200L)
+ );
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound snapshotRequest =
context.assertSentFetchSnapshotRequest();
+ FetchSnapshotRequestData.PartitionSnapshot request =
assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(0, request.position());
+
+ // Reply with unknown leader epoch
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ FetchSnapshotResponse.singleton(
+ context.metadataPartition,
+ responsePartitionSnapshot -> {
+ responsePartitionSnapshot
+ .currentLeader()
+ .setLeaderEpoch(epoch - 1)
+ .setLeaderId(leaderId + 1);
+
+ return responsePartitionSnapshot
+ .setErrorCode(Errors.UNKNOWN_LEADER_EPOCH.code());
+ }
+ )
+ );
+
+ context.pollUntilRequest();
+
+ // Follower should resend the fetch snapshot request
+ snapshotRequest = context.assertSentFetchSnapshotRequest();
+ request = assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(0, request.position());
+ }
+
+ @Test
+ public void testFetchSnapshotResponseWithInvalidId() throws Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch, leaderId,
snapshotId, 200L)
+ );
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound snapshotRequest =
context.assertSentFetchSnapshotRequest();
+ FetchSnapshotRequestData.PartitionSnapshot request =
assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(0, request.position());
+
+ // Reply with an invalid snapshot id endOffset
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ FetchSnapshotResponse.singleton(
+ context.metadataPartition,
+ responsePartitionSnapshot -> {
+ responsePartitionSnapshot
+ .currentLeader()
+ .setLeaderEpoch(epoch)
+ .setLeaderId(leaderId);
+
+ responsePartitionSnapshot
+ .snapshotId()
+ .setEndOffset(-1)
+ .setEpoch(snapshotId.epoch);
+
+ return responsePartitionSnapshot;
+ }
+ )
+ );
+
+ context.pollUntilRequest();
+
+ // Follower should send a fetch request
+ fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch, leaderId,
snapshotId, 200L)
+ );
+
+ context.pollUntilRequest();
+
+ snapshotRequest = context.assertSentFetchSnapshotRequest();
+ request = assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(0, request.position());
+
+ // Reply with an invalid snapshot id epoch
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ FetchSnapshotResponse.singleton(
+ context.metadataPartition,
+ responsePartitionSnapshot -> {
+ responsePartitionSnapshot
+ .currentLeader()
+ .setLeaderEpoch(epoch)
+ .setLeaderId(leaderId);
+
+ responsePartitionSnapshot
+ .snapshotId()
+ .setEndOffset(snapshotId.offset)
+ .setEpoch(-1);
+
+ return responsePartitionSnapshot;
+ }
+ )
+ );
+
+ context.pollUntilRequest();
+
+ // Follower should send a fetch request
+ fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+ }
+
+ @Test
+ public void testFetchSnapshotResponseToNotFollower() throws Exception {
+ int localId = 0;
+ int leaderId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, leaderId);
+ int epoch = 2;
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withElectedLeader(epoch, leaderId)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId,
+ fetchRequest.destinationId(),
+ snapshotFetchResponse(context.metadataPartition, epoch, leaderId,
snapshotId, 200L)
+ );
+
+ context.pollUntilRequest();
+
+ RaftRequest.Outbound snapshotRequest =
context.assertSentFetchSnapshotRequest();
+ FetchSnapshotRequestData.PartitionSnapshot request =
assertFetchSnapshotRequest(
+ snapshotRequest,
+ context.metadataPartition,
+ localId,
+ Integer.MAX_VALUE
+ ).get();
+ assertEquals(snapshotId.offset, request.snapshotId().endOffset());
+ assertEquals(snapshotId.epoch, request.snapshotId().epoch());
+ assertEquals(0, request.position());
+
+ // Sleeping for fetch timeout should transition to candidate
+ context.time.sleep(context.fetchTimeoutMs);
+
+ context.pollUntilRequest();
+
+ context.assertSentVoteRequest(epoch + 1, 0, 0L, 1);
+ context.assertVotedCandidate(epoch + 1, context.localId);
+
+ // Send the response late
+ context.deliverResponse(
+ snapshotRequest.correlationId,
+ snapshotRequest.destinationId(),
+ FetchSnapshotResponse.singleton(
+ context.metadataPartition,
+ responsePartitionSnapshot -> {
+ responsePartitionSnapshot
+ .currentLeader()
+ .setLeaderEpoch(epoch)
+ .setLeaderId(leaderId);
+
+ responsePartitionSnapshot
+ .snapshotId()
+ .setEndOffset(snapshotId.offset)
+ .setEpoch(-1);
+
+ return responsePartitionSnapshot;
+ }
+ )
+ );
+
+ // Assert that the response is ignored and the replicas stays as a
candidate
+ context.client.poll();
+ context.assertVotedCandidate(epoch + 1, context.localId);
+ }
+
+ private static FetchSnapshotRequestData fetchSnapshotRequest(
+ TopicPartition topicPartition,
+ int epoch,
+ OffsetAndEpoch offsetAndEpoch,
+ int maxBytes,
+ long position
+ ) {
+ FetchSnapshotRequestData.SnapshotId snapshotId = new
FetchSnapshotRequestData.SnapshotId()
+ .setEndOffset(offsetAndEpoch.offset)
+ .setEpoch(offsetAndEpoch.epoch);
+
+ FetchSnapshotRequestData request = FetchSnapshotRequest.singleton(
+ topicPartition,
+ snapshotPartition -> {
+ return snapshotPartition
+ .setCurrentLeaderEpoch(epoch)
+ .setSnapshotId(snapshotId)
+ .setPosition(position);
+ }
+ );
+
+ return request.setMaxBytes(maxBytes);
+ }
+
+ private static FetchSnapshotResponseData fetchSnapshotResponse(
+ TopicPartition topicPartition,
+ int leaderEpoch,
+ int leaderId,
+ OffsetAndEpoch snapshotId,
+ long size,
+ long position,
+ ByteBuffer buffer
+ ) {
+ return FetchSnapshotResponse.singleton(
+ topicPartition,
+ partitionSnapshot -> {
+ partitionSnapshot.currentLeader()
+ .setLeaderEpoch(leaderEpoch)
+ .setLeaderId(leaderId);
+
+ partitionSnapshot.snapshotId()
+ .setEndOffset(snapshotId.offset)
+ .setEpoch(snapshotId.epoch);
+
+ return partitionSnapshot
+ .setSize(size)
+ .setPosition(position)
+ .setBytes(buffer);
+ }
+ );
+ }
+
+ private static FetchResponseData snapshotFetchResponse(
+ TopicPartition topicPartition,
+ int epoch,
+ int leaderId,
+ OffsetAndEpoch snapshotId,
+ long highWatermark
+ ) {
+ return RaftUtil.singletonFetchResponse(topicPartition, Errors.NONE,
partitionData -> {
+ partitionData
+ .setErrorCode(Errors.NONE.code())
+ .setHighWatermark(highWatermark);
+
+ partitionData.currentLeader()
+ .setLeaderEpoch(epoch)
+ .setLeaderId(leaderId);
+
+ partitionData.snapshotId()
+ .setEpoch(snapshotId.epoch)
+ .setEndOffset(snapshotId.offset);
+ });
+ }
+
+ private static Optional<FetchSnapshotRequestData.PartitionSnapshot>
assertFetchSnapshotRequest(
+ RaftRequest.Outbound request,
+ TopicPartition topicPartition,
+ int replicaId,
+ int maxBytes
+ ) {
+ assertTrue(request.data() instanceof FetchSnapshotRequestData);
+
+ FetchSnapshotRequestData data = (FetchSnapshotRequestData)
request.data();
+
+ assertEquals(replicaId, data.replicaId());
+ assertEquals(maxBytes, data.maxBytes());
+
+ return FetchSnapshotRequest.forTopicPartition(data, topicPartition);
+ }
+
+ private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext
context, RawSnapshotWriter snapshot) {
+ return new SnapshotWriter<>(
+ snapshot,
+ 4 * 1024,
+ MemoryPool.NONE,
+ context.time,
+ CompressionType.NONE,
+ new StringSerde()
+ );
+ }
+
+ private final static class MemorySnapshotWriter implements
RawSnapshotWriter {
+ private final OffsetAndEpoch snapshotId;
+ private ByteBuffer data;
+ private boolean frozen;
+
+ public MemorySnapshotWriter(OffsetAndEpoch snapshotId) {
+ this.snapshotId = snapshotId;
+ this.data = ByteBuffer.allocate(0);
+ this.frozen = false;
+ }
+
+ @Override
+ public OffsetAndEpoch snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public long sizeInBytes() {
+ if (frozen) {
+ throw new RuntimeException("Snapshot is already frozen " +
snapshotId);
+ }
+
+ return data.position();
+ }
+
+ @Override
+ public void append(ByteBuffer buffer) {
+ if (frozen) {
+ throw new RuntimeException("Snapshot is already frozen " +
snapshotId);
+ }
+
+ if (!(data.remaining() >= buffer.remaining())) {
+ ByteBuffer old = data;
+ old.flip();
+
+ int newSize = Math.max(data.capacity() * 2, data.capacity() +
buffer.remaining());
+ data = ByteBuffer.allocate(newSize);
+
+ data.put(old);
+ }
+ data.put(buffer);
+ }
+
+ @Override
+ public boolean isFrozen() {
+ return frozen;
+ }
+
+ @Override
+ public void freeze() {
+ if (frozen) {
+ throw new RuntimeException("Snapshot is already frozen " +
snapshotId);
+ }
+
+ frozen = true;
+ data.flip();
+ }
+
+ @Override
+ public void close() {}
+
+ public ByteBuffer buffer() {
+ return data;
+ }
+ }
+}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 60f4b98..86a41e3 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -1792,7 +1792,7 @@ public class KafkaRaftClientTest {
int correlationId = context.assertSentFetchRequest(epoch, 3L,
lastEpoch);
- FetchResponseData response =
context.outOfRangeFetchRecordsResponse(epoch, otherNodeId, 2L,
+ FetchResponseData response = context.divergingFetchResponse(epoch,
otherNodeId, 2L,
lastEpoch, 1L);
context.deliverResponse(correlationId, otherNodeId, response);
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index f0be8dd..f338ed5 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -514,6 +514,10 @@ public class MockLog implements ReplicatedLog {
@Override
public long sizeInBytes() {
+ if (frozen) {
+ throw new RuntimeException("Snapshot is already frozen " +
snapshotId);
+ }
+
return data.position();
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 2b69ac8..fc342c1 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -20,14 +20,15 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
-import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
-import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
+import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.Metrics;
@@ -45,6 +46,7 @@ import
org.apache.kafka.common.requests.BeginQuorumEpochResponse;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.LogContext;
@@ -85,12 +87,13 @@ public final class RaftClientTestContext {
final TopicPartition metadataPartition = Builder.METADATA_PARTITION;
final int electionBackoffMaxMs = Builder.ELECTION_BACKOFF_MAX_MS;
- final int electionTimeoutMs = Builder.DEFAULT_ELECTION_TIMEOUT_MS;
final int electionFetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS;
final int fetchTimeoutMs = Builder.FETCH_TIMEOUT_MS;
- final int requestTimeoutMs = Builder.DEFAULT_REQUEST_TIMEOUT_MS;
final int retryBackoffMs = Builder.RETRY_BACKOFF_MS;
+ final int electionTimeoutMs;
+ final int requestTimeoutMs;
+
private final QuorumStateStore quorumStateStore;
private final QuorumState quorum;
final int localId;
@@ -230,7 +233,9 @@ public final class RaftClientTestContext {
quorum,
voters,
metrics,
- listener
+ listener,
+ electionTimeoutMs,
+ requestTimeoutMs
);
}
}
@@ -246,7 +251,9 @@ public final class RaftClientTestContext {
QuorumState quorum,
Set<Integer> voters,
Metrics metrics,
- MockListener listener
+ MockListener listener,
+ int electionTimeoutMs,
+ int requestTimeoutMs
) {
this.localId = localId;
this.client = client;
@@ -259,6 +266,9 @@ public final class RaftClientTestContext {
this.voters = voters;
this.metrics = metrics;
this.listener = listener;
+
+ this.electionTimeoutMs = electionTimeoutMs;
+ this.requestTimeoutMs = requestTimeoutMs;
}
MemoryRecords buildBatch(
@@ -364,7 +374,7 @@ public final class RaftClientTestContext {
TestUtils.waitForCondition(() -> {
client.poll();
return condition.conditionMet();
- }, 500000000, "Condition failed to be satisfied before timeout");
+ }, 5000, "Condition failed to be satisfied before timeout");
}
void pollUntilResponse() throws InterruptedException {
@@ -595,6 +605,26 @@ public final class RaftClientTestContext {
return (MemoryRecords) partitionResponse.recordSet();
}
+ RaftRequest.Outbound assertSentFetchSnapshotRequest() {
+ List<RaftRequest.Outbound> sentRequests =
channel.drainSentRequests(Optional.of(ApiKeys.FETCH_SNAPSHOT));
+ assertEquals(1, sentRequests.size());
+
+ return sentRequests.get(0);
+ }
+
+ Optional<FetchSnapshotResponseData.PartitionSnapshot>
assertSentFetchSnapshotResponse(TopicPartition topicPartition) {
+ List<RaftResponse.Outbound> sentMessages =
drainSentResponses(ApiKeys.FETCH_SNAPSHOT);
+ assertEquals(1, sentMessages.size());
+
+ RaftMessage message = sentMessages.get(0);
+ assertTrue(message.data() instanceof FetchSnapshotResponseData);
+
+ FetchSnapshotResponseData response = (FetchSnapshotResponseData)
message.data();
+ assertEquals(Errors.NONE, Errors.forCode(response.errorCode()));
+
+ return FetchSnapshotResponse.forTopicPartition(response,
topicPartition);
+ }
+
void buildFollowerSet(
int epoch,
int closeFollower,
@@ -846,7 +876,7 @@ public final class RaftClientTestContext {
});
}
- FetchResponseData outOfRangeFetchRecordsResponse(
+ FetchResponseData divergingFetchResponse(
int epoch,
int leaderId,
long divergingEpochEndOffset,
diff --git
a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
index 3957640..35652c7 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterTest.java
@@ -100,7 +100,7 @@ final public class SnapshotWriterTest {
return result;
}
- private void assertSnapshot(List<List<String>> batches, RawSnapshotReader
reader) {
+ public static void assertSnapshot(List<List<String>> batches,
RawSnapshotReader reader) {
List<String> expected = new ArrayList<>();
batches.forEach(expected::addAll);