This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new e28e95496 [server] Servers with PERMANENT_OFFLINE tag are no longer
assigned replicas (#1538)
e28e95496 is described below
commit e28e9549608186f87ad2685700158576dae1b924
Author: Liebing <[email protected]>
AuthorDate: Wed Jan 7 15:43:48 2026 +0800
[server] Servers with PERMANENT_OFFLINE tag are no longer assigned replicas
(#1538)
* [server] Servers with PERMANENT_OFFLINE tag are no longer assigned
replicas
* fix comment
* fix comment
---
.../coordinator/CoordinatorEventProcessor.java | 21 +++++++++--
.../server/metadata/CoordinatorMetadataCache.java | 42 +++++++++++++++++++---
.../metadata/CoordinatorMetadataCacheTest.java | 14 +++++++-
3 files changed, 69 insertions(+), 8 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index 471c62f7d..bd709262c 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -248,7 +248,9 @@ public class CoordinatorEventProcessor implements
EventProcessor {
HashSet<ServerInfo> tabletServerInfoList =
new
HashSet<>(coordinatorContext.getLiveTabletServers().values());
serverMetadataCache.updateMetadata(
- coordinatorContext.getCoordinatorServerInfo(),
tabletServerInfoList);
+ coordinatorContext.getCoordinatorServerInfo(),
+ tabletServerInfoList,
+ coordinatorContext.getServerTags());
updateTabletServerMetadataCacheWhenStartup(tabletServerInfoList);
// start table manager
@@ -907,7 +909,8 @@ public class CoordinatorEventProcessor implements
EventProcessor {
// update coordinatorServer metadata cache for the new added table
server.
serverMetadataCache.updateMetadata(
coordinatorContext.getCoordinatorServerInfo(),
- new
HashSet<>(coordinatorContext.getLiveTabletServers().values()));
+ new
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ coordinatorContext.getServerTags());
// update server info for all tablet servers.
updateTabletServerMetadataCache(
new
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
@@ -964,7 +967,9 @@ public class CoordinatorEventProcessor implements
EventProcessor {
new
HashSet<>(coordinatorContext.getLiveTabletServers().values());
// update coordinatorServer metadata cache.
serverMetadataCache.updateMetadata(
- coordinatorContext.getCoordinatorServerInfo(), serverInfos);
+ coordinatorContext.getCoordinatorServerInfo(),
+ serverInfos,
+ coordinatorContext.getServerTags());
updateTabletServerMetadataCache(serverInfos, null, null,
Collections.emptySet());
TableBucketStateMachine tableBucketStateMachine =
tableManager.getTableBucketStateMachine();
@@ -1044,6 +1049,11 @@ public class CoordinatorEventProcessor implements
EventProcessor {
// Then update coordinatorContext.
serverIds.forEach(serverId ->
coordinatorContext.putServerTag(serverId, serverTag));
+ // update coordinatorServer metadata cache for the new added serverTag
+ serverMetadataCache.updateMetadata(
+ coordinatorContext.getCoordinatorServerInfo(),
+ new
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ coordinatorContext.getServerTags());
return addServerTagResponse;
}
@@ -1093,6 +1103,11 @@ public class CoordinatorEventProcessor implements
EventProcessor {
// Then update coordinatorContext.
serverIds.forEach(coordinatorContext::removeServerTag);
+ // update coordinatorServer metadata cache for the new removed
serverTag
+ serverMetadataCache.updateMetadata(
+ coordinatorContext.getCoordinatorServerInfo(),
+ new
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ coordinatorContext.getServerTags());
return removeServerTagResponse;
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataCache.java
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataCache.java
index c90710e53..013b8759f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataCache.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataCache.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.metadata;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.TabletServerInfo;
+import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.server.coordinator.CoordinatorServer;
import javax.annotation.Nullable;
@@ -27,11 +28,13 @@ import javax.annotation.concurrent.GuardedBy;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
@@ -42,7 +45,7 @@ public class CoordinatorMetadataCache implements
ServerMetadataCache {
@GuardedBy("metadataLock")
private volatile NodeMetadataSnapshot metadataSnapshot =
- new NodeMetadataSnapshot(null, Collections.emptyMap());
+ new NodeMetadataSnapshot(null, Collections.emptyMap(),
Collections.emptyMap());
public CoordinatorMetadataCache() {}
@@ -91,7 +94,34 @@ public class CoordinatorMetadataCache implements
ServerMetadataCache {
return Collections.unmodifiableSet(tabletServerInfos);
}
- public void updateMetadata(ServerInfo coordinatorServer, Set<ServerInfo>
serverInfoSet) {
+ /**
+ * Servers with {@code PERMANENT_OFFLINE} tags are no longer returned
here. So that no new
+ * replicas will be assigned to these servers.
+ */
+ @Override
+ public TabletServerInfo[] getLiveServers() {
+ Set<TabletServerInfo> aliveTabletServerInfosWithoutOfflineServerTag =
+ getAliveTabletServerInfos().stream()
+ .filter(
+ info -> {
+ ServerTag tag =
metadataSnapshot.serverTags.get(info.getId());
+ return tag != ServerTag.PERMANENT_OFFLINE;
+ })
+ .collect(Collectors.toSet());
+ TabletServerInfo[] server =
+ new
TabletServerInfo[aliveTabletServerInfosWithoutOfflineServerTag.size()];
+ Iterator<TabletServerInfo> iterator =
+ aliveTabletServerInfosWithoutOfflineServerTag.iterator();
+ for (int i = 0; i <
aliveTabletServerInfosWithoutOfflineServerTag.size(); i++) {
+ server[i] = iterator.next();
+ }
+ return server;
+ }
+
+ public void updateMetadata(
+ ServerInfo coordinatorServer,
+ Set<ServerInfo> serverInfoSet,
+ Map<Integer, ServerTag> serverTagMap) {
inLock(
metadataLock,
() -> {
@@ -101,19 +131,23 @@ public class CoordinatorMetadataCache implements
ServerMetadataCache {
}
this.metadataSnapshot =
- new NodeMetadataSnapshot(coordinatorServer,
newAliveTableServers);
+ new NodeMetadataSnapshot(
+ coordinatorServer, newAliveTableServers,
serverTagMap);
});
}
private static class NodeMetadataSnapshot {
final @Nullable ServerInfo coordinatorServer;
final Map<Integer, ServerInfo> aliveTabletServers;
+ final Map<Integer, ServerTag> serverTags;
private NodeMetadataSnapshot(
@Nullable ServerInfo coordinatorServer,
- Map<Integer, ServerInfo> aliveTabletServers) {
+ Map<Integer, ServerInfo> aliveTabletServers,
+ Map<Integer, ServerTag> serverTags) {
this.coordinatorServer = coordinatorServer;
this.aliveTabletServers = aliveTabletServers;
+ this.serverTags = serverTags;
}
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/CoordinatorMetadataCacheTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/CoordinatorMetadataCacheTest.java
index 13a7b4f26..046f3396d 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/CoordinatorMetadataCacheTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/CoordinatorMetadataCacheTest.java
@@ -20,12 +20,15 @@ package org.apache.fluss.server.metadata;
import org.apache.fluss.cluster.Endpoint;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.cluster.TabletServerInfo;
+import org.apache.fluss.cluster.rebalance.ServerTag;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
@@ -36,6 +39,7 @@ public class CoordinatorMetadataCacheTest {
private ServerInfo coordinatorServer;
private Set<ServerInfo> aliveTableServers;
+ private Map<Integer, ServerTag> serverTagMap;
@BeforeEach
public void setup() {
@@ -68,11 +72,15 @@ public class CoordinatorMetadataCacheTest {
"rack2",
Endpoint.fromListenersString("INTERNAL://localhost:104"),
ServerType.TABLET_SERVER)));
+
+ serverTagMap = new HashMap<>();
+ serverTagMap.put(0, ServerTag.PERMANENT_OFFLINE);
+ serverTagMap.put(1, ServerTag.TEMPORARY_OFFLINE);
}
@Test
void testCoordinatorServerMetadataCache() {
- serverMetadataCache.updateMetadata(coordinatorServer,
aliveTableServers);
+ serverMetadataCache.updateMetadata(coordinatorServer,
aliveTableServers, serverTagMap);
assertThat(serverMetadataCache.getCoordinatorServer("CLIENT"))
.isEqualTo(coordinatorServer.node("CLIENT"));
assertThat(serverMetadataCache.getCoordinatorServer("INTERNAL"))
@@ -85,5 +93,9 @@ public class CoordinatorMetadataCacheTest {
new TabletServerInfo(0, "rack0"),
new TabletServerInfo(1, "rack1"),
new TabletServerInfo(2, "rack2"));
+ // server 0 with PERMANENT_OFFLINE tag will no longer consider alive
+ assertThat(serverMetadataCache.getLiveServers())
+ .containsExactlyInAnyOrder(
+ new TabletServerInfo(1, "rack1"), new
TabletServerInfo(2, "rack2"));
}
}