jeqo commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1247799637
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -152,19 +156,23 @@ public class RemoteLogManager implements Closeable {
* @param time Time instance.
* @param clusterId The cluster id.
* @param fetchLog function to get UnifiedLog instance for a given topic.
+ * @param updateRemoteLogStartOffset function to update the
log-start-offset for a given topic partition.
*/
public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
int brokerId,
String logDir,
String clusterId,
Time time,
- Function<TopicPartition, Optional<UnifiedLog>>
fetchLog) {
+ Function<TopicPartition, Optional<UnifiedLog>>
fetchLog,
+ BiConsumer<TopicPartition, Long>
updateRemoteLogStartOffset) {
Review Comment:
nit:
```suggestion
BiConsumer<TopicPartition, Long>
updateLogStartOffsetFromRemoteTier) {
```
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -634,6 +642,241 @@ public void run() {
}
}
+ public void handleLogStartOffsetUpdate(TopicPartition topicPartition,
long remoteLogStartOffset) {
+ if (isLeader()) {
+ logger.debug("Updating {} with remoteLogStartOffset: {}",
topicPartition, remoteLogStartOffset);
+ updateRemoteLogStartOffset.accept(topicPartition,
remoteLogStartOffset);
+ }
+ }
+
+ class RemoteLogRetentionHandler {
+
+ private final Optional<RetentionSizeData> retentionSizeData;
+ private final Optional<RetentionTimeData> retentionTimeData;
+
+ private long remainingBreachedSize;
+
+ private OptionalLong logStartOffset = OptionalLong.empty();
+
+ public RemoteLogRetentionHandler(Optional<RetentionSizeData>
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+ this.retentionSizeData = retentionSizeData;
+ this.retentionTimeData = retentionTimeData;
+ remainingBreachedSize = retentionSizeData.map(sizeData ->
sizeData.remainingBreachedSize).orElse(0L);
+ }
+
+ private boolean
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (!retentionSizeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> {
+ // Assumption that segments contain size >= 0
+ if (remainingBreachedSize > 0) {
+ long remainingBytes = remainingBreachedSize -
x.segmentSizeInBytes();
+ if (remainingBytes >= 0) {
+ remainingBreachedSize = remainingBytes;
+ return true;
+ }
+ }
+
+ return false;
+ });
+ if (isSegmentDeleted) {
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention size {} breach. Log size after deletion will be {}.",
+ metadata.remoteLogSegmentId(),
retentionSizeData.get().retentionSize, remainingBreachedSize +
retentionSizeData.get().retentionSize);
+ }
+ return isSegmentDeleted;
+ }
+
+ public boolean
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (!retentionTimeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+ x -> x.maxTimestampMs() <=
retentionTimeData.get().cleanupUntilMs);
+ if (isSegmentDeleted) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ // It is fine to have logStartOffset as
`metadata.endOffset() + 1` as the segment offset intervals
+ // are ascending with in an epoch.
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention time {}ms breach based on the largest record timestamp in the
segment",
+ metadata.remoteLogSegmentId(),
retentionTimeData.get().retentionMs);
+ }
+ return isSegmentDeleted;
+ }
+
+ private boolean
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long
startOffset)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> startOffset > x.endOffset());
+ if (isSegmentDeleted && retentionSizeData.isPresent()) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ logger.info("Deleted remote log segment {} due to log
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+ }
+
+ // No need to update the logStartOffset.
+ return isSegmentDeleted;
+ }
+
+ // It removes the segments beyond the current leader's earliest
epoch. Those segments are considered as
+ // unreferenced because they are not part of the current leader
epoch lineage.
+ private boolean
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
RemoteLogSegmentMetadata metadata) throws RemoteStorageException,
ExecutionException, InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <
earliestEpochEntry.epoch));
+ if (isSegmentDeleted) {
+ logger.info("Deleted remote log segment {} due to leader
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and
segmentEpochs: {}",
+ metadata.remoteLogSegmentId(), earliestEpochEntry,
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+ }
+
+ // No need to update the log-start-offset as these
epochs/offsets are earlier to that value.
+ return isSegmentDeleted;
+ }
+
+ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (predicate.test(segmentMetadata)) {
+ // Publish delete segment started event.
+ remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+ new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();
+
+ // Delete the segment in remote storage.
+
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);
+
+ // Publish delete segment finished event.
+ remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
+ new
RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
+ return true;
+ }
+
+ return false;
+ }
+
+ }
+
+ private void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (isCancelled() || !isLeader()) {
+ logger.info("Returning from remote log segments cleanup as the
task state is changed");
+ return;
+ }
+
+ // Cleanup remote log segments and update the log start offset if
applicable.
+ final Iterator<RemoteLogSegmentMetadata> segmentMetadataIter =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition);
+ if (!segmentMetadataIter.hasNext()) {
+ logger.debug("No remote log segments available on remote
storage for partition: {}", topicIdPartition);
+ return;
+ }
+
+ final Optional<UnifiedLog> logOptional =
fetchLog.apply(topicIdPartition.topicPartition());
+ if (!logOptional.isPresent()) {
+ logger.debug("No UnifiedLog instance available for partition:
{}", topicIdPartition);
+ return;
+ }
+
+ final UnifiedLog log = logOptional.get();
+ final Option<LeaderEpochFileCache> leaderEpochCacheOption =
log.leaderEpochCache();
+ if (leaderEpochCacheOption.isEmpty()) {
+ return;
Review Comment:
maybe worth adding a log message here stating why cleanup is not happening?
or maybe just a comment explaining why this scenario may never happen given the
low prob that recordVersion < 2 is used.
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -581,11 +588,18 @@ public void run() {
if (isLeader()) {
// Copy log segments to remote storage
copyLogSegmentsToRemote();
+ // Cleanup/delete expired remote log segments
+ cleanupExpiredRemoteLogSegments();
Review Comment:
@satishd , could you elaborate a bit more what do you mean by
> Respective garbage collectors in those storages will take care of deleting
the data asynchronously.
?
Is this relying on some specific storage backend implementation?
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -618,6 +625,230 @@ public void run() {
}
}
+ public void handleLogStartOffsetUpdate(TopicPartition topicPartition,
long remoteLogStartOffset) {
+ if (isLeader()) {
Review Comment:
> even if we are not the leader at this stage, we have deleted the logs in
remote.
If I'm reading the call path correctly, this is not the case.
`handleLogStartOffsetUpdate` function is called only at the end of
`cleanupExpiredRemoteLogSegments` that filters out calls from followers.
I guess we could either remote the `isLeader` validation here, or move this
logic within the lambda itself?
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -634,6 +642,241 @@ public void run() {
}
}
+ public void handleLogStartOffsetUpdate(TopicPartition topicPartition,
long remoteLogStartOffset) {
+ if (isLeader()) {
+ logger.debug("Updating {} with remoteLogStartOffset: {}",
topicPartition, remoteLogStartOffset);
+ updateRemoteLogStartOffset.accept(topicPartition,
remoteLogStartOffset);
+ }
+ }
+
+ class RemoteLogRetentionHandler {
+
+ private final Optional<RetentionSizeData> retentionSizeData;
+ private final Optional<RetentionTimeData> retentionTimeData;
+
+ private long remainingBreachedSize;
+
+ private OptionalLong logStartOffset = OptionalLong.empty();
+
+ public RemoteLogRetentionHandler(Optional<RetentionSizeData>
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {
+ this.retentionSizeData = retentionSizeData;
+ this.retentionTimeData = retentionTimeData;
+ remainingBreachedSize = retentionSizeData.map(sizeData ->
sizeData.remainingBreachedSize).orElse(0L);
+ }
+
+ private boolean
deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws
RemoteStorageException, ExecutionException, InterruptedException {
+ if (!retentionSizeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> {
+ // Assumption that segments contain size >= 0
+ if (remainingBreachedSize > 0) {
+ long remainingBytes = remainingBreachedSize -
x.segmentSizeInBytes();
+ if (remainingBytes >= 0) {
+ remainingBreachedSize = remainingBytes;
+ return true;
+ }
+ }
+
+ return false;
+ });
+ if (isSegmentDeleted) {
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention size {} breach. Log size after deletion will be {}.",
+ metadata.remoteLogSegmentId(),
retentionSizeData.get().retentionSize, remainingBreachedSize +
retentionSizeData.get().retentionSize);
+ }
+ return isSegmentDeleted;
+ }
+
+ public boolean
deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (!retentionTimeData.isPresent()) {
+ return false;
+ }
+
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
+ x -> x.maxTimestampMs() <=
retentionTimeData.get().cleanupUntilMs);
+ if (isSegmentDeleted) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ // It is fine to have logStartOffset as
`metadata.endOffset() + 1` as the segment offset intervals
+ // are ascending with in an epoch.
+ logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
+ logger.info("Deleted remote log segment {} due to
retention time {}ms breach based on the largest record timestamp in the
segment",
+ metadata.remoteLogSegmentId(),
retentionTimeData.get().retentionMs);
+ }
+ return isSegmentDeleted;
+ }
+
+ private boolean
deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long
startOffset)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
-> startOffset > x.endOffset());
+ if (isSegmentDeleted && retentionSizeData.isPresent()) {
+ remainingBreachedSize = Math.max(0, remainingBreachedSize
- metadata.segmentSizeInBytes());
+ logger.info("Deleted remote log segment {} due to log
start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
+ }
+
+ // No need to update the logStartOffset.
+ return isSegmentDeleted;
+ }
+
+ // It removes the segments beyond the current leader's earliest
epoch. Those segments are considered as
+ // unreferenced because they are not part of the current leader
epoch lineage.
+ private boolean
deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
RemoteLogSegmentMetadata metadata) throws RemoteStorageException,
ExecutionException, InterruptedException {
+ boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x
->
+
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <
earliestEpochEntry.epoch));
+ if (isSegmentDeleted) {
+ logger.info("Deleted remote log segment {} due to leader
epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and
segmentEpochs: {}",
+ metadata.remoteLogSegmentId(), earliestEpochEntry,
metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
+ }
+
+ // No need to update the log-start-offset as these
epochs/offsets are earlier to that value.
+ return isSegmentDeleted;
+ }
+
+ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata
segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
+ throws RemoteStorageException, ExecutionException,
InterruptedException {
+ if (predicate.test(segmentMetadata)) {
Review Comment:
Could we add a log info here similar to copy?
```suggestion
if (predicate.test(segmentMetadata)) {
logger.info("Deleting remote log segment {}",
metadata.remoteSegmentId());
```
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -922,6 +947,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
}
+ private def maybeIncrementLocalLogStartOffset(newLogStartOffset: Long,
reason: LogStartOffsetIncrementReason): Unit = {
+ maybeIncrementLogStartOffset(newLogStartOffset, reason,
onlyLocalLogStartOffsetUpdate = true)
+ }
Review Comment:
found this a bit confusing. In main operation
`onlyLocalLogStartOffsetUpdate` is false by default, but here we are overriding
with `onlyLocalLogStartOffsetUpdate` as true, and methods signature are mainly
the same. Wouldn't be clearer to use the default method with
`onlyLocalLogStartOffsetUpdate=true` instead of creating this private method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]