junrao commented on code in PR #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r1050008020
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -574,6 +594,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
explicitMetricName(pkgStr, "Log", name, tags)
}
+ def loadProducerState(lastOffset: Long): Unit = lock synchronized {
+ rebuildProducerState(lastOffset, producerStateManager)
+ maybeIncrementFirstUnstableOffset()
+ updateHighWatermark(lastOffset)
Review Comment:
It's better to use localLog.logEndOffsetMetadata since it has the log
Metadata in addition to the offset.
##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.Logging
+import org.apache.kafka.common._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig,
RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{Closeable, InputStream}
+import java.security.{AccessController, PrivilegedAction}
+import java.util
+import java.util.Optional
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import scala.collection.Set
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class is responsible for
+ * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager`
instances.
+ * - receives any leader and follower replica events and partition stop
events and act on them
+ * - also provides APIs to fetch indexes, metadata about remote log segments.
+ *
+ * @param rlmConfig Configuration required for remote logging subsystem(tiered
storage) at the broker level.
+ * @param brokerId id of the current broker.
+ * @param logDir directory of Kafka log segments.
+ */
+class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+ brokerId: Int,
+ logDir: String) extends Logging with Closeable with
KafkaMetricsGroup {
+
+ // topic ids received on leadership changes
+ private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new
ConcurrentHashMap[TopicPartition, Uuid]()
+
+ private val remoteLogStorageManager: RemoteStorageManager =
createRemoteStorageManager()
+ private val remoteLogMetadataManager: RemoteLogMetadataManager =
createRemoteLogMetadataManager()
+
+ private val indexCache = new RemoteIndexCache(remoteStorageManager =
remoteLogStorageManager, logDir = logDir)
+
+ private var closed = false
+
+ private[remote] def createRemoteStorageManager(): RemoteStorageManager = {
+ def createDelegate(classLoader: ClassLoader): RemoteStorageManager = {
+ classLoader.loadClass(rlmConfig.remoteStorageManagerClassName())
+
.getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager]
+ }
+
+ AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] {
+ private val classPath = rlmConfig.remoteStorageManagerClassPath()
+
+ override def run(): RemoteStorageManager = {
+ if (classPath != null && classPath.trim.nonEmpty) {
+ val classLoader = new ChildFirstClassLoader(classPath,
this.getClass.getClassLoader)
+ val delegate = createDelegate(classLoader)
+ new ClassLoaderAwareRemoteStorageManager(delegate, classLoader)
+ } else {
+ createDelegate(this.getClass.getClassLoader)
+ }
+ }
+ })
+ }
+
+ private def configureRSM(): Unit = {
+ val rsmProps = new util.HashMap[String, Any]()
+ rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) =>
rsmProps.put(k, v) }
+ rsmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+ remoteLogStorageManager.configure(rsmProps)
+ }
+
+ private[remote] def createRemoteLogMetadataManager():
RemoteLogMetadataManager = {
+ def createDelegate(classLoader: ClassLoader) = {
+ classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName())
+ .getDeclaredConstructor()
+ .newInstance()
+ .asInstanceOf[RemoteLogMetadataManager]
+ }
+
+ AccessController.doPrivileged(new
PrivilegedAction[RemoteLogMetadataManager] {
+ private val classPath = rlmConfig.remoteLogMetadataManagerClassPath
+
+ override def run(): RemoteLogMetadataManager = {
+ if (classPath != null && classPath.trim.nonEmpty) {
+ val classLoader = new ChildFirstClassLoader(classPath,
this.getClass.getClassLoader)
+ val delegate = createDelegate(classLoader)
+ new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader)
+ } else {
+ createDelegate(this.getClass.getClassLoader)
+ }
+ }
+ })
+ }
+
+ private def configureRLMM(): Unit = {
+ val rlmmProps = new util.HashMap[String, Any]()
+ rlmConfig.remoteLogMetadataManagerProps().asScala.foreach { case (k, v) =>
rlmmProps.put(k, v) }
+ rlmmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+ rlmmProps.put(KafkaConfig.LogDirProp, logDir)
+ remoteLogMetadataManager.configure(rlmmProps)
+ }
+
+ def startup(): Unit = {
+ // Initialize and configure RSM and RLMM. This will start RSM, RLMM
resources which may need to start resources
+ // in connecting to the brokers or remote storages.
+ configureRSM()
+ configureRLMM()
+ }
+
+ def storageManager(): RemoteStorageManager = {
+ remoteLogStorageManager
+ }
+
+ /**
+ * Callback to receive any leadership changes for the topic partitions
assigned to this broker. If there are no
+ * existing tasks for a given topic partition then it will assign new leader
or follower task else it will convert the
+ * task to respective target state(leader or follower).
+ *
+ * @param partitionsBecomeLeader partitions that have become leaders on
this broker.
+ * @param partitionsBecomeFollower partitions that have become followers on
this broker.
+ * @param topicIds topic name to topic id mappings.
+ */
+ def onLeadershipChange(partitionsBecomeLeader: Set[Partition],
+ partitionsBecomeFollower: Set[Partition],
+ topicIds: util.Map[String, Uuid]): Unit = {
+ debug(s"Received leadership changes for leaders: $partitionsBecomeLeader
and followers: $partitionsBecomeFollower")
+
+ // Partitions logs are available when this callback is invoked.
+ // Compact topics and internal topics are filtered here as they are not
supported with tiered storage.
+ def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = {
+ // We are not specifically checking for internal topics etc here as
`log.remoteLogEnabled()` already handles that.
+ partitions.filter(partition => partition.log.exists(log =>
log.remoteLogEnabled()))
+ .map(partition => new TopicIdPartition(topicIds.get(partition.topic),
partition.topicPartition))
+ }
+
+ val followerTopicPartitions = filterPartitions(partitionsBecomeFollower)
+ val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader)
+ debug(s"Effective topic partitions after filtering compact and internal
topics, leaders: $leaderTopicPartitions " +
+ s"and followers: $followerTopicPartitions")
+
+ if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) {
+ leaderTopicPartitions.foreach(x =>
topicPartitionIds.put(x.topicPartition(), x.topicId()))
+ followerTopicPartitions.foreach(x =>
topicPartitionIds.put(x.topicPartition(), x.topicId()))
+
+
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderTopicPartitions.asJava,
followerTopicPartitions.asJava)
+ }
+ }
+
+ /**
+ * Deletes the internal topic partition info if delete flag is set as true.
+ *
+ * @param topicPartition topic partition to be stopped.
+ * @param delete flag to indicate whether the given topic partitions
to be deleted or not.
+ */
+ def stopPartitions(topicPartition: TopicPartition, delete: Boolean): Unit = {
+ if (delete) {
+ // Delete from internal datastructures only if it is to be deleted.
+ val topicIdPartition = topicPartitionIds.remove(topicPartition)
+ debug(s"Removed partition: $topicIdPartition from topicPartitionIds")
+ }
+ }
+
+ def fetchRemoteLogSegmentMetadata(topicPartition: TopicPartition,
+ epochForOffset: Int,
+ offset: Long):
Optional[RemoteLogSegmentMetadata] = {
+ val topicId = topicPartitionIds.get(topicPartition)
+
+ if (topicId == null) {
+ throw new KafkaException("No topic id registered for topic partition: "
+ topicPartition)
+ }
+
+ remoteLogMetadataManager.remoteLogSegmentMetadata(new
TopicIdPartition(topicId, topicPartition), epochForOffset, offset)
+ }
+
+ private def lookupTimestamp(rlsMetadata: RemoteLogSegmentMetadata,
timestamp: Long, startingOffset: Long): Option[TimestampAndOffset] = {
+ val startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp,
startingOffset)
+
+ var remoteSegInputStream: InputStream = null
+ try {
+ // Search forward for the position of the last offset that is greater
than or equal to the startingOffset
+ remoteSegInputStream =
remoteLogStorageManager.fetchLogSegment(rlsMetadata, startPos)
+ val remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream)
+ var batch: RecordBatch = null
+
+ def nextBatch(): RecordBatch = {
+ batch = remoteLogInputStream.nextBatch()
+ batch
+ }
+
+ while (nextBatch() != null) {
+ if (batch.maxTimestamp >= timestamp && batch.lastOffset >=
startingOffset) {
+ batch.iterator.asScala.foreach(record => {
+ if (record.timestamp >= timestamp && record.offset >=
startingOffset)
+ return Some(new TimestampAndOffset(record.timestamp,
record.offset, maybeLeaderEpoch(batch.partitionLeaderEpoch)))
+ })
+ }
+ }
+ None
+ } finally {
+ Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream")
+ }
+ }
+
+ private def maybeLeaderEpoch(leaderEpoch: Int): Optional[Integer] = {
+ if (leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH)
+ Optional.empty()
+ else
+ Optional.of(leaderEpoch)
+ }
+
+ /**
+ * Search the message offset in the remote storage based on timestamp and
offset.
+ *
+ * This method returns an option of TimestampOffset. The returned value is
determined using the following ordered list of rules:
+ *
+ * - If there are no messages in the remote storage, return None
+ * - If all the messages in the remote storage have smaller offsets, return
None
+ * - If all the messages in the remote storage have smaller timestamps,
return None
+ * - no message in the remote storage has the given timestamp,
Review Comment:
It seems that the "no message in the remote storage has the given timestamp"
case is covered in the otherwise clause too?
##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -1861,6 +1861,24 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
MetadataVersion.MINIMUM_KRAFT_VERSION
}
+ val offsetForLeaderEpochRequestVersion: Short =
Review Comment:
It seems that both offsetForLeaderEpochRequestVersion and
listOffsetRequestVersion are moved to MetadataVersion and can be removed here?
##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.log.{OffsetIndex, OffsetPosition, TimeIndex, UnifiedLog}
+import kafka.utils.MockTime
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
+import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId,
RemoteLogSegmentMetadata, RemoteStorageManager}
+import org.apache.kafka.test.TestUtils
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.mockito.ArgumentMatchers
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
+
+import java.io.{File, FileInputStream}
+import java.nio.file.Files
+import java.util.Collections
+import scala.collection.mutable
+
+class RemoteIndexCacheTest {
+
+ val time = new MockTime()
+ val partition = new TopicPartition("foo", 0)
+ val idPartition = new TopicIdPartition(Uuid.randomUuid(), partition)
+ val logDir: File = TestUtils.tempDirectory("kafka-logs")
+ val tpDir: File = new File(logDir, partition.toString)
+ val brokerId = 1
+ val baseOffset = 45L
+ val lastOffset = 75L
+ val segmentSize = 1024
+
+ val rsm: RemoteStorageManager = mock(classOf[RemoteStorageManager])
+ val cache: RemoteIndexCache = new RemoteIndexCache(remoteStorageManager =
rsm, logDir = logDir.toString)
+ val remoteLogSegmentId = new RemoteLogSegmentId(idPartition,
Uuid.randomUuid())
+ val rlsMetadata: RemoteLogSegmentMetadata = new
RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
+ time.milliseconds(), brokerId, time.milliseconds(), segmentSize,
Collections.singletonMap(0, 0L))
+
+ @BeforeEach
+ def setup(): Unit = {
+ Files.createDirectory(tpDir.toPath)
+ val txnIdxFile = new File(tpDir, "txn-index" +
UnifiedLog.TxnIndexFileSuffix)
+ txnIdxFile.createNewFile()
+ when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]),
any(classOf[IndexType])))
+ .thenAnswer(ans => {
+ val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+ val indexType = ans.getArgument[IndexType](1)
+ val maxEntries = (metadata.endOffset() -
metadata.startOffset()).asInstanceOf[Int]
+ val offsetIdx = new OffsetIndex(new File(tpDir,
String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix),
+ metadata.startOffset(), maxIndexSize = maxEntries * 8)
+ val timeIdx = new TimeIndex(new File(tpDir,
String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix),
+ metadata.startOffset(), maxIndexSize = maxEntries * 12)
+ maybeAppendIndexEntries(offsetIdx, timeIdx)
+ indexType match {
+ case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+ case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+ case IndexType.TRANSACTION => new FileInputStream(txnIdxFile)
+ case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+ case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not
accessed.
+ }
+ })
+ }
+
+ @AfterEach
+ def cleanup(): Unit = {
+ reset(rsm)
+ cache.entries.forEach((_, v) => v.cleanup())
+ cache.close()
+ }
+
+ @Test
+ def testFetchIndexFromRemoteStorage(): Unit = {
+ val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+ val offsetPosition1 = offsetIndex.entry(1)
+ // this call should have invoked fetchOffsetIndex, fetchTimestampIndex once
+ val resultPosition = cache.lookupOffset(rlsMetadata,
offsetPosition1.offset)
+ assertEquals(offsetPosition1.position, resultPosition)
+ verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET,
IndexType.TIMESTAMP))
+
+ // this should not cause fetching index from RemoteStorageManager as it is
already fetched earlier
+ reset(rsm)
+ val offsetPosition2 = offsetIndex.entry(2)
+ val resultPosition2 = cache.lookupOffset(rlsMetadata,
offsetPosition2.offset)
+ assertEquals(offsetPosition2.position, resultPosition2)
+ assertNotNull(cache.getIndexEntry(rlsMetadata))
+ verifyNoInteractions(rsm)
+ }
+
+ @Test
+ def testPositionForNonExistingIndexFromRemoteStorage(): Unit = {
+ val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+ val lastOffsetPosition = cache.lookupOffset(rlsMetadata,
offsetIndex.lastOffset)
+ val greaterOffsetThanLastOffset = offsetIndex.lastOffset + 1
+ assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata,
greaterOffsetThanLastOffset))
+
+ // offsetIndex.lookup() returns OffsetPosition(baseOffset, 0) for offsets
smaller than least entry in the offset index.
+ val nonExistentOffsetPosition = OffsetPosition(baseOffset, 0)
+ val lowerOffsetThanBaseOffset = offsetIndex.baseOffset - 1
+ assertEquals(nonExistentOffsetPosition.position,
cache.lookupOffset(rlsMetadata, lowerOffsetThanBaseOffset))
+ }
+
+ @Test
+ def testCacheEntryExpiry(): Unit = {
+ val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir =
logDir.toString)
+ val tpId = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0))
+ val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+ // getIndex for first time will call rsm#fetchIndex
+ cache.getIndexEntry(metadataList.head)
+ // Calling getIndex on the same entry should not call rsm#fetchIndex
again, but it should retrieve from cache
+ cache.getIndexEntry(metadataList.head)
+ assertEquals(1, cache.entries.size())
+ verifyFetchIndexInvocation(count = 1)
+
+ // Here a new key metadataList(1) is invoked, that should call
rsm#fetchIndex, making the count to 2
+ cache.getIndexEntry(metadataList.head)
+ cache.getIndexEntry(metadataList(1))
+ assertEquals(2, cache.entries.size())
+ verifyFetchIndexInvocation(count = 2)
+
+ // getting index for metadataList.last should call rsm#fetchIndex, but
metadataList(1) is already in cache.
+ cache.getIndexEntry(metadataList.last)
+ cache.getIndexEntry(metadataList(1))
+ assertEquals(2, cache.entries.size())
+
assertTrue(cache.entries.containsKey(metadataList.last.remoteLogSegmentId().id()))
+
assertTrue(cache.entries.containsKey(metadataList(1).remoteLogSegmentId().id()))
+ verifyFetchIndexInvocation(count = 3)
+
+ // getting index for metadataList.head should call rsm#fetchIndex as that
entry was expired earlier,
+ // but metadataList(1) is already in cache.
+ cache.getIndexEntry(metadataList(1))
+ cache.getIndexEntry(metadataList.head)
+ assertEquals(2, cache.entries.size())
+
assertFalse(cache.entries.containsKey(metadataList.last.remoteLogSegmentId().id()))
+ verifyFetchIndexInvocation(count = 4)
+ }
+
+ @Test
+ def testGetIndexAfterCacheClose(): Unit = {
+ val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir =
logDir.toString)
+ val tpId = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0))
+ val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+ cache.getIndexEntry(metadataList.head)
+ assertEquals(1, cache.entries.size())
+ verifyFetchIndexInvocation(count = 1)
+
+ cache.close()
+
+ // Check IllegalStateException is thrown when index is accessed after it
is closed.
+ assertThrows(classOf[IllegalStateException], () =>
cache.getIndexEntry(metadataList.head))
+ }
+
+ @Test
+ def testReloadCacheAfterClose(): Unit = {
+ val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir =
logDir.toString)
+ val tpId = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0))
+ val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+ cache.getIndexEntry(metadataList.head)
+ cache.getIndexEntry(metadataList.head)
Review Comment:
Could we add a similar comment as in line 125?
##########
clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
+import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
+
+public class RemoteLogInputStream implements LogInputStream<RecordBatch> {
+ private final InputStream inputStream;
+ // LogHeader buffer up to magic.
+ private final ByteBuffer logHeaderBuffer =
ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);
+
+ public RemoteLogInputStream(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public RecordBatch nextBatch() throws IOException {
+ logHeaderBuffer.clear();
+ Utils.readFully(inputStream, logHeaderBuffer);
+
+ if (logHeaderBuffer.position() < HEADER_SIZE_UP_TO_MAGIC)
+ return null;
+
+ logHeaderBuffer.rewind();
+ int size = logHeaderBuffer.getInt(SIZE_OFFSET);
+
+ // V0 has the smallest overhead, stricter checking is done later
+ if (size < LegacyRecord.RECORD_OVERHEAD_V0)
+ throw new CorruptRecordException(String.format("Found record size
%d smaller than minimum record " +
+ "overhead
(%d).", size, LegacyRecord.RECORD_OVERHEAD_V0));
+
+ // 'size' = 4 bytes + magic + sizeOf(batch-records).
Review Comment:
This is still not very precise since size includes other fields like CRC,
attributes, etc. So, we could probably just omit it and say the total size of a
batch is LOG_OVERHEAD + the size of the rest of the content.
##########
core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala:
##########
@@ -52,14 +52,22 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
* Assigns the supplied Leader Epoch to the supplied Offset
* Once the epoch is assigned it cannot be reassigned
*/
- def assign(epoch: Int, startOffset: Long): Unit = {
+ def assign(epoch: Int, startOffset: Long, flushToFile: Boolean = true): Unit
= {
Review Comment:
Then, could we just remove this param?
##########
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##########
@@ -741,6 +740,35 @@ class ProducerStateManagerTest {
assertEquals(Set(1), currentSnapshotOffsets)
}
+ @Test
+ def testReloadSnapshots(): Unit = {
+ val epoch = 0.toShort
+ append(stateManager, producerId, epoch, 1, 1L)
+ append(stateManager, producerId, epoch, 2, 2L)
+ stateManager.takeSnapshot()
+ val pathAndDataList = logDir.listFiles().map(file => (file.toPath,
Files.readAllBytes(file.toPath)))
+
+ append(stateManager, producerId, epoch, 3, 3L)
+ append(stateManager, producerId, epoch, 4, 4L)
+ stateManager.takeSnapshot()
+ assertEquals(2, logDir.listFiles().length)
+ assertEquals(Set(3, 5), currentSnapshotOffsets)
+
+ // Truncate to the range (3, 5), this will delete the earlier snapshot
until offset 3.
+ stateManager.truncateAndReload(3, 5, time.milliseconds())
+ assertEquals(1, logDir.listFiles().length)
+ assertEquals(Set(5), currentSnapshotOffsets)
+
+ // Add the snapshot files until offset 3 to the log dir.
+ pathAndDataList.foreach { case (path, data) => Files.write(path, data) }
+ // Cleanup the inmemory snapshots and reload the snapshots from log dir.
Review Comment:
inmemory => in memory
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1280,6 +1309,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
case _ => Optional.empty[Integer]()
}
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset,
epochOpt))
+ } else if (targetTimestamp ==
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
+ val offset = _localLogStartOffset
Review Comment:
Change to curlocalLogStartOffset = localLogStartOffset ?
##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -192,4 +204,140 @@ class ReplicaFetcherThread(name: String,
partition.truncateFullyAndStartAt(offset, isFuture = false)
}
+ private def buildProducerSnapshotFile(snapshotFile: File,
remoteLogSegmentMetadata: RemoteLogSegmentMetadata, rlm: RemoteLogManager):
Unit = {
+ val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp")
+ // Copy it to snapshot file in atomic manner.
+ Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata,
RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+ tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+ Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath,
false)
+ }
+
+ /**
+ * It tries to build the required state for this partition from leader and
remote storage so that it can start
+ * fetching records from the leader.
+ */
+ override protected def buildRemoteLogAuxState(partition: TopicPartition,
+ currentLeaderEpoch: Int,
+ leaderLocalLogStartOffset:
Long,
+
epochForLeaderLocalLogStartOffset: Int,
+ leaderLogStartOffset: Long):
Long = {
+
+ def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = {
+ val previousEpoch = epoch - 1
+ // Find the end-offset for the epoch earlier to the given epoch from the
leader
+ val partitionsWithEpochs = Map(partition -> new
EpochData().setPartition(partition.partition())
+ .setCurrentLeaderEpoch(currentLeaderEpoch)
+ .setLeaderEpoch(previousEpoch))
+ val maybeEpochEndOffset =
leader.fetchEpochEndOffsets(partitionsWithEpochs).get(partition)
+ if (maybeEpochEndOffset.isEmpty) {
+ throw new KafkaException("No response received for partition: " +
partition);
+ }
+
+ val epochEndOffset = maybeEpochEndOffset.get
+ if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+ throw Errors.forCode(epochEndOffset.errorCode()).exception()
+ }
+
+ epochEndOffset
+ }
+
+ val log = replicaMgr.localLogOrException(partition)
+ val nextOffset = {
+ if (log.remoteStorageSystemEnable &&
log.config.remoteLogConfig.remoteStorageEnable) {
+ if (replicaMgr.remoteLogManager.isEmpty) throw new
IllegalStateException("RemoteLogManager is not yet instantiated")
+
+ val rlm = replicaMgr.remoteLogManager.get
+
+ // Find the respective leader epoch for (leaderLocalLogStartOffset -
1). We need to build the leader epoch cache
+ // until that offset
+ val previousOffsetToLeaderLocalLogStartOffset =
leaderLocalLogStartOffset - 1
+ val targetEpoch: Int = {
+ // If the existing epoch is 0, no need to fetch from earlier epoch
as the desired offset(leaderLogStartOffset - 1)
+ // will have the same epoch.
+ if (epochForLeaderLocalLogStartOffset == 0) {
+ epochForLeaderLocalLogStartOffset
+ } else {
+ // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+ val earlierEpochEndOffset =
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+ // Check if the target offset lies with in the range of earlier
epoch. Here, epoch's end-offset is exclusive.
+ if (earlierEpochEndOffset.endOffset >
previousOffsetToLeaderLocalLogStartOffset) {
+ // Always use the leader epoch from returned
earlierEpochEndOffset.
+ // This gives the respective leader epoch, that will handle any
gaps in epochs.
+ // For ex, leader epoch cache contains:
+ // leader-epoch start-offset
+ // 0 20
+ // 1 85
+ // <2> - gap no messages were appended in this leader epoch.
+ // 3 90
+ // 4 98
+ // There is a gap in leader epoch. For leaderLocalLogStartOffset
as 90, leader-epoch is 3.
+ // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1,
end-offset as 90.
+ // So, for offset 89, we should return leader epoch as 1 like
below.
+ earlierEpochEndOffset.leaderEpoch()
+ } else epochForLeaderLocalLogStartOffset
+ }
+ }
+
+ val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition,
targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
+
+ if (maybeRlsm.isPresent) {
+ val remoteLogSegmentMetadata = maybeRlsm.get()
+ // Build leader epoch cache, producer snapshots until
remoteLogSegmentMetadata.endOffset() and start
+ // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+ val nextOffset = remoteLogSegmentMetadata.endOffset() + 1
+
+ // Truncate the existing local log before restoring the leader epoch
cache and producer snapshots.
+ truncateFullyAndStartAt(partition, nextOffset)
+
+ // Build leader epoch cache.
+ log.maybeIncrementLogStartOffset(leaderLogStartOffset,
LeaderOffsetIncremented)
+ val epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata)
+ log.leaderEpochCache.foreach { cache =>
+ cache.assign(epochs)
+ }
+
+ debug(s"Updated the epoch cache from remote tier till offset:
$leaderLocalLogStartOffset " +
+ s"with size: ${epochs.size} for $partition")
+
+ // Restore producer snapshot
+ val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir,
nextOffset)
+ buildProducerSnapshotFile(snapshotFile, remoteLogSegmentMetadata,
rlm)
+
+ // Reload producer snapshots.
+ log.producerStateManager.truncateFullyAndReloadSnapshots()
+ log.loadProducerState(nextOffset)
Review Comment:
Should we add a test that explicitly tests the producer state after handling
OFFSET_MOVED_TO_TIERED_STORAGE error? This can be done in a separate PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]