swuferhong commented on code in PR #1159:
URL: https://github.com/apache/fluss/pull/1159#discussion_r2362241778


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java:
##########
@@ -564,49 +583,66 @@ private String stringifyBucket(TableBucket tableBucket) {
     }
 
     /**
-     * Elect a new leader for new or offline bucket, it'll always elect one 
from the live replicas
-     * in isr set.
+     * Elect a new leader for bucket, it'll always elect one from the live 
replicas in isr set.
+     *
+     * <p>The elect cases including:
+     *
+     * <ol>
+     *   <li>new or offline bucket
+     *   <li>tabletServer controlled shutdown
+     * </ol>
      */
-    private Optional<ElectionResult> leaderForOffline(
-            TableBucket tableBucket, LeaderAndIsr leaderAndIsr) {
+    private Optional<ElectionResult> electLeader(
+            TableBucket tableBucket,
+            LeaderAndIsr leaderAndIsr,
+            ReplicaLeaderElectionStrategy electionStrategy) {
         List<Integer> assignment = 
coordinatorContext.getAssignment(tableBucket);
         // filter out the live servers
         List<Integer> liveReplicas =
                 assignment.stream()
-                        .filter(
-                                replica ->
-                                        
coordinatorContext.isReplicaAndServerOnline(
-                                                replica, tableBucket))
+                        .filter(replica -> 
coordinatorContext.isReplicaOnline(replica, tableBucket))
                         .collect(Collectors.toList());
         // we'd like use the first live replica as the new leader
         if (liveReplicas.isEmpty()) {
             LOG.warn("No any live replica for table bucket {}.", 
stringifyBucket(tableBucket));
             return Optional.empty();
         }
 
-        Optional<Integer> leaderOpt =
-                ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection(
-                        assignment, liveReplicas, leaderAndIsr.isr());
-        if (!leaderOpt.isPresent()) {
+        Optional<LeaderElectionResult> resultOpt = Optional.empty();
+        if (electionStrategy == DEFAULT_ELECTION) {
+            resultOpt = defaultReplicaLeaderElection(assignment, liveReplicas, 
leaderAndIsr.isr());
+        } else if (electionStrategy == CONTROLLED_SHUTDOWN_ELECTION) {
+            Set<Integer> shuttingDownTabletServers = 
coordinatorContext.shuttingDownTabletServers();
+            resultOpt =
+                    controlledShutdownReplicaLeaderElection(
+                            assignment,
+                            leaderAndIsr.isr(),
+                            liveReplicas,
+                            shuttingDownTabletServers);
+        }
+
+        if (!resultOpt.isPresent()) {
             LOG.error(
                     "The leader election for table bucket {} is empty.",
                     stringifyBucket(tableBucket));
             return Optional.empty();
         }
 
         // get the updated leader and isr
+        LeaderElectionResult leaderElectionResult = resultOpt.get();
         LeaderAndIsr newLeaderAndIsr =
                 new LeaderAndIsr(
-                        leaderOpt.get(),
+                        leaderElectionResult.getLeader(),
                         leaderAndIsr.leaderEpoch() + 1,
-                        leaderAndIsr.isr(),
+                        leaderElectionResult.getNewIsr(),

Review Comment:
   I've split `leaderAndIsr.newLeaderAndIsr(int newLeader, List newIsr)` into 
two separate methods: one is `newLeaderAndIsr(int newLeader, List newIsr)`, 
indicating that the leader has changed, and the other is `newLeaderAndIsr(List 
newIsr)`, indicating that the leader remains unchanged and only the ISR has 
been updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to