This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new bdb44c710 [server] Fix rebalance may cause remote file deleted error 
(#2570)
bdb44c710 is described below

commit bdb44c710ed4fc6d78f97c79e0f385507a69d025
Author: yunhong <[email protected]>
AuthorDate: Thu Feb 5 09:50:20 2026 +0800

    [server] Fix rebalance may cause remote file deleted error (#2570)
    
    * [server] Fix rebalance may cause remote file deleted error
    
    * address jark's comments
    
    * improve
    
    * change to set offline immediately after set to immigration started
    
    ---------
    
    Co-authored-by: Jark Wu <[email protected]>
---
 fluss-rpc/src/main/proto/FlussApi.proto            |   6 +
 .../coordinator/CoordinatorEventProcessor.java     |  88 ++++-----
 .../coordinator/CoordinatorRequestBatch.java       |  15 +-
 .../server/coordinator/CoordinatorServer.java      |   7 +-
 .../coordinator/statemachine/ReplicaState.java     |  10 +-
 .../statemachine/ReplicaStateMachine.java          |  22 ++-
 .../fluss/server/entity/StopReplicaData.java       |  20 +-
 .../fluss/server/log/remote/RemoteLogManager.java  |   9 +-
 .../fluss/server/replica/ReplicaManager.java       |  27 ++-
 .../fluss/server/utils/ServerRpcMessageUtils.java  |  22 ++-
 .../rebalance/RebalanceManagerITCase.java          | 215 ++++++++++++++++++++-
 .../server/log/remote/RemoteLogManagerTest.java    |  86 ++++++++-
 .../fluss/server/replica/ReplicaManagerTest.java   |   4 +-
 .../server/testutils/FlussClusterExtension.java    |   2 +-
 14 files changed, 451 insertions(+), 82 deletions(-)

diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index b85487a50..ea2a832ba 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -899,7 +899,13 @@ message PbNotifyLeaderAndIsrRespForBucket {
 message PbStopReplicaReqForBucket {
   required PbTableBucket table_bucket = 1;
   required int32 leader_epoch = 2;
+  // delete means remove local replica data (i.e., data stored on the current 
node).
   required bool delete = 3;
+  // deleteRemote means remove remote replica data (e.g., data in object 
storage) and was introduced in v0.9.
+  // For backward compatibility, if a request does not include the 
deleteRemote flag, the system treats delete as
+  // deleteRemote (i.e., it falls back to remote deletion). This ensures older 
CoordinatorServer continues to function
+  // correctly with newer TabletServers.
+  optional bool deleteRemote = 4;
 }
 
 message PbStopReplicaRespForBucket {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index aa0afbbcd..813c1f10e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -148,6 +148,7 @@ import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaState.Offl
 import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
 import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
 import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful;
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaMigrationStarted;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeAdjustIsrResponse;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeListRebalanceProgressResponse;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeRebalanceResponse;
@@ -910,31 +911,14 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                         
coordinatorContext.retryDeleteAndSuccessDeleteReplicas(failDeletedReplicas);
 
         // transmit to deletion started for retry delete replicas
-        Set<TableBucketReplica> needRetryDeleteReplicas = new HashSet<>();
-        retryDeleteAndSuccessDeleteReplicas.f0.forEach(
-                (replica) -> {
-                    // For rebalance case. the replica state already set to 
null in method
-                    // stopRemovedReplicasOfReassignedBucket. so we need not 
reset it again.
-                    if (coordinatorContext.getReplicaState(replica) != null) {
-                        needRetryDeleteReplicas.add(replica);
-                    }
-                });
-        replicaStateMachine.handleStateChanges(needRetryDeleteReplicas, 
ReplicaDeletionStarted);
+        replicaStateMachine.handleStateChanges(
+                retryDeleteAndSuccessDeleteReplicas.f0, 
ReplicaDeletionStarted);
 
         // add all the replicas that considered as success delete to success 
deleted replicas
         successDeletedReplicas.addAll(retryDeleteAndSuccessDeleteReplicas.f1);
-
-        Set<TableBucketReplica> newSuccessDeleteReplicas = new HashSet<>();
-        successDeletedReplicas.forEach(
-                (replica) -> {
-                    // For rebalance case. the replica state already set to 
null in method
-                    // stopRemovedReplicasOfReassignedBucket. so we need not 
reset it again.
-                    if (coordinatorContext.getReplicaState(replica) != null) {
-                        newSuccessDeleteReplicas.add(replica);
-                    }
-                });
         // transmit to deletion successful for success deleted replicas
-        replicaStateMachine.handleStateChanges(newSuccessDeleteReplicas, 
ReplicaDeletionSuccessful);
+        replicaStateMachine.handleStateChanges(successDeletedReplicas, 
ReplicaDeletionSuccessful);
+
         // if any success deletion, we can resume
         if (!successDeletedReplicas.isEmpty()) {
             tableManager.resumeDeletions();
@@ -1354,8 +1338,7 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         if (planForBucket != null) {
             ReplicaReassignment reassignment =
                     ReplicaReassignment.build(
-                            coordinatorContext.getAssignment(tableBucket),
-                            planForBucket.getNewReplicas());
+                            planForBucket.getOriginReplicas(), 
planForBucket.getNewReplicas());
             try {
                 if (planForBucket.isLeaderChanged() && 
!reassignment.isBeingReassigned()) {
                     LeaderAndIsr leaderAndIsr = 
zooKeeperClient.getLeaderAndIsr(tableBucket).get();
@@ -1413,20 +1396,20 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
      *
      * <ul>
      *   <li>B1. Move all replicas in AR to OnlineReplica state.
-     *   <li>B2. Set RS = TRS, AR = [], RR = [] in memory.
-     *   <li>B3. Send a LeaderAndIsr request with RS = TRS. This will prevent 
the leader from adding
-     *       any replica in TRS - ORS back in the isr. If the current leader 
is not in TRS or isn't
-     *       alive, we move the leader to a new replica in TRS. We may send 
the LeaderAndIsr to more
-     *       than the TRS replicas due to the way the partition state machine 
works (it reads
-     *       replicas from ZK)
-     *   <li>B4. Move all replicas in RR to OfflineReplica state. As part of 
OfflineReplica state
+     *   <li>B2. Send a LeaderAndIsr request with RS = ORS +TRS. The will make 
the origin leader
+     *       change to the new leader. this request will be sent to every 
tabletServer in ORS +TRS.
+     *   <li>B3. Set RS = TRS, AR = [], RR = [] in memory.
+     *   <li>Re-send LeaderAndIsr request with new leader and a new RS (using 
TRS) and same isr to
+     *       every tabletServer in TRS.
+     *   <li>B5. Move all replicas in RR to OfflineReplica state. As part of 
OfflineReplica state
      *       change, we shrink the isr to remove RR in ZooKeeper and send a 
LeaderAndIsr ONLY to the
      *       Leader to notify it of the shrunk isr. After that, we send a 
StopReplica (delete =
-     *       false) to the replicas in RR.
-     *   <li>B5. Move all replicas in RR to NonExistentReplica state. This 
will send a StopReplica
-     *       (delete = true) to he replicas in RR to physically delete the 
replicas on disk.
-     *   <li>B6. Update ZK with RS=TRS, AR=[], RR=[].
-     *   <li>B7. After electing leader, the replicas and isr information 
changes. So resend the
+     *       false and deleteRemote = false) to the replicas in RR.
+     *   <li>B6. Move all replicas in RR to ReplicaMigrationStarted state. 
This will send a
+     *       StopReplica (delete = true and deleteRemote = false) to he 
replicas in RR to physically
+     *       delete the replicas on disk but don't delete the data in remote 
storage.
+     *   <li>B7. Update ZK with RS=TRS, AR=[], RR=[].
+     *   <li>B8. After electing leader, the replicas and isr information 
changes. So resend the
      *       update metadata request to every tabletServer.
      *   <li>B8. Mark the ongoing rebalance task to finish.
      * </ul>
@@ -1491,17 +1474,22 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                                             new 
TableBucketReplica(tableBucket, replica)),
                                     OnlineReplica));
             List<Integer> targetReplicas = reassignment.getTargetReplicas();
-            // B2. Set RS = TRS, AR = [], RR = [] in memory.
-            coordinatorContext.updateBucketReplicaAssignment(tableBucket, 
targetReplicas);
-            // B3. Send LeaderAndIsr request with a potential new leader (if 
current leader not in
-            // TRS) and a new RS (using TRS) and same isr to every 
tabletServer in ORS + TRS or TRS
+            // B2. Send LeaderAndIsr request with a potential new leader (if 
current leader not in
+            // TRS) and a new RS (using ORS + TRS) and same isr to every 
tabletServer in ORS + TRS
             maybeReassignedBucketLeaderIfRequired(tableBucket, targetReplicas);
-            // B4. replicas in RR -> Offline (force those replicas out of isr)
-            // B5. replicas in RR -> NonExistentReplica (force those replicas 
to be deleted)
+            // B3. Set RS = TRS, AR = [], RR = [] in memory.
+            coordinatorContext.updateBucketReplicaAssignment(tableBucket, 
targetReplicas);
+            // B4. Re-send LeaderAndIsr request with new leader and a new RS 
(using TRS) and same
+            // isr to every tabletServer in TRS.
+            updateBucketEpochAndSendRequest(tableBucket, targetReplicas);
+
+            // B5. replicas in RR -> Offline (force those replicas out of isr)
+            // B6. replicas in RR -> ReplicaMigrationStarted (force those 
replicas to be migration
+            // started)
             stopRemovedReplicasOfReassignedBucket(tableBucket, 
removingReplicas);
-            // B6. Update ZK with RS = TRS, AR = [], RR = [].
+            // B7. Update ZK with RS = TRS, AR = [], RR = [].
             updateReplicaAssignmentForBucket(tableBucket, targetReplicas);
-            // B7. After electing a leader in B3, the replicas and isr 
information changes, so
+            // B8. After electing a leader in B3, the replicas and isr 
information changes, so
             // resend the update metadata request to every tabletServer.
             updateTabletServerMetadataCache(
                     new 
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
@@ -1557,16 +1545,12 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
 
     private void stopRemovedReplicasOfReassignedBucket(
             TableBucket tableBucket, List<Integer> removingReplicas) {
-        Set<TableBucketReplica> replicasToBeDeleted = new HashSet<>();
+        Set<TableBucketReplica> replicasToBeMoved = new HashSet<>();
         removingReplicas.forEach(
-                replica -> replicasToBeDeleted.add(new 
TableBucketReplica(tableBucket, replica)));
-        replicaStateMachine.handleStateChanges(replicasToBeDeleted, 
OfflineReplica);
-        // send stop replica command to the old replicas.
-        replicaStateMachine.handleStateChanges(replicasToBeDeleted, 
ReplicaDeletionStarted);
-        // TODO: Eventually bucket reassignment could use a callback that does 
retries if deletion
-        // failed
-        replicaStateMachine.handleStateChanges(replicasToBeDeleted, 
ReplicaDeletionSuccessful);
-        replicaStateMachine.handleStateChanges(replicasToBeDeleted, 
NonExistentReplica);
+                replica -> replicasToBeMoved.add(new 
TableBucketReplica(tableBucket, replica)));
+        replicaStateMachine.handleStateChanges(replicasToBeMoved, 
OfflineReplica);
+        replicaStateMachine.handleStateChanges(replicasToBeMoved, 
ReplicaMigrationStarted);
+        replicaStateMachine.handleStateChanges(replicasToBeMoved, 
NonExistentReplica);
     }
 
     private void updateReplicaAssignmentForBucket(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
index 436cc5203..fe85ff391 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
@@ -240,7 +240,8 @@ public class CoordinatorRequestBatch {
     public void addStopReplicaRequestForTabletServers(
             Set<Integer> tabletServers,
             TableBucket tableBucket,
-            boolean isDelete,
+            boolean deleteLocal,
+            boolean deleteRemote,
             int leaderEpoch) {
         tabletServers.stream()
                 .filter(s -> s >= 0)
@@ -250,12 +251,18 @@ public class CoordinatorRequestBatch {
                                     stopReplicaRequestMap.computeIfAbsent(id, 
k -> new HashMap<>());
                             // reduce delete flag, if it has been marked as 
deleted,
                             // we will set it as delete replica
-                            boolean alreadyDelete =
+                            boolean alreadyDeleteLocal =
                                     stopBucketReplica.get(tableBucket) != null
                                             && 
stopBucketReplica.get(tableBucket).isDelete();
+                            boolean alreadyDeleteRemote =
+                                    stopBucketReplica.get(tableBucket) != null
+                                            && 
stopBucketReplica.get(tableBucket).isDeleteRemote();
                             PbStopReplicaReqForBucket 
protoStopReplicaForBucket =
                                     makeStopBucketReplica(
-                                            tableBucket, alreadyDelete || 
isDelete, leaderEpoch);
+                                            tableBucket,
+                                            alreadyDeleteLocal || deleteLocal,
+                                            alreadyDeleteRemote || 
deleteRemote,
+                                            leaderEpoch);
                             stopBucketReplica.put(tableBucket, 
protoStopReplicaForBucket);
                         });
     }
@@ -447,7 +454,7 @@ public class CoordinatorRequestBatch {
             // we collect the buckets whose replica is to be deleted
             Set<TableBucket> deletedReplicaBuckets =
                     stopReplicas.values().stream()
-                            .filter(PbStopReplicaReqForBucket::isDelete)
+                            .filter(pbBucket -> pbBucket.isDelete() && 
pbBucket.isDeleteRemote())
                             .map(t -> toTableBucket(t.getTableBucket()))
                             .collect(Collectors.toSet());
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index 76a3fbd68..7e06f6109 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -333,7 +333,12 @@ public class CoordinatorServer extends ServerBase {
         }
     }
 
-    private CoordinatorEventProcessor getCoordinatorEventProcessor() {
+    /**
+     * Get the coordinator event processor. Don't call this method directly as 
the coordinator event
+     * processor is single threaded model.
+     */
+    @VisibleForTesting
+    public CoordinatorEventProcessor getCoordinatorEventProcessor() {
         if (coordinatorEventProcessor != null) {
             return coordinatorEventProcessor;
         } else {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaState.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaState.java
index be33ce9b6..5ed5732cf 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaState.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaState.java
@@ -27,7 +27,7 @@ public enum ReplicaState implements BaseState<ReplicaState> {
     NonExistentReplica {
         @Override
         public Set<ReplicaState> getValidPreviousStates() {
-            return EnumSet.of(ReplicaDeletionSuccessful);
+            return EnumSet.of(ReplicaMigrationStarted, 
ReplicaDeletionSuccessful);
         }
     },
     NewReplica {
@@ -48,6 +48,14 @@ public enum ReplicaState implements BaseState<ReplicaState> {
             return EnumSet.of(NewReplica, OnlineReplica, OfflineReplica);
         }
     },
+    ReplicaMigrationStarted {
+        @Override
+        public Set<ReplicaState> getValidPreviousStates() {
+            // ReplicaMigrationStarted as a valid previous state since
+            // we will try to migrate the replica when migrate fail.
+            return EnumSet.of(OfflineReplica, ReplicaMigrationStarted);
+        }
+    },
     ReplicaDeletionStarted {
         @Override
         public Set<ReplicaState> getValidPreviousStates() {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
index fef67b083..091c2cbc9 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
@@ -259,6 +259,7 @@ public class ReplicaStateMachine {
                                         
Collections.singleton(replica.getReplica()),
                                         replica.getTableBucket(),
                                         false,
+                                        false,
                                         
coordinatorContext.getBucketLeaderEpoch(
                                                 replica.getTableBucket())));
 
@@ -307,10 +308,28 @@ public class ReplicaStateMachine {
                         replica -> doStateChange(replica, 
ReplicaState.OfflineReplica));
 
                 break;
+            case ReplicaMigrationStarted:
+                validReplicas.forEach(
+                        replica -> doStateChange(replica, 
ReplicaState.ReplicaMigrationStarted));
+                validReplicas.forEach(
+                        tableBucketReplica -> {
+                            int replicaServer = 
tableBucketReplica.getReplica();
+                            // send stop replica request with deleteLocal = 
true and deleteRemote =
+                            // false indicates the replica is migrated.
+                            
coordinatorRequestBatch.addStopReplicaRequestForTabletServers(
+                                    Collections.singleton(replicaServer),
+                                    tableBucketReplica.getTableBucket(),
+                                    true,
+                                    false,
+                                    coordinatorContext.getBucketLeaderEpoch(
+                                            
tableBucketReplica.getTableBucket()));
+                        });
+                break;
             case ReplicaDeletionStarted:
                 validReplicas.forEach(
                         replica -> doStateChange(replica, 
ReplicaState.ReplicaDeletionStarted));
-                // send stop replica request with delete = true
+                // send stop replica request with deleteLocal = true and 
delete = true indicates the
+                // replica is deleted.
                 validReplicas.forEach(
                         tableBucketReplica -> {
                             int replicaServer = 
tableBucketReplica.getReplica();
@@ -318,6 +337,7 @@ public class ReplicaStateMachine {
                                     Collections.singleton(replicaServer),
                                     tableBucketReplica.getTableBucket(),
                                     true,
+                                    true,
                                     coordinatorContext.getBucketLeaderEpoch(
                                             
tableBucketReplica.getTableBucket()));
                         });
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/entity/StopReplicaData.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/entity/StopReplicaData.java
index 4db00ecc2..84381f9c0 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/entity/StopReplicaData.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/entity/StopReplicaData.java
@@ -23,14 +23,20 @@ import org.apache.fluss.rpc.messages.StopReplicaRequest;
 /** The data to stop replica for request {@link StopReplicaRequest}. */
 public class StopReplicaData {
     private final TableBucket tableBucket;
-    private final boolean delete;
+    private final boolean deleteLocal;
+    private final boolean deleteRemote;
     private final int coordinatorEpoch;
     private final int leaderEpoch;
 
     public StopReplicaData(
-            TableBucket tableBucket, boolean delete, int coordinatorEpoch, int 
leaderEpoch) {
+            TableBucket tableBucket,
+            boolean deleteLocal,
+            boolean deleteRemote,
+            int coordinatorEpoch,
+            int leaderEpoch) {
         this.tableBucket = tableBucket;
-        this.delete = delete;
+        this.deleteLocal = deleteLocal;
+        this.deleteRemote = deleteRemote;
         this.coordinatorEpoch = coordinatorEpoch;
         this.leaderEpoch = leaderEpoch;
     }
@@ -39,8 +45,12 @@ public class StopReplicaData {
         return tableBucket;
     }
 
-    public boolean isDelete() {
-        return delete;
+    public boolean isDeleteLocal() {
+        return deleteLocal;
+    }
+
+    public boolean isDeleteRemote() {
+        return deleteRemote;
     }
 
     public int getCoordinatorEpoch() {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
index 9f3006747..c85d5ae65 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
@@ -38,6 +38,7 @@ import 
org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.Closeable;
@@ -305,7 +306,7 @@ public class RemoteLogManager implements Closeable {
     }
 
     @VisibleForTesting
-    RemoteLogTablet remoteLogTablet(TableBucket tableBucket) {
+    public RemoteLogTablet remoteLogTablet(TableBucket tableBucket) {
         RemoteLogTablet remoteLog = remoteLogs.get(tableBucket);
         if (remoteLog == null) {
             throw new IllegalStateException(
@@ -387,4 +388,10 @@ public class RemoteLogManager implements Closeable {
     public RemoteLogIndexCache getRemoteLogIndexCache() {
         return remoteLogIndexCache;
     }
+
+    @VisibleForTesting
+    @Nullable
+    TaskWithFuture getTaskWithFuture(TableBucket tableBucket) {
+        return rlmTasks.get(tableBucket);
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index dda8cef8a..d8ddb92c4 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -900,7 +900,8 @@ public class ReplicaManager {
                                     result.add(
                                             stopReplica(
                                                     tb,
-                                                    data.isDelete(),
+                                                    data.isDeleteLocal(),
+                                                    data.isDeleteRemote(),
                                                     deletedTableIds,
                                                     deletedPartitionIds));
                                 } catch (Exception e) {
@@ -1711,10 +1712,23 @@ public class ReplicaManager {
         }
     }
 
-    /** Stop the given replica. */
+    /**
+     * Stop the replica for the given table bucket.
+     *
+     * @param tb the table bucket
+     * @param deleteLocal whether to delete the local data, this will be true 
not only the table or
+     *     partition of the table bucket already deleted or the replica is 
migrated to another
+     *     server by rebalance
+     * @param deleteRemote whether to delete the remote data, like remote log 
and kv snapshot, this
+     *     means the table or partition of the table bucket already deleted
+     * @param deletedTableIds the table ids that are deleted
+     * @param deletedPartitionIds the partition ids that are deleted
+     * @return the result of stop replica
+     */
     private StopReplicaResultForBucket stopReplica(
             TableBucket tb,
-            boolean delete,
+            boolean deleteLocal,
+            boolean deleteRemote,
             Map<Long, Path> deletedTableIds,
             Map<Long, Path> deletedPartitionIds) {
         // First stop fetchers for this table bucket.
@@ -1723,7 +1737,7 @@ public class ReplicaManager {
         HostedReplica replica = getReplica(tb);
         if (replica instanceof OnlineReplica) {
             Replica replicaToDelete = ((OnlineReplica) replica).getReplica();
-            if (delete) {
+            if (deleteLocal) {
                 if (allReplicas.remove(tb) != null) {
                     serverMetricGroup.removeTableBucketMetricGroup(
                             
replicaToDelete.getPhysicalTablePath().getTablePath(), tb);
@@ -1738,8 +1752,9 @@ public class ReplicaManager {
                 }
             }
 
-            remoteLogManager.stopReplica(replicaToDelete, delete && 
replicaToDelete.isLeader());
-            if (delete && replicaToDelete.isLeader()) {
+            remoteLogManager.stopReplica(
+                    replicaToDelete, deleteRemote && 
replicaToDelete.isLeader());
+            if (deleteRemote && replicaToDelete.isLeader()) {
                 kvManager.deleteRemoteKvSnapshot(
                         replicaToDelete.getPhysicalTablePath(), 
replicaToDelete.getTableBucket());
             }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index 4e4e59908..a4bfa01c5 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -715,12 +715,23 @@ public class ServerRpcMessageUtils {
         return notifyLeaderAndIsrResultForBuckets;
     }
 
+    /**
+     * make stop bucket replica request.
+     *
+     * @param tableBucket table bucket
+     * @param deleteLocal delete local log or kv data
+     * @param deleteRemote delete remote log or kv snapshot because of table 
or partition was
+     *     deleted.
+     * @param leaderEpoch leader epoch
+     * @return stop bucket replica request
+     */
     public static PbStopReplicaReqForBucket makeStopBucketReplica(
-            TableBucket tableBucket, boolean isDelete, int leaderEpoch) {
+            TableBucket tableBucket, boolean deleteLocal, boolean 
deleteRemote, int leaderEpoch) {
         PbStopReplicaReqForBucket stopBucketReplicaRequest = new 
PbStopReplicaReqForBucket();
         PbTableBucket pbTableBucket =
                 stopBucketReplicaRequest
-                        .setDelete(isDelete)
+                        .setDelete(deleteLocal)
+                        .setDeleteRemote(deleteRemote)
                         .setLeaderEpoch(leaderEpoch)
                         .setTableBucket()
                         .setBucketId(tableBucket.getBucket())
@@ -735,10 +746,17 @@ public class ServerRpcMessageUtils {
         List<StopReplicaData> stopReplicaDataList = new ArrayList<>();
         for (PbStopReplicaReqForBucket reqForBucket : 
request.getStopReplicasReqsList()) {
             PbTableBucket tableBucket = reqForBucket.getTableBucket();
+            // For backward compatibility, if a request does not include the 
deleteRemote flag, the
+            // system treats delete as deleteRemote (i.e., it falls back to 
remote deletion). This
+            // ensures older CoordinatorServer continues to function correctly 
with newer
+            // TabletServers.
             stopReplicaDataList.add(
                     new StopReplicaData(
                             toTableBucket(tableBucket),
                             reqForBucket.isDelete(),
+                            reqForBucket.hasDeleteRemote()
+                                    ? reqForBucket.isDeleteRemote()
+                                    : reqForBucket.isDelete(),
                             request.getCoordinatorEpoch(),
                             reqForBucket.getLeaderEpoch()));
         }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java
index 422cb267b..16d06492c 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java
@@ -17,28 +17,56 @@
 
 package org.apache.fluss.server.coordinator.rebalance;
 
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
 import org.apache.fluss.cluster.rebalance.ServerTag;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.MemorySize;
 import org.apache.fluss.metadata.PartitionSpec;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableBucketReplica;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.AddServerTagRequest;
+import org.apache.fluss.server.coordinator.CoordinatorEventProcessor;
+import org.apache.fluss.server.coordinator.event.AccessContextEvent;
 import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.statemachine.ReplicaState;
+import org.apache.fluss.server.log.remote.RemoteLogManager;
+import org.apache.fluss.server.log.remote.RemoteLogManifest;
+import org.apache.fluss.server.log.remote.RemoteLogTablet;
+import org.apache.fluss.server.replica.ReplicaManager;
+import org.apache.fluss.server.tablet.TabletServer;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.BucketAssignment;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.fluss.record.TestData.DATA1;
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.createPartition;
 import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable;
+import static 
org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest;
+import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT test for {@link RebalanceManager}. */
@@ -47,7 +75,7 @@ public class RebalanceManagerITCase {
     @RegisterExtension
     public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
             FlussClusterExtension.builder()
-                    .setNumOfTabletServers(3)
+                    .setNumOfTabletServers(6)
                     .setClusterConf(initConfig())
                     .build();
 
@@ -84,8 +112,8 @@ public class RebalanceManagerITCase {
                 false);
 
         ClusterModel clusterModel = rebalanceManager.buildClusterModel();
-        assertThat(clusterModel.servers().size()).isEqualTo(3);
-        assertThat(clusterModel.aliveServers().size()).isEqualTo(3);
+        assertThat(clusterModel.servers().size()).isEqualTo(6);
+        assertThat(clusterModel.aliveServers().size()).isEqualTo(6);
         assertThat(clusterModel.offlineServers().size()).isEqualTo(0);
         assertThat(clusterModel.tables().size()).isEqualTo(2);
         assertThat(clusterModel.tables()).contains(tableId1, tableId2);
@@ -97,16 +125,193 @@ public class RebalanceManagerITCase {
         
FLUSS_CLUSTER_EXTENSION.newCoordinatorClient().addServerTag(request).get();
 
         clusterModel = rebalanceManager.buildClusterModel();
-        assertThat(clusterModel.servers().size()).isEqualTo(3);
-        assertThat(clusterModel.aliveServers().size()).isEqualTo(2);
+        assertThat(clusterModel.servers().size()).isEqualTo(6);
+        assertThat(clusterModel.aliveServers().size()).isEqualTo(5);
         assertThat(clusterModel.offlineServers().size()).isEqualTo(1);
         assertThat(clusterModel.tables().size()).isEqualTo(2);
         assertThat(clusterModel.tables()).contains(tableId1, tableId2);
     }
 
+    @Test
+    void testRebalanceWithRemoteLog() throws Exception {
+        TableBucket tb = setupTableBucket();
+        long tableId = tb.getTableId();
+
+        int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
+        TabletServerGateway leaderGateway =
+                FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId);
+
+        // produce test records
+        produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 0L);
+        // test metadata updated: verify manifest in metadata
+        TabletServer tabletServer = 
FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId);
+        RemoteLogManager remoteLogManager = 
tabletServer.getReplicaManager().getRemoteLogManager();
+        RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb);
+
+        RemoteLogManifest manifest = remoteLogTablet.currentManifest();
+        
assertThat(manifest.getPhysicalTablePath().getTablePath()).isEqualTo(DATA1_TABLE_PATH);
+        assertThat(manifest.getTableBucket()).isEqualTo(tb);
+        int remoteLogSize = manifest.getRemoteLogSegmentList().size();
+        assertThat(remoteLogSize).isGreaterThan(0);
+
+        // try to trigger rebalance. for example, if current assignment is [0, 
1, 2], try to trigger
+        // rebalance to [3, 4, 5]
+        RebalancePlanForBucket planForBucket = buildRebalancePlanForBucket(tb);
+
+        List<Integer> originReplicas = planForBucket.getOriginReplicas();
+        List<Integer> newReplicas = planForBucket.getNewReplicas();
+
+        List<ReplicaState> originReplicaStates = new ArrayList<>();
+        List<ReplicaState> newReplicaStates = new ArrayList<>();
+        fromCoordinatorContext(
+                tb, originReplicas, newReplicas, originReplicaStates, 
newReplicaStates);
+
+        // verify pre-rebalance states
+        assertThat(originReplicaStates).allMatch(replicaState -> replicaState 
== OnlineReplica);
+        assertThat(newReplicaStates).allMatch(Objects::isNull);
+
+        rebalanceManager.registerRebalance(
+                "test-rebalance-dsds",
+                Collections.singletonMap(tb, planForBucket),
+                RebalanceStatus.NOT_STARTED);
+
+        retry(
+                Duration.ofMinutes(2),
+                () -> {
+                    // assignment changed.
+                    Set<Integer> newReplicaSet = new 
HashSet<>(planForBucket.getNewReplicas());
+                    BucketAssignment bucketAssignment =
+                            zkClient.getTableAssignment(tableId)
+                                    .get()
+                                    .getBucketAssignment(tb.getBucket());
+                    List<Integer> replicas = bucketAssignment.getReplicas();
+                    
assertThat(newReplicaSet.size()).isEqualTo(replicas.size());
+                    assertThat(newReplicaSet.containsAll(replicas)).isTrue();
+
+                    // leader changed.
+                    int newLeader = planForBucket.getNewLeader();
+                    
assertThat(FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb)).isEqualTo(newLeader);
+
+                    // origin replicas all set to offline.
+                    for (int originReplica : originReplicas) {
+                        TabletServer ts =
+                                
FLUSS_CLUSTER_EXTENSION.getTabletServerById(originReplica);
+                        ReplicaManager rm = ts.getReplicaManager();
+                        assertThat(rm.getReplica(tb))
+                                
.isInstanceOf(ReplicaManager.NoneReplica.class);
+                    }
+
+                    // replica state changes.
+                    originReplicaStates.clear();
+                    newReplicaStates.clear();
+                    fromCoordinatorContext(
+                            tb, originReplicas, newReplicas, 
originReplicaStates, newReplicaStates);
+                    assertThat(originReplicaStates).allMatch(Objects::isNull);
+                    assertThat(newReplicaStates)
+                            .allMatch(replicaState -> replicaState == 
OnlineReplica);
+                });
+        // remote log not be deleted.
+        int newLeader = planForBucket.getNewLeader();
+        TabletServer leaderTs = 
FLUSS_CLUSTER_EXTENSION.getTabletServerById(newLeader);
+        RemoteLogManager leaderRm = 
leaderTs.getReplicaManager().getRemoteLogManager();
+        RemoteLogTablet leaderRlt = leaderRm.remoteLogTablet(tb);
+
+        RemoteLogManifest newManifest = leaderRlt.currentManifest();
+        
assertThat(newManifest.getPhysicalTablePath().getTablePath()).isEqualTo(DATA1_TABLE_PATH);
+        assertThat(newManifest.getTableBucket()).isEqualTo(tb);
+        
assertThat(newManifest.getRemoteLogSegmentList().size()).isEqualTo(remoteLogSize);
+    }
+
+    private void fromCoordinatorContext(
+            TableBucket tb,
+            List<Integer> originReplicas,
+            List<Integer> newReplicas,
+            List<ReplicaState> originalReplicaStates,
+            List<ReplicaState> newReplicaStates)
+            throws Exception {
+        AccessContextEvent<Void> event =
+                new AccessContextEvent<>(
+                        ctx -> {
+                            originReplicas.forEach(
+                                    replica -> {
+                                        originalReplicaStates.add(
+                                                ctx.getReplicaState(
+                                                        new 
TableBucketReplica(tb, replica)));
+                                    });
+                            newReplicas.forEach(
+                                    replica -> {
+                                        newReplicaStates.add(
+                                                ctx.getReplicaState(
+                                                        new 
TableBucketReplica(tb, replica)));
+                                    });
+                            return null;
+                        });
+        CoordinatorEventProcessor processor =
+                
FLUSS_CLUSTER_EXTENSION.getCoordinatorServer().getCoordinatorEventProcessor();
+        processor.getCoordinatorEventManager().put(event);
+        event.getResultFuture().get(30, TimeUnit.SECONDS);
+    }
+
+    private TableBucket setupTableBucket() throws Exception {
+        long tableId =
+                createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, 
DATA1_TABLE_DESCRIPTOR);
+        TableBucket tb = new TableBucket(tableId, 0);
+        FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
+        return tb;
+    }
+
+    private RebalancePlanForBucket buildRebalancePlanForBucket(TableBucket tb) 
throws Exception {
+        long tableId = tb.getTableId();
+        BucketAssignment bucketAssignment =
+                
zkClient.getTableAssignment(tableId).get().getBucketAssignment(tb.getBucket());
+        List<Integer> replicas = bucketAssignment.getReplicas();
+        Set<Integer> replicasSet = new 
HashSet<>(bucketAssignment.getReplicas());
+        int originLeader = zkClient.getLeaderAndIsr(tb).get().leader();
+        List<Integer> newReplicas = new ArrayList<>();
+        int newLeader = originLeader;
+        for (int i = 0; i < 6; i++) {
+            if (!replicasSet.contains(i)) {
+                newReplicas.add(i);
+                if (i != originLeader && newLeader == originLeader) {
+                    newLeader = i;
+                }
+            }
+        }
+
+        return new RebalancePlanForBucket(tb, originLeader, newLeader, 
replicas, newReplicas);
+    }
+
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
+        conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
         conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
+
+        // set a shorter interval for testing purpose
+        conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, 
Duration.ofSeconds(1));
+        conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("1kb"));
+
+        // set a shorter max log time to allow replica shrink from isr. Don't 
be too low, otherwise
+        // normal follower synchronization will also be affected
+        conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, 
Duration.ofSeconds(5));
         return conf;
     }
+
+    private void produceRecordsAndWaitRemoteLogCopy(
+            TabletServerGateway leaderGateway, TableBucket tb, long 
baseOffset) throws Exception {
+        for (int i = 0; i < 10; i++) {
+            assertProduceLogResponse(
+                    leaderGateway
+                            .produceLog(
+                                    newProduceLogRequest(
+                                            tb.getTableId(),
+                                            0,
+                                            1,
+                                            
genMemoryLogRecordsByObject(DATA1)))
+                            .get(),
+                    0,
+                    baseOffset + i * 10L);
+        }
+        FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(
+                new TableBucket(tb.getTableId(), 0));
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
index e2e5be650..4d06411bc 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
@@ -26,12 +26,17 @@ import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
 import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.server.coordinator.TestCoordinatorGateway;
 import org.apache.fluss.server.entity.FetchReqInfo;
+import org.apache.fluss.server.entity.StopReplicaData;
+import org.apache.fluss.server.entity.StopReplicaResultForBucket;
 import org.apache.fluss.server.log.FetchParams;
 import org.apache.fluss.server.log.LogTablet;
 import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.server.replica.ReplicaManager;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
@@ -40,8 +45,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
+import static 
org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH_PA_2024;
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
@@ -497,10 +504,79 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
         assertThatThrownBy(() -> 
remoteLogManager.relevantRemoteLogSegments(tb, 0L))
                 .isInstanceOf(IllegalStateException.class)
                 .hasMessageContaining("RemoteLogTablet can't be found for 
table-bucket " + tb);
-        FsPath logTabletDir = remoteLogTabletDir(remoteLogDir(conf), 
DATA1_PHYSICAL_TABLE_PATH, tb);
+        FsPath logTabletDir =
+                remoteLogTabletDir(
+                        remoteLogDir(conf),
+                        partitionTable
+                                ? DATA1_PHYSICAL_TABLE_PATH_PA_2024
+                                : DATA1_PHYSICAL_TABLE_PATH,
+                        tb);
         
assertThat(logTabletDir.getFileSystem().exists(logTabletDir)).isFalse();
     }
 
+    @ParameterizedTest
+    @MethodSource("stopArgs")
+    void testStopReplicaDeleteRemoteLog(boolean partitionTable, boolean 
deleteRemote)
+            throws Exception {
+        TableBucket tb = makeTableBucket(partitionTable);
+        // Need to make leader by ReplicaManager.
+        makeLogTableAsLeader(tb, partitionTable);
+        LogTablet logTablet = 
replicaManager.getReplicaOrException(tb).getLogTablet();
+        addMultiSegmentsToLogTablet(logTablet, 5);
+        // trigger RLMTask copy local log segment to remote and update 
metadata.
+        remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+        List<RemoteLogSegment> remoteLogSegmentList =
+                remoteLogManager.relevantRemoteLogSegments(tb, 0L);
+        assertThat(remoteLogSegmentList.size()).isEqualTo(4);
+        assertThat(listRemoteLogFiles(tb))
+                .isEqualTo(
+                        remoteLogSegmentList.stream()
+                                .map(s -> s.remoteLogSegmentId().toString())
+                                .collect(Collectors.toSet()));
+        assertThat(remoteLogManager.getTaskWithFuture(tb)).isNotNull();
+
+        FsPath remoteLogTabletDir =
+                remoteLogTabletDir(
+                        remoteLogDir(conf),
+                        partitionTable
+                                ? DATA1_PHYSICAL_TABLE_PATH_PA_2024
+                                : DATA1_PHYSICAL_TABLE_PATH,
+                        tb);
+        
assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isTrue();
+        assertThat(logTablet.getLogDir().exists()).isTrue();
+
+        // stop with delete = false, deleteRemote =false, local and remote log 
should be kept,
+        // remote log task will be removed.
+        CompletableFuture<List<StopReplicaResultForBucket>> future1 = new 
CompletableFuture<>();
+        replicaManager.stopReplicas(
+                0,
+                Collections.singletonList(new StopReplicaData(tb, false, 
false, 0, 0)),
+                future1::complete);
+        assertThat(future1.get()).containsOnly(new 
StopReplicaResultForBucket(tb));
+        ReplicaManager.HostedReplica hostedReplica = 
replicaManager.getReplica(tb);
+        
assertThat(hostedReplica).isInstanceOf(ReplicaManager.OnlineReplica.class);
+        
assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isTrue();
+        assertThat(remoteLogManager.getTaskWithFuture(tb)).isNull();
+
+        CompletableFuture<List<StopReplicaResultForBucket>> future2 = new 
CompletableFuture<>();
+        replicaManager.stopReplicas(
+                0,
+                Collections.singletonList(new StopReplicaData(tb, true, 
deleteRemote, 0, 0)),
+                future2::complete);
+        assertThat(future2.get()).containsOnly(new 
StopReplicaResultForBucket(tb));
+        hostedReplica = replicaManager.getReplica(tb);
+        
assertThat(hostedReplica).isInstanceOf(ReplicaManager.NoneReplica.class);
+        assertThat(logTablet.getLogDir().exists()).isFalse();
+        if (!deleteRemote) {
+            // stop with delete = true, deleteRemote =false, local log should 
be deleted, remote log
+            // should be kept.
+            
assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isTrue();
+        } else {
+            // stop with delete = true, deleteRemote =true, local and remote 
log should be deleted
+            
assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isFalse();
+        }
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     void testLookupOffsetForTimestamp(boolean partitionTable) throws Exception 
{
@@ -540,4 +616,12 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
             return new TableBucket(tableId, 0);
         }
     }
+
+    private static Stream<Arguments> stopArgs() {
+        return Stream.of(
+                Arguments.of(false, false),
+                Arguments.of(false, true),
+                Arguments.of(true, false),
+                Arguments.of(true, true));
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index 2fd268d5e..bd63f3cbf 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -1523,7 +1523,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.stopReplicas(
                 INITIAL_COORDINATOR_EPOCH,
                 Collections.singletonList(
-                        new StopReplicaData(tb, true, 
INITIAL_COORDINATOR_EPOCH, 1)),
+                        new StopReplicaData(tb, true, true, 
INITIAL_COORDINATOR_EPOCH, 1)),
                 future1::complete);
         assertThat(future1.get()).containsOnly(new 
StopReplicaResultForBucket(tb));
         ReplicaManager.HostedReplica hostedReplica = 
replicaManager.getReplica(tb);
@@ -1554,7 +1554,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
         replicaManager.stopReplicas(
                 INITIAL_COORDINATOR_EPOCH,
                 Collections.singletonList(
-                        new StopReplicaData(tb, true, 
INITIAL_COORDINATOR_EPOCH, 1)),
+                        new StopReplicaData(tb, true, true, 
INITIAL_COORDINATOR_EPOCH, 1)),
                 future1::complete);
         assertThat(future1.get())
                 .containsOnly(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index 2f2ee3735..8eaf29a0b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -832,7 +832,7 @@ public final class FlussClusterExtension
                                 .addAllStopReplicasReqs(
                                         Collections.singleton(
                                                 makeStopBucketReplica(
-                                                        tableBucket, false, 
leaderEpoch))))
+                                                        tableBucket, false, 
false, leaderEpoch))))
                 .get();
     }
 

Reply via email to