This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f88fb28f38 KAFKA-15130: Delete remote segments when deleting a topic 
(#13947)
4f88fb28f38 is described below

commit 4f88fb28f38f3e461377bf688520b28a3f209b5d
Author: DL1231 <[email protected]>
AuthorDate: Fri Aug 18 20:51:09 2023 +0800

    KAFKA-15130: Delete remote segments when deleting a topic (#13947)
    
    * Delete remote segments when deleting a topic
    
    Co-authored-by: Kamal Chandraprakash <[email protected]>
    Co-authored-by: d00791190 <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |  81 ++++++++--
 .../main/scala/kafka/server/ReplicaManager.scala   |  69 ++++++--
 .../kafka/log/remote/RemoteLogManagerTest.java     |  75 ++++++++-
 .../kafka/admin/RemoteTopicCrudTest.scala          |  87 +++++++++--
 .../kafka/log/remote/RemoteIndexCacheTest.scala    |  56 ++++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 173 ++++++++++++++-------
 .../storage/NoOpRemoteLogMetadataManager.java      |   4 +-
 .../storage/internals/log/RemoteIndexCache.java    |  18 +++
 8 files changed, 467 insertions(+), 96 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 82c0da8b584..a9372a80fa0 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -84,6 +84,7 @@ import java.nio.file.Path;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -95,6 +96,7 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -105,6 +107,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -144,7 +147,7 @@ public class RemoteLogManager implements Closeable {
     private final ConcurrentHashMap<TopicIdPartition, RLMTaskWithFuture> 
leaderOrFollowerTasks = new ConcurrentHashMap<>();
 
     // topic ids that are received on leadership changes, this map is cleared 
on stop partitions
-    private final ConcurrentMap<TopicPartition, Uuid> topicPartitionIds = new 
ConcurrentHashMap<>();
+    private final ConcurrentMap<TopicPartition, Uuid> topicIdByPartitionMap = 
new ConcurrentHashMap<>();
     private final String clusterId;
 
     // The endpoint for remote log metadata manager to connect to
@@ -288,7 +291,7 @@ public class RemoteLogManager implements Closeable {
     }
 
     private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
-        Uuid previousTopicId = 
topicPartitionIds.put(topicIdPartition.topicPartition(), 
topicIdPartition.topicId());
+        Uuid previousTopicId = 
topicIdByPartitionMap.put(topicIdPartition.topicPartition(), 
topicIdPartition.topicId());
         if (previousTopicId != null && previousTopicId != 
topicIdPartition.topicId()) {
             LOGGER.info("Previous cached topic id {} for {} does not match 
updated topic id {}",
                     previousTopicId, topicIdPartition.topicPartition(), 
topicIdPartition.topicId());
@@ -343,21 +346,81 @@ public class RemoteLogManager implements Closeable {
     /**
      * Deletes the internal topic partition info if delete flag is set as true.
      *
-     * @param topicPartition topic partition to be stopped.
+     * @param topicPartitions topic partitions that needs to be stopped.
      * @param delete         flag to indicate whether the given topic 
partitions to be deleted or not.
+     * @param errorHandler   callback to handle any errors while stopping the 
partitions.
      */
-    public void stopPartitions(TopicPartition topicPartition, boolean delete) {
+    public void stopPartitions(Set<TopicPartition> topicPartitions,
+                               boolean delete,
+                               BiConsumer<TopicPartition, Throwable> 
errorHandler) {
+        LOGGER.debug("Stopping {} partitions, delete: {}", 
topicPartitions.size(), delete);
+        Set<TopicIdPartition> topicIdPartitions = topicPartitions.stream()
+                .filter(topicIdByPartitionMap::containsKey)
+                .map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), 
tp))
+                .collect(Collectors.toSet());
+
+        topicIdPartitions.forEach(tpId -> {
+            try {
+                RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
+                if (task != null) {
+                    LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
+                    task.cancel();
+                }
+                if (delete) {
+                    LOGGER.info("Deleting the remote log segments task for 
partition: {}", tpId);
+                    deleteRemoteLogPartition(tpId);
+                }
+            } catch (Exception ex) {
+                errorHandler.accept(tpId.topicPartition(), ex);
+                LOGGER.error("Error while stopping the partition: {}, delete: 
{}", tpId.topicPartition(), delete, ex);
+            }
+        });
+        remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
         if (delete) {
-            // Delete from internal datastructures only if it is to be deleted.
-            Uuid topicIdPartition = topicPartitionIds.remove(topicPartition);
-            LOGGER.debug("Removed partition: {} from topicPartitionIds", 
topicIdPartition);
+            // NOTE: this#stopPartitions method is called when Replica state 
changes to Offline and ReplicaDeletionStarted
+            topicPartitions.forEach(topicIdByPartitionMap::remove);
+        }
+    }
+
+    private void deleteRemoteLogPartition(TopicIdPartition partition) throws 
RemoteStorageException, ExecutionException, InterruptedException {
+        List<RemoteLogSegmentMetadata> metadataList = new ArrayList<>();
+        
remoteLogMetadataManager.listRemoteLogSegments(partition).forEachRemaining(metadataList::add);
+
+        List<RemoteLogSegmentMetadataUpdate> deleteSegmentStartedEvents = 
metadataList.stream()
+                .map(metadata ->
+                        new 
RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                metadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId))
+                .collect(Collectors.toList());
+        publishEvents(deleteSegmentStartedEvents).get();
+
+        // KAFKA-15313: Delete remote log segments partition asynchronously 
when a partition is deleted.
+        Collection<Uuid> deletedSegmentIds = new ArrayList<>();
+        for (RemoteLogSegmentMetadata metadata: metadataList) {
+            deletedSegmentIds.add(metadata.remoteLogSegmentId().id());
+            remoteLogStorageManager.deleteLogSegmentData(metadata);
+        }
+        indexCache.removeAll(deletedSegmentIds);
+
+        List<RemoteLogSegmentMetadataUpdate> deleteSegmentFinishedEvents = 
metadataList.stream()
+                .map(metadata ->
+                        new 
RemoteLogSegmentMetadataUpdate(metadata.remoteLogSegmentId(), 
time.milliseconds(),
+                                metadata.customMetadata(), 
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId))
+                .collect(Collectors.toList());
+        publishEvents(deleteSegmentFinishedEvents).get();
+    }
+
+    private CompletableFuture<Void> 
publishEvents(List<RemoteLogSegmentMetadataUpdate> events) throws 
RemoteStorageException {
+        List<CompletableFuture<Void>> result = new ArrayList<>();
+        for (RemoteLogSegmentMetadataUpdate event : events) {
+            
result.add(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(event));
         }
+        return CompletableFuture.allOf(result.toArray(new 
CompletableFuture[0]));
     }
 
     public Optional<RemoteLogSegmentMetadata> 
fetchRemoteLogSegmentMetadata(TopicPartition topicPartition,
                                                                             
int epochForOffset,
                                                                             
long offset) throws RemoteStorageException {
-        Uuid topicId = topicPartitionIds.get(topicPartition);
+        Uuid topicId = topicIdByPartitionMap.get(topicPartition);
 
         if (topicId == null) {
             throw new KafkaException("No topic id registered for topic 
partition: " + topicPartition);
@@ -418,7 +481,7 @@ public class RemoteLogManager implements Closeable {
                                                                           long 
timestamp,
                                                                           long 
startingOffset,
                                                                           
LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException, 
IOException {
-        Uuid topicId = topicPartitionIds.get(tp);
+        Uuid topicId = topicIdByPartitionMap.get(tp);
         if (topicId == null) {
             throw new KafkaException("Topic id does not exist for topic 
partition: " + tp);
         }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index d56762a151d..52a3cc0341b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -89,6 +89,8 @@ case class LogDeleteRecordsResult(requestedOffset: Long, 
lowWatermark: Long, exc
   }
 }
 
+case class StopPartition(topicPartition: TopicPartition, deleteLocalLog: 
Boolean, deleteRemoteLog: Boolean = false)
+
 /**
  * Result metadata of a log read operation on the log
  * @param info @FetchDataInfo returned by the @Log read
@@ -456,7 +458,7 @@ class ReplicaManager(val config: KafkaConfig,
       } else {
         this.controllerEpoch = controllerEpoch
 
-        val stoppedPartitions = mutable.Map.empty[TopicPartition, Boolean]
+        val stoppedPartitions = mutable.Buffer.empty[StopPartition]
         partitionStates.forKeyValue { (topicPartition, partitionState) =>
           val deletePartition = partitionState.deletePartition()
 
@@ -478,7 +480,8 @@ class ReplicaManager(val config: KafkaConfig,
               if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
                   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
                   requestLeaderEpoch >= currentLeaderEpoch) {
-                stoppedPartitions += topicPartition -> deletePartition
+                stoppedPartitions += StopPartition(topicPartition, 
deletePartition,
+                  deletePartition && partition.isLeader && requestLeaderEpoch 
== LeaderAndIsr.EpochDuringDelete)
                 // Assume that everything will go right. It is overwritten in 
case of an error.
                 responseMap.put(topicPartition, Errors.NONE)
               } else if (requestLeaderEpoch < currentLeaderEpoch) {
@@ -499,12 +502,12 @@ class ReplicaManager(val config: KafkaConfig,
             case HostedPartition.None =>
               // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is 
down and recovers.
-              stoppedPartitions += topicPartition -> deletePartition
+              stoppedPartitions += StopPartition(topicPartition, 
deletePartition)
               responseMap.put(topicPartition, Errors.NONE)
           }
         }
 
-        stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) =>
+        stopPartitions(stoppedPartitions.toSet).foreach { case 
(topicPartition, e) =>
           if (e.isInstanceOf[KafkaStorageException]) {
               stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId 
" +
@@ -526,25 +529,39 @@ class ReplicaManager(val config: KafkaConfig,
   /**
    * Stop the given partitions.
    *
-   * @param partitionsToStop    A map from a topic partition to a boolean 
indicating
-   *                            whether the partition should be deleted.
+   * @param partitionsToStop A map from a topic partition to a boolean 
indicating
+   *                         whether the partition should be deleted.
+   * @return A map from partitions to exceptions which occurred.
+   *         If no errors occurred, the map will be empty.
+   */
+  private def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): 
Map[TopicPartition, Throwable] = {
+    stopPartitions(partitionsToStop.map {
+      case (topicPartition, deleteLocalLog) => StopPartition(topicPartition, 
deleteLocalLog)
+    }.toSet)
+  }
+
+  /**
+   * Stop the given partitions.
+   *
+   * @param partitionsToStop set of topic-partitions to be stopped which also 
indicates whether to remove the
+   *                         partition data from the local and remote log 
storage.
    *
-   * @return                    A map from partitions to exceptions which 
occurred.
-   *                            If no errors occurred, the map will be empty.
+   * @return                 A map from partitions to exceptions which 
occurred.
+   *                         If no errors occurred, the map will be empty.
    */
-  protected def stopPartitions(
-    partitionsToStop: Map[TopicPartition, Boolean]
-  ): Map[TopicPartition, Throwable] = {
+  private def stopPartitions(partitionsToStop: Set[StopPartition]): 
Map[TopicPartition, Throwable] = {
     // First stop fetchers for all partitions.
-    val partitions = partitionsToStop.keySet
+    val partitions = partitionsToStop.map(_.topicPartition)
     replicaFetcherManager.removeFetcherForPartitions(partitions)
     replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
 
     // Second remove deleted partitions from the partition map. Fetchers rely 
on the
     // ReplicaManager to get Partition's information so they must be stopped 
first.
     val partitionsToDelete = mutable.Set.empty[TopicPartition]
-    partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
-      if (shouldDelete) {
+    val remotePartitionsToDelete = mutable.Set.empty[TopicPartition]
+    partitionsToStop.foreach { stopPartition =>
+      val topicPartition = stopPartition.topicPartition
+      if (stopPartition.deleteLocalLog) {
         getPartition(topicPartition) match {
           case hostedPartition: HostedPartition.Online =>
             if (allPartitions.remove(topicPartition, hostedPartition)) {
@@ -558,6 +575,9 @@ class ReplicaManager(val config: KafkaConfig,
         }
         partitionsToDelete += topicPartition
       }
+      if (stopPartition.deleteRemoteLog)
+        remotePartitionsToDelete += topicPartition
+
       // If we were the leader, we may have some operations still waiting for 
completion.
       // We force completion to prevent them from timing out.
       completeDelayedFetchOrProduceRequests(topicPartition)
@@ -569,6 +589,17 @@ class ReplicaManager(val config: KafkaConfig,
       // Delete the logs and checkpoint.
       logManager.asyncDelete(partitionsToDelete, (tp, e) => errorMap.put(tp, 
e))
     }
+    remoteLogManager.foreach { rlm =>
+      // exclude the partitions with offline/error state
+      errorMap.keySet.foreach(remotePartitionsToDelete.remove)
+      if (remotePartitionsToDelete.nonEmpty) {
+        rlm.stopPartitions(remotePartitionsToDelete.asJava, true, (tp, e) => 
errorMap.put(tp, e))
+      }
+      val remotePartitionsToNotDelete = 
partitions.diff(remotePartitionsToDelete)
+      if (remotePartitionsToNotDelete.nonEmpty) {
+        rlm.stopPartitions(remotePartitionsToNotDelete.asJava, false, (tp, e) 
=> errorMap.put(tp, e))
+      }
+    }
     errorMap
   }
 
@@ -2418,7 +2449,15 @@ class ReplicaManager(val config: KafkaConfig,
       // Handle deleted partitions. We need to do this first because we might 
subsequently
       // create new partitions with the same names as the ones we are deleting 
here.
       if (!localChanges.deletes.isEmpty) {
-        val deletes = localChanges.deletes.asScala.map(tp => (tp, true)).toMap
+        val deletes = localChanges.deletes.asScala
+          .map { tp =>
+            val isCurrentLeader = Option(delta.image().getTopic(tp.topic()))
+              .map(image => image.partitions().get(tp.partition()))
+              .exists(partition => partition.leader == config.nodeId)
+            val deleteRemoteLog = delta.topicWasDeleted(tp.topic()) && 
isCurrentLeader
+            StopPartition(tp, deleteLocalLog = true, deleteRemoteLog = 
deleteRemoteLog)
+          }
+          .toSet
         stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
         stopPartitions(deletes).forKeyValue { (topicPartition, e) =>
           if (e.isInstanceOf[KafkaStorageException]) {
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index e22caf6f593..f0b89aca855 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -81,6 +81,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -91,6 +92,8 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
@@ -105,6 +108,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -824,12 +828,12 @@ public class RemoteLogManagerTest {
         verifyInCache(followerTopicIdPartition, leaderTopicIdPartition);
 
         // Evicts from topicId cache
-        
remoteLogManager.stopPartitions(leaderTopicIdPartition.topicPartition(), true);
+        
remoteLogManager.stopPartitions(Collections.singleton(leaderTopicIdPartition.topicPartition()),
 true, (tp, ex) -> { });
         verifyNotInCache(leaderTopicIdPartition);
         verifyInCache(followerTopicIdPartition);
 
         // Evicts from topicId cache
-        
remoteLogManager.stopPartitions(followerTopicIdPartition.topicPartition(), 
true);
+        
remoteLogManager.stopPartitions(Collections.singleton(followerTopicIdPartition.topicPartition()),
 true, (tp, ex) -> { });
         verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition);
     }
 
@@ -1053,6 +1057,73 @@ public class RemoteLogManagerTest {
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testStopPartitionsWithoutDeletion() throws 
RemoteStorageException {
+        BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, 
throwable) -> fail("shouldn't be called");
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(leaderTopicIdPartition.topicPartition());
+        partitions.add(followerTopicIdPartition.topicPartition());
+        
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
+                
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+
+        remoteLogManager.stopPartitions(partitions, false, errorHandler);
+        verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
+        verify(remoteStorageManager, times(0)).deleteLogSegmentData(any());
+        verify(remoteLogMetadataManager, 
times(0)).updateRemoteLogSegmentMetadata(any());
+    }
+
+    @Test
+    public void testStopPartitionsWithDeletion() throws RemoteStorageException 
{
+        BiConsumer<TopicPartition, Throwable> errorHandler =
+                (topicPartition, ex) -> fail("shouldn't be called: " + ex);
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(leaderTopicIdPartition.topicPartition());
+        partitions.add(followerTopicIdPartition.topicPartition());
+        
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
+                
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+        
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
+                
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 
1024).iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(eq(followerTopicIdPartition)))
+                
.thenReturn(listRemoteLogSegmentMetadata(followerTopicIdPartition, 3, 100, 
1024).iterator());
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
+                .thenReturn(dummyFuture);
+
+        remoteLogManager.stopPartitions(partitions, true, errorHandler);
+        verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
+        verify(remoteStorageManager, times(8)).deleteLogSegmentData(any());
+        verify(remoteLogMetadataManager, 
times(16)).updateRemoteLogSegmentMetadata(any());
+    }
+
+    private List<RemoteLogSegmentMetadata> 
listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
+                                                                        int 
segmentCount,
+                                                                        int 
recordsPerSegment,
+                                                                        int 
segmentSize) {
+        List<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<>();
+        for (int idx = 0; idx < segmentCount; idx++) {
+            long timestamp = time.milliseconds();
+            long startOffset = (long) idx * recordsPerSegment;
+            long endOffset = startOffset + recordsPerSegment - 1;
+            Map<Integer, Long> segmentLeaderEpochs = 
truncateAndGetLeaderEpochs(totalEpochEntries, startOffset, endOffset);
+            segmentMetadataList.add(new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(topicIdPartition,
+                    Uuid.randomUuid()), startOffset, endOffset, timestamp, 
brokerId, timestamp, segmentSize,
+                    segmentLeaderEpochs));
+        }
+        return segmentMetadataList;
+    }
+
+    private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> 
entries,
+                                                          Long startOffset,
+                                                          Long endOffset) {
+        InMemoryLeaderEpochCheckpoint myCheckpoint = new 
InMemoryLeaderEpochCheckpoint();
+        myCheckpoint.write(entries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(null, 
myCheckpoint);
+        cache.truncateFromStart(startOffset);
+        cache.truncateFromEnd(endOffset);
+        return myCheckpoint.read().stream().collect(Collectors.toMap(e -> 
e.epoch, e -> e.startOffset));
+    }
+
     private Partition mockPartition(TopicIdPartition topicIdPartition) {
         TopicPartition tp = topicIdPartition.topicPartition();
         Partition partition = mock(Partition.class);
diff --git 
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala 
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index 59adcf722bf..bd81b40fa8e 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -18,12 +18,14 @@ package kafka.admin
 
 import kafka.api.IntegrationTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.{TestInfoUtils, TestUtils}
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
-import org.apache.kafka.common.errors.InvalidConfigurationException
-import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig}
+import org.apache.kafka.common.errors.{InvalidConfigurationException, 
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.utils.MockTime
+import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager,
+  RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, 
RemoteLogSegmentState}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
@@ -31,7 +33,8 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 import java.util
-import java.util.{Collections, Properties}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.{Collections, Optional, Properties}
 import scala.collection.Seq
 import scala.concurrent.ExecutionException
 import scala.util.Random
@@ -41,8 +44,11 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
 
   val numPartitions = 2
   val numReplicationFactor = 2
+
   var testTopicName: String = _
   var sysRemoteStorageEnabled = true
+  var storageManagerClassName: String = 
classOf[NoOpRemoteStorageManager].getName
+  var metadataManagerClassName: String = 
classOf[NoOpRemoteLogMetadataManager].getName
 
   override protected def brokerCount: Int = 2
 
@@ -59,6 +65,10 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
     if 
(info.getTestMethod.get().getName.endsWith("SystemRemoteStorageIsDisabled")) {
       sysRemoteStorageEnabled = false
     }
+    if (info.getTestMethod.get().getName.equals("testTopicDeletion")) {
+      storageManagerClassName = classOf[MyRemoteStorageManager].getName
+      metadataManagerClassName = classOf[MyRemoteLogMetadataManager].getName
+    }
     super.setUp(info)
     testTopicName = 
s"${info.getTestMethod.get().getName}-${Random.alphanumeric.take(10).mkString}"
   }
@@ -270,6 +280,27 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
       () => admin.incrementalAlterConfigs(configs).all().get(), "Invalid local 
retention size")
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTopicDeletion(quorum: String): Unit = {
+    val numPartitions = 2
+    val topicConfig = new Properties()
+    topicConfig.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+    topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, "200")
+    topicConfig.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, "100")
+    TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, 
brokers, numPartitions, brokerCount,
+      topicConfig = topicConfig)
+    TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers)
+    assertThrowsException(classOf[UnknownTopicOrPartitionException],
+      () => TestUtils.describeTopic(createAdminClient(), testTopicName), 
"Topic should be deleted")
+
+    // FIXME: It seems the storage manager is being instantiated in different 
class loader so couldn't verify the value
+    //  but ensured it by adding a log statement in the storage manager 
(manually).
+    //    assertEquals(numPartitions * MyRemoteLogMetadataManager.segmentCount,
+    //      MyRemoteStorageManager.deleteSegmentEventCounter.get(),
+    //      "Remote log segments should be deleted only once by the leader")
+  }
+
   private def assertThrowsException(exceptionType: Class[_ <: Throwable],
                                     executable: Executable,
                                     message: String = ""): Throwable = {
@@ -320,11 +351,8 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
   private def overrideProps(): Properties = {
     val props = new Properties()
     props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
sysRemoteStorageEnabled.toString)
-    props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
-      classOf[NoOpRemoteStorageManager].getName)
-    
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
-      classOf[NoOpRemoteLogMetadataManager].getName)
-
+    props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
storageManagerClassName)
+    
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
metadataManagerClassName)
     props.put(KafkaConfig.LogRetentionTimeMillisProp, "2000")
     props.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1000")
     props.put(KafkaConfig.LogRetentionBytesProp, "2048")
@@ -332,3 +360,42 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
     props
   }
 }
+
+object MyRemoteStorageManager {
+  val deleteSegmentEventCounter = new AtomicInteger(0)
+}
+
+class MyRemoteStorageManager extends NoOpRemoteStorageManager with Logging {
+  import MyRemoteStorageManager._
+
+  override def deleteLogSegmentData(remoteLogSegmentMetadata: 
RemoteLogSegmentMetadata): Unit = {
+    deleteSegmentEventCounter.incrementAndGet()
+    info(s"Deleted the remote log segment: $remoteLogSegmentMetadata, counter: 
${deleteSegmentEventCounter.get()}")
+  }
+}
+
+class MyRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager {
+
+  import MyRemoteLogMetadataManager._
+  val time = new MockTime()
+
+  override def listRemoteLogSegments(topicIdPartition: TopicIdPartition): 
util.Iterator[RemoteLogSegmentMetadata] = {
+    val segmentMetadataList = new util.ArrayList[RemoteLogSegmentMetadata]()
+    for (idx <- 0 until segmentCount) {
+      val timestamp = time.milliseconds()
+      val startOffset = idx * recordsPerSegment
+      val endOffset = startOffset + recordsPerSegment - 1
+      val segmentLeaderEpochs: util.Map[Integer, java.lang.Long] = 
Collections.singletonMap(0, 0L)
+      segmentMetadataList.add(new RemoteLogSegmentMetadata(new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
+        startOffset, endOffset, timestamp, 0, timestamp, segmentSize, 
Optional.empty(),
+        RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentLeaderEpochs))
+    }
+    segmentMetadataList.iterator()
+  }
+}
+
+object MyRemoteLogMetadataManager {
+  val segmentCount = 10
+  val recordsPerSegment = 100
+  val segmentSize = 1024
+}
diff --git 
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala 
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index b087676bed2..4b92da4007e 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -34,6 +34,7 @@ import org.slf4j.{Logger, LoggerFactory}
 
 import java.io.{File, FileInputStream}
 import java.nio.file.Files
+import java.util
 import java.util.Collections
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import scala.collection.mutable
@@ -443,8 +444,59 @@ class RemoteIndexCacheTest {
     verifyNoMoreInteractions(rsm)
   }
 
-  private def generateSpyCacheEntry(): RemoteIndexCache.Entry = {
-    val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)
+  @Test
+  def testRemoveItem(): Unit = {
+    val segmentId = rlsMetadata.remoteLogSegmentId()
+    val segmentUuid = segmentId.id()
+    // generate and add entry to cache
+    val spyEntry = generateSpyCacheEntry(segmentId)
+    cache.internalCache.put(segmentUuid, spyEntry)
+    assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
+    assertFalse(spyEntry.isMarkedForCleanup)
+
+    cache.remove(segmentId.id())
+    assertFalse(cache.internalCache().asMap().containsKey(segmentUuid))
+    TestUtils.waitUntilTrue(() => spyEntry.isMarkedForCleanup, "Failed to mark 
cache entry for cleanup after invalidation")
+  }
+
+  @Test
+  def testRemoveNonExistentItem(): Unit = {
+    // generate and add entry to cache
+    val segmentId = rlsMetadata.remoteLogSegmentId()
+    val segmentUuid = segmentId.id()
+    // generate and add entry to cache
+    val spyEntry = generateSpyCacheEntry(segmentId)
+    cache.internalCache.put(segmentUuid, spyEntry)
+    assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
+
+    // remove a random Uuid
+    cache.remove(Uuid.randomUuid())
+    assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
+    assertFalse(spyEntry.isMarkedForCleanup)
+  }
+
+  @Test
+  def testRemoveMultipleItems(): Unit = {
+    // generate and add entry to cache
+    val uuidAndEntryList = new util.HashMap[Uuid, RemoteIndexCache.Entry]()
+    for (_ <- 0 until 10) {
+      val segmentId = RemoteLogSegmentId.generateNew(idPartition)
+      val segmentUuid = segmentId.id()
+      val spyEntry = generateSpyCacheEntry(segmentId)
+      uuidAndEntryList.put(segmentUuid, spyEntry)
+
+      cache.internalCache.put(segmentUuid, spyEntry)
+      assertTrue(cache.internalCache().asMap().containsKey(segmentUuid))
+      assertFalse(spyEntry.isMarkedForCleanup)
+    }
+    cache.removeAll(uuidAndEntryList.keySet())
+    uuidAndEntryList.values().forEach { entry =>
+      TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, "Failed to mark 
cache entry for cleanup after invalidation")
+    }
+  }
+
+  private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
+                                    = 
RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
     val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, 
baseOffset, lastOffset,
       time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
     val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 92533d0e746..e8c3c81173c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3283,10 +3283,12 @@ class ReplicaManagerTest {
     (rm0, rm1)
   }
 
-  @Test
-  def testStopReplicaWithStaleControllerEpoch(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testStopReplicaWithStaleControllerEpoch(enableRemoteStorage: Boolean): 
Unit = {
     val mockTimer = new MockTimer(time)
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1),
+      enableRemoteStorage = enableRemoteStorage)
 
     try {
       val tp0 = new TopicPartition(topic, 0)
@@ -3309,15 +3311,20 @@ class ReplicaManagerTest {
 
       val (_, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
       assertEquals(Errors.STALE_CONTROLLER_EPOCH, error)
+      if (enableRemoteStorage) {
+        verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any(), 
any())
+      }
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
   }
 
-  @Test
-  def testStopReplicaWithOfflinePartition(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testStopReplicaWithOfflinePartition(enableRemoteStorage: Boolean): Unit 
= {
     val mockTimer = new MockTimer(time)
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1),
+      enableRemoteStorage = enableRemoteStorage)
 
     try {
       val tp0 = new TopicPartition(topic, 0)
@@ -3342,29 +3349,38 @@ class ReplicaManagerTest {
       val (result, error) = replicaManager.stopReplicas(1, 0, 0, 
partitionStates)
       assertEquals(Errors.NONE, error)
       assertEquals(Map(tp0 -> Errors.KAFKA_STORAGE_ERROR), result)
+      if (enableRemoteStorage) {
+        verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any(), 
any())
+      }
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
   }
 
-  @Test
-  def testStopReplicaWithInexistentPartition(): Unit = {
-    testStopReplicaWithInexistentPartition(false, false)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testStopReplicaWithInexistentPartition(enableRemoteStorage: Boolean): 
Unit = {
+    testStopReplicaWithInexistentPartition(false, false, enableRemoteStorage)
   }
 
-  @Test
-  def testStopReplicaWithInexistentPartitionAndPartitionsDelete(): Unit = {
-    testStopReplicaWithInexistentPartition(true, false)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithInexistentPartitionAndPartitionsDelete(enableRemoteStorage: 
Boolean): Unit = {
+    testStopReplicaWithInexistentPartition(true, false, enableRemoteStorage)
   }
 
-  @Test
-  def 
testStopReplicaWithInexistentPartitionAndPartitionsDeleteAndIOException(): Unit 
= {
-    testStopReplicaWithInexistentPartition(true, true)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithInexistentPartitionAndPartitionsDeleteAndIOException(enableRemoteStorage:
 Boolean): Unit = {
+    testStopReplicaWithInexistentPartition(true, true, enableRemoteStorage)
   }
 
-  private def testStopReplicaWithInexistentPartition(deletePartitions: 
Boolean, throwIOException: Boolean): Unit = {
+  private def testStopReplicaWithInexistentPartition(deletePartitions: Boolean,
+                                                     throwIOException: Boolean,
+                                                     enableRemoteStorage: 
Boolean): Unit = {
     val mockTimer = new MockTimer(time)
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1),
+      enableRemoteStorage = enableRemoteStorage)
 
     try {
       val tp0 = new TopicPartition(topic, 0)
@@ -3396,64 +3412,79 @@ class ReplicaManagerTest {
         assertEquals(Map(tp0 -> Errors.NONE), result)
         assertTrue(replicaManager.logManager.getLog(tp0).isDefined)
       }
+      if (enableRemoteStorage) {
+        verify(mockRemoteLogManager, times(1))
+          .stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)), 
ArgumentMatchers.eq(false), any())
+      }
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
   }
 
-  @Test
-  def testStopReplicaWithExistingPartitionAndNewerLeaderEpoch(): Unit = {
-    testStopReplicaWithExistingPartition(2, false, false, Errors.NONE)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithExistingPartitionAndNewerLeaderEpoch(enableRemoteStorage: 
Boolean): Unit = {
+    testStopReplicaWithExistingPartition(2, false, false, Errors.NONE, 
enableRemoteStorage)
   }
 
-  @Test
-  def testStopReplicaWithExistingPartitionAndOlderLeaderEpoch(): Unit = {
-    testStopReplicaWithExistingPartition(0, false, false, 
Errors.FENCED_LEADER_EPOCH)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithExistingPartitionAndOlderLeaderEpoch(enableRemoteStorage: 
Boolean): Unit = {
+    testStopReplicaWithExistingPartition(0, false, false, 
Errors.FENCED_LEADER_EPOCH, enableRemoteStorage)
   }
 
-  @Test
-  def testStopReplicaWithExistingPartitionAndEqualLeaderEpoch(): Unit = {
-    testStopReplicaWithExistingPartition(1, false, false, Errors.NONE)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithExistingPartitionAndEqualLeaderEpoch(enableRemoteStorage: 
Boolean): Unit = {
+    testStopReplicaWithExistingPartition(1, false, false, Errors.NONE, 
enableRemoteStorage)
   }
 
-  @Test
-  def testStopReplicaWithExistingPartitionAndDeleteSentinel(): Unit = {
-    testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, 
false, false, Errors.NONE)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithExistingPartitionAndDeleteSentinel(enableRemoteStorage: 
Boolean): Unit = {
+    testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, 
false, false, Errors.NONE, enableRemoteStorage)
   }
 
-  @Test
-  def testStopReplicaWithExistingPartitionAndLeaderEpochNotProvided(): Unit = {
-    testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, false, false, 
Errors.NONE)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithExistingPartitionAndLeaderEpochNotProvided(enableRemoteStorage:
 Boolean): Unit = {
+    testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, false, false, 
Errors.NONE, enableRemoteStorage)
   }
 
-  @Test
-  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpoch(): 
Unit = {
-    testStopReplicaWithExistingPartition(2, true, false, Errors.NONE)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpoch(enableRemoteStorage:
 Boolean): Unit = {
+    testStopReplicaWithExistingPartition(2, true, false, Errors.NONE, 
enableRemoteStorage)
   }
 
-  @Test
-  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpochAndIOException():
 Unit = {
-    testStopReplicaWithExistingPartition(2, true, true, 
Errors.KAFKA_STORAGE_ERROR)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndNewerLeaderEpochAndIOException(enableRemoteStorage:
 Boolean): Unit = {
+    testStopReplicaWithExistingPartition(2, true, true, 
Errors.KAFKA_STORAGE_ERROR, enableRemoteStorage)
   }
 
-  @Test
-  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndOlderLeaderEpoch(): 
Unit = {
-    testStopReplicaWithExistingPartition(0, true, false, 
Errors.FENCED_LEADER_EPOCH)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndOlderLeaderEpoch(enableRemoteStorage:
 Boolean): Unit = {
+    testStopReplicaWithExistingPartition(0, true, false, 
Errors.FENCED_LEADER_EPOCH, enableRemoteStorage)
   }
 
-  @Test
-  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndEqualLeaderEpoch(): 
Unit = {
-    testStopReplicaWithExistingPartition(1, true, false, Errors.NONE)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndEqualLeaderEpoch(enableRemoteStorage:
 Boolean): Unit = {
+    testStopReplicaWithExistingPartition(1, true, false, Errors.NONE, 
enableRemoteStorage)
   }
 
-  @Test
-  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndDeleteSentinel(): Unit 
= {
-    testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, true, 
false, Errors.NONE)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndDeleteSentinel(enableRemoteStorage:
 Boolean): Unit = {
+    testStopReplicaWithExistingPartition(LeaderAndIsr.EpochDuringDelete, true, 
false, Errors.NONE, enableRemoteStorage)
   }
 
-  @Test
-  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndLeaderEpochNotProvided():
 Unit = {
-    testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, 
Errors.NONE)
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def 
testStopReplicaWithDeletePartitionAndExistingPartitionAndLeaderEpochNotProvided(enableRemoteStorage:
 Boolean): Unit = {
+    testStopReplicaWithExistingPartition(LeaderAndIsr.NoEpoch, true, false, 
Errors.NONE, enableRemoteStorage)
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@@ -3698,9 +3729,11 @@ class ReplicaManagerTest {
   private def testStopReplicaWithExistingPartition(leaderEpoch: Int,
                                                    deletePartition: Boolean,
                                                    throwIOException: Boolean,
-                                                   expectedOutput: Errors): 
Unit = {
+                                                   expectedOutput: Errors,
+                                                   enableRemoteStorage: 
Boolean): Unit = {
     val mockTimer = new MockTimer(time)
-    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1),
+      enableRemoteStorage = enableRemoteStorage)
 
     try {
       val tp0 = new TopicPartition(topic, 0)
@@ -3762,6 +3795,15 @@ class ReplicaManagerTest {
         assertEquals(HostedPartition.None, replicaManager.getPartition(tp0))
         assertFalse(readRecoveryPointCheckpoint().contains(tp0))
         assertFalse(readLogStartOffsetCheckpoint().contains(tp0))
+        if (enableRemoteStorage) {
+          
verify(mockRemoteLogManager).stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
+            ArgumentMatchers.eq(leaderEpoch == 
LeaderAndIsr.EpochDuringDelete), any())
+        }
+      }
+
+      if (expectedOutput == Errors.NONE && !deletePartition && 
enableRemoteStorage) {
+        
verify(mockRemoteLogManager).stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
+          ArgumentMatchers.eq(false), any())
       }
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -4409,7 +4451,11 @@ class ReplicaManagerTest {
       val notReplicaMetadataImage = 
imageFromTopics(notReplicaTopicsDelta.apply())
       replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage)
 
-      verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), 
anySet(), anyMap())
+      if (enableRemoteStorage) {
+        verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), 
anySet(), anyMap())
+        verify(mockRemoteLogManager, times(1)).stopPartitions(
+          ArgumentMatchers.eq(Collections.singleton(topicPartition)), 
ArgumentMatchers.eq(false), any())
+      }
 
       // Check that the partition was removed
       assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
@@ -4451,7 +4497,12 @@ class ReplicaManagerTest {
       val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
       val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
       replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage)
-      verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), 
anySet(), anyMap())
+
+      if (enableRemoteStorage) {
+        verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), 
anySet(), anyMap())
+        verify(mockRemoteLogManager, times(1)).stopPartitions(
+          ArgumentMatchers.eq(Collections.singleton(topicPartition)), 
ArgumentMatchers.eq(false), any())
+      }
 
       // Check that the partition was removed
       assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
@@ -4493,7 +4544,12 @@ class ReplicaManagerTest {
       val notReplicaTopicsDelta = 
topicsChangeDelta(leaderMetadataImage.topics(), otherId, true)
       val notReplicaMetadataImage = 
imageFromTopics(notReplicaTopicsDelta.apply())
       replicaManager.applyDelta(notReplicaTopicsDelta, notReplicaMetadataImage)
-      verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), 
anySet(), anyMap())
+
+      if (enableRemoteStorage) {
+        verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), 
anySet(), anyMap())
+        verify(mockRemoteLogManager, times(1)).stopPartitions(
+          ArgumentMatchers.eq(Collections.singleton(topicPartition)), 
ArgumentMatchers.eq(false), any())
+      }
 
       // Check that the partition was removed
       assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
@@ -4535,7 +4591,12 @@ class ReplicaManagerTest {
       val removeTopicsDelta = topicsDeleteDelta(leaderMetadataImage.topics())
       val removeMetadataImage = imageFromTopics(removeTopicsDelta.apply())
       replicaManager.applyDelta(removeTopicsDelta, removeMetadataImage)
-      verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), 
anySet(), anyMap())
+
+      if (enableRemoteStorage) {
+        verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), 
anySet(), anyMap())
+        verify(mockRemoteLogManager, times(1)).stopPartitions(
+          ArgumentMatchers.eq(Collections.singleton(topicPartition)), 
ArgumentMatchers.eq(true), any())
+      }
 
       // Check that the partition was removed
       assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
diff --git 
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
 
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
index 71881517182..6ffcf85b9b0 100644
--- 
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
+++ 
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
@@ -29,12 +29,12 @@ import java.util.concurrent.CompletableFuture;
 public class NoOpRemoteLogMetadataManager implements RemoteLogMetadataManager {
     @Override
     public CompletableFuture<Void> 
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
-        return null;
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public CompletableFuture<Void> 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
remoteLogSegmentMetadataUpdate) {
-        return null;
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index d730eeee31e..30c4da4ce44 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -166,6 +166,24 @@ public class RemoteIndexCache implements Closeable {
         return internalCache;
     }
 
+    public void remove(Uuid key) {
+        lock.writeLock().lock();
+        try {
+            internalCache.invalidate(key);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void removeAll(Collection<Uuid> keys) {
+        lock.writeLock().lock();
+        try {
+            internalCache.invalidateAll(keys);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
     // Visible for testing
     public ShutdownableThread cleanerThread() {
         return cleanerThread;

Reply via email to