This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 44e613c4cd1 KAFKA-13884; Only voters flush on Fetch response (#13396)
44e613c4cd1 is described below

commit 44e613c4cd1a2b7303da7dd30d47bbe87090131f
Author: José Armando García Sancio <[email protected]>
AuthorDate: Wed Mar 15 12:06:41 2023 -0700

    KAFKA-13884; Only voters flush on Fetch response (#13396)
    
    The leader only requires that voters have flushed their log up to the fetch 
offset before sending a fetch request.
    
    This change only flushes the log when handling the fetch response, if the 
follower is a voter. This should improve the disk performance on observers 
(brokers).
    
    Reviewers: Jason Gustafson <[email protected]>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  4 +--
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |  4 ---
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  6 +++-
 .../java/org/apache/kafka/raft/ReplicatedLog.java  |  5 ---
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  7 ++--
 .../test/java/org/apache/kafka/raft/MockLog.java   | 25 +++++++++++----
 .../apache/kafka/raft/RaftClientTestContext.java   | 37 ++++++++++++++++++----
 7 files changed, 60 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 4463c2f2096..dbfab74d3d0 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -670,7 +670,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       interBrokerProtocolVersion = MetadataVersion.latest,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
-      None,
+      requestLocal = None,
       // disable to check the validation of record size since the record is 
already accepted by leader.
       ignoreRecordSize = true)
   }
@@ -686,7 +686,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    * @param interBrokerProtocolVersion Inter-broker message protocol version
    * @param validateAndAssignOffsets Should the log assign offsets to this 
message set or blindly apply what it is given
    * @param leaderEpoch The partition's leader epoch which will be applied to 
messages when offsets are assigned on the leader
-   * @param requestLocal The request local instance if assignOffsets is true
+   * @param requestLocal The request local instance if 
validateAndAssignOffsets is true
    * @param ignoreRecordSize true to skip validation of record size.
    * @throws KafkaStorageException If the append fails due to an I/O error.
    * @throws OffsetsOutOfOrderException If out of order offsets found in 
'records'
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala 
b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index f505608c751..277b3e1c900 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -216,10 +216,6 @@ final class KafkaMetadataLog private (
     log.flush(forceFlushActiveSegment)
   }
 
-  override def lastFlushedOffset(): Long = {
-    log.recoveryPoint
-  }
-
   /**
    * Return the topic partition associated with the log.
    */
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 737b6326171..393909b390c 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1141,7 +1141,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> 
{
         Records records
     ) {
         LogAppendInfo info = log.appendAsFollower(records);
-        log.flush(false);
+        if (quorum.isVoter()) {
+            // the leader only requires that voters have flushed their log 
before sending
+            // a Fetch request
+            log.flush(false);
+        }
 
         OffsetAndEpoch endOffset = endOffset();
         kafkaRaftMetrics.updateFetchedRecords(info.lastOffset - 
info.firstOffset + 1);
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java 
b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
index 7e1bd02c1b5..a8bc65af3a8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
@@ -194,11 +194,6 @@ public interface ReplicatedLog extends AutoCloseable {
      */
     boolean maybeClean();
 
-    /**
-     * Get the last offset which has been flushed to disk.
-     */
-    long lastFlushedOffset();
-
     /**
      * Return the topic partition associated with the log.
      */
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 678648505bf..b843b8a09b8 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -616,7 +616,7 @@ public class KafkaRaftClientTest {
 
         // Leader change record appended
         assertEquals(1L, context.log.endOffset().offset);
-        assertEquals(1L, context.log.lastFlushedOffset());
+        assertEquals(1L, context.log.firstUnflushedOffset());
 
         // Send BeginQuorumEpoch to voters
         context.client.poll();
@@ -656,7 +656,7 @@ public class KafkaRaftClientTest {
 
         // Leader change record appended
         assertEquals(1L, context.log.endOffset().offset);
-        assertEquals(1L, context.log.lastFlushedOffset());
+        assertEquals(1L, context.log.firstUnflushedOffset());
 
         // Send BeginQuorumEpoch to voters
         context.client.poll();
@@ -2156,7 +2156,7 @@ public class KafkaRaftClientTest {
 
         context.client.poll();
         assertEquals(2L, context.log.endOffset().offset);
-        assertEquals(2L, context.log.lastFlushedOffset());
+        assertEquals(2L, context.log.firstUnflushedOffset());
     }
 
     @Test
@@ -2339,6 +2339,7 @@ public class KafkaRaftClientTest {
         // Poll again to complete truncation
         context.client.poll();
         assertEquals(2L, context.log.endOffset().offset);
+        assertEquals(2L, context.log.firstUnflushedOffset());
 
         // Now we should be fetching
         context.client.poll();
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 85aaa598183..aa8c194c07a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -65,7 +65,8 @@ public class MockLog implements ReplicatedLog {
 
     private long nextId = ID_GENERATOR.getAndIncrement();
     private LogOffsetMetadata highWatermark = new LogOffsetMetadata(0, 
Optional.empty());
-    private long lastFlushedOffset = 0;
+    private long firstUnflushedOffset = 0;
+    private boolean flushedSinceLastChecked = false;
 
     public MockLog(
         TopicPartition topicPartition,
@@ -86,6 +87,7 @@ public class MockLog implements ReplicatedLog {
 
         batches.removeIf(entry -> entry.lastOffset() >= offset);
         epochStartOffsets.removeIf(epochStartOffset -> 
epochStartOffset.startOffset >= offset);
+        firstUnflushedOffset = Math.min(firstUnflushedOffset, 
endOffset().offset);
     }
 
     @Override
@@ -329,7 +331,8 @@ public class MockLog implements ReplicatedLog {
 
     @Override
     public void flush(boolean forceFlushActiveSegment) {
-        lastFlushedOffset = endOffset().offset;
+        flushedSinceLastChecked = true;
+        firstUnflushedOffset = endOffset().offset;
     }
 
     @Override
@@ -337,17 +340,25 @@ public class MockLog implements ReplicatedLog {
         return false;
     }
 
-    @Override
-    public long lastFlushedOffset() {
-        return lastFlushedOffset;
+    public long firstUnflushedOffset() {
+        return firstUnflushedOffset;
+    }
+
+    /**
+     * Returns true if the log was flushed since the last time this method was 
called.
+     */
+    public boolean flushedSinceLastChecked() {
+        boolean oldValue = flushedSinceLastChecked;
+        flushedSinceLastChecked = false;
+        return oldValue;
     }
 
     /**
      * Reopening the log causes all unflushed data to be lost.
      */
     public void reopen() {
-        batches.removeIf(batch -> batch.firstOffset() >= lastFlushedOffset);
-        epochStartOffsets.removeIf(epochStartOffset -> 
epochStartOffset.startOffset >= lastFlushedOffset);
+        batches.removeIf(batch -> batch.firstOffset() >= firstUnflushedOffset);
+        epochStartOffsets.removeIf(epochStartOffset -> 
epochStartOffset.startOffset >= firstUnflushedOffset);
         highWatermark = new LogOffsetMetadata(0L, Optional.empty());
     }
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index c3fac2ebe8f..b79eddf0029 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -81,6 +81,7 @@ import java.util.stream.Collectors;
 import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public final class RaftClientTestContext {
@@ -185,6 +186,14 @@ public final class RaftClientTestContext {
                 records
             );
             log.appendAsLeader(batch, epoch);
+            // Need to flush the log to update the last flushed offset. This 
is always correct
+            // because append operation was done in the Builder which 
represent the state of the
+            // log before the replica starts.
+            log.flush(false);
+
+            // Reset the value of this method since "flush" before the replica 
start should not
+            // count when checking for flushes by the KRaft client.
+            log.flushedSinceLastChecked();
             return this;
         }
 
@@ -654,10 +663,9 @@ public final class RaftClientTestContext {
         List<RaftRequest.Outbound> sentMessages = channel.drainSendQueue();
         assertEquals(1, sentMessages.size());
 
-        // TODO: Use more specific type
-        RaftMessage raftMessage = sentMessages.get(0);
-        assertFetchRequestData(raftMessage, epoch, fetchOffset, 
lastFetchedEpoch);
-        return raftMessage.correlationId();
+        RaftRequest.Outbound raftRequest = sentMessages.get(0);
+        assertFetchRequestData(raftRequest, epoch, fetchOffset, 
lastFetchedEpoch);
+        return raftRequest.correlationId();
     }
 
     FetchResponseData.PartitionData assertSentFetchPartitionResponse() {
@@ -955,13 +963,15 @@ public final class RaftClientTestContext {
     }
 
     void assertFetchRequestData(
-        RaftMessage message,
+        RaftRequest.Outbound message,
         int epoch,
         long fetchOffset,
         int lastFetchedEpoch
     ) {
         assertTrue(
-            message.data() instanceof FetchRequestData, "Unexpected request 
type " + message.data());
+            message.data() instanceof FetchRequestData,
+            "unexpected request type " + message.data()
+        );
         FetchRequestData request = (FetchRequestData) message.data();
         assertEquals(KafkaRaftClient.MAX_FETCH_SIZE_BYTES, request.maxBytes());
         assertEquals(fetchMaxWaitMs, request.maxWaitMs());
@@ -975,6 +985,21 @@ public final class RaftClientTestContext {
         assertEquals(fetchOffset, fetchPartition.fetchOffset());
         assertEquals(lastFetchedEpoch, fetchPartition.lastFetchedEpoch());
         assertEquals(localId.orElse(-1), request.replicaId());
+
+        // Assert that voters have flushed up to the fetch offset
+        if (localId.isPresent() && voters.contains(localId.getAsInt())) {
+            assertEquals(
+                log.firstUnflushedOffset(),
+                fetchOffset,
+                String.format(
+                    "expected voters have the fetch offset (%s) be the same as 
the unflushed offset (%s)",
+                    log.firstUnflushedOffset(),
+                    fetchOffset
+                )
+            );
+        } else {
+            assertFalse(log.flushedSinceLastChecked(), "KRaft client should 
not explicitly flush when it is an observer");
+        }
     }
 
     FetchRequestData fetchRequest(

Reply via email to