swuferhong commented on code in PR #1159:
URL: https://github.com/apache/fluss/pull/1159#discussion_r2362241778
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java:
##########
@@ -564,49 +583,66 @@ private String stringifyBucket(TableBucket tableBucket) {
}
/**
- * Elect a new leader for new or offline bucket, it'll always elect one
from the live replicas
- * in isr set.
+ * Elect a new leader for bucket, it'll always elect one from the live
replicas in isr set.
+ *
+ * <p>The elect cases including:
+ *
+ * <ol>
+ * <li>new or offline bucket
+ * <li>tabletServer controlled shutdown
+ * </ol>
*/
- private Optional<ElectionResult> leaderForOffline(
- TableBucket tableBucket, LeaderAndIsr leaderAndIsr) {
+ private Optional<ElectionResult> electLeader(
+ TableBucket tableBucket,
+ LeaderAndIsr leaderAndIsr,
+ ReplicaLeaderElectionStrategy electionStrategy) {
List<Integer> assignment =
coordinatorContext.getAssignment(tableBucket);
// filter out the live servers
List<Integer> liveReplicas =
assignment.stream()
- .filter(
- replica ->
-
coordinatorContext.isReplicaAndServerOnline(
- replica, tableBucket))
+ .filter(replica ->
coordinatorContext.isReplicaOnline(replica, tableBucket))
.collect(Collectors.toList());
// we'd like use the first live replica as the new leader
if (liveReplicas.isEmpty()) {
LOG.warn("No any live replica for table bucket {}.",
stringifyBucket(tableBucket));
return Optional.empty();
}
- Optional<Integer> leaderOpt =
- ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection(
- assignment, liveReplicas, leaderAndIsr.isr());
- if (!leaderOpt.isPresent()) {
+ Optional<LeaderElectionResult> resultOpt = Optional.empty();
+ if (electionStrategy == DEFAULT_ELECTION) {
+ resultOpt = defaultReplicaLeaderElection(assignment, liveReplicas,
leaderAndIsr.isr());
+ } else if (electionStrategy == CONTROLLED_SHUTDOWN_ELECTION) {
+ Set<Integer> shuttingDownTabletServers =
coordinatorContext.shuttingDownTabletServers();
+ resultOpt =
+ controlledShutdownReplicaLeaderElection(
+ assignment,
+ leaderAndIsr.isr(),
+ liveReplicas,
+ shuttingDownTabletServers);
+ }
+
+ if (!resultOpt.isPresent()) {
LOG.error(
"The leader election for table bucket {} is empty.",
stringifyBucket(tableBucket));
return Optional.empty();
}
// get the updated leader and isr
+ LeaderElectionResult leaderElectionResult = resultOpt.get();
LeaderAndIsr newLeaderAndIsr =
new LeaderAndIsr(
- leaderOpt.get(),
+ leaderElectionResult.getLeader(),
leaderAndIsr.leaderEpoch() + 1,
- leaderAndIsr.isr(),
+ leaderElectionResult.getNewIsr(),
Review Comment:
I've split `leaderAndIsr.newLeaderAndIsr(int newLeader, List newIsr)` into
two separate methods: one is `newLeaderAndIsr(int newLeader, List newIsr)`,
indicating that the leader has changed, and the other is `newLeaderAndIsr(List
newIsr)`, indicating that the leader remains unchanged and only the ISR has
been updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]