This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 44e613c4cd1 KAFKA-13884; Only voters flush on Fetch response (#13396)
44e613c4cd1 is described below
commit 44e613c4cd1a2b7303da7dd30d47bbe87090131f
Author: José Armando García Sancio <[email protected]>
AuthorDate: Wed Mar 15 12:06:41 2023 -0700
KAFKA-13884; Only voters flush on Fetch response (#13396)
The leader only requires that voters have flushed their log up to the fetch
offset before sending a fetch request.
This change only flushes the log when handling the fetch response, if the
follower is a voter. This should improve the disk performance on observers
(brokers).
Reviewers: Jason Gustafson <[email protected]>
---
core/src/main/scala/kafka/log/UnifiedLog.scala | 4 +--
.../main/scala/kafka/raft/KafkaMetadataLog.scala | 4 ---
.../org/apache/kafka/raft/KafkaRaftClient.java | 6 +++-
.../java/org/apache/kafka/raft/ReplicatedLog.java | 5 ---
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 7 ++--
.../test/java/org/apache/kafka/raft/MockLog.java | 25 +++++++++++----
.../apache/kafka/raft/RaftClientTestContext.java | 37 ++++++++++++++++++----
7 files changed, 60 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 4463c2f2096..dbfab74d3d0 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -670,7 +670,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
interBrokerProtocolVersion = MetadataVersion.latest,
validateAndAssignOffsets = false,
leaderEpoch = -1,
- None,
+ requestLocal = None,
// disable to check the validation of record size since the record is
already accepted by leader.
ignoreRecordSize = true)
}
@@ -686,7 +686,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
* @param interBrokerProtocolVersion Inter-broker message protocol version
* @param validateAndAssignOffsets Should the log assign offsets to this
message set or blindly apply what it is given
* @param leaderEpoch The partition's leader epoch which will be applied to
messages when offsets are assigned on the leader
- * @param requestLocal The request local instance if assignOffsets is true
+ * @param requestLocal The request local instance if
validateAndAssignOffsets is true
* @param ignoreRecordSize true to skip validation of record size.
* @throws KafkaStorageException If the append fails due to an I/O error.
* @throws OffsetsOutOfOrderException If out of order offsets found in
'records'
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index f505608c751..277b3e1c900 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -216,10 +216,6 @@ final class KafkaMetadataLog private (
log.flush(forceFlushActiveSegment)
}
- override def lastFlushedOffset(): Long = {
- log.recoveryPoint
- }
-
/**
* Return the topic partition associated with the log.
*/
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 737b6326171..393909b390c 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1141,7 +1141,11 @@ public class KafkaRaftClient<T> implements RaftClient<T>
{
Records records
) {
LogAppendInfo info = log.appendAsFollower(records);
- log.flush(false);
+ if (quorum.isVoter()) {
+ // the leader only requires that voters have flushed their log
before sending
+ // a Fetch request
+ log.flush(false);
+ }
OffsetAndEpoch endOffset = endOffset();
kafkaRaftMetrics.updateFetchedRecords(info.lastOffset -
info.firstOffset + 1);
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
index 7e1bd02c1b5..a8bc65af3a8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
@@ -194,11 +194,6 @@ public interface ReplicatedLog extends AutoCloseable {
*/
boolean maybeClean();
- /**
- * Get the last offset which has been flushed to disk.
- */
- long lastFlushedOffset();
-
/**
* Return the topic partition associated with the log.
*/
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 678648505bf..b843b8a09b8 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -616,7 +616,7 @@ public class KafkaRaftClientTest {
// Leader change record appended
assertEquals(1L, context.log.endOffset().offset);
- assertEquals(1L, context.log.lastFlushedOffset());
+ assertEquals(1L, context.log.firstUnflushedOffset());
// Send BeginQuorumEpoch to voters
context.client.poll();
@@ -656,7 +656,7 @@ public class KafkaRaftClientTest {
// Leader change record appended
assertEquals(1L, context.log.endOffset().offset);
- assertEquals(1L, context.log.lastFlushedOffset());
+ assertEquals(1L, context.log.firstUnflushedOffset());
// Send BeginQuorumEpoch to voters
context.client.poll();
@@ -2156,7 +2156,7 @@ public class KafkaRaftClientTest {
context.client.poll();
assertEquals(2L, context.log.endOffset().offset);
- assertEquals(2L, context.log.lastFlushedOffset());
+ assertEquals(2L, context.log.firstUnflushedOffset());
}
@Test
@@ -2339,6 +2339,7 @@ public class KafkaRaftClientTest {
// Poll again to complete truncation
context.client.poll();
assertEquals(2L, context.log.endOffset().offset);
+ assertEquals(2L, context.log.firstUnflushedOffset());
// Now we should be fetching
context.client.poll();
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 85aaa598183..aa8c194c07a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -65,7 +65,8 @@ public class MockLog implements ReplicatedLog {
private long nextId = ID_GENERATOR.getAndIncrement();
private LogOffsetMetadata highWatermark = new LogOffsetMetadata(0,
Optional.empty());
- private long lastFlushedOffset = 0;
+ private long firstUnflushedOffset = 0;
+ private boolean flushedSinceLastChecked = false;
public MockLog(
TopicPartition topicPartition,
@@ -86,6 +87,7 @@ public class MockLog implements ReplicatedLog {
batches.removeIf(entry -> entry.lastOffset() >= offset);
epochStartOffsets.removeIf(epochStartOffset ->
epochStartOffset.startOffset >= offset);
+ firstUnflushedOffset = Math.min(firstUnflushedOffset,
endOffset().offset);
}
@Override
@@ -329,7 +331,8 @@ public class MockLog implements ReplicatedLog {
@Override
public void flush(boolean forceFlushActiveSegment) {
- lastFlushedOffset = endOffset().offset;
+ flushedSinceLastChecked = true;
+ firstUnflushedOffset = endOffset().offset;
}
@Override
@@ -337,17 +340,25 @@ public class MockLog implements ReplicatedLog {
return false;
}
- @Override
- public long lastFlushedOffset() {
- return lastFlushedOffset;
+ public long firstUnflushedOffset() {
+ return firstUnflushedOffset;
+ }
+
+ /**
+ * Returns true if the log was flushed since the last time this method was
called.
+ */
+ public boolean flushedSinceLastChecked() {
+ boolean oldValue = flushedSinceLastChecked;
+ flushedSinceLastChecked = false;
+ return oldValue;
}
/**
* Reopening the log causes all unflushed data to be lost.
*/
public void reopen() {
- batches.removeIf(batch -> batch.firstOffset() >= lastFlushedOffset);
- epochStartOffsets.removeIf(epochStartOffset ->
epochStartOffset.startOffset >= lastFlushedOffset);
+ batches.removeIf(batch -> batch.firstOffset() >= firstUnflushedOffset);
+ epochStartOffsets.removeIf(epochStartOffset ->
epochStartOffset.startOffset >= firstUnflushedOffset);
highWatermark = new LogOffsetMetadata(0L, Optional.empty());
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index c3fac2ebe8f..b79eddf0029 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -81,6 +81,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public final class RaftClientTestContext {
@@ -185,6 +186,14 @@ public final class RaftClientTestContext {
records
);
log.appendAsLeader(batch, epoch);
+ // Need to flush the log to update the last flushed offset. This
is always correct
+ // because append operation was done in the Builder which
represent the state of the
+ // log before the replica starts.
+ log.flush(false);
+
+ // Reset the value of this method since "flush" before the replica
start should not
+ // count when checking for flushes by the KRaft client.
+ log.flushedSinceLastChecked();
return this;
}
@@ -654,10 +663,9 @@ public final class RaftClientTestContext {
List<RaftRequest.Outbound> sentMessages = channel.drainSendQueue();
assertEquals(1, sentMessages.size());
- // TODO: Use more specific type
- RaftMessage raftMessage = sentMessages.get(0);
- assertFetchRequestData(raftMessage, epoch, fetchOffset,
lastFetchedEpoch);
- return raftMessage.correlationId();
+ RaftRequest.Outbound raftRequest = sentMessages.get(0);
+ assertFetchRequestData(raftRequest, epoch, fetchOffset,
lastFetchedEpoch);
+ return raftRequest.correlationId();
}
FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
@@ -955,13 +963,15 @@ public final class RaftClientTestContext {
}
void assertFetchRequestData(
- RaftMessage message,
+ RaftRequest.Outbound message,
int epoch,
long fetchOffset,
int lastFetchedEpoch
) {
assertTrue(
- message.data() instanceof FetchRequestData, "Unexpected request
type " + message.data());
+ message.data() instanceof FetchRequestData,
+ "unexpected request type " + message.data()
+ );
FetchRequestData request = (FetchRequestData) message.data();
assertEquals(KafkaRaftClient.MAX_FETCH_SIZE_BYTES, request.maxBytes());
assertEquals(fetchMaxWaitMs, request.maxWaitMs());
@@ -975,6 +985,21 @@ public final class RaftClientTestContext {
assertEquals(fetchOffset, fetchPartition.fetchOffset());
assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
assertEquals(localId.orElse(-1), request.replicaId());
+
+ // Assert that voters have flushed up to the fetch offset
+ if (localId.isPresent() && voters.contains(localId.getAsInt())) {
+ assertEquals(
+ log.firstUnflushedOffset(),
+ fetchOffset,
+ String.format(
+ "expected voters have the fetch offset (%s) be the same as
the unflushed offset (%s)",
+ log.firstUnflushedOffset(),
+ fetchOffset
+ )
+ );
+ } else {
+ assertFalse(log.flushedSinceLastChecked(), "KRaft client should
not explicitly flush when it is an observer");
+ }
}
FetchRequestData fetchRequest(