This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 79d9dbbe5 [server] TabletServer support controlled shutdown (#1159)
79d9dbbe5 is described below

commit 79d9dbbe54d6aec61f113e026cecbbf03287c588
Author: yunhong <[email protected]>
AuthorDate: Sun Sep 21 15:40:24 2025 +0800

    [server] TabletServer support controlled shutdown (#1159)
---
 .../TabletServerNotAvailableException.java         |  27 ++---
 .../fluss/rpc/gateway/CoordinatorGateway.java      |   7 ++
 .../org/apache/fluss/rpc/protocol/ApiKeys.java     |   3 +-
 fluss-rpc/src/main/proto/FlussApi.proto            |   8 ++
 .../java/org/apache/fluss/server/ServerBase.java   |   1 +
 .../server/coordinator/CoordinatorContext.java     |  24 ++++-
 .../coordinator/CoordinatorEventProcessor.java     |  78 ++++++++++++++
 .../coordinator/CoordinatorRequestBatch.java       |   2 +-
 .../server/coordinator/CoordinatorService.java     |  17 ++++
 .../coordinator/event/ControlledShutdownEvent.java |  50 +++++++++
 .../ReplicaLeaderElectionAlgorithms.java           |  85 +++++++++++++++-
 ...hms.java => ReplicaLeaderElectionStrategy.java} |  21 +---
 .../statemachine/ReplicaStateMachine.java          |   9 +-
 .../statemachine/TableBucketStateMachine.java      | 112 ++++++++++++---------
 .../apache/fluss/server/tablet/TabletServer.java   |  70 ++++++++++++-
 .../fluss/server/utils/ServerRpcMessageUtils.java  |  11 ++
 .../apache/fluss/server/zk/data/LeaderAndIsr.java  |  29 ++++--
 .../coordinator/CoordinatorEventProcessorTest.java |   9 +-
 .../server/coordinator/CoordinatorTestUtils.java   |   5 +-
 .../server/coordinator/TestCoordinatorGateway.java |   8 ++
 .../ReplicaLeaderElectionAlgorithmsTest.java       | 112 +++++++++++++++++++++
 .../statemachine/ReplicaStateMachineTest.java      |   4 +-
 .../statemachine/TableBucketStateMachineTest.java  |  79 ++++++++++++---
 ...ITCase.java => TabletServerShutdownITCase.java} |  52 ++++++++--
 24 files changed, 693 insertions(+), 130 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java
similarity index 52%
copy from 
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
copy to 
fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java
index d0f6b1835..320daeff6 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java
@@ -15,23 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.server.coordinator.statemachine;
+package org.apache.fluss.exception;
 
-import java.util.List;
-import java.util.Optional;
+import org.apache.fluss.annotation.PublicEvolving;
 
-/** The algorithms to elect the replica leader. */
-public class ReplicaLeaderElectionAlgorithms {
-    public static Optional<Integer> defaultReplicaLeaderElection(
-            List<Integer> assignments, List<Integer> aliveReplicas, 
List<Integer> isr) {
-        // currently, we always use the first replica in assignment, which 
also in aliveReplicas and
-        // isr as the leader replica.
-        for (int assignment : assignments) {
-            if (aliveReplicas.contains(assignment) && 
isr.contains(assignment)) {
-                return Optional.of(assignment);
-            }
-        }
-
-        return Optional.empty();
+/**
+ * Thrown when the tabletServer is not available.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public class TabletServerNotAvailableException extends ApiException {
+    public TabletServerNotAvailableException(String message) {
+        super(message);
     }
 }
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java
index a089a324e..a78c9bdbd 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java
@@ -26,6 +26,8 @@ import 
org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
 import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
 import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
 import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
+import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
 import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
 import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse;
 import org.apache.fluss.rpc.protocol.ApiKeys;
@@ -78,4 +80,9 @@ public interface CoordinatorGateway extends RpcGateway, 
AdminGateway {
     @RPC(api = ApiKeys.LAKE_TIERING_HEARTBEAT)
     CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat(
             LakeTieringHeartbeatRequest request);
+
+    /** Try to controlled shutdown for tabletServer with specify 
tabletServerId. */
+    @RPC(api = ApiKeys.CONTROLLED_SHUTDOWN)
+    CompletableFuture<ControlledShutdownResponse> controlledShutdown(
+            ControlledShutdownRequest request);
 }
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
index 0bc2d494a..8526581ae 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
@@ -70,7 +70,8 @@ public enum ApiKeys {
     CREATE_ACLS(1039, 0, 0, PUBLIC),
     LIST_ACLS(1040, 0, 0, PUBLIC),
     DROP_ACLS(1041, 0, 0, PUBLIC),
-    LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE);
+    LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
+    CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE);
 
     private static final Map<Integer, ApiKeys> ID_TO_TYPE =
             Arrays.stream(ApiKeys.values())
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index 19e401095..200b8b520 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -532,6 +532,14 @@ message LakeTieringHeartbeatResponse {
   repeated PbHeartbeatRespForTable failed_table_resp = 5;
 }
 
+message ControlledShutdownRequest {
+  required int32 tablet_server_id = 1;
+  required int32 tablet_server_epoch = 2;
+}
+
+message ControlledShutdownResponse {
+  repeated PbTableBucket remaining_leader_buckets = 1;
+}
 
 // --------------- Inner classes ----------------
 message PbApiVersion {
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java 
b/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java
index 99c1e105f..4acbafd09 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java
@@ -115,6 +115,7 @@ public abstract class ServerBase implements 
AutoCloseableAsync, FatalErrorHandle
     public void start() throws Exception {
         try {
             addShutDownHook();
+
             // at first, we need to initialize the file system
             pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(conf);
             FileSystem.initialize(conf, pluginManager);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
index aa372f56a..4cb988967 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
@@ -67,6 +67,7 @@ public class CoordinatorContext {
     private final Map<TableBucketReplica, Integer> failDeleteNumbers = new 
HashMap<>();
 
     private final Map<Integer, ServerInfo> liveTabletServers = new HashMap<>();
+    private final Set<Integer> shuttingDownTabletServers = new HashSet<>();
 
     // a map from the table bucket to the state of the bucket.
     private final Map<TableBucket, BucketState> bucketStates = new HashMap<>();
@@ -114,6 +115,24 @@ public class CoordinatorContext {
         return liveTabletServers;
     }
 
+    public Set<Integer> liveTabletServerSet() {
+        Set<Integer> liveTabletServers = new HashSet<>();
+        for (Integer brokerId : this.liveTabletServers.keySet()) {
+            if (!shuttingDownTabletServers.contains(brokerId)) {
+                liveTabletServers.add(brokerId);
+            }
+        }
+        return liveTabletServers;
+    }
+
+    public Set<Integer> shuttingDownTabletServers() {
+        return shuttingDownTabletServers;
+    }
+
+    public Set<Integer> liveOrShuttingDownTabletServers() {
+        return liveTabletServers.keySet();
+    }
+
     @VisibleForTesting
     public void setLiveTabletServers(List<ServerInfo> servers) {
         liveTabletServers.clear();
@@ -136,8 +155,8 @@ public class CoordinatorContext {
         this.liveTabletServers.remove(serverId);
     }
 
-    public boolean isReplicaAndServerOnline(int serverId, TableBucket 
tableBucket) {
-        return liveTabletServers.containsKey(serverId)
+    public boolean isReplicaOnline(int serverId, TableBucket tableBucket) {
+        return liveTabletServerSet().contains(serverId)
                 && !replicasOnOffline
                         .getOrDefault(serverId, Collections.emptySet())
                         .contains(tableBucket);
@@ -636,5 +655,6 @@ public class CoordinatorContext {
         clearTablesState();
         // clear the live tablet servers
         liveTabletServers.clear();
+        shuttingDownTabletServers.clear();
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index 66d52d9fc..bab477d20 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -27,6 +27,7 @@ import org.apache.fluss.exception.FencedLeaderEpochException;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.InvalidCoordinatorException;
 import org.apache.fluss.exception.InvalidUpdateVersionException;
+import org.apache.fluss.exception.TabletServerNotAvailableException;
 import org.apache.fluss.exception.UnknownTableOrBucketException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
@@ -38,6 +39,7 @@ import org.apache.fluss.rpc.messages.AdjustIsrResponse;
 import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse;
 import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
 import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
 import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
 import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.server.coordinator.event.AccessContextEvent;
@@ -45,6 +47,7 @@ import 
org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
 import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
 import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
 import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
+import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
 import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
 import org.apache.fluss.server.coordinator.event.CoordinatorEventManager;
 import org.apache.fluss.server.coordinator.event.CreatePartitionEvent;
@@ -72,6 +75,7 @@ import 
org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore;
 import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
 import org.apache.fluss.server.metadata.ServerInfo;
 import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.utils.ServerRpcMessageUtils;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.BucketAssignment;
 import org.apache.fluss.server.zk.data.LakeTableSnapshot;
@@ -104,6 +108,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
 import static 
org.apache.fluss.server.coordinator.statemachine.BucketState.OnlineBucket;
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION;
 import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica;
 import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
 import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
@@ -529,6 +534,11 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
             completeFromCallable(
                     commitLakeTableSnapshotEvent.getRespCallback(),
                     () -> 
tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
+        } else if (event instanceof ControlledShutdownEvent) {
+            ControlledShutdownEvent controlledShutdownEvent = 
(ControlledShutdownEvent) event;
+            completeFromCallable(
+                    controlledShutdownEvent.getRespCallback(),
+                    () -> 
tryProcessControlledShutdown(controlledShutdownEvent));
         } else if (event instanceof AccessContextEvent) {
             AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>) 
event;
             processAccessContext(accessContextEvent);
@@ -865,6 +875,7 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         LOG.info("Tablet server failure callback for {}.", tabletServerId);
         coordinatorContext.removeOfflineBucketInServer(tabletServerId);
         coordinatorContext.removeLiveTabletServer(tabletServerId);
+        coordinatorContext.shuttingDownTabletServers().remove(tabletServerId);
         coordinatorChannelManager.removeTabletServer(tabletServerId);
 
         // Here, we will first update alive tabletServer info for all 
tabletServers and
@@ -1165,6 +1176,73 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         return response;
     }
 
+    private ControlledShutdownResponse tryProcessControlledShutdown(
+            ControlledShutdownEvent controlledShutdownEvent) {
+        ControlledShutdownResponse response = new ControlledShutdownResponse();
+
+        // TODO here we need to check tabletServerEpoch, avoid to receive 
controlled shutdown
+        // request from an old tabletServer. Trace by 
https://github.com/alibaba/fluss/issues/1153
+        int tabletServerEpoch = controlledShutdownEvent.getTabletServerEpoch();
+
+        int tabletServerId = controlledShutdownEvent.getTabletServerId();
+        LOG.info(
+                "Try to process controlled shutdown for tabletServer: {} of 
tabletServer epoch: {}",
+                controlledShutdownEvent.getTabletServerId(),
+                tabletServerEpoch);
+
+        if 
(!coordinatorContext.liveOrShuttingDownTabletServers().contains(tabletServerId))
 {
+            throw new TabletServerNotAvailableException(
+                    "TabletServer" + tabletServerId + " is not available.");
+        }
+
+        coordinatorContext.shuttingDownTabletServers().add(tabletServerId);
+        LOG.debug(
+                "All shutting down tabletServers: {}",
+                coordinatorContext.shuttingDownTabletServers());
+        LOG.debug("All live tabletServers: {}", 
coordinatorContext.liveTabletServerSet());
+
+        List<TableBucketReplica> replicasToActOn =
+                
coordinatorContext.replicasOnTabletServer(tabletServerId).stream()
+                        .filter(
+                                replica -> {
+                                    TableBucket tableBucket = 
replica.getTableBucket();
+                                    return 
!coordinatorContext.getAssignment(tableBucket).isEmpty()
+                                            && coordinatorContext
+                                                    
.getBucketLeaderAndIsr(tableBucket)
+                                                    .isPresent()
+                                            && 
!coordinatorContext.isToBeDeleted(tableBucket);
+                                })
+                        .collect(Collectors.toList());
+
+        Set<TableBucket> bucketsLedByServer = new HashSet<>();
+        Set<TableBucketReplica> replicasFollowedByServer = new HashSet<>();
+        for (TableBucketReplica replica : replicasToActOn) {
+            TableBucket tableBucket = replica.getTableBucket();
+            if (replica.getReplica()
+                    == 
coordinatorContext.getBucketLeaderAndIsr(tableBucket).get().leader()) {
+                bucketsLedByServer.add(tableBucket);
+            } else {
+                replicasFollowedByServer.add(replica);
+            }
+        }
+
+        tableBucketStateMachine.handleStateChange(
+                bucketsLedByServer, OnlineBucket, 
CONTROLLED_SHUTDOWN_ELECTION);
+
+        // TODO need send stop request to the leader?
+
+        // If the tabletServer is a follower, updates the isr in ZK and 
notifies the current leader.
+        replicaStateMachine.handleStateChanges(replicasFollowedByServer, 
OfflineReplica);
+
+        // Return the list of buckets that are still being managed by the 
controlled shutdown
+        // tabletServer after leader migration.
+        response.addAllRemainingLeaderBuckets(
+                
coordinatorContext.getBucketsWithLeaderIn(tabletServerId).stream()
+                        .map(ServerRpcMessageUtils::fromTableBucket)
+                        .collect(Collectors.toList()));
+        return response;
+    }
+
     private void validateFencedEvent(FencedCoordinatorEvent event) {
         TableBucket tb = event.getTableBucket();
         if (coordinatorContext.getTablePathById(tb.getTableId()) == null) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
index 504b2b439..de6778d35 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
@@ -211,7 +211,7 @@ public class CoordinatorRequestBatch {
             List<Integer> bucketReplicas,
             LeaderAndIsr leaderAndIsr) {
         tabletServers.stream()
-                .filter(s -> s >= 0)
+                .filter(s -> s >= 0 && 
!coordinatorContext.shuttingDownTabletServers().contains(s))
                 .forEach(
                         id -> {
                             Map<TableBucket, PbNotifyLeaderAndIsrReqForBucket>
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 5f87937a2..dd3eb62ab 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -49,6 +49,8 @@ import 
org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
 import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
 import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
 import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
+import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
 import org.apache.fluss.rpc.messages.CreateAclsRequest;
 import org.apache.fluss.rpc.messages.CreateAclsResponse;
 import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
@@ -86,6 +88,7 @@ import 
org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
 import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
 import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
 import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
+import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
 import org.apache.fluss.server.coordinator.event.EventManager;
 import org.apache.fluss.server.entity.CommitKvSnapshotData;
 import org.apache.fluss.server.entity.LakeTieringTableInfo;
@@ -575,6 +578,20 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(heartbeatResponse);
     }
 
+    @Override
+    public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
+            ControlledShutdownRequest request) {
+        CompletableFuture<ControlledShutdownResponse> response = new 
CompletableFuture<>();
+        eventManagerSupplier
+                .get()
+                .put(
+                        new ControlledShutdownEvent(
+                                request.getTabletServerId(),
+                                request.getTabletServerEpoch(),
+                                response));
+        return response;
+    }
+
     private void validateHeartbeatRequest(
             PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) {
         if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ControlledShutdownEvent.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ControlledShutdownEvent.java
new file mode 100644
index 000000000..7f93d0805
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ControlledShutdownEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event;
+
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
+
+import java.util.concurrent.CompletableFuture;
+
+/** An event for controlled shutdown of TabletServer. */
+public class ControlledShutdownEvent implements CoordinatorEvent {
+    private final int tabletServerId;
+    private final int tabletServerEpoch;
+    private final CompletableFuture<ControlledShutdownResponse> respCallback;
+
+    public ControlledShutdownEvent(
+            int tabletServerId,
+            int tabletServerEpoch,
+            CompletableFuture<ControlledShutdownResponse> respCallback) {
+        this.tabletServerId = tabletServerId;
+        this.tabletServerEpoch = tabletServerEpoch;
+        this.respCallback = respCallback;
+    }
+
+    public int getTabletServerId() {
+        return tabletServerId;
+    }
+
+    public int getTabletServerEpoch() {
+        return tabletServerEpoch;
+    }
+
+    public CompletableFuture<ControlledShutdownResponse> getRespCallback() {
+        return respCallback;
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
index d0f6b1835..c7c1aa07a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
@@ -17,21 +17,100 @@
 
 package org.apache.fluss.server.coordinator.statemachine;
 
+import 
org.apache.fluss.server.coordinator.statemachine.TableBucketStateMachine.ElectionResult;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** The algorithms to elect the replica leader. */
 public class ReplicaLeaderElectionAlgorithms {
-    public static Optional<Integer> defaultReplicaLeaderElection(
-            List<Integer> assignments, List<Integer> aliveReplicas, 
List<Integer> isr) {
+
+    /**
+     * Init replica leader election when the bucket is new created.
+     *
+     * @param assignments the assignments
+     * @param aliveReplicas the alive replicas
+     * @param coordinatorEpoch the coordinator epoch
+     * @return the election result
+     */
+    public static Optional<ElectionResult> initReplicaLeaderElection(
+            List<Integer> assignments, List<Integer> aliveReplicas, int 
coordinatorEpoch) {
+        // currently, we always use the first replica in assignment, which 
also in aliveReplicas and
+        // isr as the leader replica.
+        for (int assignment : assignments) {
+            if (aliveReplicas.contains(assignment)) {
+                return Optional.of(
+                        new ElectionResult(
+                                aliveReplicas,
+                                new LeaderAndIsr(
+                                        assignment, 0, aliveReplicas, 
coordinatorEpoch, 0)));
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    /**
+     * Default replica leader election, like electing leader while leader 
offline.
+     *
+     * @param assignments the assignments
+     * @param aliveReplicas the alive replicas
+     * @param leaderAndIsr the original leaderAndIsr
+     * @return the election result
+     */
+    public static Optional<ElectionResult> defaultReplicaLeaderElection(
+            List<Integer> assignments, List<Integer> aliveReplicas, 
LeaderAndIsr leaderAndIsr) {
         // currently, we always use the first replica in assignment, which 
also in aliveReplicas and
         // isr as the leader replica.
+        List<Integer> isr = leaderAndIsr.isr();
         for (int assignment : assignments) {
             if (aliveReplicas.contains(assignment) && 
isr.contains(assignment)) {
-                return Optional.of(assignment);
+                return Optional.of(
+                        new ElectionResult(
+                                aliveReplicas, 
leaderAndIsr.newLeaderAndIsr(assignment, isr)));
             }
         }
 
         return Optional.empty();
     }
+
+    /**
+     * Controlled shutdown replica leader election.
+     *
+     * @param assignments the assignments
+     * @param aliveReplicas the alive replicas
+     * @param leaderAndIsr the original leaderAndIsr
+     * @param shutdownTabletServers the shutdown tabletServers
+     * @return the election result
+     */
+    public static Optional<ElectionResult> 
controlledShutdownReplicaLeaderElection(
+            List<Integer> assignments,
+            List<Integer> aliveReplicas,
+            LeaderAndIsr leaderAndIsr,
+            Set<Integer> shutdownTabletServers) {
+        List<Integer> originIsr = leaderAndIsr.isr();
+        Set<Integer> isrSet = new HashSet<>(originIsr);
+        for (Integer id : assignments) {
+            if (aliveReplicas.contains(id)
+                    && isrSet.contains(id)
+                    && !shutdownTabletServers.contains(id)) {
+                Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
+                newAliveReplicas.removeAll(shutdownTabletServers);
+                List<Integer> newIsr =
+                        originIsr.stream()
+                                .filter(replica -> 
!shutdownTabletServers.contains(replica))
+                                .collect(Collectors.toList());
+                return Optional.of(
+                        new ElectionResult(
+                                new ArrayList<>(newAliveReplicas),
+                                leaderAndIsr.newLeaderAndIsr(id, newIsr)));
+            }
+        }
+        return Optional.empty();
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionStrategy.java
similarity index 55%
copy from 
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
copy to 
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionStrategy.java
index d0f6b1835..faff47a42 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionStrategy.java
@@ -17,21 +17,8 @@
 
 package org.apache.fluss.server.coordinator.statemachine;
 
-import java.util.List;
-import java.util.Optional;
-
-/** The algorithms to elect the replica leader. */
-public class ReplicaLeaderElectionAlgorithms {
-    public static Optional<Integer> defaultReplicaLeaderElection(
-            List<Integer> assignments, List<Integer> aliveReplicas, 
List<Integer> isr) {
-        // currently, we always use the first replica in assignment, which 
also in aliveReplicas and
-        // isr as the leader replica.
-        for (int assignment : assignments) {
-            if (aliveReplicas.contains(assignment) && 
isr.contains(assignment)) {
-                return Optional.of(assignment);
-            }
-        }
-
-        return Optional.empty();
-    }
+/** The strategies to elect the replica leader. */
+public enum ReplicaLeaderElectionStrategy {
+    DEFAULT_ELECTION,
+    CONTROLLED_SHUTDOWN_ELECTION
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
index be77a43e5..235f4151d 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
@@ -107,7 +107,7 @@ public class ReplicaStateMachine {
             for (Integer replica : replicas) {
                 TableBucketReplica tableBucketReplica =
                         new TableBucketReplica(tableBucket, replica);
-                if (coordinatorContext.isReplicaAndServerOnline(replica, 
tableBucket)) {
+                if (coordinatorContext.isReplicaOnline(replica, tableBucket)) {
                     coordinatorContext.putReplicaState(
                             tableBucketReplica, ReplicaState.OnlineReplica);
                     onlineReplicas.add(tableBucketReplica);
@@ -419,7 +419,7 @@ public class ReplicaStateMachine {
             TableBucket tableBucket = tableBucketReplica.getTableBucket();
             int replicaId = tableBucketReplica.getReplica();
 
-            LeaderAndIsr leaderAndIsr = null;
+            LeaderAndIsr leaderAndIsr;
             if (toUpdateLeaderAndIsrList.get(tableBucket) != null) {
                 leaderAndIsr = toUpdateLeaderAndIsrList.get(tableBucket);
             } else {
@@ -451,7 +451,10 @@ public class ReplicaStateMachine {
                             : leaderAndIsr.isr().stream()
                                     .filter(id -> id != replicaId)
                                     .collect(Collectors.toList());
-            LeaderAndIsr adjustLeaderAndIsr = 
leaderAndIsr.newLeaderAndIsr(newLeader, newIsr);
+            LeaderAndIsr adjustLeaderAndIsr =
+                    newLeader == LeaderAndIsr.NO_LEADER
+                            ? leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
+                            : leaderAndIsr.newLeaderAndIsr(newIsr);
             adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
             toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
         }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
index d1b9958cb..33e278f44 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
@@ -40,6 +40,12 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.controlledShutdownReplicaLeaderElection;
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection;
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.initReplicaLeaderElection;
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION;
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.DEFAULT_ELECTION;
+
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
  * additional information regarding copyright ownership. */
@@ -85,7 +91,7 @@ public class TableBucketStateMachine {
                             .map(
                                     leaderAndIsr -> {
                                         // ONLINE if the leader is alive, 
otherwise, it's OFFLINE
-                                        if 
(coordinatorContext.isReplicaAndServerOnline(
+                                        if (coordinatorContext.isReplicaOnline(
                                                 leaderAndIsr.leader(), 
tableBucket)) {
                                             return BucketState.OnlineBucket;
                                         } else {
@@ -115,6 +121,13 @@ public class TableBucketStateMachine {
     }
 
     public void handleStateChange(Set<TableBucket> tableBuckets, BucketState 
targetState) {
+        handleStateChange(tableBuckets, targetState, DEFAULT_ELECTION);
+    }
+
+    public void handleStateChange(
+            Set<TableBucket> tableBuckets,
+            BucketState targetState,
+            ReplicaLeaderElectionStrategy replicaLeaderElectionStrategy) {
         try {
             coordinatorRequestBatch.newBatch();
 
@@ -123,7 +136,7 @@ public class TableBucketStateMachine {
                 batchHandleOnlineChangeAndInitLeader(tableBuckets);
             } else {
                 for (TableBucket tableBucket : tableBuckets) {
-                    doHandleStateChange(tableBucket, targetState);
+                    doHandleStateChange(tableBucket, targetState, 
replicaLeaderElectionStrategy);
                 }
             }
             coordinatorRequestBatch.sendRequestToTabletServers(
@@ -175,8 +188,12 @@ public class TableBucketStateMachine {
      *
      * @param tableBucket The table bucket that is to do state change
      * @param targetState the target state that is to change to
+     * @param replicaLeaderElectionStrategy the strategy to choose a new leader
      */
-    private void doHandleStateChange(TableBucket tableBucket, BucketState 
targetState) {
+    private void doHandleStateChange(
+            TableBucket tableBucket,
+            BucketState targetState,
+            ReplicaLeaderElectionStrategy replicaLeaderElectionStrategy) {
         coordinatorContext.putBucketStateIfNotExists(tableBucket, 
BucketState.NonExistentBucket);
         if (!checkValidTableBucketStateChange(tableBucket, targetState)) {
             return;
@@ -224,7 +241,8 @@ public class TableBucketStateMachine {
                     // current state is Online or Offline
                     // not new bucket, we then need to update leader/epoch for 
the bucket
                     Optional<ElectionResult> optionalElectionResult =
-                            electNewLeaderForTableBuckets(tableBucket);
+                            electNewLeaderForTableBuckets(
+                                    tableBucket, 
replicaLeaderElectionStrategy);
                     if (!optionalElectionResult.isPresent()) {
                         logFailedStateChange(tableBucket, currentState, 
targetState);
                     } else {
@@ -389,10 +407,7 @@ public class TableBucketStateMachine {
         // filter out the live servers
         List<Integer> liveServers =
                 assignedServers.stream()
-                        .filter(
-                                (server) ->
-                                        
coordinatorContext.isReplicaAndServerOnline(
-                                                server, tableBucket))
+                        .filter((server) -> 
coordinatorContext.isReplicaOnline(server, tableBucket))
                         .collect(Collectors.toList());
         // todo, consider this case, may reassign with other servers?
         if (liveServers.isEmpty()) {
@@ -413,23 +428,16 @@ public class TableBucketStateMachine {
         }
         // For the case that the table bucket has been initialized, we use all 
the live assigned
         // servers as inSyncReplica set.
-        List<Integer> isr = liveServers;
-        Optional<Integer> leaderOpt =
-                ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection(
-                        assignedServers, liveServers, isr);
-        if (!leaderOpt.isPresent()) {
+        Optional<ElectionResult> resultOpt =
+                initReplicaLeaderElection(
+                        assignedServers, liveServers, 
coordinatorContext.getCoordinatorEpoch());
+        if (!resultOpt.isPresent()) {
             LOG.error(
                     "The leader election for table bucket {} is empty.",
                     stringifyBucket(tableBucket));
             return Optional.empty();
         }
-        int leader = leaderOpt.get();
-
-        // Register the initial leader and isr.
-        LeaderAndIsr leaderAndIsr =
-                new LeaderAndIsr(leader, 0, isr, 
coordinatorContext.getCoordinatorEpoch(), 0);
-
-        return Optional.of(new ElectionResult(liveServers, leaderAndIsr));
+        return resultOpt;
     }
 
     private List<RegisterTableBucketLeadAndIsrInfo> 
tryRegisterLeaderAndIsrOneByOne(
@@ -449,7 +457,8 @@ public class TableBucketStateMachine {
         return registerSuccessList;
     }
 
-    private Optional<ElectionResult> electNewLeaderForTableBuckets(TableBucket 
tableBucket) {
+    private Optional<ElectionResult> electNewLeaderForTableBuckets(
+            TableBucket tableBucket, ReplicaLeaderElectionStrategy 
electionStrategy) {
         LeaderAndIsr leaderAndIsr;
         try {
             leaderAndIsr = zooKeeperClient.getLeaderAndIsr(tableBucket).get();
@@ -469,7 +478,7 @@ public class TableBucketStateMachine {
         }
         // re-election
         Optional<ElectionResult> optionalElectionResult =
-                leaderForOffline(tableBucket, leaderAndIsr);
+                electLeader(tableBucket, leaderAndIsr, electionStrategy);
         if (!optionalElectionResult.isPresent()) {
             LOG.error(
                     "The result of elect leader for table bucket {} is empty.",
@@ -564,19 +573,24 @@ public class TableBucketStateMachine {
     }
 
     /**
-     * Elect a new leader for new or offline bucket, it'll always elect one 
from the live replicas
-     * in isr set.
+     * Elect a new leader for bucket, it'll always elect one from the live 
replicas in isr set.
+     *
+     * <p>The elect cases including:
+     *
+     * <ol>
+     *   <li>new or offline bucket
+     *   <li>tabletServer controlled shutdown
+     * </ol>
      */
-    private Optional<ElectionResult> leaderForOffline(
-            TableBucket tableBucket, LeaderAndIsr leaderAndIsr) {
+    private Optional<ElectionResult> electLeader(
+            TableBucket tableBucket,
+            LeaderAndIsr leaderAndIsr,
+            ReplicaLeaderElectionStrategy electionStrategy) {
         List<Integer> assignment = 
coordinatorContext.getAssignment(tableBucket);
         // filter out the live servers
         List<Integer> liveReplicas =
                 assignment.stream()
-                        .filter(
-                                replica ->
-                                        
coordinatorContext.isReplicaAndServerOnline(
-                                                replica, tableBucket))
+                        .filter(replica -> 
coordinatorContext.isReplicaOnline(replica, tableBucket))
                         .collect(Collectors.toList());
         // we'd like use the first live replica as the new leader
         if (liveReplicas.isEmpty()) {
@@ -584,29 +598,27 @@ public class TableBucketStateMachine {
             return Optional.empty();
         }
 
-        Optional<Integer> leaderOpt =
-                ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection(
-                        assignment, liveReplicas, leaderAndIsr.isr());
-        if (!leaderOpt.isPresent()) {
+        Optional<ElectionResult> resultOpt = Optional.empty();
+        if (electionStrategy == DEFAULT_ELECTION) {
+            resultOpt = defaultReplicaLeaderElection(assignment, liveReplicas, 
leaderAndIsr);
+        } else if (electionStrategy == CONTROLLED_SHUTDOWN_ELECTION) {
+            Set<Integer> shuttingDownTabletServers = 
coordinatorContext.shuttingDownTabletServers();
+            resultOpt =
+                    controlledShutdownReplicaLeaderElection(
+                            assignment, liveReplicas, leaderAndIsr, 
shuttingDownTabletServers);
+        }
+
+        if (!resultOpt.isPresent()) {
             LOG.error(
                     "The leader election for table bucket {} is empty.",
                     stringifyBucket(tableBucket));
             return Optional.empty();
         }
-
-        // get the updated leader and isr
-        LeaderAndIsr newLeaderAndIsr =
-                new LeaderAndIsr(
-                        leaderOpt.get(),
-                        leaderAndIsr.leaderEpoch() + 1,
-                        leaderAndIsr.isr(),
-                        coordinatorContext.getCoordinatorEpoch(),
-                        leaderAndIsr.bucketEpoch() + 1);
-
-        return Optional.of(new ElectionResult(liveReplicas, newLeaderAndIsr));
+        return resultOpt;
     }
 
-    private static class ElectionResult {
+    /** The result of leader election. */
+    public static class ElectionResult {
         private final List<Integer> liveReplicas;
         private final LeaderAndIsr leaderAndIsr;
 
@@ -614,5 +626,13 @@ public class TableBucketStateMachine {
             this.liveReplicas = liveReplicas;
             this.leaderAndIsr = leaderAndIsr;
         }
+
+        public List<Integer> getLiveReplicas() {
+            return liveReplicas;
+        }
+
+        public LeaderAndIsr getLeaderAndIsr() {
+            return leaderAndIsr;
+        }
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 0e593c2c8..63121dbc3 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -24,11 +24,14 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.IllegalConfigurationException;
 import org.apache.fluss.exception.InvalidServerRackInfoException;
+import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metrics.registry.MetricRegistry;
 import org.apache.fluss.rpc.GatewayClientProxy;
 import org.apache.fluss.rpc.RpcClient;
 import org.apache.fluss.rpc.RpcServer;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
+import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
 import org.apache.fluss.rpc.metrics.ClientMetricGroup;
 import org.apache.fluss.rpc.netty.server.RequestsMetrics;
 import org.apache.fluss.server.ServerBase;
@@ -68,6 +71,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.fluss.config.ConfigOptions.BACKGROUND_THREADS;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucket;
 
 /**
  * Tablet server implementation. The tablet server is responsible to manage 
the log tablet and kv
@@ -79,6 +83,10 @@ public class TabletServer extends ServerBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TabletServer.class);
 
+    // TODO, maybe need to make it configurable
+    private static final int CONTROLLED_SHUTDOWN_MAX_RETRIES = 3;
+    private static final long CONTROLLED_SHUTDOWN_RETRY_INTERVAL_MS = 1000L;
+
     private final int serverId;
 
     /**
@@ -144,6 +152,9 @@ public class TabletServer extends ServerBase {
     @Nullable
     private Authorizer authorizer;
 
+    @GuardedBy("lock")
+    private CoordinatorGateway coordinatorGateway;
+
     public TabletServer(Configuration conf) {
         this(conf, SystemClock.getInstance());
     }
@@ -206,7 +217,7 @@ public class TabletServer extends ServerBase {
                     new ClientMetricGroup(metricRegistry, SERVER_NAME + "-" + 
serverId);
             this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);
 
-            CoordinatorGateway coordinatorGateway =
+            this.coordinatorGateway =
                     GatewayClientProxy.createGatewayProxy(
                             () -> 
metadataCache.getCoordinatorServer(interListenerName),
                             rpcClient,
@@ -261,6 +272,9 @@ public class TabletServer extends ServerBase {
     @Override
     protected CompletableFuture<Result> closeAsync(Result result) {
         if (isShutDown.compareAndSet(false, true)) {
+
+            controlledShutDown();
+
             CompletableFuture<Void> serviceShutdownFuture = stopServices();
 
             serviceShutdownFuture.whenComplete(
@@ -408,6 +422,60 @@ public class TabletServer extends ServerBase {
         }
     }
 
+    private void controlledShutDown() {
+        LOG.info("Starting controlled shutdown.");
+
+        // We request the CoordinatorServer to do a controlled shutdown. On 
failure, we backoff for
+        // a period of time and try again for a number of retries. If all the 
attempt fails, we
+        // simply force the shutdown.
+        boolean shutdownSucceeded = false;
+        int remainingRetries = CONTROLLED_SHUTDOWN_MAX_RETRIES;
+        while (!shutdownSucceeded && remainingRetries > 0) {
+            remainingRetries--;
+
+            ControlledShutdownRequest controlledShutdownRequest =
+                    new ControlledShutdownRequest()
+                            .setTabletServerId(serverId)
+                            .setTabletServerEpoch(-1); // TODO, set correct 
tabletServer epoch.
+            try {
+                ControlledShutdownResponse response =
+                        
coordinatorGateway.controlledShutdown(controlledShutdownRequest).get();
+                if (response.getRemainingLeaderBucketsCount() > 0) {
+                    List<TableBucket> remainingLeaderBuckets = new 
ArrayList<>();
+                    response.getRemainingLeaderBucketsList()
+                            .forEach(
+                                    pbTableBucket ->
+                                            remainingLeaderBuckets.add(
+                                                    
toTableBucket(pbTableBucket)));
+                    LOG.warn(
+                            "TabletServer {} is still the leader for the 
following buckets: {} after Controlled Shutdown",
+                            serverId,
+                            remainingLeaderBuckets);
+                } else {
+                    shutdownSucceeded = true;
+                }
+            } catch (Exception e) {
+                LOG.warn("Failed to do controlled shutdown: {}", 
e.getMessage());
+                // do nothing and retry.
+            }
+
+            if (!shutdownSucceeded && remainingRetries > 0) {
+                try {
+                    Thread.sleep(CONTROLLED_SHUTDOWN_RETRY_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+                LOG.info("Retrying controlled shutdown ({} retries 
remaining).", remainingRetries);
+            }
+        }
+
+        if (!shutdownSucceeded) {
+            LOG.warn(
+                    "Proceeding to do an unclean shutdown as all the 
controlled shutdown attempts failed.");
+        }
+    }
+
     @Override
     protected String getServerName() {
         return SERVER_NAME;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index 823c2bb52..88126f266 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -218,6 +218,17 @@ public class ServerRpcMessageUtils {
                 protoTableBucket.getBucketId());
     }
 
+    public static PbTableBucket fromTableBucket(TableBucket tableBucket) {
+        PbTableBucket pbTableBucket =
+                new PbTableBucket()
+                        .setTableId(tableBucket.getTableId())
+                        .setBucketId(tableBucket.getBucket());
+        if (tableBucket.getPartitionId() != null) {
+            pbTableBucket.setPartitionId(tableBucket.getPartitionId());
+        }
+        return pbTableBucket;
+    }
+
     public static ServerNode toServerNode(PbServerNode pbServerNode, 
ServerType serverType) {
         return new ServerNode(
                 pbServerNode.getNodeId(),
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java
index 6b6f8bc7a..251248138 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java
@@ -70,8 +70,27 @@ public class LeaderAndIsr {
         this.bucketEpoch = bucketEpoch;
     }
 
+    /**
+     * Create a new LeaderAndIsr with the given leader and isr, which means 
the leader changes.
+     *
+     * @param newLeader the new leader replica id
+     * @param newIsr the new isr
+     * @return the new LeaderAndIsr
+     */
     public LeaderAndIsr newLeaderAndIsr(int newLeader, List<Integer> newIsr) {
-        return new LeaderAndIsr(newLeader, leaderEpoch, newIsr, 
coordinatorEpoch, bucketEpoch + 1);
+        return new LeaderAndIsr(
+                newLeader, leaderEpoch + 1, newIsr, coordinatorEpoch, 
bucketEpoch + 1);
+    }
+
+    /**
+     * Create a new LeaderAndIsr with the given isr, which means only the isr 
changes, but the
+     * leader remains the same.
+     *
+     * @param newIsr the new isr
+     * @return the new LeaderAndIsr
+     */
+    public LeaderAndIsr newLeaderAndIsr(List<Integer> newIsr) {
+        return new LeaderAndIsr(leader, leaderEpoch, newIsr, coordinatorEpoch, 
bucketEpoch + 1);
     }
 
     public int leader() {
@@ -98,14 +117,6 @@ public class LeaderAndIsr {
         return bucketEpoch;
     }
 
-    public boolean equalsAllowStalePartitionEpoch(LeaderAndIsr other) {
-        return leader == other.leader
-                && leaderEpoch == other.leaderEpoch
-                && coordinatorEpoch == other.coordinatorEpoch
-                && isr.equals(other.isr)
-                && bucketEpoch <= other.bucketEpoch;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index 09a833e41..79047d8b7 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -315,8 +315,7 @@ class CoordinatorEventProcessorTest {
         client.registerTabletServer(newlyServerId, tabletServerRegistration);
 
         // retry until the tablet server register event is been handled
-        retryVerifyContext(
-                ctx -> 
assertThat(ctx.getLiveTabletServers()).containsKey(newlyServerId));
+        retryVerifyContext(ctx -> 
assertThat(ctx.liveTabletServerSet()).contains(newlyServerId));
 
         initCoordinatorChannel();
         // verify the context has the exact tablet server
@@ -360,7 +359,7 @@ class CoordinatorEventProcessorTest {
 
         // retry until the server has been removed from coordinator context
         retryVerifyContext(
-                ctx -> 
assertThat(ctx.getLiveTabletServers()).doesNotContainKey(newlyServerId));
+                ctx -> 
assertThat(ctx.liveTabletServerSet()).doesNotContain(newlyServerId));
 
         // check replica state
         // all replicas should be online but the replica in the down server
@@ -397,8 +396,7 @@ class CoordinatorEventProcessorTest {
         // assume the server that comes again
         zookeeperClient.registerTabletServer(newlyServerId, 
tabletServerRegistration);
         // retry until the server has been added to coordinator context
-        retryVerifyContext(
-                ctx -> 
assertThat(ctx.getLiveTabletServers()).containsKey(newlyServerId));
+        retryVerifyContext(ctx -> 
assertThat(ctx.liveTabletServerSet()).contains(newlyServerId));
 
         // make sure the bucket that remains in offline should be online again
         // since the server become online
@@ -812,7 +810,6 @@ class CoordinatorEventProcessorTest {
                                         
assertThat(ctx.getBucketLeaderAndIsr(tableBucket))
                                                 .contains(
                                                         
leaderAndIsr.newLeaderAndIsr(
-                                                                
leaderAndIsr.leader(),
                                                                 
leaderAndIsr.isr()))));
 
         // verify the response
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java
index 9d7337ae7..4636a341d 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java
@@ -43,7 +43,7 @@ public class CoordinatorTestUtils {
             TestCoordinatorChannelManager testCoordinatorChannelManager) {
         Map<Integer, TabletServerGateway> gateways =
                 makeTabletServerGateways(
-                        coordinatorContext.getLiveTabletServers().keySet(), 
Collections.emptySet());
+                        coordinatorContext.liveTabletServerSet(), 
Collections.emptySet());
         testCoordinatorChannelManager.setGateways(gateways);
     }
 
@@ -52,8 +52,7 @@ public class CoordinatorTestUtils {
             TestCoordinatorChannelManager testCoordinatorChannelManager,
             Set<Integer> failServers) {
         Map<Integer, TabletServerGateway> gateways =
-                makeTabletServerGateways(
-                        coordinatorContext.getLiveTabletServers().keySet(), 
failServers);
+                
makeTabletServerGateways(coordinatorContext.liveTabletServerSet(), failServers);
         testCoordinatorChannelManager.setGateways(gateways);
     }
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index 1f7509738..38e3f9ea1 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -30,6 +30,8 @@ import 
org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
 import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
 import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
 import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
+import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
 import org.apache.fluss.rpc.messages.CreateAclsRequest;
 import org.apache.fluss.rpc.messages.CreateAclsResponse;
 import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
@@ -296,6 +298,12 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
+            ControlledShutdownRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest 
request) {
         throw new UnsupportedOperationException();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithmsTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithmsTest.java
new file mode 100644
index 000000000..a530a1929
--- /dev/null
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithmsTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.statemachine;
+
+import 
org.apache.fluss.server.coordinator.statemachine.TableBucketStateMachine.ElectionResult;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.controlledShutdownReplicaLeaderElection;
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection;
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.initReplicaLeaderElection;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ReplicaLeaderElectionAlgorithms}. */
+public class ReplicaLeaderElectionAlgorithmsTest {
+
+    @Test
+    void testInitReplicaLeaderElection() {
+        List<Integer> assignments = Arrays.asList(2, 4);
+        List<Integer> liveReplicas = Collections.singletonList(4);
+
+        Optional<ElectionResult> leaderElectionResultOpt =
+                initReplicaLeaderElection(assignments, liveReplicas, 0);
+        assertThat(leaderElectionResultOpt.isPresent()).isTrue();
+        ElectionResult leaderElectionResult = leaderElectionResultOpt.get();
+        
assertThat(leaderElectionResult.getLiveReplicas()).containsExactlyInAnyOrder(4);
+        
assertThat(leaderElectionResult.getLeaderAndIsr().leader()).isEqualTo(4);
+        
assertThat(leaderElectionResult.getLeaderAndIsr().isr()).containsExactlyInAnyOrder(4);
+    }
+
+    @Test
+    void testDefaultReplicaLeaderElection() {
+        List<Integer> assignments = Arrays.asList(2, 4);
+        List<Integer> liveReplicas = Arrays.asList(2, 4);
+        LeaderAndIsr originLeaderAndIsr = new LeaderAndIsr(4, 0, 
Arrays.asList(2, 4), 0, 0);
+
+        Optional<ElectionResult> leaderElectionResultOpt =
+                defaultReplicaLeaderElection(assignments, liveReplicas, 
originLeaderAndIsr);
+        assertThat(leaderElectionResultOpt.isPresent()).isTrue();
+        ElectionResult leaderElectionResult = leaderElectionResultOpt.get();
+        
assertThat(leaderElectionResult.getLiveReplicas()).containsExactlyInAnyOrder(2, 
4);
+        
assertThat(leaderElectionResult.getLeaderAndIsr().leader()).isEqualTo(2);
+        
assertThat(leaderElectionResult.getLeaderAndIsr().isr()).containsExactlyInAnyOrder(2,
 4);
+    }
+
+    @Test
+    void testControlledShutdownReplicaLeaderElection() {
+        List<Integer> assignments = Arrays.asList(2, 4);
+        List<Integer> liveReplicas = Arrays.asList(2, 4);
+        LeaderAndIsr originLeaderAndIsr = new LeaderAndIsr(2, 0, 
Arrays.asList(2, 4), 0, 0);
+        Set<Integer> shutdownTabletServers = Collections.singleton(2);
+
+        Optional<ElectionResult> leaderElectionResultOpt =
+                controlledShutdownReplicaLeaderElection(
+                        assignments, liveReplicas, originLeaderAndIsr, 
shutdownTabletServers);
+        assertThat(leaderElectionResultOpt.isPresent()).isTrue();
+        ElectionResult leaderElectionResult = leaderElectionResultOpt.get();
+        
assertThat(leaderElectionResult.getLiveReplicas()).containsExactlyInAnyOrder(4);
+        
assertThat(leaderElectionResult.getLeaderAndIsr().leader()).isEqualTo(4);
+        
assertThat(leaderElectionResult.getLeaderAndIsr().isr()).containsExactlyInAnyOrder(4);
+    }
+
+    @Test
+    void testControlledShutdownReplicaLeaderElectionLastIsrShuttingDown() {
+        List<Integer> assignments = Arrays.asList(2, 4);
+        List<Integer> liveReplicas = Arrays.asList(2, 4);
+        LeaderAndIsr originLeaderAndIsr =
+                new LeaderAndIsr(2, 0, Collections.singletonList(2), 0, 0);
+        Set<Integer> shutdownTabletServers = Collections.singleton(2);
+
+        Optional<ElectionResult> leaderElectionResultOpt =
+                controlledShutdownReplicaLeaderElection(
+                        assignments, liveReplicas, originLeaderAndIsr, 
shutdownTabletServers);
+        assertThat(leaderElectionResultOpt).isEmpty();
+    }
+
+    @Test
+    void 
testControlledShutdownPartitionLeaderElectionAllIsrSimultaneouslyShutdown() {
+        List<Integer> assignments = Arrays.asList(2, 4);
+        List<Integer> liveReplicas = Arrays.asList(2, 4);
+        LeaderAndIsr originLeaderAndIsr = new LeaderAndIsr(2, 0, 
Arrays.asList(2, 4), 0, 0);
+        Set<Integer> shutdownTabletServers = new HashSet<>(Arrays.asList(2, 
4));
+
+        Optional<ElectionResult> leaderElectionResultOpt =
+                controlledShutdownReplicaLeaderElection(
+                        assignments, liveReplicas, originLeaderAndIsr, 
shutdownTabletServers);
+        assertThat(leaderElectionResultOpt).isEmpty();
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
index 4c3cf1ab4..454ec5de4 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
@@ -223,7 +223,7 @@ class ReplicaStateMachineTest {
         replicaStateMachine.handleStateChanges(replicas, OfflineReplica);
         leaderAndIsr = 
coordinatorContext.getBucketLeaderAndIsr(tableBucket).get();
         assertThat(leaderAndIsr)
-                .isEqualTo(new LeaderAndIsr(LeaderAndIsr.NO_LEADER, 0, 
Arrays.asList(2), 0, 3));
+                .isEqualTo(new LeaderAndIsr(LeaderAndIsr.NO_LEADER, 3, 
Arrays.asList(2), 0, 3));
     }
 
     @Test
@@ -274,7 +274,7 @@ class ReplicaStateMachineTest {
         assertThat(leaderAndIsr)
                 .isEqualTo(
                         new LeaderAndIsr(
-                                LeaderAndIsr.NO_LEADER, 0, 
Collections.singletonList(0), 0, 3));
+                                LeaderAndIsr.NO_LEADER, 1, 
Collections.singletonList(0), 0, 3));
     }
 
     private void toReplicaDeletionStartedState(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
index 9009765e4..6cee33fbe 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
@@ -53,14 +53,19 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
 import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
+import static 
org.apache.fluss.server.coordinator.CoordinatorTestUtils.createServers;
+import static 
org.apache.fluss.server.coordinator.CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess;
 import static 
org.apache.fluss.server.coordinator.statemachine.BucketState.NewBucket;
 import static 
org.apache.fluss.server.coordinator.statemachine.BucketState.NonExistentBucket;
 import static 
org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
 import static 
org.apache.fluss.server.coordinator.statemachine.BucketState.OnlineBucket;
+import static 
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION;
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -121,9 +126,8 @@ class TableBucketStateMachineTest {
         coordinatorContext.putTablePath(t1Id, TablePath.of("db1", "t1"));
         coordinatorContext.putTablePath(t2Id, TablePath.of("db1", "t2"));
 
-        coordinatorContext.setLiveTabletServers(
-                CoordinatorTestUtils.createServers(Arrays.asList(0, 1, 3)));
-        CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
+        coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(0, 
1, 3)));
+        makeSendLeaderAndStopRequestAlwaysSuccess(
                 coordinatorContext, testCoordinatorChannelManager);
         // set assignments
         coordinatorContext.updateBucketReplicaAssignment(t1b0, 
Arrays.asList(0, 1));
@@ -203,9 +207,8 @@ class TableBucketStateMachineTest {
         
assertThat(coordinatorContext.getBucketState(tableBucket)).isEqualTo(NewBucket);
 
         // now, we set 3 live servers
-        coordinatorContext.setLiveTabletServers(
-                CoordinatorTestUtils.createServers(Arrays.asList(0, 1, 2)));
-        CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
+        coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(0, 
1, 2)));
+        makeSendLeaderAndStopRequestAlwaysSuccess(
                 coordinatorContext, testCoordinatorChannelManager);
 
         // change to online again
@@ -217,8 +220,7 @@ class TableBucketStateMachineTest {
 
         // case2: assuming the leader replica fail(we remove it to server 
list),
         // we need elect another replica,
-        coordinatorContext.setLiveTabletServers(
-                CoordinatorTestUtils.createServers(Arrays.asList(1, 2)));
+        coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(1, 
2)));
 
         
tableBucketStateMachine.handleStateChange(Collections.singleton(tableBucket), 
OnlineBucket);
         // check state is online
@@ -230,8 +232,7 @@ class TableBucketStateMachineTest {
 
         // case4: the leader replica fail, but non replicas is available
         coordinatorContext.putBucketState(tableBucket, OfflineBucket);
-        coordinatorContext.setLiveTabletServers(
-                CoordinatorTestUtils.createServers(Collections.emptyList()));
+        
coordinatorContext.setLiveTabletServers(createServers(Collections.emptyList()));
         
tableBucketStateMachine.handleStateChange(Collections.singleton(tableBucket), 
OnlineBucket);
         // the state will still be offline
         
assertThat(coordinatorContext.getBucketState(tableBucket)).isEqualTo(OfflineBucket);
@@ -266,8 +267,7 @@ class TableBucketStateMachineTest {
                         coordinatorContext, coordinatorRequestBatch, 
zookeeperClient);
         eventManager.start();
 
-        coordinatorContext.setLiveTabletServers(
-                CoordinatorTestUtils.createServers(Arrays.asList(0, 1, 2)));
+        coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(0, 
1, 2)));
         CoordinatorTestUtils.makeSendLeaderAndStopRequestFailContext(
                 coordinatorContext, testCoordinatorChannelManager, 
Sets.newHashSet(0, 2));
         // init a table bucket assignment to coordinator context
@@ -319,6 +319,61 @@ class TableBucketStateMachineTest {
         assertThat(coordinatorContext.getBucketState(tableBucket0)).isNull();
     }
 
+    @Test
+    void testStateChangeForTabletServerControlledShutdown() {
+        TableBucketStateMachine tableBucketStateMachine = 
createTableBucketStateMachine();
+        long tableId = 7;
+        TablePath fakeTablePath = TablePath.of("db1", "t2");
+        TableBucket tb = new TableBucket(tableId, 0);
+
+        // init coordinator context.
+        coordinatorContext.putTableInfo(
+                TableInfo.of(
+                        fakeTablePath,
+                        tableId,
+                        0,
+                        DATA1_TABLE_DESCRIPTOR,
+                        System.currentTimeMillis(),
+                        System.currentTimeMillis()));
+        coordinatorContext.putTablePath(tableId, fakeTablePath);
+        coordinatorContext.updateBucketReplicaAssignment(tb, Arrays.asList(0, 
1, 2));
+        coordinatorContext.putBucketState(tb, NewBucket);
+
+        List<Integer> aliveServers = Arrays.asList(0, 1, 2);
+        coordinatorContext.setLiveTabletServers(createServers(aliveServers));
+        makeSendLeaderAndStopRequestAlwaysSuccess(
+                coordinatorContext, testCoordinatorChannelManager);
+
+        // check state is online.
+        tableBucketStateMachine.handleStateChange(Collections.singleton(tb), 
OnlineBucket);
+        
assertThat(coordinatorContext.getBucketState(tb)).isEqualTo(OnlineBucket);
+        assertThat(coordinatorContext.liveTabletServerSet())
+                .containsExactlyInAnyOrderElementsOf(aliveServers);
+        assertThat(coordinatorContext.shuttingDownTabletServers()).isEmpty();
+        assertThat(coordinatorContext.liveOrShuttingDownTabletServers())
+                .containsExactlyInAnyOrderElementsOf(aliveServers);
+
+        int oldLeader = 
coordinatorContext.getBucketLeaderAndIsr(tb).get().leader();
+        aliveServers =
+                aliveServers.stream().filter(s -> s != 
oldLeader).collect(Collectors.toList());
+
+        // trigger controlled shutdown for oldLeader.
+        coordinatorContext.shuttingDownTabletServers().add(oldLeader);
+        assertThat(coordinatorContext.liveTabletServerSet())
+                .containsExactlyInAnyOrderElementsOf(aliveServers);
+        assertThat(coordinatorContext.shuttingDownTabletServers())
+                .containsExactlyInAnyOrder(oldLeader);
+        assertThat(coordinatorContext.liveOrShuttingDownTabletServers())
+                .containsExactlyInAnyOrder(0, 1, 2);
+
+        // handle state change for controlled shutdown.
+        tableBucketStateMachine.handleStateChange(
+                Collections.singleton(tb), OnlineBucket, 
CONTROLLED_SHUTDOWN_ELECTION);
+        
assertThat(coordinatorContext.getBucketState(tb)).isEqualTo(OnlineBucket);
+        assertThat(coordinatorContext.getBucketLeaderAndIsr(tb).get().leader())
+                .isNotEqualTo(oldLeader);
+    }
+
     private TableBucketStateMachine createTableBucketStateMachine() {
         return new TableBucketStateMachine(
                 coordinatorContext, coordinatorRequestBatch, zookeeperClient);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerFailOverITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerShutdownITCase.java
similarity index 77%
rename from 
fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerFailOverITCase.java
rename to 
fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerShutdownITCase.java
index 8388259a5..2102df6d1 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerFailOverITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerShutdownITCase.java
@@ -51,9 +51,8 @@ import static 
org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** The ITCase for tablet server failover. */
-class TabletServerFailOverITCase {
-
+/** The ITCase for tabletServer shutdown (controlled shutdown). */
+public class TabletServerShutdownITCase {
     @RegisterExtension
     public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
             FlussClusterExtension.builder().setNumOfTabletServers(3).build();
@@ -114,7 +113,42 @@ class TabletServerFailOverITCase {
     }
 
     @Test
-    void testKillServers() throws Exception {
+    void testControlledShutdown() throws Exception {
+        FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(Schema.newBuilder().column("a", 
DataTypes.INT()).build())
+                        .distributedBy(1)
+                        .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3)
+                        .build();
+        TablePath tablePath = TablePath.of("test_shutdown", 
"test_controlled_shutdown");
+        long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
tableDescriptor);
+        TableBucket tb = new TableBucket(tableId, 0);
+
+        LeaderAndIsr leaderAndIsr = 
FLUSS_CLUSTER_EXTENSION.waitLeaderAndIsrReady(tb);
+        int leader = leaderAndIsr.leader();
+
+        // test kill the tabletServers with leader on.
+        FLUSS_CLUSTER_EXTENSION.stopTabletServer(leader);
+        ZooKeeperClient zkClient = 
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+
+        // the leader should be removed from isr, and new leader should be 
elected.
+        retry(
+                Duration.ofMinutes(1),
+                () ->
+                        assertThat(zkClient.getLeaderAndIsr(tb))
+                                .map(LeaderAndIsr::leader)
+                                .isNotEqualTo(leader));
+
+        // restart the shutdown server
+        FLUSS_CLUSTER_EXTENSION.startTabletServer(leader, true);
+    }
+
+    @Test
+    void testControlledShutdownRetriesFailover() throws Exception {
+        // This case is to test the scenario that the controlled shutdown 
request is retried and
+        // failed by cannot elect any new leader. In this case the controlled 
shutdown will finally
+        // go uncontrolled shutdown.
         FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);
         TableDescriptor tableDescriptor =
                 TableDescriptor.builder()
@@ -122,7 +156,7 @@ class TabletServerFailOverITCase {
                         .distributedBy(1)
                         .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 2)
                         .build();
-        TablePath tablePath = TablePath.of("test_failover", 
"test_kill_servers");
+        TablePath tablePath = TablePath.of("test_failover", 
"test_controlled_shutdown_failed");
         long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
tableDescriptor);
         TableBucket tb = new TableBucket(tableId, 0);
 
@@ -132,20 +166,22 @@ class TabletServerFailOverITCase {
         isr.remove(Integer.valueOf(leader));
         int follower = isr.get(0);
 
-        // let's kil follower
+        // Let's kil follower. Will go controlled shutdown.
         FLUSS_CLUSTER_EXTENSION.stopTabletServer(follower);
         ZooKeeperClient zkClient = 
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
 
         // the follower should be removed from isr
         LeaderAndIsr expectedLeaderAndIsr1 =
-                leaderAndIsr.newLeaderAndIsr(leader, 
Collections.singletonList(leader));
+                
leaderAndIsr.newLeaderAndIsr(Collections.singletonList(leader));
         retry(
                 Duration.ofMinutes(1),
                 () ->
                         assertThat(zkClient.getLeaderAndIsr(tb).get())
                                 .isEqualTo(expectedLeaderAndIsr1));
 
-        // kill the leader again
+        // kill the leader. As we only have 1 replica, no leader can be 
elected as we send the
+        // controlled shutdown request to the leader. So the controlled 
shutdown will finally go
+        // uncontrolled shutdown.
         FLUSS_CLUSTER_EXTENSION.stopTabletServer(leader);
 
         // should be no leader

Reply via email to