wuchong commented on code in PR #1159:
URL: https://github.com/apache/fluss/pull/1159#discussion_r2362038912
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java:
##########
@@ -17,21 +17,64 @@
package org.apache.fluss.server.coordinator.statemachine;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** The algorithms to elect the replica leader. */
public class ReplicaLeaderElectionAlgorithms {
- public static Optional<Integer> defaultReplicaLeaderElection(
+ public static Optional<LeaderElectionResult> defaultReplicaLeaderElection(
List<Integer> assignments, List<Integer> aliveReplicas,
List<Integer> isr) {
// currently, we always use the first replica in assignment, which
also in aliveReplicas and
// isr as the leader replica.
for (int assignment : assignments) {
if (aliveReplicas.contains(assignment) &&
isr.contains(assignment)) {
- return Optional.of(assignment);
+ return Optional.of(new LeaderElectionResult(assignment, isr));
}
}
return Optional.empty();
}
+
+ public static Optional<LeaderElectionResult>
controlledShutdownReplicaLeaderElection(
Review Comment:
I think we need to return the type of
`TableBucketStateMachine.ElectionResult` which includes `List<Integer>
liveReplicas`. Because the `liveReplica` should exclude `shutdownTabletServers`.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java:
##########
@@ -564,49 +583,66 @@ private String stringifyBucket(TableBucket tableBucket) {
}
/**
- * Elect a new leader for new or offline bucket, it'll always elect one
from the live replicas
- * in isr set.
+ * Elect a new leader for bucket, it'll always elect one from the live
replicas in isr set.
+ *
+ * <p>The elect cases including:
+ *
+ * <ol>
+ * <li>new or offline bucket
+ * <li>tabletServer controlled shutdown
+ * </ol>
*/
- private Optional<ElectionResult> leaderForOffline(
- TableBucket tableBucket, LeaderAndIsr leaderAndIsr) {
+ private Optional<ElectionResult> electLeader(
+ TableBucket tableBucket,
+ LeaderAndIsr leaderAndIsr,
+ ReplicaLeaderElectionStrategy electionStrategy) {
List<Integer> assignment =
coordinatorContext.getAssignment(tableBucket);
// filter out the live servers
List<Integer> liveReplicas =
assignment.stream()
- .filter(
- replica ->
-
coordinatorContext.isReplicaAndServerOnline(
- replica, tableBucket))
+ .filter(replica ->
coordinatorContext.isReplicaOnline(replica, tableBucket))
.collect(Collectors.toList());
// we'd like use the first live replica as the new leader
if (liveReplicas.isEmpty()) {
LOG.warn("No any live replica for table bucket {}.",
stringifyBucket(tableBucket));
return Optional.empty();
}
- Optional<Integer> leaderOpt =
- ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection(
- assignment, liveReplicas, leaderAndIsr.isr());
- if (!leaderOpt.isPresent()) {
+ Optional<LeaderElectionResult> resultOpt = Optional.empty();
+ if (electionStrategy == DEFAULT_ELECTION) {
+ resultOpt = defaultReplicaLeaderElection(assignment, liveReplicas,
leaderAndIsr.isr());
+ } else if (electionStrategy == CONTROLLED_SHUTDOWN_ELECTION) {
+ Set<Integer> shuttingDownTabletServers =
coordinatorContext.shuttingDownTabletServers();
+ resultOpt =
+ controlledShutdownReplicaLeaderElection(
+ assignment,
+ leaderAndIsr.isr(),
+ liveReplicas,
+ shuttingDownTabletServers);
+ }
+
+ if (!resultOpt.isPresent()) {
LOG.error(
"The leader election for table bucket {} is empty.",
stringifyBucket(tableBucket));
return Optional.empty();
}
// get the updated leader and isr
+ LeaderElectionResult leaderElectionResult = resultOpt.get();
LeaderAndIsr newLeaderAndIsr =
new LeaderAndIsr(
- leaderOpt.get(),
+ leaderElectionResult.getLeader(),
leaderAndIsr.leaderEpoch() + 1,
- leaderAndIsr.isr(),
+ leaderElectionResult.getNewIsr(),
Review Comment:
Why not using the method `leaderAndIsr.newLeaderAndIsr(...)`? (why the
`leaderEpoch` not increase in the method)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]