This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 31f9a54cba3 KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint 
(#13456)
31f9a54cba3 is described below

commit 31f9a54cba38fbdc015590bd82c1f1d62839f09f
Author: Luke Chen <[email protected]>
AuthorDate: Wed Apr 5 20:11:32 2023 +0800

    KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint (#13456)
    
    The motivation for introducing InMemoryLeaderEpochCheckpoint is to allow 
remote log manager to create the RemoteLogSegmentMetadata(RLSM) with the 
correct leader epoch info for a specific segment. To do that, we need to rely 
on the LeaderEpochCheckpointCache to truncate from start and end, to get the 
epoch info. However, we don't really want to truncate the epochs in cache (and 
write to checkpoint file in the end). So, we introduce this 
InMemoryLeaderEpochCheckpoint to feed into Leader [...]
    
    Reviewers: Divij Vaidya <[email protected]>, Satish Duggana 
<[email protected]>
---
 .../InMemoryLeaderEpochCheckpointTest.scala        | 58 +++++++++++++++++++
 .../apache/kafka/server/common/CheckpointFile.java | 27 +++++++++
 .../checkpoint/InMemoryLeaderEpochCheckpoint.java  | 65 ++++++++++++++++++++++
 .../internals/epoch/LeaderEpochFileCache.java      | 15 ++++-
 4 files changed, 163 insertions(+), 2 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala
 
b/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala
new file mode 100644
index 00000000000..9ca09f2cceb
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.checkpoints
+
+import 
org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint
+import org.apache.kafka.storage.internals.log.EpochEntry
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader}
+import java.nio.charset.StandardCharsets
+
+class InMemoryLeaderEpochCheckpointTest {
+
+  @Test
+  def shouldAppendNewEntry(): Unit = {
+    val checkpoint = new InMemoryLeaderEpochCheckpoint()
+    val epochs = java.util.Arrays.asList(new EpochEntry(0, 1L), new 
EpochEntry(1, 2L), new EpochEntry(2, 3L))
+    checkpoint.write(epochs)
+    assertEquals(epochs, checkpoint.read())
+
+    val epochs2 = java.util.Arrays.asList(new EpochEntry(3, 4L), new 
EpochEntry(4, 5L))
+    checkpoint.write(epochs2)
+
+    assertEquals(epochs2, checkpoint.read())
+  }
+
+  @Test
+  def testReadAsByteBuffer(): Unit = {
+    val checkpoint = new InMemoryLeaderEpochCheckpoint()
+    val expectedEpoch = 0
+    val expectedStartOffset = 1L
+    val expectedVersion = 0
+    val epochs = java.util.Arrays.asList(new EpochEntry(expectedEpoch, 
expectedStartOffset))
+    checkpoint.write(epochs)
+    assertEquals(epochs, checkpoint.read())
+    val ba = checkpoint.readAsByteBuffer()
+
+    val bufferedReader = new BufferedReader(new InputStreamReader(new 
ByteArrayInputStream(ba.array()), StandardCharsets.UTF_8))
+    assertEquals(expectedVersion.toString, bufferedReader.readLine())
+    assertEquals(epochs.size().toString, bufferedReader.readLine())
+    assertEquals(s"$expectedEpoch $expectedStartOffset", 
bufferedReader.readLine())
+  }
+}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
index c44ad6ab28a..aca730baf6e 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
@@ -108,6 +108,33 @@ public class CheckpointFile<T> {
         }
     }
 
+    public static class CheckpointWriteBuffer<T> {
+        private BufferedWriter writer;
+        private int version;
+        private EntryFormatter<T> formatter;
+
+        public CheckpointWriteBuffer(BufferedWriter writer,
+                                     int version,
+                                     EntryFormatter<T> formatter) {
+            this.version = version;
+            this.writer = writer;
+            this.formatter = formatter;
+        }
+
+        public void write(List<T> entries) throws IOException {
+            writer.write(String.valueOf(version));
+            writer.newLine();
+
+            writer.write(String.valueOf(entries.size()));
+            writer.newLine();
+
+            for (T entry : entries) {
+                writer.write(formatter.toString(entry));
+                writer.newLine();
+            }
+        }
+    }
+
     public static class CheckpointReadBuffer<T> {
 
         private final String location;
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
new file mode 100644
index 00000000000..cd7fdc2f893
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
+ *
+ * The motivation for this class is to allow remote log manager to create the 
RemoteLogSegmentMetadata(RLSM)
+ * with the correct leader epoch info for a specific segment. To do that, we 
need to rely on the LeaderEpochCheckpointCache
+ * to truncate from start and end, to get the epoch info. However, we don't 
really want to truncate the epochs in cache
+ * (and write to checkpoint file in the end). So, we introduce this 
InMemoryLeaderEpochCheckpoint to feed into LeaderEpochCheckpointCache,
+ * and when we truncate the epoch for RLSM, we can do them in memory without 
affecting the checkpoint file, and without interacting with file system.
+ */
+public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {
+    private List<EpochEntry> epochs = Collections.emptyList();
+
+    public void write(Collection<EpochEntry> epochs) {
+        this.epochs = new ArrayList<>(epochs);
+    }
+
+    public List<EpochEntry> read() {
+        return Collections.unmodifiableList(epochs);
+    }
+
+    public ByteBuffer readAsByteBuffer() throws IOException {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(stream, StandardCharsets.UTF_8));
+        CheckpointFile.CheckpointWriteBuffer<EpochEntry> writeBuffer = new 
CheckpointFile.CheckpointWriteBuffer<>(writer, 0, 
LeaderEpochCheckpointFile.FORMATTER);
+        try {
+            writeBuffer.write(epochs);
+            writer.flush();
+            return ByteBuffer.wrap(stream.toByteArray());
+        } finally {
+            writer.close();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
index 7ba3e388fe2..1db53133578 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
@@ -49,12 +49,15 @@ public class LeaderEpochFileCache {
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     private final TreeMap<Integer, EpochEntry> epochs = new TreeMap<>();
 
+    private final TopicPartition topicPartition;
+
     /**
      * @param topicPartition the associated topic partition
      * @param checkpoint     the checkpoint file
      */
     public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
         this.checkpoint = checkpoint;
+        this.topicPartition = topicPartition;
         LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
         log = logContext.logger(LeaderEpochFileCache.class);
         checkpoint.read().forEach(this::assign);
@@ -147,6 +150,11 @@ public class LeaderEpochFileCache {
         return removedEpochs;
     }
 
+    public LeaderEpochFileCache 
cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) {
+        flushTo(leaderEpochCheckpoint);
+        return new LeaderEpochFileCache(this.topicPartition, 
leaderEpochCheckpoint);
+    }
+
     public boolean nonEmpty() {
         lock.readLock().lock();
         try {
@@ -390,13 +398,16 @@ public class LeaderEpochFileCache {
         }
     }
 
-    private void flush() {
+    private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) {
         lock.readLock().lock();
         try {
-            checkpoint.write(epochs.values());
+            leaderEpochCheckpoint.write(epochs.values());
         } finally {
             lock.readLock().unlock();
         }
     }
 
+    private void flush() {
+        flushTo(this.checkpoint);
+    }
 }

Reply via email to