This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4f88fb28f38 KAFKA-15130: Delete remote segments when deleting a topic
(#13947)
4f88fb28f38 is described below
commit 4f88fb28f38f3e461377bf688520b28a3f209b5d
Author: DL1231 <[email protected]>
AuthorDate: Fri Aug 18 20:51:09 2023 +0800
KAFKA-15130: Delete remote segments when deleting a topic (#13947)
* Delete remote segments when deleting a topic
Co-authored-by: Kamal Chandraprakash <[email protected]>
Co-authored-by: d00791190 <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 81 ++++++++--
.../main/scala/kafka/server/ReplicaManager.scala | 69 ++++++--
.../kafka/log/remote/RemoteLogManagerTest.java | 75 ++++++++-
.../kafka/admin/RemoteTopicCrudTest.scala | 87 +++++++++--
.../kafka/log/remote/RemoteIndexCacheTest.scala | 56 ++++++-
.../unit/kafka/server/ReplicaManagerTest.scala | 173 ++++++++++++++-------
.../storage/NoOpRemoteLogMetadataManager.java | 4 +-
.../storage/internals/log/RemoteIndexCache.java | 18 +++
8 files changed, 467 insertions(+), 96 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 82c0da8b584..a9372a80fa0 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -84,6 +84,7 @@ import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -95,6 +96,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -105,6 +107,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -144,7 +147,7 @@ public class RemoteLogManager implements Closeable {
private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture>
leaderOrFollowerTasks = new ConcurrentHashMap<>();
// topic ids that are received on leadership changes, this map is cleared
on stop partitions
- private final ConcurrentMap<TopicPartition, Uuid> topicPartitionIds = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<TopicPartition, Uuid> topicIdByPartitionMap =
new ConcurrentHashMap<>();
private final String clusterId;
// The endpoint for remote log metadata manager to connect to
@@ -288,7 +291,7 @@ public class RemoteLogManager implements Closeable {
}
private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
- Uuid previousTopicId =
topicPartitionIds.put(topicIdPartition.topicPartition(),
topicIdPartition.topicId());
+ Uuid previousTopicId =
topicIdByPartitionMap.put(topicIdPartition.topicPartition(),
topicIdPartition.topicId());
if (previousTopicId != null && previousTopicId !=
topicIdPartition.topicId()) {
LOGGER.info("Previous cached topic id {} for {} does not match
updated topic id {}",
previousTopicId, topicIdPartition.topicPartition(),
topicIdPartition.topicId());
@@ -343,21 +346,81 @@ public class RemoteLogManager implements Closeable {
/**
* Deletes the internal topic partition info if delete flag is set as true.
*
- * @param topicPartition topic partition to be stopped.
+ * @param topicPartitions topic partitions that needs to be stopped.
* @param delete flag to indicate whether the given topic
partitions to be deleted or not.
+ * @param errorHandler callback to handle any errors while stopping the
partitions.
*/
- public void stopPartitions(TopicPartition topicPartition, boolean delete) {
+ public void stopPartitions(Set<TopicPartition> topicPartitions,
+ boolean delete,
+ BiConsumer<TopicPartition, Throwable>
errorHandler) {
+ LOGGER.debug("Stopping {} partitions, delete: {}",
topicPartitions.size(), delete);
+ Set<TopicIdPartition> topicIdPartitions = topicPartitions.stream()
+ .filter(topicIdByPartitionMap::containsKey)
+ .map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp),
tp))
+ .collect(Collectors.toSet());
+
+ topicIdPartitions.forEach(tpId -> {
+ try {
+ RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
+ if (task != null) {
+ LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
+ task.cancel();
+ }
+ if (delete) {
+ LOGGER.info("Deleting the remote log segments task for
partition: {}", tpId);
+ deleteRemoteLogPartition(tpId);
+ }
+ } catch (Exception ex) {
+ errorHandler.accept(tpId.topicPartition(), ex);
+ LOGGER.error("Error while stopping the partition: {}, delete:
{}", tpId.topicPartition(), delete, ex);
+ }
+ });
+ remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
if (delete) {
- // Delete from internal datastructures only if it is to be deleted.
- Uuid topicIdPartition = topicPartitionIds.remove(topicPartition);
- LOGGER.debug("Removed partition: {} from topicPartitionIds",
topicIdPartition);
+ // NOTE: this#stopPartitions method is called when Replica state
changes to Offline and ReplicaDeletionStarted
+ topicPartitions.forEach(topicIdByPartitionMap::remove);
+ }
+ }
+
+ private void deleteRemoteLogPartition(TopicIdPartition partition) throws
RemoteStorageException, ExecutionException, InterruptedException {
+ List<RemoteLogSegmentMetadata> metadataList = new ArrayList<>();
+
remoteLogMetadataManager.listRemoteLogSegments(partition).forEachRemaining(metadataList::add);
+
+ List<RemoteLogSegmentMetadataUpdate> deleteSegmentStartedEvents =
metadataList.stream()
+ .map(metadata ->
+ new
RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(),
time.milliseconds(),
+ metadata.customMetadata(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId))
+ .collect(Collectors.toList());
+ publishEvents(deleteSegmentStartedEvents).get();
+
+ // KAFKA-15313: Delete remote log segments partition asynchronously
when a partition is deleted.
+ Collection<Uuid> deletedSegmentIds = new ArrayList<>();
+ for (RemoteLogSegmentMetadata metadata: metadataList) {
+ deletedSegmentIds.add(metadata.remoteLogSegmentId().id());
+ remoteLogStorageManager.deleteLogSegmentData(metadata);
+ }
+ indexCache.removeAll(deletedSegmentIds);
+
+ List<RemoteLogSegmentMetadataUpdate> deleteSegmentFinishedEvents =
metadataList.stream()
+ .map(metadata ->
+ new
RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(),
time.milliseconds(),
+ metadata.customMetadata(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId))
+ .collect(Collectors.toList());
+ publishEvents(deleteSegmentFinishedEvents).get();
+ }
+
+ private CompletableFuture<Void>
publishEvents(List<RemoteLogSegmentMetadataUpdate> events) throws
RemoteStorageException {
+ List<CompletableFuture<Void>> result = new ArrayList<>();
+ for (RemoteLogSegmentMetadataUpdate event : events) {
+
result.add(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(event));
}
+ return CompletableFuture.allOf(result.toArray(new
CompletableFuture[0]));
}
public Optional<RemoteLogSegmentMetadata>
fetchRemoteLogSegmentMetadata(TopicPartition topicPartition,
int epochForOffset,
long offset) throws RemoteStorageException {
- Uuid topicId = topicPartitionIds.get(topicPartition);
+ Uuid topicId = topicIdByPartitionMap.get(topicPartition);
if (topicId == null) {
throw new KafkaException("No topic id registered for topic
partition: " + topicPartition);
@@ -418,7 +481,7 @@ public class RemoteLogManager implements Closeable {
long
timestamp,
long
startingOffset,
LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException,
IOException {
- Uuid topicId = topicPartitionIds.get(tp);
+ Uuid topicId = topicIdByPartitionMap.get(tp);
if (topicId == null) {
throw new KafkaException("Topic id does not exist for topic
partition: " + tp);
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d56762a151d..52a3cc0341b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -89,6 +89,8 @@ case class LogDeleteRecordsResult(requestedOffset: Long,
lowWatermark: Long, exc
}
}
+case class StopPartition(topicPartition: TopicPartition, deleteLocalLog:
Boolean, deleteRemoteLog: Boolean = false)
+
/**
* Result metadata of a log read operation on the log
* @param info @FetchDataInfo returned by the @Log read
@@ -456,7 +458,7 @@ class ReplicaManager(val config: KafkaConfig,
} else {
this.controllerEpoch = controllerEpoch
- val stoppedPartitions = mutable.Map.empty[TopicPartition, Boolean]
+ val stoppedPartitions = mutable.Buffer.empty[StopPartition]
partitionStates.forKeyValue { (topicPartition, partitionState) =>
val deletePartition = partitionState.deletePartition()
@@ -478,7 +480,8 @@ class ReplicaManager(val config: KafkaConfig,
if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
requestLeaderEpoch >= currentLeaderEpoch) {
- stoppedPartitions += topicPartition -> deletePartition
+ stoppedPartitions += StopPartition(topicPartition,
deletePartition,
+ deletePartition && partition.isLeader && requestLeaderEpoch
== LeaderAndIsr.EpochDuringDelete)
// Assume that everything will go right. It is overwritten in
case of an error.
responseMap.put(topicPartition, Errors.NONE)
} else if (requestLeaderEpoch < currentLeaderEpoch) {
@@ -499,12 +502,12 @@ class ReplicaManager(val config: KafkaConfig,
case HostedPartition.None =>
// Delete log and corresponding folders in case replica manager
doesn't hold them anymore.
// This could happen when topic is being deleted while broker is
down and recovers.
- stoppedPartitions += topicPartition -> deletePartition
+ stoppedPartitions += StopPartition(topicPartition,
deletePartition)
responseMap.put(topicPartition, Errors.NONE)
}
}
- stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) =>
+ stopPartitions(stoppedPartitions.toSet).foreach { case
(topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) {
stateChangeLogger.error(s"Ignoring StopReplica request
(delete=true) from " +
s"controller $controllerId with correlation id $correlationId
" +
@@ -526,25 +529,39 @@ class ReplicaManager(val config: KafkaConfig,
/**
* Stop the given partitions.
*
- * @param partitionsToStop A map from a topic partition to a boolean
indicating
- * whether the partition should be deleted.
+ * @param partitionsToStop A map from a topic partition to a boolean
indicating
+ * whether the partition should be deleted.
+ * @return A map from partitions to exceptions which occurred.
+ * If no errors occurred, the map will be empty.
+ */
+ private def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]):
Map[TopicPartition, Throwable] = {
+ stopPartitions(partitionsToStop.map {
+ case (topicPartition, deleteLocalLog) => StopPartition(topicPartition,
deleteLocalLog)
+ }.toSet)
+ }
+
+ /**
+ * Stop the given partitions.
+ *
+ * @param partitionsToStop set of topic-partitions to be stopped which also
indicates whether to remove the
+ * partition data from the local and remote log
storage.
*
- * @return A map from partitions to exceptions which
occurred.
- * If no errors occurred, the map will be empty.
+ * @return A map from partitions to exceptions which
occurred.
+ * If no errors occurred, the map will be empty.
*/
- protected def stopPartitions(
- partitionsToStop: Map[TopicPartition, Boolean]
- ): Map[TopicPartition, Throwable] = {
+ private def stopPartitions(partitionsToStop: Set[StopPartition]):
Map[TopicPartition, Throwable] = {
// First stop fetchers for all partitions.
- val partitions = partitionsToStop.keySet
+ val partitions = partitionsToStop.map(_.topicPartition)
replicaFetcherManager.removeFetcherForPartitions(partitions)
replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
// Second remove deleted partitions from the partition map. Fetchers rely
on the
// ReplicaManager to get Partition's information so they must be stopped
first.
val partitionsToDelete = mutable.Set.empty[TopicPartition]
- partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
- if (shouldDelete) {
+ val remotePartitionsToDelete = mutable.Set.empty[TopicPartition]
+ partitionsToStop.foreach { stopPartition =>
+ val topicPartition = stopPartition.topicPartition
+ if (stopPartition.deleteLocalLog) {
getPartition(topicPartition) match {
case hostedPartition: HostedPartition.Online =>
if (allPartitions.remove(topicPartition, hostedPartition)) {
@@ -558,6 +575,9 @@ class ReplicaManager(val config: KafkaConfig,
}
partitionsToDelete += topicPartition
}
+ if (stopPartition.deleteRemoteLog)
+ remotePartitionsToDelete += topicPartition
+
// If we were the leader, we may have some operations still waiting for
completion.
// We force completion to prevent them from timing out.
completeDelayedFetchOrProduceRequests(topicPartition)
@@ -569,6 +589,17 @@ class ReplicaManager(val config: KafkaConfig,
// Delete the logs and checkpoint.
logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp,
e))
}
+ remoteLogManager.foreach { rlm =>
+ // exclude the partitions with offline/error state
+ errorMap.keySet.foreach(remotePartitionsToDelete.remove)
+ if (remotePartitionsToDelete.nonEmpty) {
+ rlm.stopPartitions(remotePartitionsToDelete.asJava, true, (tp, e) =>
errorMap.put(tp, e))
+ }
+ val remotePartitionsToNotDelete =
partitions.diff(remotePartitionsToDelete)
+ if (remotePartitionsToNotDelete.nonEmpty) {
+ rlm.stopPartitions(remotePartitionsToNotDelete.asJava, false, (tp, e)
=> errorMap.put(tp, e))
+ }
+ }
errorMap
}
@@ -2418,7 +2449,15 @@ class ReplicaManager(val config: KafkaConfig,
// Handle deleted partitions. We need to do this first because we might
subsequently
// create new partitions with the same names as the ones we are deleting
here.
if (!localChanges.deletes.isEmpty) {
- val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap
+ val deletes = localChanges.deletes.asScala
+ .map { tp =>
+ val isCurrentLeader = Option(delta.image().getTopic(tp.topic()))
+ .map(image => image.partitions().get(tp.partition()))
+ .exists(partition => partition.leader == config.nodeId)
+ val deleteRemoteLog = delta.topicWasDeleted(tp.topic()) &&
isCurrentLeader
+ StopPartition(tp, deleteLocalLog = true, deleteRemoteLog =
deleteRemoteLog)
+ }
+ .toSet
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
stopPartitions(deletes).forKeyValue { (topicPartition, e) =>
if (e.isInstanceOf[KafkaStorageException]) {
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index e22caf6f593..f0b89aca855 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -81,6 +81,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -91,6 +92,8 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
@@ -105,6 +108,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -824,12 +828,12 @@ public class RemoteLogManagerTest {
verifyInCache(followerTopicIdPartition, leaderTopicIdPartition);
// Evicts from topicId cache
-
remoteLogManager.stopPartitions(leaderTopicIdPartition.topicPartition(), true);
+
remoteLogManager.stopPartitions(Collections.singleton(leaderTopicIdPartition.topicPartition()),
true, (tp, ex) -> { });
verifyNotInCache(leaderTopicIdPartition);
verifyInCache(followerTopicIdPartition);
// Evicts from topicId cache
-
remoteLogManager.stopPartitions(followerTopicIdPartition.topicPartition(),
true);
+
remoteLogManager.stopPartitions(Collections.singleton(followerTopicIdPartition.topicPartition()),
true, (tp, ex) -> { });
verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition);
}
@@ -1053,6 +1057,73 @@ public class RemoteLogManagerTest {
assertEquals(expected, actual);
}
+ @Test
+ public void testStopPartitionsWithoutDeletion() throws
RemoteStorageException {
+ BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition,
throwable) -> fail("shouldn't be called");
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(leaderTopicIdPartition.topicPartition());
+ partitions.add(followerTopicIdPartition.topicPartition());
+
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
+
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+
+ remoteLogManager.stopPartitions(partitions, false, errorHandler);
+ verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
+ verify(remoteStorageManager, times(0)).deleteLogSegmentData(any());
+ verify(remoteLogMetadataManager,
times(0)).updateRemoteLogSegmentMetadata(any());
+ }
+
+ @Test
+ public void testStopPartitionsWithDeletion() throws RemoteStorageException
{
+ BiConsumer<TopicPartition, Throwable> errorHandler =
+ (topicPartition, ex) -> fail("shouldn't be called: " + ex);
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(leaderTopicIdPartition.topicPartition());
+ partitions.add(followerTopicIdPartition.topicPartition());
+
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
+
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
+
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100,
1024).iterator());
+
when(remoteLogMetadataManager.listRemoteLogSegments(eq(followerTopicIdPartition)))
+
.thenReturn(listRemoteLogSegmentMetadata(followerTopicIdPartition, 3, 100,
1024).iterator());
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+ when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
+ .thenReturn(dummyFuture);
+
+ remoteLogManager.stopPartitions(partitions, true, errorHandler);
+ verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
+ verify(remoteStorageManager, times(8)).deleteLogSegmentData(any());
+ verify(remoteLogMetadataManager,
times(16)).updateRemoteLogSegmentMetadata(any());
+ }
+
+ private List<RemoteLogSegmentMetadata>
listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
+ int
segmentCount,
+ int
recordsPerSegment,
+ int
segmentSize) {
+ List<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<>();
+ for (int idx = 0; idx < segmentCount; idx++) {
+ long timestamp = time.milliseconds();
+ long startOffset = (long) idx * recordsPerSegment;
+ long endOffset = startOffset + recordsPerSegment - 1;
+ Map<Integer, Long> segmentLeaderEpochs =
truncateAndGetLeaderEpochs(totalEpochEntries, startOffset, endOffset);
+ segmentMetadataList.add(new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(topicIdPartition,
+ Uuid.randomUuid()), startOffset, endOffset, timestamp,
brokerId, timestamp, segmentSize,
+ segmentLeaderEpochs));
+ }
+ return segmentMetadataList;
+ }
+
+ private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry>
entries,
+ Long startOffset,
+ Long endOffset) {
+ InMemoryLeaderEpochCheckpoint myCheckpoint = new
InMemoryLeaderEpochCheckpoint();
+ myCheckpoint.write(entries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(null,
myCheckpoint);
+ cache.truncateFromStart(startOffset);
+ cache.truncateFromEnd(endOffset);
+ return myCheckpoint.read().stream().collect(Collectors.toMap(e ->
e.epoch, e -> e.startOffset));
+ }
+
private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);
diff --git
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index 59adcf722bf..bd81b40fa8e 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -18,12 +18,14 @@ package kafka.admin
import kafka.api.IntegrationTestHarness
import kafka.server.KafkaConfig
-import kafka.utils.{TestInfoUtils, TestUtils}
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.InvalidConfigurationException
-import
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
NoOpRemoteStorageManager, RemoteLogManagerConfig}
+import org.apache.kafka.common.errors.{InvalidConfigurationException,
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.utils.MockTime
+import
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
NoOpRemoteStorageManager,
+ RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata,
RemoteLogSegmentState}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
@@ -31,7 +33,8 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util
-import java.util.{Collections, Properties}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.{Collections, Optional, Properties}
import scala.collection.Seq
import scala.concurrent.ExecutionException
import scala.util.Random
@@ -41,8 +44,11 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val numPartitions = 2
val numReplicationFactor = 2
+
var testTopicName: String = _
var sysRemoteStorageEnabled = true
+ var storageManagerClassName: String =
classOf[NoOpRemoteStorageManager].getName
+ var metadataManagerClassName: String =
classOf[NoOpRemoteLogMetadataManager].getName
override protected def brokerCount: Int = 2
@@ -59,6 +65,10 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
if
(info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) {
sysRemoteStorageEnabled = false
}
+ if (info.getTestMethod.get().getName.equals("testTopicDeletion")) {
+ storageManagerClassName = classOf[MyRemoteStorageManager].getName
+ metadataManagerClassName = classOf[MyRemoteLogMetadataManager].getName
+ }
super.setUp(info)
testTopicName =
s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
}
@@ -270,6 +280,27 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
() => admin.incrementalAlterConfigs(configs).all().get(), "Invalid local
retention size")
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTopicDeletion(quorum: String): Unit = {
+ val numPartitions = 2
+ val topicConfig = new Properties()
+ topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+ topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
+ topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
+ TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName,
brokers, numPartitions, brokerCount,
+ topicConfig = topicConfig)
+ TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers)
+ assertThrowsException(classOf[UnknownTopicOrPartitionException],
+ () => TestUtils.describeTopic(createAdminClient(), testTopicName),
"Topic should be deleted")
+
+ // FIXME: It seems the storage manager is being instantiated in different
class loader so couldn't verify the value
+ // but ensured it by adding a log statement in the storage manager
(manually).
+ // assertEquals(numPartitions * MyRemoteLogMetadataManager.segmentCount,
+ // MyRemoteStorageManager.deleteSegmentEventCounter.get(),
+ // "Remote log segments should be deleted only once by the leader")
+ }
+
private def assertThrowsException(exceptionType: Class[_ <: Throwable],
executable: Executable,
message: String = ""): Throwable = {
@@ -320,11 +351,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
private def overrideProps(): Properties = {
val props = new Properties()
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP,
sysRemoteStorageEnabled.toString)
- props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
- classOf[NoOpRemoteStorageManager].getName)
-
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
- classOf[NoOpRemoteLogMetadataManager].getName)
-
+ props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
storageManagerClassName)
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
metadataManagerClassName)
props.put(KafkaConfig.LogRetentionTimeMillisProp, "2000")
props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
props.put(KafkaConfig.LogRetentionBytesProp, "2048")
@@ -332,3 +360,42 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
props
}
}
+
+object MyRemoteStorageManager {
+ val deleteSegmentEventCounter = new AtomicInteger(0)
+}
+
+class MyRemoteStorageManager extends NoOpRemoteStorageManager with Logging {
+ import MyRemoteStorageManager._
+
+ override def deleteLogSegmentData(remoteLogSegmentMetadata:
RemoteLogSegmentMetadata): Unit = {
+ deleteSegmentEventCounter.incrementAndGet()
+ info(s"Deleted the remote log segment: $remoteLogSegmentMetadata, counter:
${deleteSegmentEventCounter.get()}")
+ }
+}
+
+class MyRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager {
+
+ import MyRemoteLogMetadataManager._
+ val time = new MockTime()
+
+ override def listRemoteLogSegments(topicIdPartition: TopicIdPartition):
util.Iterator[RemoteLogSegmentMetadata] = {
+ val segmentMetadataList = new util.ArrayList[RemoteLogSegmentMetadata]()
+ for (idx <- 0 until segmentCount) {
+ val timestamp = time.milliseconds()
+ val startOffset = idx * recordsPerSegment
+ val endOffset = startOffset + recordsPerSegment - 1
+ val segmentLeaderEpochs: util.Map[Integer, java.lang.Long] =
Collections.singletonMap(0, 0L)
+ segmentMetadataList.add(new RemoteLogSegmentMetadata(new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+ startOffset, endOffset, timestamp, 0, timestamp, segmentSize,
Optional.empty(),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentLeaderEpochs))
+ }
+ segmentMetadataList.iterator()
+ }
+}
+
+object MyRemoteLogMetadataManager {
+ val segmentCount = 10
+ val recordsPerSegment = 100
+ val segmentSize = 1024
+}
diff --git
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index b087676bed2..4b92da4007e 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -34,6 +34,7 @@ import org.slf4j.{Logger, LoggerFactory}
import java.io.{File, FileInputStream}
import java.nio.file.Files
+import java.util
import java.util.Collections
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import scala.collection.mutable
@@ -443,8 +444,59 @@ class RemoteIndexCacheTest {
verifyNoMoreInteractions(rsm)
}
- private def generateSpyCacheEntry(): RemoteIndexCache.Entry = {
- val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)
+ @Test
+ def testRemoveItem(): Unit = {
+ val segmentId = rlsMetadata.remoteLogSegmentId()
+ val segmentUuid = segmentId.id()
+ // generate and add entry to cache
+ val spyEntry = generateSpyCacheEntry(segmentId)
+ cache.internalCache.put(segmentUuid, spyEntry)
+ assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
+ assertFalse(spyEntry.isMarkedForCleanup)
+
+ cache.remove(segmentId.id())
+ assertFalse(cache.internalCache().asMap().containsKey(segmentUuid))
+ TestUtils.waitUntilTrue(() => spyEntry.isMarkedForCleanup, "Failed to mark
cache entry for cleanup after invalidation")
+ }
+
+ @Test
+ def testRemoveNonExistentItem(): Unit = {
+ // generate and add entry to cache
+ val segmentId = rlsMetadata.remoteLogSegmentId()
+ val segmentUuid = segmentId.id()
+ // generate and add entry to cache
+ val spyEntry = generateSpyCacheEntry(segmentId)
+ cache.internalCache.put(segmentUuid, spyEntry)
+ assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
+
+ // remove a random Uuid
+ cache.remove(Uuid.randomUuid())
+ assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
+ assertFalse(spyEntry.isMarkedForCleanup)
+ }
+
+ @Test
+ def testRemoveMultipleItems(): Unit = {
+ // generate and add entry to cache
+ val uuidAndEntryList = new util.HashMap[Uuid, RemoteIndexCache.Entry]()
+ for (_ <- 0 until 10) {
+ val segmentId = RemoteLogSegmentId.generateNew(idPartition)
+ val segmentUuid = segmentId.id()
+ val spyEntry = generateSpyCacheEntry(segmentId)
+ uuidAndEntryList.put(segmentUuid, spyEntry)
+
+ cache.internalCache.put(segmentUuid, spyEntry)
+ assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
+ assertFalse(spyEntry.isMarkedForCleanup)
+ }
+ cache.removeAll(uuidAndEntryList.keySet())
+ uuidAndEntryList.values().forEach { entry =>
+ TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, "Failed to mark
cache entry for cleanup after invalidation")
+ }
+ }
+
+ private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
+ =
RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId,
baseOffset, lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize,
Collections.singletonMap(0, 0L))
val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 92533d0e746..e8c3c81173c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3283,10 +3283,12 @@ class ReplicaManagerTest {
(rm0, rm1)
}
- @Test
- def testStopReplicaWithStaleControllerEpoch(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testStopReplicaWithStaleControllerEpoch(enableRemoteStorage: Boolean):
Unit = {
val mockTimer = new MockTimer(time)
- val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer,
aliveBrokerIds = Seq(0, 1))
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer,
aliveBrokerIds = Seq(0, 1),
+ enableRemoteStorage = enableRemoteStorage)
try {
val tp0 = new TopicPartition(topic, 0)
@@ -3309,15 +3311,20 @@ class ReplicaManagerTest {
val (_, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
assertEquals(Errors.STALE_CONTROLLER_EPOCH, error)
+ if (enableRemoteStorage) {
+ verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any(),
any())
+ }
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
- @Test
- def testStopReplicaWithOfflinePartition(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testStopReplicaWithOfflinePartition(enableRemoteStorage: Boolean): Unit
= {
val mockTimer = new MockTimer(time)
- val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer,
aliveBrokerIds = Seq(0, 1))
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer,
aliveBrokerIds = Seq(0, 1),
+ enableRemoteStorage = enableRemoteStorage)
try {
val tp0 = new TopicPartition(topic, 0)
@@ -3342,29 +3349,38 @@ class ReplicaManagerTest {
val (result, error) = replicaManager.stopReplicas(1, 0, 0,
partitionStates)
assertEquals(Errors.NONE, error)
assertEquals(Map(tp0 -> Errors.KAFKA_STORAGE_ERROR), result)
+ if (enableRemoteStorage) {
+ verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any(),
any())
+ }
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
- @Test
- def testStopReplicaWithInexistentPartition(): Unit = {
- testStopReplicaWithInexistentPartition(false, false)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testStopReplicaWithInexistentPartition(enableRemoteStorage: Boolean):
Unit = {
+ testStopReplicaWithInexistentPartition(false, false, enableRemoteStorage)
}
- @Test
- def testStopReplicaWithInexistentPartitionAndPartitionsDelete(): Unit = {
- testStopReplicaWithInexistentPartition(true, false)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithInexistentPartitionAndPartitionsDelete(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithInexistentPartition(true, false, enableRemoteStorage)
}
- @Test
- def
testStopReplicaWithInexistentPartitionAndPartitionsDeleteAndIOException(): Unit
= {
- testStopReplicaWithInexistentPartition(true, true)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithInexistentPartitionAndPartitionsDeleteAndIOException(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithInexistentPartition(true, true, enableRemoteStorage)
}
- private def testStopReplicaWithInexistentPartition(deletePartitions:
Boolean, throwIOException: Boolean): Unit = {
+ private def testStopReplicaWithInexistentPartition(deletePartitions: Boolean,
+ throwIOException: Boolean,
+ enableRemoteStorage:
Boolean): Unit = {
val mockTimer = new MockTimer(time)
- val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer,
aliveBrokerIds = Seq(0, 1))
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer,
aliveBrokerIds = Seq(0, 1),
+ enableRemoteStorage = enableRemoteStorage)
try {
val tp0 = new TopicPartition(topic, 0)
@@ -3396,64 +3412,79 @@ class ReplicaManagerTest {
assertEquals(Map(tp0 -> Errors.NONE), result)
assertTrue(replicaManager.logManager.getLog(tp0).isDefined)
}
+ if (enableRemoteStorage) {
+ verify(mockRemoteLogManager, times(1))
+ .stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
ArgumentMatchers.eq(false), any())
+ }
} finally {
replicaManager.shutdown(checkpointHW = false)
}
}
- @Test
- def testStopReplicaWithExistingPartitionAndNewerLeaderEpoch(): Unit = {
- testStopReplicaWithExistingPartition(2, false, false, Errors.NONE)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithExistingPartitionAndNewerLeaderEpoch(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(2, false, false, Errors.NONE,
enableRemoteStorage)
}
- @Test
- def testStopReplicaWithExistingPartitionAndOlderLeaderEpoch(): Unit = {
- testStopReplicaWithExistingPartition(0, false, false,
Errors.FENCED_LEADER_EPOCH)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithExistingPartitionAndOlderLeaderEpoch(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(0, false, false,
Errors.FENCED_LEADER_EPOCH, enableRemoteStorage)
}
- @Test
- def testStopReplicaWithExistingPartitionAndEqualLeaderEpoch(): Unit = {
- testStopReplicaWithExistingPartition(1, false, false, Errors.NONE)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithExistingPartitionAndEqualLeaderEpoch(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(1, false, false, Errors.NONE,
enableRemoteStorage)
}
- @Test
- def testStopReplicaWithExistingPartitionAndDeleteSentinel(): Unit = {
- testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete,
false, false, Errors.NONE)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithExistingPartitionAndDeleteSentinel(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete,
false, false, Errors.NONE, enableRemoteStorage)
}
- @Test
- def testStopReplicaWithExistingPartitionAndLeaderEpochNotProvided(): Unit = {
- testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, false, false,
Errors.NONE)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithExistingPartitionAndLeaderEpochNotProvided(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, false, false,
Errors.NONE, enableRemoteStorage)
}
- @Test
- def
testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpoch():
Unit = {
- testStopReplicaWithExistingPartition(2, true, false, Errors.NONE)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpoch(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(2, true, false, Errors.NONE,
enableRemoteStorage)
}
- @Test
- def
testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpochAndIOException():
Unit = {
- testStopReplicaWithExistingPartition(2, true, true,
Errors.KAFKA_STORAGE_ERROR)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpochAndIOException(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(2, true, true,
Errors.KAFKA_STORAGE_ERROR, enableRemoteStorage)
}
- @Test
- def
testStopReplicaWithDeletePartitionAndExistingPartitionAndOlderLeaderEpoch():
Unit = {
- testStopReplicaWithExistingPartition(0, true, false,
Errors.FENCED_LEADER_EPOCH)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithDeletePartitionAndExistingPartitionAndOlderLeaderEpoch(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(0, true, false,
Errors.FENCED_LEADER_EPOCH, enableRemoteStorage)
}
- @Test
- def
testStopReplicaWithDeletePartitionAndExistingPartitionAndEqualLeaderEpoch():
Unit = {
- testStopReplicaWithExistingPartition(1, true, false, Errors.NONE)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithDeletePartitionAndExistingPartitionAndEqualLeaderEpoch(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(1, true, false, Errors.NONE,
enableRemoteStorage)
}
- @Test
- def
testStopReplicaWithDeletePartitionAndExistingPartitionAndDeleteSentinel(): Unit
= {
- testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, true,
false, Errors.NONE)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithDeletePartitionAndExistingPartitionAndDeleteSentinel(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, true,
false, Errors.NONE, enableRemoteStorage)
}
- @Test
- def
testStopReplicaWithDeletePartitionAndExistingPartitionAndLeaderEpochNotProvided():
Unit = {
- testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false,
Errors.NONE)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def
testStopReplicaWithDeletePartitionAndExistingPartitionAndLeaderEpochNotProvided(enableRemoteStorage:
Boolean): Unit = {
+ testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false,
Errors.NONE, enableRemoteStorage)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -3698,9 +3729,11 @@ class ReplicaManagerTest {
private def testStopReplicaWithExistingPartition(leaderEpoch: Int,
deletePartition: Boolean,
throwIOException: Boolean,
- expectedOutput: Errors):
Unit = {
+ expectedOutput: Errors,
+ enableRemoteStorage:
Boolean): Unit = {
val mockTimer = new MockTimer(time)
- val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer,
aliveBrokerIds = Seq(0, 1))
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer,
aliveBrokerIds = Seq(0, 1),
+ enableRemoteStorage = enableRemoteStorage)
try {
val tp0 = new TopicPartition(topic, 0)
@@ -3762,6 +3795,15 @@ class ReplicaManagerTest {
assertEquals(HostedPartition.None, replicaManager.getPartition(tp0))
assertFalse(readRecoveryPointCheckpoint().contains(tp0))
assertFalse(readLogStartOffsetCheckpoint().contains(tp0))
+ if (enableRemoteStorage) {
+
verify(mockRemoteLogManager).stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
+ ArgumentMatchers.eq(leaderEpoch ==
LeaderAndIsr.EpochDuringDelete), any())
+ }
+ }
+
+ if (expectedOutput == Errors.NONE && !deletePartition &&
enableRemoteStorage) {
+
verify(mockRemoteLogManager).stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
+ ArgumentMatchers.eq(false), any())
}
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -4409,7 +4451,11 @@ class ReplicaManagerTest {
val notReplicaMetadataImage =
imageFromTopics(notReplicaTopicsDelta.apply())
replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage)
- verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(),
anySet(), anyMap())
+ if (enableRemoteStorage) {
+ verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(),
anySet(), anyMap())
+ verify(mockRemoteLogManager, times(1)).stopPartitions(
+ ArgumentMatchers.eq(Collections.singleton(topicPartition)),
ArgumentMatchers.eq(false), any())
+ }
// Check that the partition was removed
assertEquals(HostedPartition.None,
replicaManager.getPartition(topicPartition))
@@ -4451,7 +4497,12 @@ class ReplicaManagerTest {
val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage)
- verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(),
anySet(), anyMap())
+
+ if (enableRemoteStorage) {
+ verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(),
anySet(), anyMap())
+ verify(mockRemoteLogManager, times(1)).stopPartitions(
+ ArgumentMatchers.eq(Collections.singleton(topicPartition)),
ArgumentMatchers.eq(false), any())
+ }
// Check that the partition was removed
assertEquals(HostedPartition.None,
replicaManager.getPartition(topicPartition))
@@ -4493,7 +4544,12 @@ class ReplicaManagerTest {
val notReplicaTopicsDelta =
topicsChangeDelta(leaderMetadataImage.topics(), otherId, true)
val notReplicaMetadataImage =
imageFromTopics(notReplicaTopicsDelta.apply())
replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage)
- verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(),
anySet(), anyMap())
+
+ if (enableRemoteStorage) {
+ verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(),
anySet(), anyMap())
+ verify(mockRemoteLogManager, times(1)).stopPartitions(
+ ArgumentMatchers.eq(Collections.singleton(topicPartition)),
ArgumentMatchers.eq(false), any())
+ }
// Check that the partition was removed
assertEquals(HostedPartition.None,
replicaManager.getPartition(topicPartition))
@@ -4535,7 +4591,12 @@ class ReplicaManagerTest {
val removeTopicsDelta = topicsDeleteDelta(leaderMetadataImage.topics())
val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage)
- verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(),
anySet(), anyMap())
+
+ if (enableRemoteStorage) {
+ verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(),
anySet(), anyMap())
+ verify(mockRemoteLogManager, times(1)).stopPartitions(
+ ArgumentMatchers.eq(Collections.singleton(topicPartition)),
ArgumentMatchers.eq(true), any())
+ }
// Check that the partition was removed
assertEquals(HostedPartition.None,
replicaManager.getPartition(topicPartition))
diff --git
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
index 71881517182..6ffcf85b9b0 100644
---
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
+++
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
@@ -29,12 +29,12 @@ import java.util.concurrent.CompletableFuture;
public class NoOpRemoteLogMetadataManager implements RemoteLogMetadataManager {
@Override
public CompletableFuture<Void>
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void>
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
remoteLogSegmentMetadataUpdate) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
@Override
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index d730eeee31e..30c4da4ce44 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -166,6 +166,24 @@ public class RemoteIndexCache implements Closeable {
return internalCache;
}
+ public void remove(Uuid key) {
+ lock.writeLock().lock();
+ try {
+ internalCache.invalidate(key);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void removeAll(Collection<Uuid> keys) {
+ lock.writeLock().lock();
+ try {
+ internalCache.invalidateAll(keys);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
// Visible for testing
public ShutdownableThread cleanerThread() {
return cleanerThread;