This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new e28e95496 [server] Servers with PERMANENT_OFFLINE tag are no longer 
assigned replicas (#1538)
e28e95496 is described below

commit e28e9549608186f87ad2685700158576dae1b924
Author: Liebing <[email protected]>
AuthorDate: Wed Jan 7 15:43:48 2026 +0800

    [server] Servers with PERMANENT_OFFLINE tag are no longer assigned replicas 
(#1538)
    
    * [server] Servers with PERMANENT_OFFLINE tag are no longer assigned 
replicas
    
    * fix comment
    
    * fix comment
---
 .../coordinator/CoordinatorEventProcessor.java     | 21 +++++++++--
 .../server/metadata/CoordinatorMetadataCache.java  | 42 +++++++++++++++++++---
 .../metadata/CoordinatorMetadataCacheTest.java     | 14 +++++++-
 3 files changed, 69 insertions(+), 8 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index 471c62f7d..bd709262c 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -248,7 +248,9 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         HashSet<ServerInfo> tabletServerInfoList =
                 new 
HashSet<>(coordinatorContext.getLiveTabletServers().values());
         serverMetadataCache.updateMetadata(
-                coordinatorContext.getCoordinatorServerInfo(), 
tabletServerInfoList);
+                coordinatorContext.getCoordinatorServerInfo(),
+                tabletServerInfoList,
+                coordinatorContext.getServerTags());
         updateTabletServerMetadataCacheWhenStartup(tabletServerInfoList);
 
         // start table manager
@@ -907,7 +909,8 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
         // update coordinatorServer metadata cache for the new added table 
server.
         serverMetadataCache.updateMetadata(
                 coordinatorContext.getCoordinatorServerInfo(),
-                new 
HashSet<>(coordinatorContext.getLiveTabletServers().values()));
+                new 
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+                coordinatorContext.getServerTags());
         // update server info for all tablet servers.
         updateTabletServerMetadataCache(
                 new 
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
@@ -964,7 +967,9 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                 new 
HashSet<>(coordinatorContext.getLiveTabletServers().values());
         // update coordinatorServer metadata cache.
         serverMetadataCache.updateMetadata(
-                coordinatorContext.getCoordinatorServerInfo(), serverInfos);
+                coordinatorContext.getCoordinatorServerInfo(),
+                serverInfos,
+                coordinatorContext.getServerTags());
         updateTabletServerMetadataCache(serverInfos, null, null, 
Collections.emptySet());
 
         TableBucketStateMachine tableBucketStateMachine = 
tableManager.getTableBucketStateMachine();
@@ -1044,6 +1049,11 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
 
         // Then update coordinatorContext.
         serverIds.forEach(serverId -> 
coordinatorContext.putServerTag(serverId, serverTag));
+        // update coordinatorServer metadata cache for the new added serverTag
+        serverMetadataCache.updateMetadata(
+                coordinatorContext.getCoordinatorServerInfo(),
+                new 
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+                coordinatorContext.getServerTags());
 
         return addServerTagResponse;
     }
@@ -1093,6 +1103,11 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
 
         // Then update coordinatorContext.
         serverIds.forEach(coordinatorContext::removeServerTag);
+        // update coordinatorServer metadata cache for the new removed 
serverTag
+        serverMetadataCache.updateMetadata(
+                coordinatorContext.getCoordinatorServerInfo(),
+                new 
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+                coordinatorContext.getServerTags());
 
         return removeServerTagResponse;
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataCache.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataCache.java
index c90710e53..013b8759f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataCache.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metadata/CoordinatorMetadataCache.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.metadata;
 
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.cluster.TabletServerInfo;
+import org.apache.fluss.cluster.rebalance.ServerTag;
 import org.apache.fluss.server.coordinator.CoordinatorServer;
 
 import javax.annotation.Nullable;
@@ -27,11 +28,13 @@ import javax.annotation.concurrent.GuardedBy;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
 
@@ -42,7 +45,7 @@ public class CoordinatorMetadataCache implements 
ServerMetadataCache {
 
     @GuardedBy("metadataLock")
     private volatile NodeMetadataSnapshot metadataSnapshot =
-            new NodeMetadataSnapshot(null, Collections.emptyMap());
+            new NodeMetadataSnapshot(null, Collections.emptyMap(), 
Collections.emptyMap());
 
     public CoordinatorMetadataCache() {}
 
@@ -91,7 +94,34 @@ public class CoordinatorMetadataCache implements 
ServerMetadataCache {
         return Collections.unmodifiableSet(tabletServerInfos);
     }
 
-    public void updateMetadata(ServerInfo coordinatorServer, Set<ServerInfo> 
serverInfoSet) {
+    /**
+     * Servers with {@code PERMANENT_OFFLINE} tags are no longer returned 
here. So that no new
+     * replicas will be assigned to these servers.
+     */
+    @Override
+    public TabletServerInfo[] getLiveServers() {
+        Set<TabletServerInfo> aliveTabletServerInfosWithoutOfflineServerTag =
+                getAliveTabletServerInfos().stream()
+                        .filter(
+                                info -> {
+                                    ServerTag tag = 
metadataSnapshot.serverTags.get(info.getId());
+                                    return tag != ServerTag.PERMANENT_OFFLINE;
+                                })
+                        .collect(Collectors.toSet());
+        TabletServerInfo[] server =
+                new 
TabletServerInfo[aliveTabletServerInfosWithoutOfflineServerTag.size()];
+        Iterator<TabletServerInfo> iterator =
+                aliveTabletServerInfosWithoutOfflineServerTag.iterator();
+        for (int i = 0; i < 
aliveTabletServerInfosWithoutOfflineServerTag.size(); i++) {
+            server[i] = iterator.next();
+        }
+        return server;
+    }
+
+    public void updateMetadata(
+            ServerInfo coordinatorServer,
+            Set<ServerInfo> serverInfoSet,
+            Map<Integer, ServerTag> serverTagMap) {
         inLock(
                 metadataLock,
                 () -> {
@@ -101,19 +131,23 @@ public class CoordinatorMetadataCache implements 
ServerMetadataCache {
                     }
 
                     this.metadataSnapshot =
-                            new NodeMetadataSnapshot(coordinatorServer, 
newAliveTableServers);
+                            new NodeMetadataSnapshot(
+                                    coordinatorServer, newAliveTableServers, 
serverTagMap);
                 });
     }
 
     private static class NodeMetadataSnapshot {
         final @Nullable ServerInfo coordinatorServer;
         final Map<Integer, ServerInfo> aliveTabletServers;
+        final Map<Integer, ServerTag> serverTags;
 
         private NodeMetadataSnapshot(
                 @Nullable ServerInfo coordinatorServer,
-                Map<Integer, ServerInfo> aliveTabletServers) {
+                Map<Integer, ServerInfo> aliveTabletServers,
+                Map<Integer, ServerTag> serverTags) {
             this.coordinatorServer = coordinatorServer;
             this.aliveTabletServers = aliveTabletServers;
+            this.serverTags = serverTags;
         }
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/CoordinatorMetadataCacheTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/CoordinatorMetadataCacheTest.java
index 13a7b4f26..046f3396d 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/metadata/CoordinatorMetadataCacheTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metadata/CoordinatorMetadataCacheTest.java
@@ -20,12 +20,15 @@ package org.apache.fluss.server.metadata;
 import org.apache.fluss.cluster.Endpoint;
 import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.cluster.TabletServerInfo;
+import org.apache.fluss.cluster.rebalance.ServerTag;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -36,6 +39,7 @@ public class CoordinatorMetadataCacheTest {
 
     private ServerInfo coordinatorServer;
     private Set<ServerInfo> aliveTableServers;
+    private Map<Integer, ServerTag> serverTagMap;
 
     @BeforeEach
     public void setup() {
@@ -68,11 +72,15 @@ public class CoordinatorMetadataCacheTest {
                                         "rack2",
                                         
Endpoint.fromListenersString("INTERNAL://localhost:104"),
                                         ServerType.TABLET_SERVER)));
+
+        serverTagMap = new HashMap<>();
+        serverTagMap.put(0, ServerTag.PERMANENT_OFFLINE);
+        serverTagMap.put(1, ServerTag.TEMPORARY_OFFLINE);
     }
 
     @Test
     void testCoordinatorServerMetadataCache() {
-        serverMetadataCache.updateMetadata(coordinatorServer, 
aliveTableServers);
+        serverMetadataCache.updateMetadata(coordinatorServer, 
aliveTableServers, serverTagMap);
         assertThat(serverMetadataCache.getCoordinatorServer("CLIENT"))
                 .isEqualTo(coordinatorServer.node("CLIENT"));
         assertThat(serverMetadataCache.getCoordinatorServer("INTERNAL"))
@@ -85,5 +93,9 @@ public class CoordinatorMetadataCacheTest {
                         new TabletServerInfo(0, "rack0"),
                         new TabletServerInfo(1, "rack1"),
                         new TabletServerInfo(2, "rack2"));
+        // server 0 with PERMANENT_OFFLINE tag will no longer consider alive
+        assertThat(serverMetadataCache.getLiveServers())
+                .containsExactlyInAnyOrder(
+                        new TabletServerInfo(1, "rack1"), new 
TabletServerInfo(2, "rack2"));
     }
 }

Reply via email to