satishd commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1289577043
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -696,11 +704,327 @@ public void run() {
}
}
+ public void handleLogStartOffsetUpdate(TopicPartition topicPartition,
long remoteLogStartOffset) {
+ if (isLeader()) {
+ logger.debug("Updating {} with remoteLogStartOffset: {}",
topicPartition, remoteLogStartOffset);
+ updateRemoteLogStartOffset.accept(topicPartition,
remoteLogStartOffset);
+ }
+ }
+
+ class RemoteLogRetentionHandler {
+
+ private final Optional<RetentionSizeData> retentionSizeData;
+ private final Optional<RetentionTimeData> retentionTimeData;
+
+ private long remainingBreachedSize;
+
+ private OptionalLong logStartOffset = OptionalLong.empty();
+
+ public RemoteLogRetentionHandler(Optional<RetentionSizeData>
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+ this.retentionSizeData = retentionSizeData;
+ this.retentionTimeData = retentionTimeData;
+ remainingBreachedSize = retentionSizeData.map(sizeData ->
sizeData.remainingBreachedSize).orElse(0L);
+ }
+
+ private boolean
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (!retentionSizeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> {
+ // Assumption that segments contain size >= 0
+ if (remainingBreachedSize > 0) {
+ long remainingBytes = remainingBreachedSize -
x.segmentSizeInBytes();
+ if (remainingBytes >= 0) {
+ remainingBreachedSize = remainingBytes;
+ return true;
+ }
+ }
+
+ return false;
+ });
+ if (isSegmentDeleted) {
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention size {} breach. Log size after deletion will be {}.",
+ metadata.remoteLogSegmentId(),
retentionSizeData.get().retentionSize, remainingBreachedSize +
retentionSizeData.get().retentionSize);
+ }
+ return isSegmentDeleted;
+ }
+
+ public boolean
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (!retentionTimeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+ x -> x.maxTimestampMs() <=
retentionTimeData.get().cleanupUntilMs);
+ if (isSegmentDeleted) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ // It is fine to have logStartOffset as
`metadata.endOffset() + 1` as the segment offset intervals
+ // are ascending with in an epoch.
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention time {}ms breach based on the largest record timestamp in the
segment",
+ metadata.remoteLogSegmentId(),
retentionTimeData.get().retentionMs);
+ }
+ return isSegmentDeleted;
+ }
+
+ private boolean
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long
startOffset)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> startOffset > x.endOffset());
+ if (isSegmentDeleted && retentionSizeData.isPresent()) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ logger.info("Deleted remote log segment {} due to log
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+ }
+
+ return isSegmentDeleted;
+ }
+
+ // It removes the segments beyond the current leader's earliest
epoch. Those segments are considered as
+ // unreferenced because they are not part of the current leader
epoch lineage.
+ private boolean
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
RemoteLogSegmentMetadata metadata) throws RemoteStorageException,
ExecutionException, InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <
earliestEpochEntry.epoch));
+ if (isSegmentDeleted) {
+ logger.info("Deleted remote log segment {} due to leader
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and
segmentEpochs: {}",
+ metadata.remoteLogSegmentId(), earliestEpochEntry,
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+ }
+
+ // No need to update the log-start-offset as these
epochs/offsets are earlier to that value.
+ return isSegmentDeleted;
+ }
+
+ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (predicate.test(segmentMetadata)) {
+ logger.info("Deleting remote log segment {}",
segmentMetadata.remoteLogSegmentId());
+ // Publish delete segment started event.
+ remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+ new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+ segmentMetadata.customMetadata(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+ // Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+ // Publish delete segment finished event.
+ remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+ new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+ segmentMetadata.customMetadata(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+ logger.info("Deleted remote log segment {}",
segmentMetadata.remoteLogSegmentId());
+ return true;
+ }
+
+ return false;
+ }
+
+ }
+
+ private void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (isCancelled() || !isLeader()) {
+ logger.info("Returning from remote log segments cleanup as the
task state is changed");
+ return;
+ }
+
+ // Cleanup remote log segments and update the log start offset if
applicable.
+ final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+ if (!segmentMetadataIter.hasNext()) {
+ logger.debug("No remote log segments available on remote
storage for partition: {}", topicIdPartition);
+ return;
+ }
+
+ final Optional<UnifiedLog> logOptional =
fetchLog.apply(topicIdPartition.topicPartition());
+ if (!logOptional.isPresent()) {
+ logger.debug("No UnifiedLog instance available for partition:
{}", topicIdPartition);
+ return;
+ }
+
+ final UnifiedLog log = logOptional.get();
+ final Option<LeaderEpochFileCache> leaderEpochCacheOption =
log.leaderEpochCache();
+ if (leaderEpochCacheOption.isEmpty()) {
+ logger.debug("No leader epoch cache available for partition:
{}", topicIdPartition);
+ return;
+ }
+
+ final Set<Integer> epochsSet = new HashSet<>();
+ // Good to have an API from RLMM to get all the remote leader
epochs of all the segments of a partition
+ // instead of going through all the segments and building it here.
+ while (segmentMetadataIter.hasNext()) {
+ RemoteLogSegmentMetadata segmentMetadata =
segmentMetadataIter.next();
+
epochsSet.addAll(segmentMetadata.segmentLeaderEpochs().keySet());
+ }
+
+ // All the leader epochs in sorted order that exists in remote
storage
+ final List<Integer> remoteLeaderEpochs = new
ArrayList<>(epochsSet);
+ Collections.sort(remoteLeaderEpochs);
+
+ LeaderEpochFileCache leaderEpochCache =
leaderEpochCacheOption.get();
+ NavigableMap<Integer, Long> epochWithOffsets =
leaderEpochCache.epochWithOffsets();
+ Optional<EpochEntry> earliestEpochEntryOptional =
leaderEpochCache.earliestEntry();
+
+ Optional<RetentionSizeData> retentionSizeData =
buildRetentionSizeData(log.config().retentionSize,
+ log.onlyLocalLogSegmentsSize(), log.logEndOffset(),
epochWithOffsets);
+ Optional<RetentionTimeData> retentionTimeData =
buildRetentionTimeData(log.config().retentionMs);
+
+ RemoteLogRetentionHandler remoteLogRetentionHandler = new
RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
+ Iterator<Integer> epochIterator =
epochWithOffsets.navigableKeySet().iterator();
+ boolean isSegmentDeleted = true;
+ while (isSegmentDeleted && epochIterator.hasNext()) {
+ Integer epoch = epochIterator.next();
+ Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
+ while (isSegmentDeleted && segmentsIterator.hasNext()) {
+ if (isCancelled() || !isLeader()) {
+ logger.info("Returning from remote log segments
cleanup for the remaining segments as the task state is changed.");
+ return;
+ }
+ RemoteLogSegmentMetadata metadata =
segmentsIterator.next();
+
+ // check whether the segment contains the required epoch
range with in the current leader epoch lineage.
+ if (isRemoteSegmentWithinLeaderEpochs(metadata,
log.logEndOffset(), epochWithOffsets)) {
+ isSegmentDeleted =
+
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
+
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
+
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata,
log.logStartOffset());
+ }
+ }
+ }
+
+ // Remove the remote log segments whose segment-leader-epochs are
less than the earliest-epoch known
Review Comment:
This covers scenarios where unclean leader election happens and the remote
storage contains segments that are earlier to the current leader's
leader-epoch-lineage.
For ex:
The current leader has the current leader-epoch-cache.
```
-----------------------------
leader-epoch | start-offset |
-----------------------------
3 700
4 780
6 900
7 990
-----------------------------
```
But the earlier broker which got replaced with a new broker which has the
current leader's leader-epoch lineage.
```
-----------------------------
leader-epoch | start-offset |
-----------------------------
0 0
1 180
2 400
-----------------------------
```
But these segments did not expire retention and they were not deleted in the
remote storage. But these leader epochs are not there in the current leader's
leader epoch as it was chosen with unclean leader election. In this case, we
need to remove the segments, that do not exist beyond the current leader epoch
lineage. Otherwise, they will never be cleaned up and will continue to
accumulate in remote storage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]