showuon commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1281657244
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -667,11 +675,323 @@ public void run() {
}
}
+ public void handleLogStartOffsetUpdate(TopicPartition topicPartition,
long remoteLogStartOffset) {
+ if (isLeader()) {
+ logger.debug("Updating {} with remoteLogStartOffset: {}",
topicPartition, remoteLogStartOffset);
+ updateRemoteLogStartOffset.accept(topicPartition,
remoteLogStartOffset);
+ }
+ }
+
+ class RemoteLogRetentionHandler {
+
+ private final Optional<RetentionSizeData> retentionSizeData;
+ private final Optional<RetentionTimeData> retentionTimeData;
+
+ private long remainingBreachedSize;
+
+ private OptionalLong logStartOffset = OptionalLong.empty();
+
+ public RemoteLogRetentionHandler(Optional<RetentionSizeData>
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+ this.retentionSizeData = retentionSizeData;
+ this.retentionTimeData = retentionTimeData;
+ remainingBreachedSize = retentionSizeData.map(sizeData ->
sizeData.remainingBreachedSize).orElse(0L);
+ }
+
+ private boolean
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (!retentionSizeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> {
+ // Assumption that segments contain size >= 0
+ if (remainingBreachedSize > 0) {
+ long remainingBytes = remainingBreachedSize -
x.segmentSizeInBytes();
+ if (remainingBytes >= 0) {
+ remainingBreachedSize = remainingBytes;
+ return true;
+ }
+ }
+
+ return false;
+ });
+ if (isSegmentDeleted) {
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention size {} breach. Log size after deletion will be {}.",
+ metadata.remoteLogSegmentId(),
retentionSizeData.get().retentionSize, remainingBreachedSize +
retentionSizeData.get().retentionSize);
+ }
+ return isSegmentDeleted;
+ }
+
+ public boolean
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (!retentionTimeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+ x -> x.maxTimestampMs() <=
retentionTimeData.get().cleanupUntilMs);
+ if (isSegmentDeleted) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ // It is fine to have logStartOffset as
`metadata.endOffset() + 1` as the segment offset intervals
+ // are ascending with in an epoch.
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention time {}ms breach based on the largest record timestamp in the
segment",
+ metadata.remoteLogSegmentId(),
retentionTimeData.get().retentionMs);
+ }
+ return isSegmentDeleted;
+ }
+
+ private boolean
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long
startOffset)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> startOffset > x.endOffset());
+ if (isSegmentDeleted && retentionSizeData.isPresent()) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ logger.info("Deleted remote log segment {} due to log
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+ }
+
+ // No need to update the logStartOffset.
+ return isSegmentDeleted;
+ }
+
+ // It removes the segments beyond the current leader's earliest
epoch. Those segments are considered as
+ // unreferenced because they are not part of the current leader
epoch lineage.
+ private boolean
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
RemoteLogSegmentMetadata metadata) throws RemoteStorageException,
ExecutionException, InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <
earliestEpochEntry.epoch));
+ if (isSegmentDeleted) {
+ logger.info("Deleted remote log segment {} due to leader
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and
segmentEpochs: {}",
+ metadata.remoteLogSegmentId(), earliestEpochEntry,
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+ }
+
+ // No need to update the log-start-offset as these
epochs/offsets are earlier to that value.
+ return isSegmentDeleted;
+ }
+
+ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (predicate.test(segmentMetadata)) {
+ // Publish delete segment started event.
+ remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+ new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+ // Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+ // Publish delete segment finished event.
+ remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+ new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+ return true;
+ }
+
+ return false;
+ }
+
+ }
+
+ private void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (isCancelled() || !isLeader()) {
+ logger.info("Returning from remote log segments cleanup as the
task state is changed");
+ return;
+ }
+
+ // Cleanup remote log segments and update the log start offset if
applicable.
+ final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+ if (!segmentMetadataIter.hasNext()) {
+ logger.debug("No remote log segments available on remote
storage for partition: {}", topicIdPartition);
+ return;
+ }
+
+ final Optional<UnifiedLog> logOptional =
fetchLog.apply(topicIdPartition.topicPartition());
+ if (!logOptional.isPresent()) {
+ logger.debug("No UnifiedLog instance available for partition:
{}", topicIdPartition);
+ return;
+ }
+
+ final UnifiedLog log = logOptional.get();
+ final Option<LeaderEpochFileCache> leaderEpochCacheOption =
log.leaderEpochCache();
+ if (leaderEpochCacheOption.isEmpty()) {
+ logger.debug("No leader epoch cache available for partition:
{}", topicIdPartition);
+ return;
+ }
+
+ final Set<Integer> epochsSet = new HashSet<>();
+ while (segmentMetadataIter.hasNext()) {
+ RemoteLogSegmentMetadata segmentMetadata =
segmentMetadataIter.next();
+
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+ }
+
+ // All the leader epochs in sorted order that exists in remote
storage
+ final List<Integer> remoteLeaderEpochs = new
ArrayList<>(epochsSet);
+ Collections.sort(remoteLeaderEpochs);
+
+ LeaderEpochFileCache leaderEpochCache =
leaderEpochCacheOption.get();
+ NavigableMap<Integer, Long> epochWithOffsets =
leaderEpochCache.epochWithOffsets();
+ Optional<EpochEntry> earliestEpochEntryOptional =
leaderEpochCache.earliestEntry();
+
+ Optional<RetentionSizeData> retentionSizeData =
buildRetentionSizeData(log.config().retentionSize,
+ log.onlyLocalLogSegmentsSize(), log.logEndOffset(),
epochWithOffsets);
+ Optional<RetentionTimeData> retentionTimeData =
buildRetentionTimeData(log.config().retentionMs);
+
+ RemoteLogRetentionHandler remoteLogRetentionHandler = new
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
+ Iterator<Integer> epochIterator =
epochWithOffsets.navigableKeySet().iterator();
+ boolean isSegmentDeleted = true;
+ while (isSegmentDeleted && epochIterator.hasNext()) {
+ Integer epoch = epochIterator.next();
+ Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+ while (isSegmentDeleted && segmentsIterator.hasNext()) {
+ if (isCancelled() || !isLeader()) {
+ logger.info("Returning from remote log segments
cleanup for the remaining segments as the task state is changed.");
+ return;
+ }
+ RemoteLogSegmentMetadata metadata =
segmentsIterator.next();
+
+ // check whether the segment contains the required epoch
range with in the current leader epoch lineage.
+ if (isRemoteSegmentWithinLeaderEpochs(metadata,
log.logEndOffset(), epochWithOffsets)) {
+ isSegmentDeleted =
+
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
+
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
+
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata,
log.logStartOffset());
+ }
+ }
+ }
+
+ // Remove the remote log segments whose segment-leader-epochs are
lesser than the earliest-epoch known
+ // to the leader. This will remove the unreferenced segments in
the remote storage.
+ if (earliestEpochEntryOptional.isPresent()) {
+ EpochEntry earliestEpochEntry =
earliestEpochEntryOptional.get();
+ Iterator<Integer> epochsToClean =
remoteLeaderEpochs.stream().filter(x -> x <
earliestEpochEntry.epoch).iterator();
+ while (epochsToClean.hasNext()) {
+ int epoch = epochsToClean.next();
+ Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+ while (segmentsToBeCleaned.hasNext()) {
+ if (isCancelled() || !isLeader()) {
+ return;
+ }
+ // No need to update the log-start-offset even though
the segment is deleted as these epochs/offsets are earlier to that value.
+
remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
segmentsToBeCleaned.next());
+ }
+ }
+ }
+
+ // Update log start offset with the computed value after retention
cleanup is done
+ remoteLogRetentionHandler.logStartOffset.ifPresent(offset ->
handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
+ }
+
+ private Optional<RetentionTimeData> buildRetentionTimeData(long
retentionMs) {
+ return retentionMs > -1
+ ? Optional.of(new RetentionTimeData(retentionMs,
time.milliseconds() - retentionMs))
+ : Optional.empty();
+ }
+
+ private Optional<RetentionSizeData> buildRetentionSizeData(long
retentionSize,
+ long
onlyLocalLogSegmentsSize,
+ long
logEndOffset,
+
NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException {
+ if (retentionSize > -1) {
+ long remoteLogSizeBytes = 0L;
+ for (Integer epoch : epochEntries.navigableKeySet()) {
+ // remoteLogSize(topicIdPartition, epochEntry.epoch) may
not be completely accurate as the remote
+ // log size may be computed for all the segments but not
for segments with in the current
+ // partition's leader epoch lineage. Better to revisit
this API.
+ // remoteLogSizeBytes +=
remoteLogMetadataManager.remoteLogSize(topicIdPartition, epochEntry.epoch);
+ Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+ while (segmentsIterator.hasNext()) {
+ RemoteLogSegmentMetadata segmentMetadata =
segmentsIterator.next();
+ if (isRemoteSegmentWithinLeaderEpochs(segmentMetadata,
logEndOffset, epochEntries)) {
+ remoteLogSizeBytes +=
segmentMetadata.segmentSizeInBytes();
+ }
+ }
+ }
+
+ // This is the total size of segments in local log that have
their base-offset > local-log-start-offset
+ // and size of the segments in remote storage which have their
end-offset < local-log-start-offset.
+ long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes;
+ if (totalSize > retentionSize) {
+ long remainingBreachedSize = totalSize - retentionSize;
+ RetentionSizeData retentionSizeData = new
RetentionSizeData(retentionSize, remainingBreachedSize);
+ return Optional.of(retentionSizeData);
+ }
+ }
+
+ return Optional.empty();
+ }
+
public String toString() {
return this.getClass().toString() + "[" + topicIdPartition + "]";
}
}
+ /**
+ * Returns true if the remote segment's epoch/offsets are within the
leader epoch lineage of the partition.
+ * The constraints here are as follows:
+ * - The segment's first epoch's offset should be more than or equal to
the respective leader epoch's offset in the partition leader epoch lineage.
+ * - The segment's end offset should be less than or equal to the
respective leader epoch's offset in the partition leader epoch lineage.
+ * - The segment's epoch lineage(epoch and offset) should be same as
leader epoch lineage((epoch and offset)) except
+ * for the first and the last epochs in the segment.
+ *
+ * @param segmentMetadata The remote segment metadata to be validated.
+ * @param logEndOffset The log end offset of the partition.
+ * @param leaderEpochs The leader epoch lineage of the partition.
+ * @return true if the remote segment's epoch/offsets are within the
leader epoch lineage of the partition.
+ */
+ public static boolean
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
+ long logEndOffset,
+
NavigableMap<Integer, Long> leaderEpochs) {
+ long segmentEndOffset = segmentMetadata.endOffset();
+ NavigableMap<Integer, Long> segmentLeaderEpochs =
segmentMetadata.segmentLeaderEpochs();
+ // Check for out of bound epochs between segment epochs and current
leader epochs.
+ Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
+ Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
+ if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch >
leaderEpochs.lastKey()) {
+ LOGGER.debug("Remote segment {} is not within the partition leader
epoch lineage. Remote segment epochs: {} and partition leader epochs: {}",
Review Comment:
Should we log partition info as below did here?
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -667,11 +675,323 @@ public void run() {
}
}
+ public void handleLogStartOffsetUpdate(TopicPartition topicPartition,
long remoteLogStartOffset) {
+ if (isLeader()) {
+ logger.debug("Updating {} with remoteLogStartOffset: {}",
topicPartition, remoteLogStartOffset);
+ updateRemoteLogStartOffset.accept(topicPartition,
remoteLogStartOffset);
+ }
+ }
+
+ class RemoteLogRetentionHandler {
+
+ private final Optional<RetentionSizeData> retentionSizeData;
+ private final Optional<RetentionTimeData> retentionTimeData;
+
+ private long remainingBreachedSize;
+
+ private OptionalLong logStartOffset = OptionalLong.empty();
+
+ public RemoteLogRetentionHandler(Optional<RetentionSizeData>
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+ this.retentionSizeData = retentionSizeData;
+ this.retentionTimeData = retentionTimeData;
+ remainingBreachedSize = retentionSizeData.map(sizeData ->
sizeData.remainingBreachedSize).orElse(0L);
+ }
+
+ private boolean
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (!retentionSizeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> {
+ // Assumption that segments contain size >= 0
+ if (remainingBreachedSize > 0) {
+ long remainingBytes = remainingBreachedSize -
x.segmentSizeInBytes();
+ if (remainingBytes >= 0) {
+ remainingBreachedSize = remainingBytes;
+ return true;
+ }
+ }
+
+ return false;
+ });
+ if (isSegmentDeleted) {
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention size {} breach. Log size after deletion will be {}.",
+ metadata.remoteLogSegmentId(),
retentionSizeData.get().retentionSize, remainingBreachedSize +
retentionSizeData.get().retentionSize);
+ }
+ return isSegmentDeleted;
+ }
+
+ public boolean
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (!retentionTimeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+ x -> x.maxTimestampMs() <=
retentionTimeData.get().cleanupUntilMs);
+ if (isSegmentDeleted) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ // It is fine to have logStartOffset as
`metadata.endOffset() + 1` as the segment offset intervals
+ // are ascending with in an epoch.
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention time {}ms breach based on the largest record timestamp in the
segment",
+ metadata.remoteLogSegmentId(),
retentionTimeData.get().retentionMs);
+ }
+ return isSegmentDeleted;
+ }
+
+ private boolean
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long
startOffset)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> startOffset > x.endOffset());
+ if (isSegmentDeleted && retentionSizeData.isPresent()) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ logger.info("Deleted remote log segment {} due to log
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+ }
+
+ // No need to update the logStartOffset.
+ return isSegmentDeleted;
+ }
+
+ // It removes the segments beyond the current leader's earliest
epoch. Those segments are considered as
+ // unreferenced because they are not part of the current leader
epoch lineage.
+ private boolean
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
RemoteLogSegmentMetadata metadata) throws RemoteStorageException,
ExecutionException, InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <
earliestEpochEntry.epoch));
+ if (isSegmentDeleted) {
+ logger.info("Deleted remote log segment {} due to leader
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and
segmentEpochs: {}",
+ metadata.remoteLogSegmentId(), earliestEpochEntry,
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+ }
+
+ // No need to update the log-start-offset as these
epochs/offsets are earlier to that value.
+ return isSegmentDeleted;
+ }
+
+ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (predicate.test(segmentMetadata)) {
+ // Publish delete segment started event.
+ remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+ new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+ // Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+ // Publish delete segment finished event.
+ remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+ new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+ return true;
+ }
+
+ return false;
+ }
+
+ }
+
+ private void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (isCancelled() || !isLeader()) {
+ logger.info("Returning from remote log segments cleanup as the
task state is changed");
+ return;
+ }
+
+ // Cleanup remote log segments and update the log start offset if
applicable.
+ final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+ if (!segmentMetadataIter.hasNext()) {
+ logger.debug("No remote log segments available on remote
storage for partition: {}", topicIdPartition);
+ return;
+ }
+
+ final Optional<UnifiedLog> logOptional =
fetchLog.apply(topicIdPartition.topicPartition());
+ if (!logOptional.isPresent()) {
+ logger.debug("No UnifiedLog instance available for partition:
{}", topicIdPartition);
+ return;
+ }
+
+ final UnifiedLog log = logOptional.get();
+ final Option<LeaderEpochFileCache> leaderEpochCacheOption =
log.leaderEpochCache();
+ if (leaderEpochCacheOption.isEmpty()) {
+ logger.debug("No leader epoch cache available for partition:
{}", topicIdPartition);
+ return;
+ }
+
+ final Set<Integer> epochsSet = new HashSet<>();
+ while (segmentMetadataIter.hasNext()) {
+ RemoteLogSegmentMetadata segmentMetadata =
segmentMetadataIter.next();
+
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+ }
+
+ // All the leader epochs in sorted order that exists in remote
storage
+ final List<Integer> remoteLeaderEpochs = new
ArrayList<>(epochsSet);
+ Collections.sort(remoteLeaderEpochs);
+
+ LeaderEpochFileCache leaderEpochCache =
leaderEpochCacheOption.get();
+ NavigableMap<Integer, Long> epochWithOffsets =
leaderEpochCache.epochWithOffsets();
+ Optional<EpochEntry> earliestEpochEntryOptional =
leaderEpochCache.earliestEntry();
+
+ Optional<RetentionSizeData> retentionSizeData =
buildRetentionSizeData(log.config().retentionSize,
+ log.onlyLocalLogSegmentsSize(), log.logEndOffset(),
epochWithOffsets);
+ Optional<RetentionTimeData> retentionTimeData =
buildRetentionTimeData(log.config().retentionMs);
+
+ RemoteLogRetentionHandler remoteLogRetentionHandler = new
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
+ Iterator<Integer> epochIterator =
epochWithOffsets.navigableKeySet().iterator();
+ boolean isSegmentDeleted = true;
+ while (isSegmentDeleted && epochIterator.hasNext()) {
+ Integer epoch = epochIterator.next();
+ Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+ while (isSegmentDeleted && segmentsIterator.hasNext()) {
+ if (isCancelled() || !isLeader()) {
+ logger.info("Returning from remote log segments
cleanup for the remaining segments as the task state is changed.");
Review Comment:
As commented above, there might be chances that the leadership change during
the segment deletion, I think we should update the log start offset before
exiting the `cleanupExpiredRemoteLogSegments` method since if there's no
deletion happened, the `logStartOffset` will be empty. WDYT?
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -979,13 +1010,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
localLog.checkIfMemoryMappedBufferClosed()
if (newLogStartOffset > logStartOffset) {
- updatedLogStartOffset = true
- updateLogStartOffset(newLogStartOffset)
- _localLogStartOffset = newLogStartOffset
- info(s"Incremented log start offset to $newLogStartOffset due to
$reason")
- leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
- producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
- maybeIncrementFirstUnstableOffset()
+ _localLogStartOffset = math.max(newLogStartOffset,
localLogStartOffset())
+
+ // it should always get updated if tiered-storage is not enabled.
+ if (!onlyLocalLogStartOffsetUpdate || !remoteLogEnabled()) {
+ updatedLogStartOffset = true
+ updateLogStartOffset(newLogStartOffset)
+ info(s"Incremented log start offset to $newLogStartOffset due to
$reason")
+ leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
+ producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
+ maybeIncrementFirstUnstableOffset()
+ } else {
+ info(s"Incrementing local log start offset to
${localLogStartOffset()}")
Review Comment:
Why don't we log `reason` here?
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -823,6 +826,108 @@ public RemoteLogMetadataManager
createRemoteLogMetadataManager() {
}
}
+ private static RemoteLogSegmentMetadata
createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map<Integer,
Long> segmentEpochs) {
+ return new RemoteLogSegmentMetadata(
+ new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(),
+ new TopicPartition("topic", 0)), Uuid.randomUuid()),
+ startOffset, endOffset,
+ 100000L,
+ 1,
+ 100000L,
+ 1000,
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
+ }
+
+ @Test
+ public void testRemoteSegmentWithinLeaderEpochs() {
Review Comment:
Nice test!
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -2207,7 +2267,7 @@ case class RetentionSizeBreach(log: UnifiedLog) extends
SegmentDeletionReason {
var size = log.size
toDelete.foreach { segment =>
size -= segment.size
- log.info(s"Deleting segment $segment due to retention size
${log.config.retentionSize} breach. Log size " +
+ log.info(s"Deleting segment $segment due to local log retention size
${UnifiedLog.localRetentionSize(log.config)} breach. Local log size " +
Review Comment:
nit: It's weird to see `local log retention size` when user is not enabled
the tiered storage. Could we add a if check to see if remote storage is enabled
or not and print the log accordingly?
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -667,11 +675,323 @@ public void run() {
}
}
+ public void handleLogStartOffsetUpdate(TopicPartition topicPartition,
long remoteLogStartOffset) {
+ if (isLeader()) {
+ logger.debug("Updating {} with remoteLogStartOffset: {}",
topicPartition, remoteLogStartOffset);
+ updateRemoteLogStartOffset.accept(topicPartition,
remoteLogStartOffset);
+ }
+ }
+
+ class RemoteLogRetentionHandler {
+
+ private final Optional<RetentionSizeData> retentionSizeData;
+ private final Optional<RetentionTimeData> retentionTimeData;
+
+ private long remainingBreachedSize;
+
+ private OptionalLong logStartOffset = OptionalLong.empty();
+
+ public RemoteLogRetentionHandler(Optional<RetentionSizeData>
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+ this.retentionSizeData = retentionSizeData;
+ this.retentionTimeData = retentionTimeData;
+ remainingBreachedSize = retentionSizeData.map(sizeData ->
sizeData.remainingBreachedSize).orElse(0L);
+ }
+
+ private boolean
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (!retentionSizeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> {
+ // Assumption that segments contain size >= 0
+ if (remainingBreachedSize > 0) {
+ long remainingBytes = remainingBreachedSize -
x.segmentSizeInBytes();
+ if (remainingBytes >= 0) {
+ remainingBreachedSize = remainingBytes;
+ return true;
+ }
+ }
+
+ return false;
+ });
+ if (isSegmentDeleted) {
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention size {} breach. Log size after deletion will be {}.",
+ metadata.remoteLogSegmentId(),
retentionSizeData.get().retentionSize, remainingBreachedSize +
retentionSizeData.get().retentionSize);
+ }
+ return isSegmentDeleted;
+ }
+
+ public boolean
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (!retentionTimeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+ x -> x.maxTimestampMs() <=
retentionTimeData.get().cleanupUntilMs);
+ if (isSegmentDeleted) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ // It is fine to have logStartOffset as
`metadata.endOffset() + 1` as the segment offset intervals
+ // are ascending with in an epoch.
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention time {}ms breach based on the largest record timestamp in the
segment",
+ metadata.remoteLogSegmentId(),
retentionTimeData.get().retentionMs);
+ }
+ return isSegmentDeleted;
+ }
+
+ private boolean
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long
startOffset)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> startOffset > x.endOffset());
+ if (isSegmentDeleted && retentionSizeData.isPresent()) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ logger.info("Deleted remote log segment {} due to log
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+ }
+
+ // No need to update the logStartOffset.
+ return isSegmentDeleted;
+ }
+
+ // It removes the segments beyond the current leader's earliest
epoch. Those segments are considered as
+ // unreferenced because they are not part of the current leader
epoch lineage.
+ private boolean
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
RemoteLogSegmentMetadata metadata) throws RemoteStorageException,
ExecutionException, InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <
earliestEpochEntry.epoch));
+ if (isSegmentDeleted) {
+ logger.info("Deleted remote log segment {} due to leader
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and
segmentEpochs: {}",
+ metadata.remoteLogSegmentId(), earliestEpochEntry,
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+ }
+
+ // No need to update the log-start-offset as these
epochs/offsets are earlier to that value.
+ return isSegmentDeleted;
+ }
+
+ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (predicate.test(segmentMetadata)) {
+ // Publish delete segment started event.
+ remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+ new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+ // Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+ // Publish delete segment finished event.
+ remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+ new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+ return true;
+ }
+
+ return false;
+ }
+
+ }
+
+ private void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (isCancelled() || !isLeader()) {
+ logger.info("Returning from remote log segments cleanup as the
task state is changed");
+ return;
+ }
+
+ // Cleanup remote log segments and update the log start offset if
applicable.
+ final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+ if (!segmentMetadataIter.hasNext()) {
+ logger.debug("No remote log segments available on remote
storage for partition: {}", topicIdPartition);
+ return;
+ }
+
+ final Optional<UnifiedLog> logOptional =
fetchLog.apply(topicIdPartition.topicPartition());
+ if (!logOptional.isPresent()) {
+ logger.debug("No UnifiedLog instance available for partition:
{}", topicIdPartition);
+ return;
+ }
+
+ final UnifiedLog log = logOptional.get();
+ final Option<LeaderEpochFileCache> leaderEpochCacheOption =
log.leaderEpochCache();
+ if (leaderEpochCacheOption.isEmpty()) {
+ logger.debug("No leader epoch cache available for partition:
{}", topicIdPartition);
+ return;
+ }
+
+ final Set<Integer> epochsSet = new HashSet<>();
+ while (segmentMetadataIter.hasNext()) {
+ RemoteLogSegmentMetadata segmentMetadata =
segmentMetadataIter.next();
+
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+ }
+
+ // All the leader epochs in sorted order that exists in remote
storage
+ final List<Integer> remoteLeaderEpochs = new
ArrayList<>(epochsSet);
+ Collections.sort(remoteLeaderEpochs);
+
+ LeaderEpochFileCache leaderEpochCache =
leaderEpochCacheOption.get();
+ NavigableMap<Integer, Long> epochWithOffsets =
leaderEpochCache.epochWithOffsets();
+ Optional<EpochEntry> earliestEpochEntryOptional =
leaderEpochCache.earliestEntry();
+
+ Optional<RetentionSizeData> retentionSizeData =
buildRetentionSizeData(log.config().retentionSize,
+ log.onlyLocalLogSegmentsSize(), log.logEndOffset(),
epochWithOffsets);
+ Optional<RetentionTimeData> retentionTimeData =
buildRetentionTimeData(log.config().retentionMs);
+
+ RemoteLogRetentionHandler remoteLogRetentionHandler = new
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
+ Iterator<Integer> epochIterator =
epochWithOffsets.navigableKeySet().iterator();
+ boolean isSegmentDeleted = true;
+ while (isSegmentDeleted && epochIterator.hasNext()) {
+ Integer epoch = epochIterator.next();
+ Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+ while (isSegmentDeleted && segmentsIterator.hasNext()) {
+ if (isCancelled() || !isLeader()) {
+ logger.info("Returning from remote log segments
cleanup for the remaining segments as the task state is changed.");
+ return;
+ }
+ RemoteLogSegmentMetadata metadata =
segmentsIterator.next();
+
+ // check whether the segment contains the required epoch
range with in the current leader epoch lineage.
+ if (isRemoteSegmentWithinLeaderEpochs(metadata,
log.logEndOffset(), epochWithOffsets)) {
+ isSegmentDeleted =
+
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
+
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
+
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata,
log.logStartOffset());
+ }
+ }
+ }
+
+ // Remove the remote log segments whose segment-leader-epochs are
lesser than the earliest-epoch known
+ // to the leader. This will remove the unreferenced segments in
the remote storage.
+ if (earliestEpochEntryOptional.isPresent()) {
+ EpochEntry earliestEpochEntry =
earliestEpochEntryOptional.get();
+ Iterator<Integer> epochsToClean =
remoteLeaderEpochs.stream().filter(x -> x <
earliestEpochEntry.epoch).iterator();
+ while (epochsToClean.hasNext()) {
+ int epoch = epochsToClean.next();
+ Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+ while (segmentsToBeCleaned.hasNext()) {
+ if (isCancelled() || !isLeader()) {
+ return;
+ }
+ // No need to update the log-start-offset even though
the segment is deleted as these epochs/offsets are earlier to that value.
+
remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
segmentsToBeCleaned.next());
+ }
+ }
+ }
+
+ // Update log start offset with the computed value after retention
cleanup is done
+ remoteLogRetentionHandler.logStartOffset.ifPresent(offset ->
handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));
+ }
+
+ private Optional<RetentionTimeData> buildRetentionTimeData(long
retentionMs) {
+ return retentionMs > -1
+ ? Optional.of(new RetentionTimeData(retentionMs,
time.milliseconds() - retentionMs))
+ : Optional.empty();
+ }
+
+ private Optional<RetentionSizeData> buildRetentionSizeData(long
retentionSize,
+ long
onlyLocalLogSegmentsSize,
+ long
logEndOffset,
+
NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException {
+ if (retentionSize > -1) {
+ long remoteLogSizeBytes = 0L;
+ for (Integer epoch : epochEntries.navigableKeySet()) {
+ // remoteLogSize(topicIdPartition, epochEntry.epoch) may
not be completely accurate as the remote
+ // log size may be computed for all the segments but not
for segments with in the current
+ // partition's leader epoch lineage. Better to revisit
this API.
+ // remoteLogSizeBytes +=
remoteLogMetadataManager.remoteLogSize(topicIdPartition, epochEntry.epoch);
+ Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+ while (segmentsIterator.hasNext()) {
+ RemoteLogSegmentMetadata segmentMetadata =
segmentsIterator.next();
+ if (isRemoteSegmentWithinLeaderEpochs(segmentMetadata,
logEndOffset, epochEntries)) {
+ remoteLogSizeBytes +=
segmentMetadata.segmentSizeInBytes();
+ }
+ }
+ }
+
+ // This is the total size of segments in local log that have
their base-offset > local-log-start-offset
+ // and size of the segments in remote storage which have their
end-offset < local-log-start-offset.
+ long totalSize = onlyLocalLogSegmentsSize + remoteLogSizeBytes;
+ if (totalSize > retentionSize) {
+ long remainingBreachedSize = totalSize - retentionSize;
+ RetentionSizeData retentionSizeData = new
RetentionSizeData(retentionSize, remainingBreachedSize);
+ return Optional.of(retentionSizeData);
+ }
+ }
+
+ return Optional.empty();
+ }
+
public String toString() {
return this.getClass().toString() + "[" + topicIdPartition + "]";
}
}
+ /**
+ * Returns true if the remote segment's epoch/offsets are within the
leader epoch lineage of the partition.
+ * The constraints here are as follows:
+ * - The segment's first epoch's offset should be more than or equal to
the respective leader epoch's offset in the partition leader epoch lineage.
+ * - The segment's end offset should be less than or equal to the
respective leader epoch's offset in the partition leader epoch lineage.
+ * - The segment's epoch lineage(epoch and offset) should be same as
leader epoch lineage((epoch and offset)) except
+ * for the first and the last epochs in the segment.
+ *
+ * @param segmentMetadata The remote segment metadata to be validated.
+ * @param logEndOffset The log end offset of the partition.
+ * @param leaderEpochs The leader epoch lineage of the partition.
+ * @return true if the remote segment's epoch/offsets are within the
leader epoch lineage of the partition.
+ */
+ public static boolean
isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata segmentMetadata,
+ long logEndOffset,
+
NavigableMap<Integer, Long> leaderEpochs) {
+ long segmentEndOffset = segmentMetadata.endOffset();
+ NavigableMap<Integer, Long> segmentLeaderEpochs =
segmentMetadata.segmentLeaderEpochs();
+ // Check for out of bound epochs between segment epochs and current
leader epochs.
+ Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
+ Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
+ if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch >
leaderEpochs.lastKey()) {
+ LOGGER.debug("Remote segment {} is not within the partition leader
epoch lineage. Remote segment epochs: {} and partition leader epochs: {}",
+ segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs,
leaderEpochs);
+ return false;
+ }
+
+ for (Map.Entry<Integer, Long> entry : segmentLeaderEpochs.entrySet()) {
+ int epoch = entry.getKey();
+ long offset = entry.getValue();
+
+ // If segment's epoch does not exist in the leader epoch lineage
then it is not a valid segment.
+ if (!leaderEpochs.containsKey(epoch)) {
+ LOGGER.debug("[{}] Remote segment {}'s epoch {} is not within
the leader epoch lineage. Remote segment epochs: {} and partition leader
epochs: {}",
+ segmentMetadata.topicIdPartition(),
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
+ return false;
+ }
+
+ // Segment's first epoch's offset can be more than or equal to the
respective leader epoch's offset.
+ if (epoch == segmentFirstEpoch && offset <
leaderEpochs.get(epoch)) {
+ LOGGER.debug("[{}] Remote segment {}'s first epoch {}'s
offset is more than leader epoch's offset {}.",
+ segmentMetadata.topicIdPartition(),
segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch));
Review Comment:
The comment is not clear:
`// Segment's first epoch's offset [should] be more than or equal to the
respective leader epoch's offset.`
The log is not correct:
`"[{}] Remote segment {}'s first epoch {}'s offset is [less] than leader
epoch's offset {}.",`
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -152,16 +152,42 @@ class UnifiedLog(@volatile var logStartOffset: Long,
locally {
initializePartitionMetadata()
updateLogStartOffset(logStartOffset)
+ updateLocalLogStartOffset(math.max(logStartOffset,
localLog.segments.firstSegmentBaseOffset.getOrElse(0L)))
+ if (!remoteLogEnabled())
+ logStartOffset = localLogStartOffset()
maybeIncrementFirstUnstableOffset()
initializeTopicId()
logOffsetsListener.onHighWatermarkUpdated(highWatermarkMetadata.messageOffset)
+
+ info(s"Completed load of log with ${localLog.segments.numberOfSegments}
segments, local log start offset ${localLogStartOffset()} and " +
+ s"log end offset $logEndOffset")
}
def setLogOffsetsListener(listener: LogOffsetsListener): Unit = {
logOffsetsListener = listener
}
+ private def updateLocalLogStartOffset(offset: Long): Unit = {
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]