This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 640f40c6d [server] Coordinator should reject the AdjustIsr request if
the adjusted isr contains shutdown TabletServers (#1734)
640f40c6d is described below
commit 640f40c6d7cda1da80e9d5a25ce37a5d12e66190
Author: yunhong <[email protected]>
AuthorDate: Tue Sep 23 11:25:48 2025 +0800
[server] Coordinator should reject the AdjustIsr request if the adjusted
isr contains shutdown TabletServers (#1734)
---
.../exception/IneligibleReplicaException.java | 28 ++++++++++++++++++
.../java/org/apache/fluss/rpc/protocol/Errors.java | 7 ++++-
.../coordinator/CoordinatorEventProcessor.java | 15 ++++++++++
.../org/apache/fluss/server/replica/Replica.java | 1 +
.../apache/fluss/server/zk/ZooKeeperClient.java | 5 ++++
.../server/coordinator/TestCoordinatorGateway.java | 29 ++++++++++++++++++-
.../apache/fluss/server/replica/AdjustIsrTest.java | 33 +++++++++++++++++++++-
7 files changed, 115 insertions(+), 3 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/exception/IneligibleReplicaException.java
b/fluss-common/src/main/java/org/apache/fluss/exception/IneligibleReplicaException.java
new file mode 100644
index 000000000..e21736b3a
--- /dev/null
+++
b/fluss-common/src/main/java/org/apache/fluss/exception/IneligibleReplicaException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/** Exception for ineligible replica. */
+public class IneligibleReplicaException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public IneligibleReplicaException(String message) {
+ super(message);
+ }
+}
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
index 631047eac..5ec892a69 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
@@ -28,6 +28,7 @@ import org.apache.fluss.exception.DatabaseNotExistException;
import org.apache.fluss.exception.DuplicateSequenceException;
import org.apache.fluss.exception.FencedLeaderEpochException;
import org.apache.fluss.exception.FencedTieringEpochException;
+import org.apache.fluss.exception.IneligibleReplicaException;
import org.apache.fluss.exception.InvalidColumnProjectionException;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidCoordinatorException;
@@ -217,7 +218,11 @@ public enum Errors {
LAKE_SNAPSHOT_NOT_EXIST(
53, "The lake snapshot is not exist.",
LakeTableSnapshotNotExistException::new),
LAKE_TABLE_ALREADY_EXIST(
- 54, "The lake table already exists.",
LakeTableAlreadyExistException::new);
+ 54, "The lake table already exists.",
LakeTableAlreadyExistException::new),
+ INELIGIBLE_REPLICA_EXCEPTION(
+ 55,
+ "The new ISR contains at least one ineligible replica.",
+ IneligibleReplicaException::new);
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index bab477d20..3f0ee3cbc 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FencedLeaderEpochException;
import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.IneligibleReplicaException;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidUpdateVersionException;
import org.apache.fluss.exception.TabletServerNotAvailableException;
@@ -1026,6 +1027,20 @@ public class CoordinatorEventProcessor implements
EventProcessor {
// that this node is not the leader.
throw new InvalidUpdateVersionException(
"The request bucket epoch in adjust isr request is
lower than current bucket epoch in coordinator.");
+ } else {
+ // Check if the new ISR are all ineligible replicas (doesn't
contain any shutting
+ // down tabletServers).
+ Set<Integer> ineligibleReplicas = new
HashSet<>(newLeaderAndIsr.isr());
+
ineligibleReplicas.removeAll(coordinatorContext.liveTabletServerSet());
+ if (!ineligibleReplicas.isEmpty()) {
+ String errorMsg =
+ String.format(
+ "Rejecting adjustIsr request for table
bucket %s because it "
+ + "specified ineligible replicas
%s in the new ISR %s",
+ tableBucket, ineligibleReplicas,
newLeaderAndIsr.isr());
+ LOG.info(errorMsg);
+ throw new IneligibleReplicaException(errorMsg);
+ }
}
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 90c1630f8..e4e25de80 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -1649,6 +1649,7 @@ public final class Replica {
failedIsrUpdates.inc();
switch (error) {
case OPERATION_NOT_ATTEMPTED_EXCEPTION:
+ case INELIGIBLE_REPLICA_EXCEPTION:
// Care must be taken when resetting to the last committed
state since we may not
// know in general whether the request was applied or not
taking into account
// retries and controller changes which might have occurred
before we received the
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index 98654972a..7173d8cdf 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -408,6 +408,10 @@ public class ZooKeeperClient implements AutoCloseable {
.forPath(bucketsParentPath);
for (RegisterTableBucketLeadAndIsrInfo info : registerList) {
+ LOG.info(
+ "Batch Register {} for bucket {} in Zookeeper.",
+ info.getLeaderAndIsr(),
+ info.getTableBucket());
byte[] data = LeaderAndIsrZNode.encode(info.getLeaderAndIsr());
// create direct parent node
CuratorOp parentNodeCreate =
@@ -487,6 +491,7 @@ public class ZooKeeperClient implements AutoCloseable {
TableBucket tableBucket = entry.getKey();
LeaderAndIsr leaderAndIsr = entry.getValue();
+ LOG.info("Batch Update {} for bucket {} in Zookeeper.",
leaderAndIsr, tableBucket);
String path = LeaderAndIsrZNode.path(tableBucket);
byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
CuratorOp updateOp =
zkClient.transactionOp().setData().forPath(path, data);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index 38e3f9ea1..5ef93cd71 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -18,6 +18,7 @@
package org.apache.fluss.server.coordinator;
import org.apache.fluss.exception.FencedLeaderEpochException;
+import org.apache.fluss.exception.IneligibleReplicaException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
@@ -89,8 +90,10 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -105,6 +108,7 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
private final @Nullable ZooKeeperClient zkClient;
public final AtomicBoolean commitRemoteLogManifestFail = new
AtomicBoolean(false);
public final Map<TableBucket, Integer> currentLeaderEpoch = new
HashMap<>();
+ private Set<Integer> shutdownTabletServers;
public TestCoordinatorGateway() {
this(null);
@@ -112,6 +116,7 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
public TestCoordinatorGateway(ZooKeeperClient zkClient) {
this.zkClient = zkClient;
+ this.shutdownTabletServers = new HashSet<>();
}
@Override
@@ -230,7 +235,12 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
(tb, leaderAndIsr) -> {
Integer currentLeaderEpoch =
this.currentLeaderEpoch.getOrDefault(tb, 0);
int requestLeaderEpoch = leaderAndIsr.leaderEpoch();
-
+ Set<Integer> ineligibleReplicas = new HashSet<>();
+ for (int replica : leaderAndIsr.isr()) {
+ if (shutdownTabletServers.contains(replica)) {
+ ineligibleReplicas.add(replica);
+ }
+ }
AdjustIsrResultForBucket adjustIsrResultForBucket;
if (requestLeaderEpoch < currentLeaderEpoch) {
adjustIsrResultForBucket =
@@ -239,6 +249,19 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
ApiError.fromThrowable(
new FencedLeaderEpochException(
"request leader epoch
is fenced.")));
+ } else if (!ineligibleReplicas.isEmpty()) {
+ adjustIsrResultForBucket =
+ new AdjustIsrResultForBucket(
+ tb,
+ ApiError.fromThrowable(
+ new IneligibleReplicaException(
+ String.format(
+ "Rejecting
adjustIsr request for table bucket %s because it "
+ +
"specified ineligible replicas %s in the new ISR %s",
+ tb,
+
ineligibleReplicas,
+
leaderAndIsr))));
+
} else {
adjustIsrResultForBucket =
new AdjustIsrResultForBucket(
@@ -322,4 +345,8 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
public void setCurrentLeaderEpoch(TableBucket tableBucket, int
leaderEpoch) {
currentLeaderEpoch.put(tableBucket, leaderEpoch);
}
+
+ public void setShutdownTabletServers(Set<Integer> shutdownTabletServers) {
+ this.shutdownTabletServers = shutdownTabletServers;
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
index f90561485..64eade999 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.replica;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FencedLeaderEpochException;
+import org.apache.fluss.exception.IneligibleReplicaException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
import org.apache.fluss.server.entity.FetchReqInfo;
@@ -118,7 +119,7 @@ public class AdjustIsrTest extends ReplicaTestBase {
}
@Test
- void testSubmitShrinkIsrAsLeaderFenced() throws Exception {
+ void testSubmitShrinkIsrAsLeaderFenced() {
// replica set is 1,2,3 , isr set is 1,2,3.
TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
makeLogTableAsLeader(tb, Arrays.asList(1, 2, 3), Arrays.asList(1, 2,
3), false);
@@ -144,4 +145,34 @@ public class AdjustIsrTest extends ReplicaTestBase {
.isInstanceOf(FencedLeaderEpochException.class)
.hasMessageContaining("request leader epoch is fenced.");
}
+
+ @Test
+ void testSubmitShrinkIsrAsServerAlreadyShutdown() {
+ // replica set is 1,2,3 , isr set is 1,2,3.
+ TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
+ makeLogTableAsLeader(tb, Arrays.asList(1, 2, 3), Arrays.asList(1, 2,
3), false);
+
+ Replica replica = replicaManager.getReplicaOrException(tb);
+ assertThat(replica.getIsr()).containsExactlyInAnyOrder(1, 2, 3);
+
+ // To mock we prepare an isr shrink in Replica#maybeShrinkIsr();
+ IsrState.PendingShrinkIsrState pendingShrinkIsrState =
+ replica.prepareIsrShrink(
+ new IsrState.CommittedIsrState(Arrays.asList(1, 2, 3)),
+ Arrays.asList(1, 2),
+ Collections.singletonList(3));
+
+ // Set tabletServer-2 as shutdown tabletServers to mock server already
shutdown.
+
testCoordinatorGateway.setShutdownTabletServers(Collections.singleton(2));
+ assertThatThrownBy(
+ () ->
+ replica.submitAdjustIsr(pendingShrinkIsrState)
+ .get(1, TimeUnit.MINUTES))
+ .rootCause()
+ .isInstanceOf(IneligibleReplicaException.class)
+ .hasMessage(
+ "Rejecting adjustIsr request for table bucket "
+ + "TableBucket{tableId=150001, bucket=1}
because it specified ineligible replicas [2] "
+ + "in the new ISR LeaderAndIsr{leader=1,
leaderEpoch=0, isr=[1, 2], coordinatorEpoch=0, bucketEpoch=0}");
+ }
}