This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 24ed31739e2 KAFKA-16853: Split RemoteLogManagerScheduledThreadPool
(#16502)
24ed31739e2 is described below
commit 24ed31739e2efc337f53040366c38dedae543308
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Wed Jul 17 16:43:23 2024 +0530
KAFKA-16853: Split RemoteLogManagerScheduledThreadPool (#16502)
As part of KIP-950, we want to split the
RemoteLogManagerScheduledThreadPool into separate thread pools (one for copy
and another for expiration). In this change, we are splitting it into three
thread pools (one for copy, one for expiration, and another one for follower).
We are reusing the same thread pool configuration for all three thread pools.
We can introduce new user-facing configurations later.
Reviewers: Kamal Chandraprakash<[email protected]>, Luke Chen
<[email protected]>, Christo Lolov <[email protected]>, Satish Duggana
<[email protected]>
---
checkstyle/suppressions.xml | 1 +
.../java/kafka/log/remote/RemoteLogManager.java | 346 +++++++++++++--------
.../kafka/log/remote/RemoteLogManagerTest.java | 289 +++++++++++------
3 files changed, 415 insertions(+), 221 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a36d66bd6fd..231b18c97a4 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -40,6 +40,7 @@
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity"
files="(ClusterTestExtensions|KafkaApisBuilder|SharePartition).java"/>
<suppress
checks="NPathComplexity|ClassFanOutComplexity|ClassDataAbstractionCoupling"
files="(RemoteLogManager|RemoteLogManagerTest).java"/>
+ <suppress checks="MethodLength" files="RemoteLogManager.java"/>
<suppress checks="ClassFanOutComplexity"
files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 8beb6971aeb..11c34a0f535 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -179,11 +179,16 @@ public class RemoteLogManager implements Closeable {
private final RemoteIndexCache indexCache;
private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
- private final RLMScheduledThreadPool rlmScheduledThreadPool;
+ private final RLMScheduledThreadPool rlmCopyThreadPool;
+ private final RLMScheduledThreadPool rlmExpirationThreadPool;
+ private final RLMScheduledThreadPool followerThreadPool;
private final long delayInMs;
- private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture>
leaderOrFollowerTasks = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture>
leaderCopyRLMTasks = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture>
leaderExpirationRLMTasks = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture>
followerRLMTasks = new ConcurrentHashMap<>();
+ private final Set<RemoteLogSegmentId> segmentIdsBeingCopied =
ConcurrentHashMap.newKeySet();
// topic ids that are received on leadership changes, this map is cleared
on stop partitions
private final ConcurrentMap<TopicPartition, Uuid> topicIdByPartitionMap =
new ConcurrentHashMap<>();
@@ -241,12 +246,17 @@ public class RemoteLogManager implements Closeable {
indexCache = new
RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(),
remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
- rlmScheduledThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+ rlmCopyThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerCopierThreadPoolSize(),
+ "RLMCopyThreadPool", "kafka-rlm-copy-thread-pool-");
+ rlmExpirationThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerExpirationThreadPoolSize(),
+ "RLMExpirationThreadPool", "kafka-rlm-expiration-thread-pool-");
+ followerThreadPool = new
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize(),
+ "RLMFollowerScheduledThreadPool",
"kafka-rlm-follower-thread-pool-");
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, new
Gauge<Double>() {
@Override
public Double value() {
- return rlmScheduledThreadPool.getIdlePercent();
+ return rlmCopyThreadPool.getIdlePercent();
}
});
remoteReadTimer =
metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
@@ -433,11 +443,8 @@ public class RemoteLogManager implements Closeable {
throw new KafkaException("RemoteLogManager is not configured when
remote storage system is enabled");
}
- Map<TopicIdPartition, Integer> leaderPartitionsWithLeaderEpoch =
filterPartitions(partitionsBecomeLeader)
- .collect(Collectors.toMap(
- partition -> new
TopicIdPartition(topicIds.get(partition.topic()), partition.topicPartition()),
- Partition::getLeaderEpoch));
- Set<TopicIdPartition> leaderPartitions =
leaderPartitionsWithLeaderEpoch.keySet();
+ Set<TopicIdPartition> leaderPartitions =
filterPartitions(partitionsBecomeLeader)
+ .map(p -> new TopicIdPartition(topicIds.get(p.topic()),
p.topicPartition())).collect(Collectors.toSet());
Set<TopicIdPartition> followerPartitions =
filterPartitions(partitionsBecomeFollower)
.map(p -> new TopicIdPartition(topicIds.get(p.topic()),
p.topicPartition())).collect(Collectors.toSet());
@@ -450,16 +457,13 @@ public class RemoteLogManager implements Closeable {
followerPartitions.forEach(this::cacheTopicPartitionIds);
remoteLogMetadataManager.onPartitionLeadershipChanges(leaderPartitions,
followerPartitions);
- followerPartitions.forEach(topicIdPartition ->
- doHandleLeaderOrFollowerPartitions(topicIdPartition,
RLMTask::convertToFollower));
+ followerPartitions.forEach(this::doHandleFollowerPartition);
// If this node was the previous leader for the partition, then
the RLMTask might be running in the
// background thread and might emit metrics. So, removing the
metrics after marking this node as follower.
followerPartitions.forEach(this::removeRemoteTopicPartitionMetrics);
- leaderPartitionsWithLeaderEpoch.forEach((topicIdPartition,
leaderEpoch) ->
- doHandleLeaderOrFollowerPartitions(topicIdPartition,
- rlmTask -> rlmTask.convertToLeader(leaderEpoch)));
+ leaderPartitions.forEach(this::doHandleLeaderPartition);
}
}
@@ -479,11 +483,21 @@ public class RemoteLogManager implements Closeable {
try {
if (topicIdByPartitionMap.containsKey(tp)) {
TopicIdPartition tpId = new
TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
- RLMTaskWithFuture task =
leaderOrFollowerTasks.remove(tpId);
- if (task != null) {
- LOGGER.info("Cancelling the RLM task for tpId: {}",
tpId);
+ leaderCopyRLMTasks.computeIfPresent(tpId,
(topicIdPartition, task) -> {
+ LOGGER.info("Cancelling the copy RLM task for tpId:
{}", tpId);
task.cancel();
- }
+ return null;
+ });
+ leaderExpirationRLMTasks.computeIfPresent(tpId,
(topicIdPartition, task) -> {
+ LOGGER.info("Cancelling the expiration RLM task for
tpId: {}", tpId);
+ task.cancel();
+ return null;
+ });
+ followerRLMTasks.computeIfPresent(tpId, (topicIdPartition,
task) -> {
+ LOGGER.info("Cancelling the follower RLM task for
tpId: {}", tpId);
+ task.cancel();
+ return null;
+ });
removeRemoteTopicPartitionMetrics(tpId);
@@ -686,62 +700,95 @@ public class RemoteLogManager implements Closeable {
}
// VisibleForTesting
- RLMTask rlmTask(TopicIdPartition topicIdPartition) {
- RLMTaskWithFuture task = leaderOrFollowerTasks.get(topicIdPartition);
+ RLMTask rlmCopyTask(TopicIdPartition topicIdPartition) {
+ RLMTaskWithFuture task = leaderCopyRLMTasks.get(topicIdPartition);
if (task != null) {
return task.rlmTask;
}
return null;
}
- class RLMTask extends CancellableRunnable {
+ abstract class RLMTask extends CancellableRunnable {
- private final TopicIdPartition topicIdPartition;
- private final int customMetadataSizeLimit;
+ protected final TopicIdPartition topicIdPartition;
private final Logger logger;
- private volatile int leaderEpoch = -1;
-
- public RLMTask(TopicIdPartition topicIdPartition, int
customMetadataSizeLimit) {
+ public RLMTask(TopicIdPartition topicIdPartition) {
this.topicIdPartition = topicIdPartition;
- this.customMetadataSizeLimit = customMetadataSizeLimit;
- LogContext logContext = new LogContext("[RemoteLogManager=" +
brokerId + " partition=" + topicIdPartition + "] ");
- logger = logContext.logger(RLMTask.class);
+ this.logger = getLogContext().logger(RLMTask.class);
}
- boolean isLeader() {
- return leaderEpoch >= 0;
+ protected LogContext getLogContext() {
+ return new LogContext("[RemoteLogManager=" + brokerId + "
partition=" + topicIdPartition + "] ");
}
- // The copied and log-start offset is empty initially for a new leader
RLMTask, and needs to be fetched inside
+ public void run() {
+ if (isCancelled())
+ return;
+
+ try {
+ Optional<UnifiedLog> unifiedLogOptional =
fetchLog.apply(topicIdPartition.topicPartition());
+
+ if (!unifiedLogOptional.isPresent()) {
+ return;
+ }
+
+ execute(unifiedLogOptional.get());
+ } catch (InterruptedException ex) {
+ if (!isCancelled()) {
+ logger.warn("Current thread for topic-partition-id {} is
interrupted", topicIdPartition, ex);
+ }
+ } catch (RetriableException ex) {
+ logger.debug("Encountered a retryable error while executing
current task for topic-partition {}", topicIdPartition, ex);
+ } catch (Exception ex) {
+ if (!isCancelled()) {
+ logger.warn("Current task for topic-partition {} received
error but it will be scheduled", topicIdPartition, ex);
+ }
+ }
+ }
+
+ protected abstract void execute(UnifiedLog log) throws
InterruptedException, RemoteStorageException, ExecutionException;
+
+ public String toString() {
+ return this.getClass() + "[" + topicIdPartition + "]";
+ }
+ }
+
+ class RLMCopyTask extends RLMTask {
+ private final int customMetadataSizeLimit;
+ private final Logger logger;
+
+ // The copied and log-start offset is empty initially for a new
RLMCopyTask, and needs to be fetched inside
// the task's run() method.
private volatile Optional<OffsetAndEpoch> copiedOffsetOption =
Optional.empty();
- private volatile boolean isLogStartOffsetUpdatedOnBecomingLeader =
false;
+ private volatile boolean isLogStartOffsetUpdated = false;
private volatile Optional<String> logDirectory = Optional.empty();
- public void convertToLeader(int leaderEpochVal) {
- if (leaderEpochVal < 0) {
- throw new KafkaException("leaderEpoch value for topic
partition " + topicIdPartition + " can not be negative");
- }
- if (this.leaderEpoch != leaderEpochVal) {
- leaderEpoch = leaderEpochVal;
- }
- // Reset copied and log-start offset, so that it is set in next
run of RLMTask
- copiedOffsetOption = Optional.empty();
- isLogStartOffsetUpdatedOnBecomingLeader = false;
+ public RLMCopyTask(TopicIdPartition topicIdPartition, int
customMetadataSizeLimit) {
+ super(topicIdPartition);
+ this.customMetadataSizeLimit = customMetadataSizeLimit;
+ this.logger = getLogContext().logger(RLMCopyTask.class);
}
- public void convertToFollower() {
- leaderEpoch = -1;
+ @Override
+ protected void execute(UnifiedLog log) throws InterruptedException {
+ // In the first run after completing altering logDir within
broker, we should make sure the state is reset. (KAFKA-16711)
+ if (!log.parentDir().equals(logDirectory.orElse(null))) {
+ copiedOffsetOption = Optional.empty();
+ isLogStartOffsetUpdated = false;
+ logDirectory = Optional.of(log.parentDir());
+ }
+
+ copyLogSegmentsToRemote(log);
}
private void maybeUpdateLogStartOffsetOnBecomingLeader(UnifiedLog log)
throws RemoteStorageException {
- if (!isLogStartOffsetUpdatedOnBecomingLeader) {
+ if (!isLogStartOffsetUpdated) {
long logStartOffset = findLogStartOffset(topicIdPartition,
log);
updateRemoteLogStartOffset.accept(topicIdPartition.topicPartition(),
logStartOffset);
- isLogStartOffsetUpdatedOnBecomingLeader = true;
- logger.info("Found the logStartOffset: {} for partition: {}
after becoming leader, leaderEpoch: {}",
- logStartOffset, topicIdPartition, leaderEpoch);
+ isLogStartOffsetUpdated = true;
+ logger.info("Found the logStartOffset: {} for partition: {}
after becoming leader",
+ logStartOffset, topicIdPartition);
}
}
@@ -752,8 +799,7 @@ public class RemoteLogManager implements Closeable {
// previous leader epoch till it finds an entry, If there are
no entries till the earliest leader epoch in leader
// epoch cache then it starts copying the segments from the
earliest epoch entry's offset.
copiedOffsetOption =
Optional.of(findHighestRemoteOffset(topicIdPartition, log));
- logger.info("Found the highest copiedRemoteOffset: {} for
partition: {} after becoming leader, " +
- "leaderEpoch: {}", copiedOffsetOption,
topicIdPartition, leaderEpoch);
+ logger.info("Found the highest copiedRemoteOffset: {} for
partition: {} after becoming leader", copiedOffsetOption, topicIdPartition);
copiedOffsetOption.ifPresent(offsetAndEpoch ->
log.updateHighestOffsetInRemoteStorage(offsetAndEpoch.offset()));
}
}
@@ -810,9 +856,9 @@ public class RemoteLogManager implements Closeable {
topicIdPartition, copiedOffset,
log.activeSegment().baseOffset());
} else {
for (EnrichedLogSegment candidateLogSegment :
candidateLogSegments) {
- if (isCancelled() || !isLeader()) {
- logger.info("Skipping copying log segments as
the current task state is changed, cancelled: {} leader:{}",
- isCancelled(), isLeader());
+ if (isCancelled()) {
+ logger.info("Skipping copying log segments as
the current task state is changed, cancelled: {}",
+ isCancelled());
return;
}
@@ -835,7 +881,14 @@ public class RemoteLogManager implements Closeable {
} finally {
copyQuotaManagerLock.unlock();
}
- copyLogSegment(log,
candidateLogSegment.logSegment, candidateLogSegment.nextSegmentOffset);
+
+ RemoteLogSegmentId segmentId =
RemoteLogSegmentId.generateNew(topicIdPartition);
+ segmentIdsBeingCopied.add(segmentId);
+ try {
+ copyLogSegment(log,
candidateLogSegment.logSegment, segmentId,
candidateLogSegment.nextSegmentOffset);
+ } finally {
+ segmentIdsBeingCopied.remove(segmentId);
+ }
}
}
} else {
@@ -857,14 +910,13 @@ public class RemoteLogManager implements Closeable {
}
}
- private void copyLogSegment(UnifiedLog log, LogSegment segment, long
nextSegmentBaseOffset)
+ private void copyLogSegment(UnifiedLog log, LogSegment segment,
RemoteLogSegmentId segmentId, long nextSegmentBaseOffset)
throws InterruptedException, ExecutionException,
RemoteStorageException, IOException,
CustomMetadataSizeLimitExceededException {
File logFile = segment.log().file();
String logFileName = logFile.getName();
logger.info("Copying {} to remote storage.", logFileName);
- RemoteLogSegmentId id =
RemoteLogSegmentId.generateNew(topicIdPartition);
long endOffset = nextSegmentBaseOffset - 1;
int tieredEpoch = 0;
@@ -874,7 +926,7 @@ public class RemoteLogManager implements Closeable {
Map<Integer, Long> segmentLeaderEpochs = new
HashMap<>(epochEntries.size());
epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch,
entry.startOffset));
- RemoteLogSegmentMetadata copySegmentStartedRlsm = new
RemoteLogSegmentMetadata(id, segment.baseOffset(), endOffset,
+ RemoteLogSegmentMetadata copySegmentStartedRlsm = new
RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset,
segment.largestTimestamp(), brokerId, time.milliseconds(),
segment.log().sizeInBytes(),
segmentLeaderEpochs, tieredEpoch);
@@ -888,7 +940,7 @@ public class RemoteLogManager implements Closeable {
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
Optional<CustomMetadata> customMetadata =
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
- RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
+ RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new
RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
customMetadata,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
if (customMetadata.isPresent()) {
@@ -932,7 +984,7 @@ public class RemoteLogManager implements Closeable {
// VisibleForTesting
void recordLagStats(long bytesLag, long segmentsLag) {
- if (isLeader()) {
+ if (!isCancelled()) {
String topic = topicIdPartition.topic();
int partition = topicIdPartition.partition();
brokerTopicStats.recordRemoteCopyLagBytes(topic, partition,
bytesLag);
@@ -943,55 +995,25 @@ public class RemoteLogManager implements Closeable {
private Path toPathIfExists(File file) {
return file.exists() ? file.toPath() : null;
}
+ }
- public void run() {
- if (isCancelled())
- return;
-
- try {
- Optional<UnifiedLog> unifiedLogOptional =
fetchLog.apply(topicIdPartition.topicPartition());
-
- if (!unifiedLogOptional.isPresent()) {
- return;
- }
+ class RLMExpirationTask extends RLMTask {
+ private final Logger logger;
- UnifiedLog log = unifiedLogOptional.get();
- // In the first run after completing altering logDir within
broker, we should make sure the state is reset. (KAFKA-16711)
- if (!log.parentDir().equals(logDirectory.orElse(null))) {
- copiedOffsetOption = Optional.empty();
- isLogStartOffsetUpdatedOnBecomingLeader = false;
- logDirectory = Optional.of(log.parentDir());
- }
+ public RLMExpirationTask(TopicIdPartition topicIdPartition) {
+ super(topicIdPartition);
+ this.logger = getLogContext().logger(RLMExpirationTask.class);
+ }
- if (isLeader()) {
- // Copy log segments to remote storage
- copyLogSegmentsToRemote(log);
- // Cleanup/delete expired remote log segments
- cleanupExpiredRemoteLogSegments();
- } else {
- OffsetAndEpoch offsetAndEpoch =
findHighestRemoteOffset(topicIdPartition, log);
- // Update the highest offset in remote storage for this
partition's log so that the local log segments
- // are not deleted before they are copied to remote
storage.
-
log.updateHighestOffsetInRemoteStorage(offsetAndEpoch.offset());
- }
- } catch (InterruptedException ex) {
- if (!isCancelled()) {
- logger.warn("Current thread for topic-partition-id {} is
interrupted", topicIdPartition, ex);
- }
- } catch (RetriableException ex) {
- logger.debug("Encountered a retryable error while executing
current task for topic-partition {}", topicIdPartition, ex);
- } catch (Exception ex) {
- if (!isCancelled()) {
- logger.warn("Current task for topic-partition {} received
error but it will be scheduled", topicIdPartition, ex);
- }
- }
+ @Override
+ protected void execute(UnifiedLog log) throws InterruptedException,
RemoteStorageException, ExecutionException {
+ // Cleanup/delete expired remote log segments
+ cleanupExpiredRemoteLogSegments();
}
public void handleLogStartOffsetUpdate(TopicPartition topicPartition,
long remoteLogStartOffset) {
- if (isLeader()) {
- logger.debug("Updating {} with remoteLogStartOffset: {}",
topicPartition, remoteLogStartOffset);
- updateRemoteLogStartOffset.accept(topicPartition,
remoteLogStartOffset);
- }
+ logger.debug("Updating {} with remoteLogStartOffset: {}",
topicPartition, remoteLogStartOffset);
+ updateRemoteLogStartOffset.accept(topicPartition,
remoteLogStartOffset);
}
class RemoteLogRetentionHandler {
@@ -1137,7 +1159,7 @@ public class RemoteLogManager implements Closeable {
}
void cleanupExpiredRemoteLogSegments() throws RemoteStorageException,
ExecutionException, InterruptedException {
- if (isCancelled() || !isLeader()) {
+ if (isCancelled()) {
logger.info("Returning from remote log segments cleanup as the
task state is changed");
return;
}
@@ -1200,12 +1222,18 @@ public class RemoteLogManager implements Closeable {
Integer epoch = epochIterator.next();
Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
while (canProcess && segmentsIterator.hasNext()) {
- if (isCancelled() || !isLeader()) {
+ if (isCancelled()) {
logger.info("Returning from remote log segments
cleanup for the remaining segments as the task state is changed.");
return;
}
RemoteLogSegmentMetadata metadata =
segmentsIterator.next();
+ if
(segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) {
+ logger.debug("Copy for the segment {} is currently in
process. Skipping cleanup for it and the remaining segments",
+ metadata.remoteLogSegmentId());
+ canProcess = false;
+ continue;
+ }
if
(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) {
continue;
}
@@ -1252,7 +1280,7 @@ public class RemoteLogManager implements Closeable {
updateRemoteDeleteLagWith(segmentsLeftToDelete,
sizeOfDeletableSegmentsBytes);
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
- if
(!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x ->
!isCancelled() && isLeader())) {
+ if
(!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x ->
!isCancelled())) {
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
} else {
sizeOfDeletableSegmentsBytes -=
segmentMetadata.segmentSizeInBytes();
@@ -1281,7 +1309,7 @@ public class RemoteLogManager implements Closeable {
int epoch = epochsToClean.next();
Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
while (segmentsToBeCleaned.hasNext()) {
- if (!isCancelled() && isLeader()) {
+ if (!isCancelled()) {
RemoteLogSegmentMetadata nextSegmentMetadata =
segmentsToBeCleaned.next();
sizeOfDeletableSegmentsBytes +=
nextSegmentMetadata.segmentSizeInBytes();
listOfSegmentsToBeCleaned.add(nextSegmentMetadata);
@@ -1292,7 +1320,7 @@ public class RemoteLogManager implements Closeable {
segmentsLeftToDelete += listOfSegmentsToBeCleaned.size();
updateRemoteDeleteLagWith(segmentsLeftToDelete,
sizeOfDeletableSegmentsBytes);
for (RemoteLogSegmentMetadata segmentMetadata :
listOfSegmentsToBeCleaned) {
- if (!isCancelled() && isLeader()) {
+ if (!isCancelled()) {
// No need to update the log-start-offset even though
the segment is deleted as these epochs/offsets are earlier to that value.
if
(remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
segmentMetadata)) {
sizeOfDeletableSegmentsBytes -=
segmentMetadata.segmentSizeInBytes();
@@ -1348,9 +1376,20 @@ public class RemoteLogManager implements Closeable {
return Optional.empty();
}
+ }
- public String toString() {
- return this.getClass() + "[" + topicIdPartition + "]";
+ class RLMFollowerTask extends RLMTask {
+
+ public RLMFollowerTask(TopicIdPartition topicIdPartition) {
+ super(topicIdPartition);
+ }
+
+ @Override
+ protected void execute(UnifiedLog log) throws InterruptedException,
RemoteStorageException, ExecutionException {
+ OffsetAndEpoch offsetAndEpoch =
findHighestRemoteOffset(topicIdPartition, log);
+ // Update the highest offset in remote storage for this
partition's log so that the local log segments
+ // are not deleted before they are copied to remote storage.
+ log.updateHighestOffsetInRemoteStorage(offsetAndEpoch.offset());
}
}
@@ -1761,19 +1800,48 @@ public class RemoteLogManager implements Closeable {
new RemoteLogReader(fetchInfo, this, callback,
brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
}
- void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
- Consumer<RLMTask>
convertToLeaderOrFollower) {
- RLMTaskWithFuture rlmTaskWithFuture =
leaderOrFollowerTasks.computeIfAbsent(topicPartition,
- topicIdPartition -> {
- RLMTask task = new RLMTask(topicIdPartition,
rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
- // set this upfront when it is getting initialized instead
of doing it after scheduling.
- convertToLeaderOrFollower.accept(task);
- LOGGER.info("Created a new task: {} and getting
scheduled", task);
- ScheduledFuture<?> future =
rlmScheduledThreadPool.scheduleWithFixedDelay(task, 0, delayInMs,
TimeUnit.MILLISECONDS);
- return new RLMTaskWithFuture(task, future);
- }
- );
- convertToLeaderOrFollower.accept(rlmTaskWithFuture.rlmTask);
+ void doHandleLeaderPartition(TopicIdPartition topicPartition) {
+ RLMTaskWithFuture followerRLMTaskWithFuture =
followerRLMTasks.remove(topicPartition);
+ if (followerRLMTaskWithFuture != null) {
+ LOGGER.info("Cancelling the follower task: {}",
followerRLMTaskWithFuture.rlmTask);
+ followerRLMTaskWithFuture.cancel();
+ }
+
+ leaderCopyRLMTasks.computeIfAbsent(topicPartition, topicIdPartition ->
{
+ RLMCopyTask task = new RLMCopyTask(topicIdPartition,
this.rlmConfig.remoteLogMetadataCustomMetadataMaxBytes());
+ // set this upfront when it is getting initialized instead of
doing it after scheduling.
+ LOGGER.info("Created a new copy task: {} and getting scheduled",
task);
+ ScheduledFuture<?> future =
rlmCopyThreadPool.scheduleWithFixedDelay(task, 0, delayInMs,
TimeUnit.MILLISECONDS);
+ return new RLMTaskWithFuture(task, future);
+ });
+
+ leaderExpirationRLMTasks.computeIfAbsent(topicPartition,
topicIdPartition -> {
+ RLMExpirationTask task = new RLMExpirationTask(topicIdPartition);
+ LOGGER.info("Created a new expiration task: {} and getting
scheduled", task);
+ ScheduledFuture<?> future =
rlmExpirationThreadPool.scheduleWithFixedDelay(task, 0, delayInMs,
TimeUnit.MILLISECONDS);
+ return new RLMTaskWithFuture(task, future);
+ });
+ }
+
+ void doHandleFollowerPartition(TopicIdPartition topicPartition) {
+ RLMTaskWithFuture copyRLMTaskWithFuture =
leaderCopyRLMTasks.remove(topicPartition);
+ if (copyRLMTaskWithFuture != null) {
+ LOGGER.info("Cancelling the copy task: {}",
copyRLMTaskWithFuture.rlmTask);
+ copyRLMTaskWithFuture.cancel();
+ }
+
+ RLMTaskWithFuture expirationRLMTaskWithFuture =
leaderExpirationRLMTasks.remove(topicPartition);
+ if (expirationRLMTaskWithFuture != null) {
+ LOGGER.info("Cancelling the expiration task: {}",
expirationRLMTaskWithFuture.rlmTask);
+ expirationRLMTaskWithFuture.cancel();
+ }
+
+ followerRLMTasks.computeIfAbsent(topicPartition, topicIdPartition -> {
+ RLMFollowerTask task = new RLMFollowerTask(topicIdPartition);
+ LOGGER.info("Created a new follower task: {} and getting
scheduled", task);
+ ScheduledFuture<?> future =
followerThreadPool.scheduleWithFixedDelay(task, 0, delayInMs,
TimeUnit.MILLISECONDS);
+ return new RLMTaskWithFuture(task, future);
+ });
}
static class RLMTaskWithFuture {
@@ -1803,19 +1871,25 @@ public class RemoteLogManager implements Closeable {
public void close() {
synchronized (this) {
if (!closed) {
-
leaderOrFollowerTasks.values().forEach(RLMTaskWithFuture::cancel);
+ leaderCopyRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
+
leaderExpirationRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
+ followerRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
Utils.closeQuietly(remoteLogStorageManager,
"RemoteLogStorageManager");
Utils.closeQuietly(remoteLogMetadataManager,
"RemoteLogMetadataManager");
Utils.closeQuietly(indexCache, "RemoteIndexCache");
- rlmScheduledThreadPool.close();
+ rlmCopyThreadPool.close();
+ rlmExpirationThreadPool.close();
+ followerThreadPool.close();
try {
shutdownAndAwaitTermination(remoteStorageReaderThreadPool,
"RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS);
} finally {
removeMetrics();
}
- leaderOrFollowerTasks.clear();
+ leaderCopyRLMTasks.clear();
+ leaderExpirationRLMTasks.clear();
+ followerRLMTasks.clear();
closed = true;
}
}
@@ -1883,18 +1957,28 @@ public class RemoteLogManager implements Closeable {
}
//Visible for testing
- RLMTaskWithFuture task(TopicIdPartition partition) {
- return leaderOrFollowerTasks.get(partition);
+ RLMTaskWithFuture leaderCopyTask(TopicIdPartition partition) {
+ return leaderCopyRLMTasks.get(partition);
+ }
+ RLMTaskWithFuture leaderExpirationTask(TopicIdPartition partition) {
+ return leaderExpirationRLMTasks.get(partition);
+ }
+ RLMTaskWithFuture followerTask(TopicIdPartition partition) {
+ return followerRLMTasks.get(partition);
}
static class RLMScheduledThreadPool {
private static final Logger LOGGER =
LoggerFactory.getLogger(RLMScheduledThreadPool.class);
private final int poolSize;
+ private final String threadPoolName;
+ private final String threadNamePrefix;
private final ScheduledThreadPoolExecutor scheduledThreadPool;
- public RLMScheduledThreadPool(int poolSize) {
+ public RLMScheduledThreadPool(int poolSize, String threadPoolName,
String threadNamePrefix) {
this.poolSize = poolSize;
+ this.threadPoolName = threadPoolName;
+ this.threadNamePrefix = threadNamePrefix;
scheduledThreadPool = createPool();
}
@@ -1907,7 +1991,7 @@ public class RemoteLogManager implements Closeable {
private final AtomicInteger sequence = new AtomicInteger();
public Thread newThread(Runnable r) {
- return KafkaThread.daemon("kafka-rlm-thread-pool-" +
sequence.incrementAndGet(), r);
+ return KafkaThread.daemon(threadNamePrefix +
sequence.incrementAndGet(), r);
}
});
@@ -1924,7 +2008,7 @@ public class RemoteLogManager implements Closeable {
}
public void close() {
- shutdownAndAwaitTermination(scheduledThreadPool,
"RLMScheduledThreadPool", 10, TimeUnit.SECONDS);
+ shutdownAndAwaitTermination(scheduledThreadPool, threadPoolName,
10, TimeUnit.SECONDS);
}
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index c9c37de929b..4e5f70f9337 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -550,8 +550,7 @@ public class RemoteLogManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(2);
+ RemoteLogManager.RLMCopyTask task = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, 128);
task.copyLogSegmentsToRemote(mockLog);
// verify remoteLogMetadataManager did add the expected
RemoteLogSegmentMetadata
@@ -658,8 +657,7 @@ public class RemoteLogManagerTest {
.thenReturn(Optional.of(customMetadata));
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, customMetadataSizeLimit);
- task.convertToLeader(2);
+ RemoteLogManager.RLMCopyTask task = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, customMetadataSizeLimit);
task.copyLogSegmentsToRemote(mockLog);
ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg =
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
@@ -1032,7 +1030,9 @@ public class RemoteLogManagerTest {
Partition mockLeaderPartition = mockPartition(leaderTopicIdPartition);
- when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(175L, 100L);
+ // This method is called by both Copy and Expiration task. On the
first call, both tasks should see 175 bytes as
+ // the local log segments size
+ when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(175L, 175L, 100L);
when(activeSegment.size()).thenReturn(100);
when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(2L).thenReturn(1L);
@@ -1154,8 +1154,7 @@ public class RemoteLogManagerTest {
// Verify aggregate metrics
assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(2);
+ RemoteLogManager.RLMCopyTask task = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, 128);
task.copyLogSegmentsToRemote(mockLog);
// Verify we attempted to copy log segment metadata to remote storage
@@ -1171,40 +1170,6 @@ public class RemoteLogManagerTest {
assertEquals(1,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
}
- @Test
- void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws
Exception {
- long oldSegmentStartOffset = 0L;
- long nextSegmentStartOffset = 150L;
-
- // leader epoch preparation
- checkpoint.write(totalEpochEntries);
- LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint,
scheduler);
- when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
-
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt())).thenReturn(Optional.of(0L));
-
- // create 2 log segments, with 0 and 150 as log start offset
- LogSegment oldSegment = mock(LogSegment.class);
- LogSegment activeSegment = mock(LogSegment.class);
-
- when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
- when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
-
- when(mockLog.activeSegment()).thenReturn(activeSegment);
- when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
- when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
- when(mockLog.lastStableOffset()).thenReturn(250L);
-
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToFollower();
- task.copyLogSegmentsToRemote(mockLog);
-
- // verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
- verify(remoteLogMetadataManager,
never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
- verify(remoteStorageManager,
never()).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
- verify(remoteLogMetadataManager,
never()).updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class));
- verify(mockLog).updateHighestOffsetInRemoteStorage(anyLong());
- }
-
@Test
void
testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialized()
throws Exception {
long oldSegmentStartOffset = 0L;
@@ -1240,8 +1205,7 @@ public class RemoteLogManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(0);
+ RemoteLogManager.RLMCopyTask task = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, 128);
task.run();
// verify the remoteLogMetadataManager never add any metadata and
remoteStorageManager never copy log segments
@@ -1374,13 +1338,13 @@ public class RemoteLogManagerTest {
RemoteLogManager spyRemoteLogManager = spy(remoteLogManager);
spyRemoteLogManager.onLeadershipChange(
Collections.emptySet(),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
-
verify(spyRemoteLogManager).doHandleLeaderOrFollowerPartitions(eq(followerTopicIdPartition),
any(java.util.function.Consumer.class));
+
verify(spyRemoteLogManager).doHandleFollowerPartition(eq(followerTopicIdPartition));
Mockito.reset(spyRemoteLogManager);
spyRemoteLogManager.onLeadershipChange(
Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.emptySet(), topicIds);
-
verify(spyRemoteLogManager).doHandleLeaderOrFollowerPartitions(eq(leaderTopicIdPartition),
any(java.util.function.Consumer.class));
+
verify(spyRemoteLogManager).doHandleLeaderPartition(eq(leaderTopicIdPartition));
}
private MemoryRecords records(long timestamp,
@@ -1393,16 +1357,6 @@ public class RemoteLogManagerTest {
);
}
- @Test
- void testRLMTaskShouldSetLeaderEpochCorrectly() {
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- assertFalse(task.isLeader());
- task.convertToLeader(1);
- assertTrue(task.isLeader());
- task.convertToFollower();
- assertFalse(task.isLeader());
- }
-
@Test
void testFindOffsetByTimestamp() throws IOException,
RemoteStorageException {
remoteLogManager.startup();
@@ -1813,7 +1767,7 @@ public class RemoteLogManagerTest {
when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1,
segment2, activeSegment)));
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ RemoteLogManager.RLMCopyTask task = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, 128);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
@@ -1839,7 +1793,7 @@ public class RemoteLogManagerTest {
when(log.logSegments(5L, Long.MAX_VALUE))
.thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1,
segment2, segment3, activeSegment)));
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ RemoteLogManager.RLMCopyTask task = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, 128);
List<RemoteLogManager.EnrichedLogSegment> expected =
Arrays.asList(
new RemoteLogManager.EnrichedLogSegment(segment1, 10L),
@@ -1887,12 +1841,14 @@ public class RemoteLogManagerTest {
partitions.add(new
StopPartition(followerTopicIdPartition.topicPartition(), true, false));
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
- assertNotNull(remoteLogManager.task(leaderTopicIdPartition));
- assertNotNull(remoteLogManager.task(followerTopicIdPartition));
+ assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
+
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+ assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
remoteLogManager.stopPartitions(partitions, errorHandler);
- assertNull(remoteLogManager.task(leaderTopicIdPartition));
- assertNull(remoteLogManager.task(followerTopicIdPartition));
+ assertNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
+
assertNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+ assertNull(remoteLogManager.followerTask(followerTopicIdPartition));
verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
verify(remoteStorageManager, times(0)).deleteLogSegmentData(any());
verify(remoteLogMetadataManager,
times(0)).updateRemoteLogSegmentMetadata(any());
@@ -1908,8 +1864,9 @@ public class RemoteLogManagerTest {
partitions.add(new
StopPartition(followerTopicIdPartition.topicPartition(), true, true));
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
- assertNotNull(remoteLogManager.task(leaderTopicIdPartition));
- assertNotNull(remoteLogManager.task(followerTopicIdPartition));
+ assertNotNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
+
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+ assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024,
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
@@ -1921,8 +1878,9 @@ public class RemoteLogManagerTest {
.thenReturn(dummyFuture);
remoteLogManager.stopPartitions(partitions, errorHandler);
- assertNull(remoteLogManager.task(leaderTopicIdPartition));
- assertNull(remoteLogManager.task(followerTopicIdPartition));
+ assertNull(remoteLogManager.leaderCopyTask(leaderTopicIdPartition));
+
assertNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+ assertNull(remoteLogManager.followerTask(followerTopicIdPartition));
verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
verify(remoteStorageManager, times(8)).deleteLogSegmentData(any());
verify(remoteLogMetadataManager,
times(16)).updateRemoteLogSegmentMetadata(any());
@@ -2027,13 +1985,174 @@ public class RemoteLogManagerTest {
return remoteLogMetadataManager;
}
}) {
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(4);
+ RemoteLogManager.RLMCopyTask task = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, 128);
task.copyLogSegmentsToRemote(mockLog);
assertEquals(600L, logStartOffset.get());
}
}
+ @Test
+ public void testDeletionSkippedForSegmentsBeingCopied() throws
RemoteStorageException, IOException, InterruptedException, ExecutionException {
+ RemoteLogMetadataManager remoteLogMetadataManager = new
NoOpRemoteLogMetadataManager() {
+ List<RemoteLogSegmentMetadata> metadataList = new ArrayList<>();
+
+ @Override
+ public synchronized CompletableFuture<Void>
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+ metadataList.add(remoteLogSegmentMetadata);
+ return CompletableFuture.runAsync(() -> { });
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void>
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
remoteLogSegmentMetadataUpdate) {
+ metadataList = metadataList.stream()
+ .map(m -> {
+ if
(m.remoteLogSegmentId().equals(remoteLogSegmentMetadataUpdate.remoteLogSegmentId()))
{
+ return
m.createWithUpdates(remoteLogSegmentMetadataUpdate);
+ }
+ return m;
+ })
+ .collect(Collectors.toList());
+ return CompletableFuture.runAsync(() -> { });
+ }
+
+ @Override
+ public Optional<Long> highestOffsetForEpoch(TopicIdPartition
topicIdPartition, int leaderEpoch) {
+ return Optional.of(-1L);
+ }
+
+ @Override
+ public synchronized Iterator<RemoteLogSegmentMetadata>
listRemoteLogSegments(TopicIdPartition topicIdPartition) {
+ return metadataList.iterator();
+ }
+
+ @Override
+ public synchronized Iterator<RemoteLogSegmentMetadata>
listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch) {
+ return metadataList.iterator();
+ }
+ };
+
+ remoteLogManager = new
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId,
time,
+ tp -> Optional.of(mockLog),
+ (topicPartition, offset) -> currentLogStartOffset.set(offset),
+ brokerTopicStats, metrics) {
+ public RemoteStorageManager createRemoteStorageManager() {
+ return remoteStorageManager;
+ }
+ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+ return remoteLogMetadataManager;
+ }
+ public RLMQuotaManager createRLMCopyQuotaManager() {
+ return rlmCopyQuotaManager;
+ }
+ public Duration quotaTimeout() {
+ return Duration.ofMillis(100);
+ }
+ @Override
+ long findLogStartOffset(TopicIdPartition topicIdPartition,
UnifiedLog log) {
+ return 0L;
+ }
+ };
+
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+ long lastStableOffset = 150L;
+ long logEndOffset = 150L;
+
+
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(Collections.singletonList(epochEntry0));
+ LeaderEpochFileCache cache = new
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ File tempFile = TestUtils.tempFile();
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(),
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
activeSegment)));
+
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ ProducerStateManager mockStateManager =
mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(lastStableOffset);
+ when(mockLog.logEndOffset()).thenReturn(logEndOffset);
+
+ File tempDir = TestUtils.tempDirectory();
+ OffsetIndex idx =
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir,
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+ TimeIndex timeIdx =
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset,
""), oldSegmentStartOffset, 1500).get();
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir,
oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new
TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.timeIndex()).thenReturn(timeIdx);
+ when(oldSegment.offsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ CountDownLatch copyLogSegmentLatch = new CountDownLatch(1);
+ CountDownLatch copySegmentDataLatch = new CountDownLatch(1);
+ doAnswer(ans -> {
+ // unblock the expiration thread
+ copySegmentDataLatch.countDown();
+ // introduce a delay in copying segment data
+ copyLogSegmentLatch.await(5000, TimeUnit.MILLISECONDS);
+ return Optional.empty();
+
}).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
any(LogSegmentData.class));
+
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaAvailableThrottleTime);
+
+ // Set up expiration behaviour
+ Map<String, Long> logProps = new HashMap<>();
+ logProps.put("retention.bytes", 0L);
+ logProps.put("retention.ms", -1L);
+ LogConfig mockLogConfig = new LogConfig(logProps);
+ when(mockLog.config()).thenReturn(mockLogConfig);
+
+ RemoteLogManager.RLMCopyTask copyTask = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, 128);
+ Thread copyThread = new Thread(() -> {
+ try {
+ copyTask.copyLogSegmentsToRemote(mockLog);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ RemoteLogManager.RLMExpirationTask expirationTask =
remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
+ Thread expirationThread = new Thread(() -> {
+ try {
+ // wait until copy thread has started copying segment data
+ copySegmentDataLatch.await();
+ expirationTask.cleanupExpiredRemoteLogSegments();
+ } catch (RemoteStorageException | ExecutionException |
InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ copyThread.start();
+ expirationThread.start();
+
+ copyThread.join(10_000);
+ expirationThread.join(1_000);
+
+ // Verify no segments were deleted
+ verify(remoteStorageManager,
times(0)).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
+
+ // Run expiration task again and verify the copied segment was deleted
+ RemoteLogSegmentMetadata remoteLogSegmentMetadata =
remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition).next();
+ expirationTask.cleanupExpiredRemoteLogSegments();
+ verify(remoteStorageManager,
times(1)).deleteLogSegmentData(remoteLogSegmentMetadata);
+ }
+
@ParameterizedTest(name = "testDeletionOnRetentionBreachedSegments
retentionSize={0} retentionMs={1}")
@CsvSource(value = {"0, -1", "-1, 0"})
public void testDeletionOnRetentionBreachedSegments(long retentionSize,
@@ -2070,8 +2189,7 @@ public class RemoteLogManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(0);
+ RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
task.cleanupExpiredRemoteLogSegments();
assertEquals(200L, currentLogStartOffset.get());
@@ -2139,8 +2257,7 @@ public class RemoteLogManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(0);
+ RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
task.cleanupExpiredRemoteLogSegments();
assertEquals(metadata2.endOffset() + 1, currentLogStartOffset.get());
@@ -2206,7 +2323,7 @@ public class RemoteLogManagerTest {
return Optional.empty();
}).when(remoteStorageManager).deleteLogSegmentData(any(RemoteLogSegmentMetadata.class));
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
+ RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
assertEquals(0L, yammerMetricValue("RemoteDeleteLagBytes"));
assertEquals(0L, yammerMetricValue("RemoteDeleteLagSegments"));
@@ -2214,7 +2331,6 @@ public class RemoteLogManagerTest {
assertEquals(0L,
safeLongYammerMetricValue("RemoteDeleteLagBytes,topic=" + leaderTopic));
assertEquals(0L,
safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic));
- task.convertToLeader(0);
task.cleanupExpiredRemoteLogSegments();
assertEquals(200L, currentLogStartOffset.get());
@@ -2224,8 +2340,7 @@ public class RemoteLogManagerTest {
@Test
public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws
RemoteStorageException, ExecutionException, InterruptedException {
- RemoteLogManager.RLMTask leaderTask = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- leaderTask.convertToLeader(0);
+ RemoteLogManager.RLMExpirationTask leaderTask = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.logEndOffset()).thenReturn(200L);
@@ -2264,8 +2379,7 @@ public class RemoteLogManagerTest {
verify(remoteStorageManager,
never()).deleteLogSegmentData(metadataList.get(1));
// test that the 2nd log segment will be deleted by the new leader
- RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new
RLMTask(followerTopicIdPartition, 128);
- newLeaderTask.convertToLeader(1);
+ RemoteLogManager.RLMExpirationTask newLeaderTask =
remoteLogManager.new RLMExpirationTask(followerTopicIdPartition);
Iterator<RemoteLogSegmentMetadata> firstIterator =
metadataList.iterator();
firstIterator.next();
@@ -2324,8 +2438,7 @@ public class RemoteLogManagerTest {
assertEquals(0,
brokerTopicStats.allTopicsStats().remoteDeleteRequestRate().count());
assertEquals(0,
brokerTopicStats.allTopicsStats().failedRemoteDeleteRequestRate().count());
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(0);
+ RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
doThrow(new RemoteStorageException("Failed to delete
segment")).when(remoteStorageManager).deleteLogSegmentData(any());
assertThrows(RemoteStorageException.class,
task::cleanupExpiredRemoteLogSegments);
@@ -2432,8 +2545,7 @@ public class RemoteLogManagerTest {
});
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(currentLeaderEpoch);
+ RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
task.cleanupExpiredRemoteLogSegments();
ArgumentCaptor<RemoteLogSegmentMetadata> deletedMetadataCapture =
ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
@@ -2462,8 +2574,7 @@ public class RemoteLogManagerTest {
return remoteLogMetadataManager;
}
}) {
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(0);
+ RemoteLogManager.RLMExpirationTask task = remoteLogManager.new
RLMExpirationTask(leaderTopicIdPartition);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.logEndOffset()).thenReturn(200L);
@@ -2862,7 +2973,7 @@ public class RemoteLogManagerTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testCopyQuota(boolean quotaExceeded) throws Exception {
- RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded);
+ RemoteLogManager.RLMCopyTask task = setupRLMTask(quotaExceeded);
if (quotaExceeded) {
// Verify that the copy operation times out, since no segments can
be copied due to quota being exceeded
@@ -2921,7 +3032,7 @@ public class RemoteLogManagerTest {
}
// helper method to set up a RemoteLogManager.RLMTask for testing copy
quota behaviour
- private RemoteLogManager.RLMTask setupRLMTask(boolean quotaExceeded)
throws RemoteStorageException, IOException {
+ private RemoteLogManager.RLMCopyTask setupRLMTask(boolean quotaExceeded)
throws RemoteStorageException, IOException {
long oldSegmentStartOffset = 0L;
long nextSegmentStartOffset = 150L;
@@ -2981,8 +3092,7 @@ public class RemoteLogManagerTest {
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(quotaExceeded
? 1000L : 0L);
doNothing().when(rlmCopyQuotaManager).record(anyInt());
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(2);
+ RemoteLogManager.RLMCopyTask task = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, 128);
return task;
}
@@ -3053,8 +3163,7 @@ public class RemoteLogManagerTest {
when(rlmCopyQuotaManager.getThrottleTimeMs()).thenReturn(0L, 1000L);
doNothing().when(rlmCopyQuotaManager).record(anyInt());
- RemoteLogManager.RLMTask task = remoteLogManager.new
RLMTask(leaderTopicIdPartition, 128);
- task.convertToLeader(2);
+ RemoteLogManager.RLMCopyTask task = remoteLogManager.new
RLMCopyTask(leaderTopicIdPartition, 128);
// Verify that the copy operation times out, since the second segment
cannot be copied due to quota being exceeded
assertThrows(AssertionFailedError.class, () ->
assertTimeoutPreemptively(Duration.ofMillis(200), () ->
task.copyLogSegmentsToRemote(mockLog)));
@@ -3074,7 +3183,7 @@ public class RemoteLogManagerTest {
remoteLogManager.startup();
remoteLogManager.onLeadershipChange(
Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.emptySet(), topicIds);
- RemoteLogManager.RLMTask rlmTask =
remoteLogManager.rlmTask(leaderTopicIdPartition);
+ RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask)
remoteLogManager.rlmCopyTask(leaderTopicIdPartition);
assertNotNull(rlmTask);
rlmTask.recordLagStats(1024, 2);
assertEquals(1024,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes());