This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 31f9a54cba3 KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint
(#13456)
31f9a54cba3 is described below
commit 31f9a54cba38fbdc015590bd82c1f1d62839f09f
Author: Luke Chen <[email protected]>
AuthorDate: Wed Apr 5 20:11:32 2023 +0800
KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint (#13456)
The motivation for introducing InMemoryLeaderEpochCheckpoint is to allow
remote log manager to create the RemoteLogSegmentMetadata(RLSM) with the
correct leader epoch info for a specific segment. To do that, we need to rely
on the LeaderEpochCheckpointCache to truncate from start and end, to get the
epoch info. However, we don't really want to truncate the epochs in cache (and
write to checkpoint file in the end). So, we introduce this
InMemoryLeaderEpochCheckpoint to feed into Leader [...]
Reviewers: Divij Vaidya <[email protected]>, Satish Duggana
<[email protected]>
---
.../InMemoryLeaderEpochCheckpointTest.scala | 58 +++++++++++++++++++
.../apache/kafka/server/common/CheckpointFile.java | 27 +++++++++
.../checkpoint/InMemoryLeaderEpochCheckpoint.java | 65 ++++++++++++++++++++++
.../internals/epoch/LeaderEpochFileCache.java | 15 ++++-
4 files changed, 163 insertions(+), 2 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala
b/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala
new file mode 100644
index 00000000000..9ca09f2cceb
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/server/checkpoints/InMemoryLeaderEpochCheckpointTest.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.checkpoints
+
+import
org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint
+import org.apache.kafka.storage.internals.log.EpochEntry
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.io.{BufferedReader, ByteArrayInputStream, InputStreamReader}
+import java.nio.charset.StandardCharsets
+
+class InMemoryLeaderEpochCheckpointTest {
+
+ @Test
+ def shouldAppendNewEntry(): Unit = {
+ val checkpoint = new InMemoryLeaderEpochCheckpoint()
+ val epochs = java.util.Arrays.asList(new EpochEntry(0, 1L), new
EpochEntry(1, 2L), new EpochEntry(2, 3L))
+ checkpoint.write(epochs)
+ assertEquals(epochs, checkpoint.read())
+
+ val epochs2 = java.util.Arrays.asList(new EpochEntry(3, 4L), new
EpochEntry(4, 5L))
+ checkpoint.write(epochs2)
+
+ assertEquals(epochs2, checkpoint.read())
+ }
+
+ @Test
+ def testReadAsByteBuffer(): Unit = {
+ val checkpoint = new InMemoryLeaderEpochCheckpoint()
+ val expectedEpoch = 0
+ val expectedStartOffset = 1L
+ val expectedVersion = 0
+ val epochs = java.util.Arrays.asList(new EpochEntry(expectedEpoch,
expectedStartOffset))
+ checkpoint.write(epochs)
+ assertEquals(epochs, checkpoint.read())
+ val ba = checkpoint.readAsByteBuffer()
+
+ val bufferedReader = new BufferedReader(new InputStreamReader(new
ByteArrayInputStream(ba.array()), StandardCharsets.UTF_8))
+ assertEquals(expectedVersion.toString, bufferedReader.readLine())
+ assertEquals(epochs.size().toString, bufferedReader.readLine())
+ assertEquals(s"$expectedEpoch $expectedStartOffset",
bufferedReader.readLine())
+ }
+}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
index c44ad6ab28a..aca730baf6e 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
@@ -108,6 +108,33 @@ public class CheckpointFile<T> {
}
}
+ public static class CheckpointWriteBuffer<T> {
+ private BufferedWriter writer;
+ private int version;
+ private EntryFormatter<T> formatter;
+
+ public CheckpointWriteBuffer(BufferedWriter writer,
+ int version,
+ EntryFormatter<T> formatter) {
+ this.version = version;
+ this.writer = writer;
+ this.formatter = formatter;
+ }
+
+ public void write(List<T> entries) throws IOException {
+ writer.write(String.valueOf(version));
+ writer.newLine();
+
+ writer.write(String.valueOf(entries.size()));
+ writer.newLine();
+
+ for (T entry : entries) {
+ writer.write(formatter.toString(entry));
+ writer.newLine();
+ }
+ }
+ }
+
public static class CheckpointReadBuffer<T> {
private final String location;
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
new file mode 100644
index 00000000000..cd7fdc2f893
--- /dev/null
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
+ *
+ * The motivation for this class is to allow remote log manager to create the
RemoteLogSegmentMetadata(RLSM)
+ * with the correct leader epoch info for a specific segment. To do that, we
need to rely on the LeaderEpochCheckpointCache
+ * to truncate from start and end, to get the epoch info. However, we don't
really want to truncate the epochs in cache
+ * (and write to checkpoint file in the end). So, we introduce this
InMemoryLeaderEpochCheckpoint to feed into LeaderEpochCheckpointCache,
+ * and when we truncate the epoch for RLSM, we can do them in memory without
affecting the checkpoint file, and without interacting with file system.
+ */
+public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {
+ private List<EpochEntry> epochs = Collections.emptyList();
+
+ public void write(Collection<EpochEntry> epochs) {
+ this.epochs = new ArrayList<>(epochs);
+ }
+
+ public List<EpochEntry> read() {
+ return Collections.unmodifiableList(epochs);
+ }
+
+ public ByteBuffer readAsByteBuffer() throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(stream, StandardCharsets.UTF_8));
+ CheckpointFile.CheckpointWriteBuffer<EpochEntry> writeBuffer = new
CheckpointFile.CheckpointWriteBuffer<>(writer, 0,
LeaderEpochCheckpointFile.FORMATTER);
+ try {
+ writeBuffer.write(epochs);
+ writer.flush();
+ return ByteBuffer.wrap(stream.toByteArray());
+ } finally {
+ writer.close();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
index 7ba3e388fe2..1db53133578 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
@@ -49,12 +49,15 @@ public class LeaderEpochFileCache {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final TreeMap<Integer, EpochEntry> epochs = new TreeMap<>();
+ private final TopicPartition topicPartition;
+
/**
* @param topicPartition the associated topic partition
* @param checkpoint the checkpoint file
*/
public LeaderEpochFileCache(TopicPartition topicPartition,
LeaderEpochCheckpoint checkpoint) {
this.checkpoint = checkpoint;
+ this.topicPartition = topicPartition;
LogContext logContext = new LogContext("[LeaderEpochCache " +
topicPartition + "] ");
log = logContext.logger(LeaderEpochFileCache.class);
checkpoint.read().forEach(this::assign);
@@ -147,6 +150,11 @@ public class LeaderEpochFileCache {
return removedEpochs;
}
+ public LeaderEpochFileCache
cloneWithLeaderEpochCheckpoint(LeaderEpochCheckpoint leaderEpochCheckpoint) {
+ flushTo(leaderEpochCheckpoint);
+ return new LeaderEpochFileCache(this.topicPartition,
leaderEpochCheckpoint);
+ }
+
public boolean nonEmpty() {
lock.readLock().lock();
try {
@@ -390,13 +398,16 @@ public class LeaderEpochFileCache {
}
}
- private void flush() {
+ private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint) {
lock.readLock().lock();
try {
- checkpoint.write(epochs.values());
+ leaderEpochCheckpoint.write(epochs.values());
} finally {
lock.readLock().unlock();
}
}
+ private void flush() {
+ flushTo(this.checkpoint);
+ }
}