This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new bdb44c710 [server] Fix rebalance may cause remote file deleted error
(#2570)
bdb44c710 is described below
commit bdb44c710ed4fc6d78f97c79e0f385507a69d025
Author: yunhong <[email protected]>
AuthorDate: Thu Feb 5 09:50:20 2026 +0800
[server] Fix rebalance may cause remote file deleted error (#2570)
* [server] Fix rebalance may cause remote file deleted error
* address jark's comments
* improve
* change to set offline immediately after set to immigration started
---------
Co-authored-by: Jark Wu <[email protected]>
---
fluss-rpc/src/main/proto/FlussApi.proto | 6 +
.../coordinator/CoordinatorEventProcessor.java | 88 ++++-----
.../coordinator/CoordinatorRequestBatch.java | 15 +-
.../server/coordinator/CoordinatorServer.java | 7 +-
.../coordinator/statemachine/ReplicaState.java | 10 +-
.../statemachine/ReplicaStateMachine.java | 22 ++-
.../fluss/server/entity/StopReplicaData.java | 20 +-
.../fluss/server/log/remote/RemoteLogManager.java | 9 +-
.../fluss/server/replica/ReplicaManager.java | 27 ++-
.../fluss/server/utils/ServerRpcMessageUtils.java | 22 ++-
.../rebalance/RebalanceManagerITCase.java | 215 ++++++++++++++++++++-
.../server/log/remote/RemoteLogManagerTest.java | 86 ++++++++-
.../fluss/server/replica/ReplicaManagerTest.java | 4 +-
.../server/testutils/FlussClusterExtension.java | 2 +-
14 files changed, 451 insertions(+), 82 deletions(-)
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index b85487a50..ea2a832ba 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -899,7 +899,13 @@ message PbNotifyLeaderAndIsrRespForBucket {
message PbStopReplicaReqForBucket {
required PbTableBucket table_bucket = 1;
required int32 leader_epoch = 2;
+ // delete means remove local replica data (i.e., data stored on the current
node).
required bool delete = 3;
+ // deleteRemote means remove remote replica data (e.g., data in object
storage) and was introduced in v0.9.
+ // For backward compatibility, if a request does not include the
deleteRemote flag, the system treats delete as
+ // deleteRemote (i.e., it falls back to remote deletion). This ensures older
CoordinatorServer continues to function
+ // correctly with newer TabletServers.
+ optional bool deleteRemote = 4;
}
message PbStopReplicaRespForBucket {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index aa0afbbcd..813c1f10e 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -148,6 +148,7 @@ import static
org.apache.fluss.server.coordinator.statemachine.ReplicaState.Offl
import static
org.apache.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
import static
org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
import static
org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionSuccessful;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaMigrationStarted;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeAdjustIsrResponse;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeListRebalanceProgressResponse;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeRebalanceResponse;
@@ -910,31 +911,14 @@ public class CoordinatorEventProcessor implements
EventProcessor {
coordinatorContext.retryDeleteAndSuccessDeleteReplicas(failDeletedReplicas);
// transmit to deletion started for retry delete replicas
- Set<TableBucketReplica> needRetryDeleteReplicas = new HashSet<>();
- retryDeleteAndSuccessDeleteReplicas.f0.forEach(
- (replica) -> {
- // For rebalance case. the replica state already set to
null in method
- // stopRemovedReplicasOfReassignedBucket. so we need not
reset it again.
- if (coordinatorContext.getReplicaState(replica) != null) {
- needRetryDeleteReplicas.add(replica);
- }
- });
- replicaStateMachine.handleStateChanges(needRetryDeleteReplicas,
ReplicaDeletionStarted);
+ replicaStateMachine.handleStateChanges(
+ retryDeleteAndSuccessDeleteReplicas.f0,
ReplicaDeletionStarted);
// add all the replicas that considered as success delete to success
deleted replicas
successDeletedReplicas.addAll(retryDeleteAndSuccessDeleteReplicas.f1);
-
- Set<TableBucketReplica> newSuccessDeleteReplicas = new HashSet<>();
- successDeletedReplicas.forEach(
- (replica) -> {
- // For rebalance case. the replica state already set to
null in method
- // stopRemovedReplicasOfReassignedBucket. so we need not
reset it again.
- if (coordinatorContext.getReplicaState(replica) != null) {
- newSuccessDeleteReplicas.add(replica);
- }
- });
// transmit to deletion successful for success deleted replicas
- replicaStateMachine.handleStateChanges(newSuccessDeleteReplicas,
ReplicaDeletionSuccessful);
+ replicaStateMachine.handleStateChanges(successDeletedReplicas,
ReplicaDeletionSuccessful);
+
// if any success deletion, we can resume
if (!successDeletedReplicas.isEmpty()) {
tableManager.resumeDeletions();
@@ -1354,8 +1338,7 @@ public class CoordinatorEventProcessor implements
EventProcessor {
if (planForBucket != null) {
ReplicaReassignment reassignment =
ReplicaReassignment.build(
- coordinatorContext.getAssignment(tableBucket),
- planForBucket.getNewReplicas());
+ planForBucket.getOriginReplicas(),
planForBucket.getNewReplicas());
try {
if (planForBucket.isLeaderChanged() &&
!reassignment.isBeingReassigned()) {
LeaderAndIsr leaderAndIsr =
zooKeeperClient.getLeaderAndIsr(tableBucket).get();
@@ -1413,20 +1396,20 @@ public class CoordinatorEventProcessor implements
EventProcessor {
*
* <ul>
* <li>B1. Move all replicas in AR to OnlineReplica state.
- * <li>B2. Set RS = TRS, AR = [], RR = [] in memory.
- * <li>B3. Send a LeaderAndIsr request with RS = TRS. This will prevent
the leader from adding
- * any replica in TRS - ORS back in the isr. If the current leader
is not in TRS or isn't
- * alive, we move the leader to a new replica in TRS. We may send
the LeaderAndIsr to more
- * than the TRS replicas due to the way the partition state machine
works (it reads
- * replicas from ZK)
- * <li>B4. Move all replicas in RR to OfflineReplica state. As part of
OfflineReplica state
+ * <li>B2. Send a LeaderAndIsr request with RS = ORS +TRS. The will make
the origin leader
+ * change to the new leader. this request will be sent to every
tabletServer in ORS +TRS.
+ * <li>B3. Set RS = TRS, AR = [], RR = [] in memory.
+ * <li>Re-send LeaderAndIsr request with new leader and a new RS (using
TRS) and same isr to
+ * every tabletServer in TRS.
+ * <li>B5. Move all replicas in RR to OfflineReplica state. As part of
OfflineReplica state
* change, we shrink the isr to remove RR in ZooKeeper and send a
LeaderAndIsr ONLY to the
* Leader to notify it of the shrunk isr. After that, we send a
StopReplica (delete =
- * false) to the replicas in RR.
- * <li>B5. Move all replicas in RR to NonExistentReplica state. This
will send a StopReplica
- * (delete = true) to he replicas in RR to physically delete the
replicas on disk.
- * <li>B6. Update ZK with RS=TRS, AR=[], RR=[].
- * <li>B7. After electing leader, the replicas and isr information
changes. So resend the
+ * false and deleteRemote = false) to the replicas in RR.
+ * <li>B6. Move all replicas in RR to ReplicaMigrationStarted state.
This will send a
+ * StopReplica (delete = true and deleteRemote = false) to he
replicas in RR to physically
+ * delete the replicas on disk but don't delete the data in remote
storage.
+ * <li>B7. Update ZK with RS=TRS, AR=[], RR=[].
+ * <li>B8. After electing leader, the replicas and isr information
changes. So resend the
* update metadata request to every tabletServer.
* <li>B8. Mark the ongoing rebalance task to finish.
* </ul>
@@ -1491,17 +1474,22 @@ public class CoordinatorEventProcessor implements
EventProcessor {
new
TableBucketReplica(tableBucket, replica)),
OnlineReplica));
List<Integer> targetReplicas = reassignment.getTargetReplicas();
- // B2. Set RS = TRS, AR = [], RR = [] in memory.
- coordinatorContext.updateBucketReplicaAssignment(tableBucket,
targetReplicas);
- // B3. Send LeaderAndIsr request with a potential new leader (if
current leader not in
- // TRS) and a new RS (using TRS) and same isr to every
tabletServer in ORS + TRS or TRS
+ // B2. Send LeaderAndIsr request with a potential new leader (if
current leader not in
+ // TRS) and a new RS (using ORS + TRS) and same isr to every
tabletServer in ORS + TRS
maybeReassignedBucketLeaderIfRequired(tableBucket, targetReplicas);
- // B4. replicas in RR -> Offline (force those replicas out of isr)
- // B5. replicas in RR -> NonExistentReplica (force those replicas
to be deleted)
+ // B3. Set RS = TRS, AR = [], RR = [] in memory.
+ coordinatorContext.updateBucketReplicaAssignment(tableBucket,
targetReplicas);
+ // B4. Re-send LeaderAndIsr request with new leader and a new RS
(using TRS) and same
+ // isr to every tabletServer in TRS.
+ updateBucketEpochAndSendRequest(tableBucket, targetReplicas);
+
+ // B5. replicas in RR -> Offline (force those replicas out of isr)
+ // B6. replicas in RR -> ReplicaMigrationStarted (force those
replicas to be migration
+ // started)
stopRemovedReplicasOfReassignedBucket(tableBucket,
removingReplicas);
- // B6. Update ZK with RS = TRS, AR = [], RR = [].
+ // B7. Update ZK with RS = TRS, AR = [], RR = [].
updateReplicaAssignmentForBucket(tableBucket, targetReplicas);
- // B7. After electing a leader in B3, the replicas and isr
information changes, so
+ // B8. After electing a leader in B3, the replicas and isr
information changes, so
// resend the update metadata request to every tabletServer.
updateTabletServerMetadataCache(
new
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
@@ -1557,16 +1545,12 @@ public class CoordinatorEventProcessor implements
EventProcessor {
private void stopRemovedReplicasOfReassignedBucket(
TableBucket tableBucket, List<Integer> removingReplicas) {
- Set<TableBucketReplica> replicasToBeDeleted = new HashSet<>();
+ Set<TableBucketReplica> replicasToBeMoved = new HashSet<>();
removingReplicas.forEach(
- replica -> replicasToBeDeleted.add(new
TableBucketReplica(tableBucket, replica)));
- replicaStateMachine.handleStateChanges(replicasToBeDeleted,
OfflineReplica);
- // send stop replica command to the old replicas.
- replicaStateMachine.handleStateChanges(replicasToBeDeleted,
ReplicaDeletionStarted);
- // TODO: Eventually bucket reassignment could use a callback that does
retries if deletion
- // failed
- replicaStateMachine.handleStateChanges(replicasToBeDeleted,
ReplicaDeletionSuccessful);
- replicaStateMachine.handleStateChanges(replicasToBeDeleted,
NonExistentReplica);
+ replica -> replicasToBeMoved.add(new
TableBucketReplica(tableBucket, replica)));
+ replicaStateMachine.handleStateChanges(replicasToBeMoved,
OfflineReplica);
+ replicaStateMachine.handleStateChanges(replicasToBeMoved,
ReplicaMigrationStarted);
+ replicaStateMachine.handleStateChanges(replicasToBeMoved,
NonExistentReplica);
}
private void updateReplicaAssignmentForBucket(
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
index 436cc5203..fe85ff391 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
@@ -240,7 +240,8 @@ public class CoordinatorRequestBatch {
public void addStopReplicaRequestForTabletServers(
Set<Integer> tabletServers,
TableBucket tableBucket,
- boolean isDelete,
+ boolean deleteLocal,
+ boolean deleteRemote,
int leaderEpoch) {
tabletServers.stream()
.filter(s -> s >= 0)
@@ -250,12 +251,18 @@ public class CoordinatorRequestBatch {
stopReplicaRequestMap.computeIfAbsent(id,
k -> new HashMap<>());
// reduce delete flag, if it has been marked as
deleted,
// we will set it as delete replica
- boolean alreadyDelete =
+ boolean alreadyDeleteLocal =
stopBucketReplica.get(tableBucket) != null
&&
stopBucketReplica.get(tableBucket).isDelete();
+ boolean alreadyDeleteRemote =
+ stopBucketReplica.get(tableBucket) != null
+ &&
stopBucketReplica.get(tableBucket).isDeleteRemote();
PbStopReplicaReqForBucket
protoStopReplicaForBucket =
makeStopBucketReplica(
- tableBucket, alreadyDelete ||
isDelete, leaderEpoch);
+ tableBucket,
+ alreadyDeleteLocal || deleteLocal,
+ alreadyDeleteRemote ||
deleteRemote,
+ leaderEpoch);
stopBucketReplica.put(tableBucket,
protoStopReplicaForBucket);
});
}
@@ -447,7 +454,7 @@ public class CoordinatorRequestBatch {
// we collect the buckets whose replica is to be deleted
Set<TableBucket> deletedReplicaBuckets =
stopReplicas.values().stream()
- .filter(PbStopReplicaReqForBucket::isDelete)
+ .filter(pbBucket -> pbBucket.isDelete() &&
pbBucket.isDeleteRemote())
.map(t -> toTableBucket(t.getTableBucket()))
.collect(Collectors.toSet());
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index 76a3fbd68..7e06f6109 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -333,7 +333,12 @@ public class CoordinatorServer extends ServerBase {
}
}
- private CoordinatorEventProcessor getCoordinatorEventProcessor() {
+ /**
+ * Get the coordinator event processor. Don't call this method directly as
the coordinator event
+ * processor is single threaded model.
+ */
+ @VisibleForTesting
+ public CoordinatorEventProcessor getCoordinatorEventProcessor() {
if (coordinatorEventProcessor != null) {
return coordinatorEventProcessor;
} else {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaState.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaState.java
index be33ce9b6..5ed5732cf 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaState.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaState.java
@@ -27,7 +27,7 @@ public enum ReplicaState implements BaseState<ReplicaState> {
NonExistentReplica {
@Override
public Set<ReplicaState> getValidPreviousStates() {
- return EnumSet.of(ReplicaDeletionSuccessful);
+ return EnumSet.of(ReplicaMigrationStarted,
ReplicaDeletionSuccessful);
}
},
NewReplica {
@@ -48,6 +48,14 @@ public enum ReplicaState implements BaseState<ReplicaState> {
return EnumSet.of(NewReplica, OnlineReplica, OfflineReplica);
}
},
+ ReplicaMigrationStarted {
+ @Override
+ public Set<ReplicaState> getValidPreviousStates() {
+ // ReplicaMigrationStarted as a valid previous state since
+ // we will try to migrate the replica when migrate fail.
+ return EnumSet.of(OfflineReplica, ReplicaMigrationStarted);
+ }
+ },
ReplicaDeletionStarted {
@Override
public Set<ReplicaState> getValidPreviousStates() {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
index fef67b083..091c2cbc9 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
@@ -259,6 +259,7 @@ public class ReplicaStateMachine {
Collections.singleton(replica.getReplica()),
replica.getTableBucket(),
false,
+ false,
coordinatorContext.getBucketLeaderEpoch(
replica.getTableBucket())));
@@ -307,10 +308,28 @@ public class ReplicaStateMachine {
replica -> doStateChange(replica,
ReplicaState.OfflineReplica));
break;
+ case ReplicaMigrationStarted:
+ validReplicas.forEach(
+ replica -> doStateChange(replica,
ReplicaState.ReplicaMigrationStarted));
+ validReplicas.forEach(
+ tableBucketReplica -> {
+ int replicaServer =
tableBucketReplica.getReplica();
+ // send stop replica request with deleteLocal =
true and deleteRemote =
+ // false indicates the replica is migrated.
+
coordinatorRequestBatch.addStopReplicaRequestForTabletServers(
+ Collections.singleton(replicaServer),
+ tableBucketReplica.getTableBucket(),
+ true,
+ false,
+ coordinatorContext.getBucketLeaderEpoch(
+
tableBucketReplica.getTableBucket()));
+ });
+ break;
case ReplicaDeletionStarted:
validReplicas.forEach(
replica -> doStateChange(replica,
ReplicaState.ReplicaDeletionStarted));
- // send stop replica request with delete = true
+ // send stop replica request with deleteLocal = true and
delete = true indicates the
+ // replica is deleted.
validReplicas.forEach(
tableBucketReplica -> {
int replicaServer =
tableBucketReplica.getReplica();
@@ -318,6 +337,7 @@ public class ReplicaStateMachine {
Collections.singleton(replicaServer),
tableBucketReplica.getTableBucket(),
true,
+ true,
coordinatorContext.getBucketLeaderEpoch(
tableBucketReplica.getTableBucket()));
});
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/entity/StopReplicaData.java
b/fluss-server/src/main/java/org/apache/fluss/server/entity/StopReplicaData.java
index 4db00ecc2..84381f9c0 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/entity/StopReplicaData.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/entity/StopReplicaData.java
@@ -23,14 +23,20 @@ import org.apache.fluss.rpc.messages.StopReplicaRequest;
/** The data to stop replica for request {@link StopReplicaRequest}. */
public class StopReplicaData {
private final TableBucket tableBucket;
- private final boolean delete;
+ private final boolean deleteLocal;
+ private final boolean deleteRemote;
private final int coordinatorEpoch;
private final int leaderEpoch;
public StopReplicaData(
- TableBucket tableBucket, boolean delete, int coordinatorEpoch, int
leaderEpoch) {
+ TableBucket tableBucket,
+ boolean deleteLocal,
+ boolean deleteRemote,
+ int coordinatorEpoch,
+ int leaderEpoch) {
this.tableBucket = tableBucket;
- this.delete = delete;
+ this.deleteLocal = deleteLocal;
+ this.deleteRemote = deleteRemote;
this.coordinatorEpoch = coordinatorEpoch;
this.leaderEpoch = leaderEpoch;
}
@@ -39,8 +45,12 @@ public class StopReplicaData {
return tableBucket;
}
- public boolean isDelete() {
- return delete;
+ public boolean isDeleteLocal() {
+ return deleteLocal;
+ }
+
+ public boolean isDeleteRemote() {
+ return deleteRemote;
}
public int getCoordinatorEpoch() {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
index 9f3006747..c85d5ae65 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
@@ -38,6 +38,7 @@ import
org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.io.Closeable;
@@ -305,7 +306,7 @@ public class RemoteLogManager implements Closeable {
}
@VisibleForTesting
- RemoteLogTablet remoteLogTablet(TableBucket tableBucket) {
+ public RemoteLogTablet remoteLogTablet(TableBucket tableBucket) {
RemoteLogTablet remoteLog = remoteLogs.get(tableBucket);
if (remoteLog == null) {
throw new IllegalStateException(
@@ -387,4 +388,10 @@ public class RemoteLogManager implements Closeable {
public RemoteLogIndexCache getRemoteLogIndexCache() {
return remoteLogIndexCache;
}
+
+ @VisibleForTesting
+ @Nullable
+ TaskWithFuture getTaskWithFuture(TableBucket tableBucket) {
+ return rlmTasks.get(tableBucket);
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index dda8cef8a..d8ddb92c4 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -900,7 +900,8 @@ public class ReplicaManager {
result.add(
stopReplica(
tb,
- data.isDelete(),
+ data.isDeleteLocal(),
+ data.isDeleteRemote(),
deletedTableIds,
deletedPartitionIds));
} catch (Exception e) {
@@ -1711,10 +1712,23 @@ public class ReplicaManager {
}
}
- /** Stop the given replica. */
+ /**
+ * Stop the replica for the given table bucket.
+ *
+ * @param tb the table bucket
+ * @param deleteLocal whether to delete the local data, this will be true
not only the table or
+ * partition of the table bucket already deleted or the replica is
migrated to another
+ * server by rebalance
+ * @param deleteRemote whether to delete the remote data, like remote log
and kv snapshot, this
+ * means the table or partition of the table bucket already deleted
+ * @param deletedTableIds the table ids that are deleted
+ * @param deletedPartitionIds the partition ids that are deleted
+ * @return the result of stop replica
+ */
private StopReplicaResultForBucket stopReplica(
TableBucket tb,
- boolean delete,
+ boolean deleteLocal,
+ boolean deleteRemote,
Map<Long, Path> deletedTableIds,
Map<Long, Path> deletedPartitionIds) {
// First stop fetchers for this table bucket.
@@ -1723,7 +1737,7 @@ public class ReplicaManager {
HostedReplica replica = getReplica(tb);
if (replica instanceof OnlineReplica) {
Replica replicaToDelete = ((OnlineReplica) replica).getReplica();
- if (delete) {
+ if (deleteLocal) {
if (allReplicas.remove(tb) != null) {
serverMetricGroup.removeTableBucketMetricGroup(
replicaToDelete.getPhysicalTablePath().getTablePath(), tb);
@@ -1738,8 +1752,9 @@ public class ReplicaManager {
}
}
- remoteLogManager.stopReplica(replicaToDelete, delete &&
replicaToDelete.isLeader());
- if (delete && replicaToDelete.isLeader()) {
+ remoteLogManager.stopReplica(
+ replicaToDelete, deleteRemote &&
replicaToDelete.isLeader());
+ if (deleteRemote && replicaToDelete.isLeader()) {
kvManager.deleteRemoteKvSnapshot(
replicaToDelete.getPhysicalTablePath(),
replicaToDelete.getTableBucket());
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index 4e4e59908..a4bfa01c5 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -715,12 +715,23 @@ public class ServerRpcMessageUtils {
return notifyLeaderAndIsrResultForBuckets;
}
+ /**
+ * make stop bucket replica request.
+ *
+ * @param tableBucket table bucket
+ * @param deleteLocal delete local log or kv data
+ * @param deleteRemote delete remote log or kv snapshot because of table
or partition was
+ * deleted.
+ * @param leaderEpoch leader epoch
+ * @return stop bucket replica request
+ */
public static PbStopReplicaReqForBucket makeStopBucketReplica(
- TableBucket tableBucket, boolean isDelete, int leaderEpoch) {
+ TableBucket tableBucket, boolean deleteLocal, boolean
deleteRemote, int leaderEpoch) {
PbStopReplicaReqForBucket stopBucketReplicaRequest = new
PbStopReplicaReqForBucket();
PbTableBucket pbTableBucket =
stopBucketReplicaRequest
- .setDelete(isDelete)
+ .setDelete(deleteLocal)
+ .setDeleteRemote(deleteRemote)
.setLeaderEpoch(leaderEpoch)
.setTableBucket()
.setBucketId(tableBucket.getBucket())
@@ -735,10 +746,17 @@ public class ServerRpcMessageUtils {
List<StopReplicaData> stopReplicaDataList = new ArrayList<>();
for (PbStopReplicaReqForBucket reqForBucket :
request.getStopReplicasReqsList()) {
PbTableBucket tableBucket = reqForBucket.getTableBucket();
+ // For backward compatibility, if a request does not include the
deleteRemote flag, the
+ // system treats delete as deleteRemote (i.e., it falls back to
remote deletion). This
+ // ensures older CoordinatorServer continues to function correctly
with newer
+ // TabletServers.
stopReplicaDataList.add(
new StopReplicaData(
toTableBucket(tableBucket),
reqForBucket.isDelete(),
+ reqForBucket.hasDeleteRemote()
+ ? reqForBucket.isDeleteRemote()
+ : reqForBucket.isDelete(),
request.getCoordinatorEpoch(),
reqForBucket.getLeaderEpoch()));
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java
index 422cb267b..16d06492c 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerITCase.java
@@ -17,28 +17,56 @@
package org.apache.fluss.server.coordinator.rebalance;
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.MemorySize;
import org.apache.fluss.metadata.PartitionSpec;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableBucketReplica;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.AddServerTagRequest;
+import org.apache.fluss.server.coordinator.CoordinatorEventProcessor;
+import org.apache.fluss.server.coordinator.event.AccessContextEvent;
import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.statemachine.ReplicaState;
+import org.apache.fluss.server.log.remote.RemoteLogManager;
+import org.apache.fluss.server.log.remote.RemoteLogManifest;
+import org.apache.fluss.server.log.remote.RemoteLogTablet;
+import org.apache.fluss.server.replica.ReplicaManager;
+import org.apache.fluss.server.tablet.TabletServer;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.BucketAssignment;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import static org.apache.fluss.record.TestData.DATA1;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
+import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.createPartition;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable;
+import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest;
+import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
/** IT test for {@link RebalanceManager}. */
@@ -47,7 +75,7 @@ public class RebalanceManagerITCase {
@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
FlussClusterExtension.builder()
- .setNumOfTabletServers(3)
+ .setNumOfTabletServers(6)
.setClusterConf(initConfig())
.build();
@@ -84,8 +112,8 @@ public class RebalanceManagerITCase {
false);
ClusterModel clusterModel = rebalanceManager.buildClusterModel();
- assertThat(clusterModel.servers().size()).isEqualTo(3);
- assertThat(clusterModel.aliveServers().size()).isEqualTo(3);
+ assertThat(clusterModel.servers().size()).isEqualTo(6);
+ assertThat(clusterModel.aliveServers().size()).isEqualTo(6);
assertThat(clusterModel.offlineServers().size()).isEqualTo(0);
assertThat(clusterModel.tables().size()).isEqualTo(2);
assertThat(clusterModel.tables()).contains(tableId1, tableId2);
@@ -97,16 +125,193 @@ public class RebalanceManagerITCase {
FLUSS_CLUSTER_EXTENSION.newCoordinatorClient().addServerTag(request).get();
clusterModel = rebalanceManager.buildClusterModel();
- assertThat(clusterModel.servers().size()).isEqualTo(3);
- assertThat(clusterModel.aliveServers().size()).isEqualTo(2);
+ assertThat(clusterModel.servers().size()).isEqualTo(6);
+ assertThat(clusterModel.aliveServers().size()).isEqualTo(5);
assertThat(clusterModel.offlineServers().size()).isEqualTo(1);
assertThat(clusterModel.tables().size()).isEqualTo(2);
assertThat(clusterModel.tables()).contains(tableId1, tableId2);
}
+ @Test
+ void testRebalanceWithRemoteLog() throws Exception {
+ TableBucket tb = setupTableBucket();
+ long tableId = tb.getTableId();
+
+ int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
+ TabletServerGateway leaderGateway =
+ FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId);
+
+ // produce test records
+ produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 0L);
+ // test metadata updated: verify manifest in metadata
+ TabletServer tabletServer =
FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId);
+ RemoteLogManager remoteLogManager =
tabletServer.getReplicaManager().getRemoteLogManager();
+ RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb);
+
+ RemoteLogManifest manifest = remoteLogTablet.currentManifest();
+
assertThat(manifest.getPhysicalTablePath().getTablePath()).isEqualTo(DATA1_TABLE_PATH);
+ assertThat(manifest.getTableBucket()).isEqualTo(tb);
+ int remoteLogSize = manifest.getRemoteLogSegmentList().size();
+ assertThat(remoteLogSize).isGreaterThan(0);
+
+ // try to trigger rebalance. for example, if current assignment is [0,
1, 2], try to trigger
+ // rebalance to [3, 4, 5]
+ RebalancePlanForBucket planForBucket = buildRebalancePlanForBucket(tb);
+
+ List<Integer> originReplicas = planForBucket.getOriginReplicas();
+ List<Integer> newReplicas = planForBucket.getNewReplicas();
+
+ List<ReplicaState> originReplicaStates = new ArrayList<>();
+ List<ReplicaState> newReplicaStates = new ArrayList<>();
+ fromCoordinatorContext(
+ tb, originReplicas, newReplicas, originReplicaStates,
newReplicaStates);
+
+ // verify pre-rebalance states
+ assertThat(originReplicaStates).allMatch(replicaState -> replicaState
== OnlineReplica);
+ assertThat(newReplicaStates).allMatch(Objects::isNull);
+
+ rebalanceManager.registerRebalance(
+ "test-rebalance-dsds",
+ Collections.singletonMap(tb, planForBucket),
+ RebalanceStatus.NOT_STARTED);
+
+ retry(
+ Duration.ofMinutes(2),
+ () -> {
+ // assignment changed.
+ Set<Integer> newReplicaSet = new
HashSet<>(planForBucket.getNewReplicas());
+ BucketAssignment bucketAssignment =
+ zkClient.getTableAssignment(tableId)
+ .get()
+ .getBucketAssignment(tb.getBucket());
+ List<Integer> replicas = bucketAssignment.getReplicas();
+
assertThat(newReplicaSet.size()).isEqualTo(replicas.size());
+ assertThat(newReplicaSet.containsAll(replicas)).isTrue();
+
+ // leader changed.
+ int newLeader = planForBucket.getNewLeader();
+
assertThat(FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb)).isEqualTo(newLeader);
+
+ // origin replicas all set to offline.
+ for (int originReplica : originReplicas) {
+ TabletServer ts =
+
FLUSS_CLUSTER_EXTENSION.getTabletServerById(originReplica);
+ ReplicaManager rm = ts.getReplicaManager();
+ assertThat(rm.getReplica(tb))
+
.isInstanceOf(ReplicaManager.NoneReplica.class);
+ }
+
+ // replica state changes.
+ originReplicaStates.clear();
+ newReplicaStates.clear();
+ fromCoordinatorContext(
+ tb, originReplicas, newReplicas,
originReplicaStates, newReplicaStates);
+ assertThat(originReplicaStates).allMatch(Objects::isNull);
+ assertThat(newReplicaStates)
+ .allMatch(replicaState -> replicaState ==
OnlineReplica);
+ });
+ // remote log not be deleted.
+ int newLeader = planForBucket.getNewLeader();
+ TabletServer leaderTs =
FLUSS_CLUSTER_EXTENSION.getTabletServerById(newLeader);
+ RemoteLogManager leaderRm =
leaderTs.getReplicaManager().getRemoteLogManager();
+ RemoteLogTablet leaderRlt = leaderRm.remoteLogTablet(tb);
+
+ RemoteLogManifest newManifest = leaderRlt.currentManifest();
+
assertThat(newManifest.getPhysicalTablePath().getTablePath()).isEqualTo(DATA1_TABLE_PATH);
+ assertThat(newManifest.getTableBucket()).isEqualTo(tb);
+
assertThat(newManifest.getRemoteLogSegmentList().size()).isEqualTo(remoteLogSize);
+ }
+
+ private void fromCoordinatorContext(
+ TableBucket tb,
+ List<Integer> originReplicas,
+ List<Integer> newReplicas,
+ List<ReplicaState> originalReplicaStates,
+ List<ReplicaState> newReplicaStates)
+ throws Exception {
+ AccessContextEvent<Void> event =
+ new AccessContextEvent<>(
+ ctx -> {
+ originReplicas.forEach(
+ replica -> {
+ originalReplicaStates.add(
+ ctx.getReplicaState(
+ new
TableBucketReplica(tb, replica)));
+ });
+ newReplicas.forEach(
+ replica -> {
+ newReplicaStates.add(
+ ctx.getReplicaState(
+ new
TableBucketReplica(tb, replica)));
+ });
+ return null;
+ });
+ CoordinatorEventProcessor processor =
+
FLUSS_CLUSTER_EXTENSION.getCoordinatorServer().getCoordinatorEventProcessor();
+ processor.getCoordinatorEventManager().put(event);
+ event.getResultFuture().get(30, TimeUnit.SECONDS);
+ }
+
+ private TableBucket setupTableBucket() throws Exception {
+ long tableId =
+ createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH,
DATA1_TABLE_DESCRIPTOR);
+ TableBucket tb = new TableBucket(tableId, 0);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
+ return tb;
+ }
+
+ private RebalancePlanForBucket buildRebalancePlanForBucket(TableBucket tb)
throws Exception {
+ long tableId = tb.getTableId();
+ BucketAssignment bucketAssignment =
+
zkClient.getTableAssignment(tableId).get().getBucketAssignment(tb.getBucket());
+ List<Integer> replicas = bucketAssignment.getReplicas();
+ Set<Integer> replicasSet = new
HashSet<>(bucketAssignment.getReplicas());
+ int originLeader = zkClient.getLeaderAndIsr(tb).get().leader();
+ List<Integer> newReplicas = new ArrayList<>();
+ int newLeader = originLeader;
+ for (int i = 0; i < 6; i++) {
+ if (!replicasSet.contains(i)) {
+ newReplicas.add(i);
+ if (i != originLeader && newLeader == originLeader) {
+ newLeader = i;
+ }
+ }
+ }
+
+ return new RebalancePlanForBucket(tb, originLeader, newLeader,
replicas, newReplicas);
+ }
+
private static Configuration initConfig() {
Configuration conf = new Configuration();
+ conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
+
+ // set a shorter interval for testing purpose
+ conf.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION,
Duration.ofSeconds(1));
+ conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("1kb"));
+
+ // set a shorter max log time to allow replica shrink from isr. Don't
be too low, otherwise
+ // normal follower synchronization will also be affected
+ conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME,
Duration.ofSeconds(5));
return conf;
}
+
+ private void produceRecordsAndWaitRemoteLogCopy(
+ TabletServerGateway leaderGateway, TableBucket tb, long
baseOffset) throws Exception {
+ for (int i = 0; i < 10; i++) {
+ assertProduceLogResponse(
+ leaderGateway
+ .produceLog(
+ newProduceLogRequest(
+ tb.getTableId(),
+ 0,
+ 1,
+
genMemoryLogRecordsByObject(DATA1)))
+ .get(),
+ 0,
+ baseOffset + i * 10L);
+ }
+ FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(
+ new TableBucket(tb.getTableId(), 0));
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
index e2e5be650..4d06411bc 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogManagerTest.java
@@ -26,12 +26,17 @@ import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.server.coordinator.TestCoordinatorGateway;
import org.apache.fluss.server.entity.FetchReqInfo;
+import org.apache.fluss.server.entity.StopReplicaData;
+import org.apache.fluss.server.entity.StopReplicaResultForBucket;
import org.apache.fluss.server.log.FetchParams;
import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.server.replica.ReplicaManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
@@ -40,8 +45,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
+import static
org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH_PA_2024;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
@@ -497,10 +504,79 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
assertThatThrownBy(() ->
remoteLogManager.relevantRemoteLogSegments(tb, 0L))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("RemoteLogTablet can't be found for
table-bucket " + tb);
- FsPath logTabletDir = remoteLogTabletDir(remoteLogDir(conf),
DATA1_PHYSICAL_TABLE_PATH, tb);
+ FsPath logTabletDir =
+ remoteLogTabletDir(
+ remoteLogDir(conf),
+ partitionTable
+ ? DATA1_PHYSICAL_TABLE_PATH_PA_2024
+ : DATA1_PHYSICAL_TABLE_PATH,
+ tb);
assertThat(logTabletDir.getFileSystem().exists(logTabletDir)).isFalse();
}
+ @ParameterizedTest
+ @MethodSource("stopArgs")
+ void testStopReplicaDeleteRemoteLog(boolean partitionTable, boolean
deleteRemote)
+ throws Exception {
+ TableBucket tb = makeTableBucket(partitionTable);
+ // Need to make leader by ReplicaManager.
+ makeLogTableAsLeader(tb, partitionTable);
+ LogTablet logTablet =
replicaManager.getReplicaOrException(tb).getLogTablet();
+ addMultiSegmentsToLogTablet(logTablet, 5);
+ // trigger RLMTask copy local log segment to remote and update
metadata.
+ remoteLogTaskScheduler.triggerPeriodicScheduledTasks();
+ List<RemoteLogSegment> remoteLogSegmentList =
+ remoteLogManager.relevantRemoteLogSegments(tb, 0L);
+ assertThat(remoteLogSegmentList.size()).isEqualTo(4);
+ assertThat(listRemoteLogFiles(tb))
+ .isEqualTo(
+ remoteLogSegmentList.stream()
+ .map(s -> s.remoteLogSegmentId().toString())
+ .collect(Collectors.toSet()));
+ assertThat(remoteLogManager.getTaskWithFuture(tb)).isNotNull();
+
+ FsPath remoteLogTabletDir =
+ remoteLogTabletDir(
+ remoteLogDir(conf),
+ partitionTable
+ ? DATA1_PHYSICAL_TABLE_PATH_PA_2024
+ : DATA1_PHYSICAL_TABLE_PATH,
+ tb);
+
assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isTrue();
+ assertThat(logTablet.getLogDir().exists()).isTrue();
+
+ // stop with delete = false, deleteRemote =false, local and remote log
should be kept,
+ // remote log task will be removed.
+ CompletableFuture<List<StopReplicaResultForBucket>> future1 = new
CompletableFuture<>();
+ replicaManager.stopReplicas(
+ 0,
+ Collections.singletonList(new StopReplicaData(tb, false,
false, 0, 0)),
+ future1::complete);
+ assertThat(future1.get()).containsOnly(new
StopReplicaResultForBucket(tb));
+ ReplicaManager.HostedReplica hostedReplica =
replicaManager.getReplica(tb);
+
assertThat(hostedReplica).isInstanceOf(ReplicaManager.OnlineReplica.class);
+
assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isTrue();
+ assertThat(remoteLogManager.getTaskWithFuture(tb)).isNull();
+
+ CompletableFuture<List<StopReplicaResultForBucket>> future2 = new
CompletableFuture<>();
+ replicaManager.stopReplicas(
+ 0,
+ Collections.singletonList(new StopReplicaData(tb, true,
deleteRemote, 0, 0)),
+ future2::complete);
+ assertThat(future2.get()).containsOnly(new
StopReplicaResultForBucket(tb));
+ hostedReplica = replicaManager.getReplica(tb);
+
assertThat(hostedReplica).isInstanceOf(ReplicaManager.NoneReplica.class);
+ assertThat(logTablet.getLogDir().exists()).isFalse();
+ if (!deleteRemote) {
+ // stop with delete = true, deleteRemote =false, local log should
be deleted, remote log
+ // should be kept.
+
assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isTrue();
+ } else {
+ // stop with delete = true, deleteRemote =true, local and remote
log should be deleted
+
assertThat(remoteLogTabletDir.getFileSystem().exists(remoteLogTabletDir)).isFalse();
+ }
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testLookupOffsetForTimestamp(boolean partitionTable) throws Exception
{
@@ -540,4 +616,12 @@ class RemoteLogManagerTest extends RemoteLogTestBase {
return new TableBucket(tableId, 0);
}
}
+
+ private static Stream<Arguments> stopArgs() {
+ return Stream.of(
+ Arguments.of(false, false),
+ Arguments.of(false, true),
+ Arguments.of(true, false),
+ Arguments.of(true, true));
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
index 2fd268d5e..bd63f3cbf 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java
@@ -1523,7 +1523,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.stopReplicas(
INITIAL_COORDINATOR_EPOCH,
Collections.singletonList(
- new StopReplicaData(tb, true,
INITIAL_COORDINATOR_EPOCH, 1)),
+ new StopReplicaData(tb, true, true,
INITIAL_COORDINATOR_EPOCH, 1)),
future1::complete);
assertThat(future1.get()).containsOnly(new
StopReplicaResultForBucket(tb));
ReplicaManager.HostedReplica hostedReplica =
replicaManager.getReplica(tb);
@@ -1554,7 +1554,7 @@ class ReplicaManagerTest extends ReplicaTestBase {
replicaManager.stopReplicas(
INITIAL_COORDINATOR_EPOCH,
Collections.singletonList(
- new StopReplicaData(tb, true,
INITIAL_COORDINATOR_EPOCH, 1)),
+ new StopReplicaData(tb, true, true,
INITIAL_COORDINATOR_EPOCH, 1)),
future1::complete);
assertThat(future1.get())
.containsOnly(
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index 2f2ee3735..8eaf29a0b 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -832,7 +832,7 @@ public final class FlussClusterExtension
.addAllStopReplicasReqs(
Collections.singleton(
makeStopBucketReplica(
- tableBucket, false,
leaderEpoch))))
+ tableBucket, false,
false, leaderEpoch))))
.get();
}