wuchong commented on code in PR #1081:
URL: https://github.com/apache/fluss/pull/1081#discussion_r2296132559
##########
fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/ReplicaStateMachine.java:
##########
@@ -438,25 +439,21 @@ private Map<TableBucketReplica, LeaderAndIsr>
doRemoveReplicaFromIsr(
List<Integer> newIsr =
leaderAndIsr.isr().size() == 1
// don't remove the replica id from isr when isr
size is 1,
- // if isr is empty, we can't elect leader any more
+ // if isr is empty, we can't elect leader anymore
? leaderAndIsr.isr()
: leaderAndIsr.isr().stream()
.filter(id -> id != replicaId)
.collect(Collectors.toList());
LeaderAndIsr adjustLeaderAndIsr =
leaderAndIsr.newLeaderAndIsr(newLeader, newIsr);
- try {
- zooKeeperClient.updateLeaderAndIsr(tableBucket,
adjustLeaderAndIsr);
- } catch (Exception e) {
- LOG.error(
- "Fail to update bucket LeaderAndIsr for table bucket
{} of table {}.",
- tableBucket,
-
coordinatorContext.getTablePathById(tableBucket.getTableId()),
- e);
- continue;
- }
- // update leader and isr
- coordinatorContext.putBucketLeaderAndIsr(tableBucket,
adjustLeaderAndIsr);
adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
+ toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
+ }
+ try {
+ zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList);
+
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
Review Comment:
As we moved the `coordinatorContext` updates at the end, this will change
the logic of this method, because it always gets the current `leaderAndIsr`
from `coordinatorContext`.
For example, given we have `(leader=1, isr=[1,2,3])` in zk and
`coordinatorContext`, and the replicas to offline are `[1,2]`. When we process
offline replica `1`, it puts `(leader=-1, isr=[2,3])` to the temporary map.
When we process offline replica `2`, the `leaderAndIsr` in `coordinatorContext`
is still `(leader=1, isr=[1,2,3])`, it puts `(leader=1, isr=[1,3])` to the
temporary map and overrides the previous `(leader=-1, isr=[2,3])`. In the end,
we get the wrong state in zk and `coordinatorContext`.
We need to get the current `leaderAndIsr` from the
`toUpdateLeaderAndIsrList` first, and then get from `coordinatorContext` if not
found.
Please also add such a unit test in `ReplicaStateMachineTest` to cover this
case.
##########
fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java:
##########
@@ -211,14 +242,44 @@ void testBatchCreateLeaderAndIsr() throws Exception {
new RegisterTableBucketLeadAndIsrInfo(
tableBucket, leaderAndIsr, "partition" + i, null));
}
-
+ // batch create
zookeeperClient.batchRegisterLeaderAndIsrForTablePartition(partitionTableBucket);
for (int i = 0; i < 100; i++) {
+ // each should register successful
Optional<LeaderAndIsr> optionalLeaderAndIsr =
zookeeperClient.getLeaderAndIsr(partitionTableBucket.get(i).getTableBucket());
assertThat(optionalLeaderAndIsr.isPresent()).isTrue();
assertThat(optionalLeaderAndIsr.get()).isIn(partitionleaderAndIsrList);
}
+
+ Map<TableBucket, LeaderAndIsr> partitionUpdateMap =
Review Comment:
This method is quite long. Please consider to move the partitioned table
test to a new method as there is nothing reused with the non-partitioned test.
If you can unify the test code using `@ParameterizedTest`, that would be great.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]