This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit bbc445a0a7b9b0a4c5380e5b8864a6e9206ee02a Author: whenzhou.zc <[email protected]> AuthorDate: Mon Mar 2 17:03:52 2026 +0800 [server] Support Coordinator High Availability --- .../exception/NotCoordinatorLeaderException.java | 32 ++ .../java/org/apache/fluss/metrics/MetricNames.java | 1 + .../java/org/apache/fluss/rpc/protocol/Errors.java | 7 +- .../server/coordinator/CoordinatorContext.java | 18 + .../coordinator/CoordinatorEventProcessor.java | 44 ++- .../coordinator/CoordinatorLeaderElection.java | 210 +++++++++++ .../server/coordinator/CoordinatorServer.java | 196 +++++++++- .../server/coordinator/CoordinatorService.java | 79 +++- .../coordinator/RequireCoordinatorLeader.java | 48 +++ .../coordinator/event/CoordinatorEventManager.java | 9 + .../event/DeadCoordinatorEvent.java} | 39 +- .../event/NewCoordinatorEvent.java} | 39 +- .../event/watcher/CoordinatorChangeWatcher.java | 89 +++++ .../event/watcher/ServerBaseChangeWatcher.java | 66 ++++ .../event/watcher/TabletServerChangeWatcher.java | 45 +-- .../apache/fluss/server/zk/ZooKeeperClient.java | 33 +- .../fluss/server/zk/data/CoordinatorAddress.java | 2 +- .../org/apache/fluss/server/zk/data/ZkData.java | 48 ++- .../org/apache/fluss/server/ServerTestBase.java | 15 +- .../coordinator/CoordinatorEventProcessorTest.java | 2 +- .../server/coordinator/CoordinatorHAITCase.java | 419 +++++++++++++++++++++ .../coordinator/CoordinatorServerElectionTest.java | 153 ++++++++ .../coordinator/CoordinatorServerITCase.java | 1 + .../server/coordinator/CoordinatorServerTest.java | 19 +- ...Test.java => CoordinatorChangeWatcherTest.java} | 43 +-- .../watcher/TabletServerChangeWatcherTest.java | 2 +- .../server/testutils/FlussClusterExtension.java | 11 + .../fluss/server/zk/ZooKeeperClientTest.java | 4 +- .../maintenance/observability/monitor-metrics.md | 11 +- 29 files changed, 1536 insertions(+), 149 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java b/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java new file mode 100644 index 000000000..a3f65aaf4 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** Exception thrown when a request is sent to a stand by coordinator server. since: 0.9 */ +public class NotCoordinatorLeaderException extends ApiException { + + private static final long serialVersionUID = 1L; + + public NotCoordinatorLeaderException(String message) { + super(message); + } + + public NotCoordinatorLeaderException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index bf6440dec..b0fcaeecb 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -36,6 +36,7 @@ public class MetricNames { // metrics for coordinator server // -------------------------------------------------------------------------------------------- public static final String ACTIVE_COORDINATOR_COUNT = "activeCoordinatorCount"; + public static final String ALIVE_COORDINATOR_COUNT = "aliveCoordinatorCount"; public static final String ACTIVE_TABLET_SERVER_COUNT = "activeTabletServerCount"; public static final String OFFLINE_BUCKET_COUNT = "offlineBucketCount"; public static final String TABLE_COUNT = "tableCount"; diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index dd16d10e2..0ab13e6b7 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -56,6 +56,7 @@ import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.exception.NetworkException; import org.apache.fluss.exception.NoRebalanceInProgressException; import org.apache.fluss.exception.NonPrimaryKeyTableException; +import org.apache.fluss.exception.NotCoordinatorLeaderException; import org.apache.fluss.exception.NotEnoughReplicasAfterAppendException; import org.apache.fluss.exception.NotEnoughReplicasException; import org.apache.fluss.exception.NotLeaderOrFollowerException; @@ -247,7 +248,11 @@ public enum Errors { 63, "The client has attempted to perform an operation with an invalid producer ID.", InvalidProducerIdException::new), - CONFIG_EXCEPTION(64, "A configuration error occurred.", ConfigException::new); + CONFIG_EXCEPTION(64, "A configuration error occurred.", ConfigException::new), + NOT_COORDINATOR_LEADER_EXCEPTION( + 65, + "The coordinator is not a leader and cannot process request.", + NotCoordinatorLeaderException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index bece7d9dc..2b821b02d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -67,6 +67,7 @@ public class CoordinatorContext { // a success deletion. private final Map<TableBucketReplica, Integer> failDeleteNumbers = new HashMap<>(); + private final Set<String> liveCoordinatorServers = new HashSet<>(); private final Map<Integer, ServerInfo> liveTabletServers = new HashMap<>(); private final Set<Integer> shuttingDownTabletServers = new HashSet<>(); @@ -115,6 +116,23 @@ public class CoordinatorContext { return coordinatorEpoch; } + public Set<String> getLiveCoordinatorServers() { + return liveCoordinatorServers; + } + + public void setLiveCoordinators(Set<String> servers) { + liveCoordinatorServers.clear(); + liveCoordinatorServers.addAll(servers); + } + + public void addLiveCoordinator(String serverId) { + this.liveCoordinatorServers.add(serverId); + } + + public void removeLiveCoordinator(String serverId) { + this.liveCoordinatorServers.remove(serverId); + } + public Map<Integer, ServerInfo> getLiveTabletServers() { return liveTabletServers; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index ce6f7f336..4668593ea 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -71,6 +71,7 @@ import org.apache.fluss.server.coordinator.event.CoordinatorEvent; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; import org.apache.fluss.server.coordinator.event.CreatePartitionEvent; import org.apache.fluss.server.coordinator.event.CreateTableEvent; +import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent; import org.apache.fluss.server.coordinator.event.DeadTabletServerEvent; import org.apache.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent; import org.apache.fluss.server.coordinator.event.DropPartitionEvent; @@ -78,6 +79,7 @@ import org.apache.fluss.server.coordinator.event.DropTableEvent; import org.apache.fluss.server.coordinator.event.EventProcessor; import org.apache.fluss.server.coordinator.event.FencedCoordinatorEvent; import org.apache.fluss.server.coordinator.event.ListRebalanceProgressEvent; +import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent; import org.apache.fluss.server.coordinator.event.NewTabletServerEvent; import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent; import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent; @@ -86,6 +88,7 @@ import org.apache.fluss.server.coordinator.event.RebalanceEvent; import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; import org.apache.fluss.server.coordinator.event.SchemaChangeEvent; import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent; +import org.apache.fluss.server.coordinator.event.watcher.CoordinatorChangeWatcher; import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher; import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; @@ -172,6 +175,7 @@ public class CoordinatorEventProcessor implements EventProcessor { private final LakeTableTieringManager lakeTableTieringManager; private final TableChangeWatcher tableChangeWatcher; private final CoordinatorChannelManager coordinatorChannelManager; + private final CoordinatorChangeWatcher coordinatorChangeWatcher; private final TabletServerChangeWatcher tabletServerChangeWatcher; private final CoordinatorMetadataCache serverMetadataCache; private final CoordinatorRequestBatch coordinatorRequestBatch; @@ -224,6 +228,8 @@ public class CoordinatorEventProcessor implements EventProcessor { tableBucketStateMachine, new RemoteStorageCleaner(conf, ioExecutor), ioExecutor); + this.coordinatorChangeWatcher = + new CoordinatorChangeWatcher(zooKeeperClient, coordinatorEventManager); this.tableChangeWatcher = new TableChangeWatcher(zooKeeperClient, coordinatorEventManager); this.tabletServerChangeWatcher = new TabletServerChangeWatcher(zooKeeperClient, coordinatorEventManager); @@ -263,6 +269,7 @@ public class CoordinatorEventProcessor implements EventProcessor { public void startup() { coordinatorContext.setCoordinatorServerInfo(getCoordinatorServerInfo()); // start watchers first so that we won't miss node in zk; + coordinatorChangeWatcher.start(); tabletServerChangeWatcher.start(); tableChangeWatcher.start(); LOG.info("Initializing coordinator context."); @@ -306,12 +313,9 @@ public class CoordinatorEventProcessor implements EventProcessor { private ServerInfo getCoordinatorServerInfo() { try { return zooKeeperClient - .getCoordinatorAddress() + .getCoordinatorLeaderAddress() .map( coordinatorAddress -> - // TODO we set id to 0 as that CoordinatorServer don't support - // HA, if we support HA, we need to set id to the config - // CoordinatorServer id to avoid node drift. new ServerInfo( 0, null, // For coordinatorServer, no rack info @@ -334,6 +338,11 @@ public class CoordinatorEventProcessor implements EventProcessor { private void initCoordinatorContext() throws Exception { long start = System.currentTimeMillis(); + // get all coordinator servers + List<String> currentCoordinatorServers = zooKeeperClient.getCoordinatorServerList(); + coordinatorContext.setLiveCoordinators(new HashSet<>(currentCoordinatorServers)); + LOG.info("Load coordinator servers success when initializing coordinator context."); + // get all tablet server's int[] currentServers = zooKeeperClient.getSortedTabletServerList(); List<ServerInfo> tabletServerInfos = new ArrayList<>(); @@ -548,6 +557,7 @@ public class CoordinatorEventProcessor implements EventProcessor { tableManager.shutdown(); // then stop watchers + coordinatorChangeWatcher.stop(); tableChangeWatcher.stop(); tabletServerChangeWatcher.stop(); } @@ -572,6 +582,10 @@ public class CoordinatorEventProcessor implements EventProcessor { (NotifyLeaderAndIsrResponseReceivedEvent) event); } else if (event instanceof DeleteReplicaResponseReceivedEvent) { processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) event); + } else if (event instanceof NewCoordinatorEvent) { + processNewCoordinator((NewCoordinatorEvent) event); + } else if (event instanceof DeadCoordinatorEvent) { + processDeadCoordinator((DeadCoordinatorEvent) event); } else if (event instanceof NewTabletServerEvent) { processNewTabletServer((NewTabletServerEvent) event); } else if (event instanceof DeadTabletServerEvent) { @@ -984,6 +998,28 @@ public class CoordinatorEventProcessor implements EventProcessor { replicaStateMachine.handleStateChanges(offlineReplicas, OfflineReplica); } + private void processNewCoordinator(NewCoordinatorEvent newCoordinatorEvent) { + String coordinatorServerId = newCoordinatorEvent.getServerId(); + if (coordinatorContext.getLiveCoordinatorServers().contains(coordinatorServerId)) { + return; + } + + // process new coordinator server + LOG.info("New coordinator server callback for coordinator server {}", coordinatorServerId); + + coordinatorContext.addLiveCoordinator(coordinatorServerId); + } + + private void processDeadCoordinator(DeadCoordinatorEvent deadCoordinatorEvent) { + String coordinatorServerId = deadCoordinatorEvent.getServerId(); + if (!coordinatorContext.getLiveCoordinatorServers().contains(coordinatorServerId)) { + return; + } + // process dead coordinator server + LOG.info("Coordinator server failure callback for {}.", coordinatorServerId); + coordinatorContext.removeLiveCoordinator(coordinatorServerId); + } + private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) { // NOTE: we won't need to detect bounced tablet servers like Kafka as we won't // miss the event of tablet server un-register and register again since we can diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java new file mode 100644 index 000000000..9f09df6af --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** + * Using by coordinator server. Coordinator servers listen ZK node and elect leadership. + * + * <p>This class manages the leader election lifecycle: + * + * <ul> + * <li>Start election and participate as a candidate + * <li>When elected as leader, invoke the initialization callback + * <li>When losing leadership, clean up leader resources but continue participating in election + * <li>Can be re-elected as leader multiple times + * </ul> + */ +public class CoordinatorLeaderElection implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class); + + private final String serverId; + private final LeaderLatch leaderLatch; + private final AtomicBoolean isLeader = new AtomicBoolean(false); + private final CompletableFuture<Void> leaderReadyFuture = new CompletableFuture<>(); + // Single-threaded executor to run leader init callbacks outside Curator's EventThread. + // Curator's LeaderLatchListener callbacks run on its internal EventThread; performing + // synchronous ZK operations there causes deadlock because ZK response dispatch also + // needs that same thread. + private final ExecutorService leaderCallbackExecutor; + // Tracks the pending cleanup task so that init can wait for it to complete. + private final AtomicReference<CompletableFuture<Void>> pendingCleanup = + new AtomicReference<>(CompletableFuture.completedFuture(null)); + private volatile Runnable initLeaderServices; + private volatile Consumer<Throwable> cleanupLeaderServices; + + public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) { + this.serverId = serverId; + this.leaderLatch = + new LeaderLatch( + zkClient.getCuratorClient(), + ZkData.CoordinatorElectionZNode.path(), + String.valueOf(serverId)); + this.leaderCallbackExecutor = + Executors.newSingleThreadExecutor( + r -> { + Thread t = new Thread(r, "coordinator-leader-callback-" + serverId); + t.setDaemon(true); + return t; + }); + } + + /** + * Starts the leader election process asynchronously. The returned future completes when this + * server becomes the leader for the first time and initializes the leader services. + * + * <p>After the first election, the server will continue to participate in future elections. + * When re-elected as leader, the initLeaderServices callback will be invoked again. + * + * @param initLeaderServices the callback to initialize leader services once elected + * @param cleanupLeaderServices the callback to clean up leader services when losing leadership + * @return a CompletableFuture that completes when this server becomes leader for the first time + */ + public CompletableFuture<Void> startElectLeaderAsync( + Runnable initLeaderServices, Consumer<Throwable> cleanupLeaderServices) { + this.initLeaderServices = initLeaderServices; + this.cleanupLeaderServices = cleanupLeaderServices; + + leaderLatch.addListener( + new LeaderLatchListener() { + @Override + public void isLeader() { + LOG.info("Coordinator server {} has become the leader.", serverId); + // Capture the pending cleanup future at this point so that + // init waits for it before proceeding. + CompletableFuture<Void> cleanup = pendingCleanup.get(); + // Run init on a separate thread to avoid deadlock with + // Curator's EventThread when performing ZK operations. + leaderCallbackExecutor.execute( + () -> { + // Wait for any pending cleanup to finish first. + try { + cleanup.get(60, TimeUnit.SECONDS); + } catch (TimeoutException e) { + LOG.warn( + "Pending cleanup for server {} did not complete within 60s, proceeding with init.", + serverId); + } catch (Exception e) { + LOG.warn( + "Error waiting for pending cleanup for server {}", + serverId, + e); + } + try { + initLeaderServices.run(); + leaderReadyFuture.complete(null); + } catch (Exception e) { + LOG.error( + "Failed to initialize leader services for server {}", + serverId, + e); + leaderReadyFuture.completeExceptionally(e); + } + }); + isLeader.set(true); + } + + @Override + public void notLeader() { + if (isLeader.compareAndSet(true, false)) { + LOG.warn( + "Coordinator server {} has lost the leadership, cleaning up leader services.", + serverId); + // Run cleanup on a separate daemon thread (NOT on the + // leaderCallbackExecutor) to avoid blocking init tasks. + // The cleanup completion is tracked via pendingCleanup so + // that subsequent init waits for it. + CompletableFuture<Void> cleanupFuture = new CompletableFuture<>(); + pendingCleanup.set(cleanupFuture); + Thread cleanupThread = + new Thread( + () -> { + try { + if (cleanupLeaderServices != null) { + cleanupLeaderServices.accept(null); + } + } catch (Exception e) { + LOG.error( + "Failed to cleanup leader services for server {}", + serverId, + e); + } finally { + cleanupFuture.complete(null); + } + }, + "coordinator-leader-cleanup-" + serverId); + cleanupThread.setDaemon(true); + cleanupThread.start(); + } + } + }); + + try { + leaderLatch.start(); + LOG.info("Coordinator server {} started leader election.", serverId); + } catch (Exception e) { + LOG.error("Failed to start LeaderLatch for server {}", serverId, e); + leaderReadyFuture.completeExceptionally( + new RuntimeException("Leader election start failed", e)); + } + + return leaderReadyFuture; + } + + @Override + public void close() { + LOG.info("Closing LeaderLatch for server {}.", serverId); + + if (leaderLatch != null) { + try { + leaderLatch.close(); + } catch (Exception e) { + LOG.error("Failed to close LeaderLatch for server {}.", serverId, e); + } + } + + leaderCallbackExecutor.shutdownNow(); + + // Complete the future exceptionally if it hasn't been completed yet + if (!leaderReadyFuture.isDone()) { + leaderReadyFuture.completeExceptionally( + new RuntimeException("Leader election closed for server " + serverId)); + } + } + + public boolean isLeader() { + return this.isLeader.get(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 1f9f3306e..2262b971b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -92,8 +92,7 @@ public class CoordinatorServer extends ServerBase { private final AtomicBoolean isShutDown = new AtomicBoolean(false); private final Clock clock; - @GuardedBy("lock") - private String serverId; + private final String serverId; @GuardedBy("lock") private MetricRegistry metricRegistry; @@ -116,6 +115,9 @@ public class CoordinatorServer extends ServerBase { @GuardedBy("lock") private CoordinatorMetadataCache metadataCache; + @GuardedBy("lock") + private MetadataManager metadataManager; + @GuardedBy("lock") private CoordinatorChannelManager coordinatorChannelManager; @@ -147,6 +149,12 @@ public class CoordinatorServer extends ServerBase { @GuardedBy("lock") private LakeCatalogDynamicLoader lakeCatalogDynamicLoader; + @GuardedBy("lock") + private CoordinatorLeaderElection coordinatorLeaderElection; + + @GuardedBy("lock") + private CompletableFuture<Void> leaderElectionFuture; + @GuardedBy("lock") private KvSnapshotLeaseManager kvSnapshotLeaseManager; @@ -158,6 +166,7 @@ public class CoordinatorServer extends ServerBase { super(conf); validateCoordinatorConfigs(conf); this.terminationFuture = new CompletableFuture<>(); + this.serverId = UUID.randomUUID().toString(); this.clock = clock; } @@ -170,10 +179,41 @@ public class CoordinatorServer extends ServerBase { @Override protected void startServices() throws Exception { + electCoordinatorLeaderAsync(); + } + + private CompletableFuture<Void> electCoordinatorLeaderAsync() throws Exception { + initCoordinatorStandby(); + + // start election (coordinatorLeaderElection is created inside initCoordinatorStandby + // after zkClient is initialized) + this.leaderElectionFuture = + coordinatorLeaderElection.startElectLeaderAsync( + () -> { + try { + initCoordinatorLeader(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + (Throwable t) -> { + try { + cleanupCoordinatorLeader(); + } catch (Exception e) { + LOG.error("Failed to cleanup coordinator leader services", e); + } + }); + return leaderElectionFuture; + } + + protected void initCoordinatorStandby() throws Exception { + // When a coordinator server starts, it first becomes a standby. + // This method execute initialization for standby, opening necessary rpc server port. + // Corresponding rpc methods will reject requests from clients + // and just serve for health check. synchronized (lock) { - LOG.info("Initializing Coordinator services."); + LOG.info("Initializing Coordinator services as standby."); List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, ServerType.COORDINATOR); - this.serverId = UUID.randomUUID().toString(); // for metrics this.metricRegistry = MetricRegistry.create(conf, pluginManager); @@ -186,6 +226,9 @@ public class CoordinatorServer extends ServerBase { this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); + // CoordinatorLeaderElection must be created after zkClient is initialized. + this.coordinatorLeaderElection = new CoordinatorLeaderElection(zkClient, serverId); + this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, pluginManager, true); this.dynamicConfigManager = new DynamicConfigManager(zkClient, conf, true); @@ -206,8 +249,7 @@ public class CoordinatorServer extends ServerBase { new LakeTableTieringManager( new LakeTieringMetricGroup(metricRegistry, serverMetricGroup)); - MetadataManager metadataManager = - new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader); + this.metadataManager = new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader); this.ioExecutor = Executors.newFixedThreadPool( conf.get(ConfigOptions.SERVER_IO_POOL_SIZE), @@ -237,7 +279,8 @@ public class CoordinatorServer extends ServerBase { lakeTableTieringManager, dynamicConfigManager, ioExecutor, - kvSnapshotLeaseManager); + kvSnapshotLeaseManager, + coordinatorLeaderElection); this.rpcServer = RpcServer.create( @@ -249,11 +292,15 @@ public class CoordinatorServer extends ServerBase { serverMetricGroup)); rpcServer.start(); - registerCoordinatorLeader(); - // when init session, register coordinator server again + registerCoordinatorServer(); ZooKeeperUtils.registerZookeeperClientReInitSessionListener( - zkClient, this::registerCoordinatorLeader, this); + zkClient, this::registerCoordinatorServer, this); + } + } + + protected void initCoordinatorLeader() throws Exception { + synchronized (lock) { this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME); this.rpcClient = RpcClient.create(conf, clientMetricGroup, true); @@ -263,6 +310,7 @@ public class CoordinatorServer extends ServerBase { new AutoPartitionManager(metadataCache, metadataManager, conf); autoPartitionManager.start(); + registerCoordinatorLeader(); // start coordinator event processor after we register coordinator leader to zk // so that the event processor can get the coordinator leader node from zk during start // up. @@ -287,6 +335,72 @@ public class CoordinatorServer extends ServerBase { } } + /** + * Cleans up leader-specific resources when this server loses leadership. + * + * <p>This method is called by {@link CoordinatorLeaderElection} when the server transitions + * from leader to standby. It cleans up leader-only resources while keeping the server running + * as a standby, ready to participate in future elections. + */ + protected void cleanupCoordinatorLeader() throws Exception { + synchronized (lock) { + LOG.info("Cleaning up coordinator leader services."); + + // Clean up leader-specific resources in reverse order of initialization + try { + if (coordinatorEventProcessor != null) { + coordinatorEventProcessor.shutdown(); + coordinatorEventProcessor = null; + } + } catch (Throwable t) { + LOG.warn("Failed to shutdown coordinator event processor", t); + } + + try { + if (coordinatorChannelManager != null) { + coordinatorChannelManager.close(); + coordinatorChannelManager = null; + } + } catch (Throwable t) { + LOG.warn("Failed to close coordinator channel manager", t); + } + + try { + if (autoPartitionManager != null) { + autoPartitionManager.close(); + autoPartitionManager = null; + } + } catch (Throwable t) { + LOG.warn("Failed to close auto partition manager", t); + } + + try { + if (rpcClient != null) { + rpcClient.close(); + rpcClient = null; + } + } catch (Throwable t) { + LOG.warn("Failed to close RPC client", t); + } + + try { + if (clientMetricGroup != null) { + clientMetricGroup.close(); + clientMetricGroup = null; + } + } catch (Throwable t) { + LOG.warn("Failed to close client metric group", t); + } + + // Reset coordinator context for next election + if (coordinatorContext != null) { + coordinatorContext.resetContext(); + } + + LOG.info("Coordinator leader services cleaned up successfully."); + } + } + @Override protected CompletableFuture<Result> closeAsync(Result result) { if (isShutDown.compareAndSet(false, true)) { @@ -306,6 +420,45 @@ public class CoordinatorServer extends ServerBase { return terminationFuture; } + private void registerCoordinatorServer() throws Exception { + long startTime = System.currentTimeMillis(); + List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints(); + CoordinatorAddress coordinatorAddress = + new CoordinatorAddress( + this.serverId, Endpoint.loadAdvertisedEndpoints(bindEndpoints, conf)); + + // we need to retry to register since although + // zkClient reconnect, the ephemeral node may still exist + // for a while time, retry to wait the ephemeral node removed + // see ZOOKEEPER-2985 + while (true) { + try { + zkClient.registerCoordinatorServer(coordinatorAddress); + break; + } catch (KeeperException.NodeExistsException nodeExistsException) { + long elapsedTime = System.currentTimeMillis() - startTime; + if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) { + LOG.error( + "Coordinator Server register to Zookeeper exceeded total retry time of {} ms. " + + "Aborting registration attempts.", + ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS); + throw nodeExistsException; + } + + LOG.warn( + "Coordinator server already registered in Zookeeper. " + + "retrying register after {} ms....", + ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS); + try { + Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + break; + } + } + } + } + private void registerCoordinatorLeader() throws Exception { long startTime = System.currentTimeMillis(); List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints(); @@ -488,6 +641,14 @@ public class CoordinatorServer extends ServerBase { exception = ExceptionUtils.firstOrSuppressed(t, exception); } + try { + if (coordinatorLeaderElection != null) { + coordinatorLeaderElection.close(); + } + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + try { if (zkClient != null) { zkClient.close(); @@ -525,6 +686,11 @@ public class CoordinatorServer extends ServerBase { return coordinatorService; } + @VisibleForTesting + public CompletableFuture<Void> getLeaderElectionFuture() { + return leaderElectionFuture; + } + @Override protected String getServerName() { return SERVER_NAME; @@ -535,6 +701,11 @@ public class CoordinatorServer extends ServerBase { return rpcServer; } + @VisibleForTesting + public String getServerId() { + return serverId; + } + @VisibleForTesting public ServerMetadataCache getMetadataCache() { return metadataCache; @@ -553,4 +724,9 @@ public class CoordinatorServer extends ServerBase { public RebalanceManager getRebalanceManager() { return coordinatorEventProcessor.getRebalanceManager(); } + + @VisibleForTesting + public ZooKeeperClient getZooKeeperClient() { + return zkClient; + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 1cce283af..7f9d92c77 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -35,6 +35,7 @@ import org.apache.fluss.exception.InvalidDatabaseException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.LakeTableAlreadyExistException; import org.apache.fluss.exception.NonPrimaryKeyTableException; +import org.apache.fluss.exception.NotCoordinatorLeaderException; import org.apache.fluss.exception.SecurityDisabledException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotPartitionedException; @@ -241,6 +242,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina private final LakeTableHelper lakeTableHelper; private final ProducerOffsetsManager producerOffsetsManager; private final KvSnapshotLeaseManager kvSnapshotLeaseManager; + private final CoordinatorLeaderElection coordinatorLeaderElection; public CoordinatorService( Configuration conf, @@ -254,7 +256,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina LakeTableTieringManager lakeTableTieringManager, DynamicConfigManager dynamicConfigManager, ExecutorService ioExecutor, - KvSnapshotLeaseManager kvSnapshotLeaseManager) { + KvSnapshotLeaseManager kvSnapshotLeaseManager, + CoordinatorLeaderElection coordinatorLeaderElection) { super( remoteFileSystem, ServerType.COORDINATOR, @@ -283,6 +286,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina this.producerOffsetsManager.start(); this.kvSnapshotLeaseManager = kvSnapshotLeaseManager; + this.coordinatorLeaderElection = coordinatorLeaderElection; } @Override @@ -290,6 +294,21 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return "coordinator"; } + /** + * Checks whether this coordinator server is the current leader. + * + * <p>This method should be called at the beginning of every method annotated with {@link + * RequireCoordinatorLeader} to guard against requests being processed by a standby coordinator. + * + * @throws NotCoordinatorLeaderException if this server is not the current coordinator leader + */ + private void checkLeader() { + if (!coordinatorLeaderElection.isLeader()) { + throw new NotCoordinatorLeaderException( + "This coordinator server is not the current leader."); + } + } + @Override public void shutdown() { IOUtils.closeQuietly(producerOffsetsManager, "producer snapshot manager"); @@ -348,8 +367,10 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina } } + @RequireCoordinatorLeader @Override public CompletableFuture<CreateDatabaseResponse> createDatabase(CreateDatabaseRequest request) { + checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.CREATE, Resource.cluster()); } @@ -372,6 +393,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(response); } + @RequireCoordinatorLeader @Override public CompletableFuture<AlterDatabaseResponse> alterDatabase(AlterDatabaseRequest request) { String databaseName = request.getDatabaseName(); @@ -416,6 +438,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina @Override public CompletableFuture<DropDatabaseResponse> dropDatabase(DropDatabaseRequest request) { + checkLeader(); authorizeDatabase(OperationType.DROP, request.getDatabaseName()); DropDatabaseResponse response = new DropDatabaseResponse(); metadataManager.dropDatabase( @@ -423,8 +446,10 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(response); } + @RequireCoordinatorLeader @Override public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest request) { + checkLeader(); TablePath tablePath = toTablePath(request.getTablePath()); tablePath.validate(); authorizeDatabase(OperationType.CREATE, tablePath.getDatabaseName()); @@ -493,8 +518,10 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(new CreateTableResponse()); } + @RequireCoordinatorLeader @Override public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest request) { + checkLeader(); TablePath tablePath = toTablePath(request.getTablePath()); tablePath.validate(); authorizeTable(OperationType.ALTER, tablePath); @@ -653,8 +680,10 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return Boolean.parseBoolean(dataLakeEnabledValue); } + @RequireCoordinatorLeader @Override public CompletableFuture<DropTableResponse> dropTable(DropTableRequest request) { + checkLeader(); TablePath tablePath = toTablePath(request.getTablePath()); authorizeTable(OperationType.DROP, tablePath); @@ -663,9 +692,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(response); } + @RequireCoordinatorLeader @Override public CompletableFuture<CreatePartitionResponse> createPartition( CreatePartitionRequest request) { + checkLeader(); TablePath tablePath = toTablePath(request.getTablePath()); authorizeTable(OperationType.WRITE, tablePath); @@ -707,8 +738,10 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(response); } + @RequireCoordinatorLeader @Override public CompletableFuture<DropPartitionResponse> dropPartition(DropPartitionRequest request) { + checkLeader(); TablePath tablePath = toTablePath(request.getTablePath()); authorizeTable(OperationType.WRITE, tablePath); @@ -729,8 +762,10 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(response); } + @RequireCoordinatorLeader @Override public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) { + checkLeader(); String listenerName = currentListenerName(); Session session = currentSession(); @@ -749,7 +784,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return metadataResponseAccessContextEvent.getResultFuture(); } + @RequireCoordinatorLeader public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request) { + checkLeader(); CompletableFuture<AdjustIsrResponse> response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -757,9 +794,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } + @RequireCoordinatorLeader @Override public CompletableFuture<CommitKvSnapshotResponse> commitKvSnapshot( CommitKvSnapshotRequest request) { + checkLeader(); CompletableFuture<CommitKvSnapshotResponse> response = new CompletableFuture<>(); // parse completed snapshot from request byte[] completedSnapshotBytes = request.getCompletedSnapshot(); @@ -774,9 +813,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } + @RequireCoordinatorLeader @Override public CompletableFuture<CommitRemoteLogManifestResponse> commitRemoteLogManifest( CommitRemoteLogManifestRequest request) { + checkLeader(); CompletableFuture<CommitRemoteLogManifestResponse> response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -786,8 +827,10 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } + @RequireCoordinatorLeader @Override public CompletableFuture<CreateAclsResponse> createAcls(CreateAclsRequest request) { + checkLeader(); if (authorizer == null) { throw new SecurityDisabledException("No Authorizer is configured."); } @@ -796,8 +839,10 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(makeCreateAclsResponse(aclCreateResults)); } + @RequireCoordinatorLeader @Override public CompletableFuture<DropAclsResponse> dropAcls(DropAclsRequest request) { + checkLeader(); if (authorizer == null) { throw new SecurityDisabledException("No Authorizer is configured."); } @@ -806,9 +851,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(makeDropAclsResponse(aclDeleteResults)); } + @RequireCoordinatorLeader @Override public CompletableFuture<PrepareLakeTableSnapshotResponse> prepareLakeTableSnapshot( PrepareLakeTableSnapshotRequest request) { + checkLeader(); CompletableFuture<PrepareLakeTableSnapshotResponse> future = new CompletableFuture<>(); boolean ignorePreviousBucketOffsets = request.hasIgnorePreviousTableOffsets() && request.isIgnorePreviousTableOffsets(); @@ -857,9 +904,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return future; } + @RequireCoordinatorLeader @Override public CompletableFuture<CommitLakeTableSnapshotResponse> commitLakeTableSnapshot( CommitLakeTableSnapshotRequest request) { + checkLeader(); CompletableFuture<CommitLakeTableSnapshotResponse> response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -869,9 +918,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } + @RequireCoordinatorLeader @Override public CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat( LakeTieringHeartbeatRequest request) { + checkLeader(); LakeTieringHeartbeatResponse heartbeatResponse = new LakeTieringHeartbeatResponse(); int currentCoordinatorEpoch = coordinatorEpochSupplier.get(); heartbeatResponse.setCoordinatorEpoch(currentCoordinatorEpoch); @@ -945,9 +996,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(heartbeatResponse); } + @RequireCoordinatorLeader @Override public CompletableFuture<ControlledShutdownResponse> controlledShutdown( ControlledShutdownRequest request) { + checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster()); } @@ -963,9 +1016,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } + @RequireCoordinatorLeader @Override public CompletableFuture<AcquireKvSnapshotLeaseResponse> acquireKvSnapshotLease( AcquireKvSnapshotLeaseRequest request) { + checkLeader(); for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable : request.getSnapshotsToLeasesList()) { long tableId = kvSnapshotLeaseForTable.getTableId(); @@ -997,9 +1052,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ioExecutor); } + @RequireCoordinatorLeader @Override public CompletableFuture<ReleaseKvSnapshotLeaseResponse> releaseKvSnapshotLease( ReleaseKvSnapshotLeaseRequest request) { + checkLeader(); for (PbTableBucket tableBucket : request.getBucketsToReleasesList()) { long tableId = tableBucket.getTableId(); if (authorizer != null) { @@ -1030,9 +1087,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ioExecutor); } + @RequireCoordinatorLeader @Override public CompletableFuture<DropKvSnapshotLeaseResponse> dropKvSnapshotLease( DropKvSnapshotLeaseRequest request) { + checkLeader(); String leaseId = request.getLeaseId(); // Capture session before entering async block since currentSession() is thread-local Session session = authorizer != null ? currentSession() : null; @@ -1069,9 +1128,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ioExecutor); } + @RequireCoordinatorLeader @Override public CompletableFuture<AlterClusterConfigsResponse> alterClusterConfigs( AlterClusterConfigsRequest request) { + checkLeader(); CompletableFuture<AlterClusterConfigsResponse> future = new CompletableFuture<>(); List<PbAlterConfig> infos = request.getAlterConfigsList(); if (infos.isEmpty()) { @@ -1112,8 +1173,10 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return future; } + @RequireCoordinatorLeader @Override public CompletableFuture<AddServerTagResponse> addServerTag(AddServerTagRequest request) { + checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster()); } @@ -1131,9 +1194,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } + @RequireCoordinatorLeader @Override public CompletableFuture<RemoveServerTagResponse> removeServerTag( RemoveServerTagRequest request) { + checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster()); } @@ -1151,8 +1216,10 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } + @RequireCoordinatorLeader @Override public CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest request) { + checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.WRITE, Resource.cluster()); } @@ -1166,9 +1233,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } + @RequireCoordinatorLeader @Override public CompletableFuture<ListRebalanceProgressResponse> listRebalanceProgress( ListRebalanceProgressRequest request) { + checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.DESCRIBE, Resource.cluster()); } @@ -1183,9 +1252,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } + @RequireCoordinatorLeader @Override public CompletableFuture<CancelRebalanceResponse> cancelRebalance( CancelRebalanceRequest request) { + checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.WRITE, Resource.cluster()); } @@ -1296,9 +1367,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina // Producer Offset Management APIs (for Exactly-Once Semantics) // ================================================================================== + @RequireCoordinatorLeader @Override public CompletableFuture<RegisterProducerOffsetsResponse> registerProducerOffsets( RegisterProducerOffsetsRequest request) { + checkLeader(); // Authorization: require WRITE permission on all tables in the request if (authorizer != null) { for (PbProducerTableOffsets tableOffsets : request.getTableOffsetsList()) { @@ -1342,9 +1415,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ioExecutor); } + @RequireCoordinatorLeader @Override public CompletableFuture<GetProducerOffsetsResponse> getProducerOffsets( GetProducerOffsetsRequest request) { + checkLeader(); String producerId = request.getProducerId(); // Capture session before entering async block since currentSession() is thread-local Session session = authorizer != null ? currentSession() : null; @@ -1401,9 +1476,11 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ioExecutor); } + @RequireCoordinatorLeader @Override public CompletableFuture<DeleteProducerOffsetsResponse> deleteProducerOffsets( DeleteProducerOffsetsRequest request) { + checkLeader(); // Capture session before entering async block since currentSession() is thread-local Session session = authorizer != null ? currentSession() : null; return CompletableFuture.supplyAsync( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java new file mode 100644 index 000000000..cbf0178a3 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation to mark RPC methods that require the coordinator server to be the current leader + * before execution. + * + * <p>When a method annotated with {@code @RequireCoordinatorLeader} is invoked, it will first check + * whether this coordinator server is the current leader via {@link + * CoordinatorLeaderElection#isLeader()}. If the server is not the leader, a {@link + * org.apache.fluss.exception.NotCoordinatorLeaderException} will be thrown immediately, and the + * actual method body will not be executed. + * + * <p>Usage example: + * + * <pre>{@code + * @RequireCoordinatorLeader + * public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest request) { + * // method body only executes when this server is the coordinator leader + * } + * }</pre> + */ +@Documented +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface RequireCoordinatorLeader {} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java index ad22f6d1c..74a49a0da 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java @@ -62,6 +62,7 @@ public final class CoordinatorEventManager implements EventManager { private Histogram eventQueueTime; // Coordinator metrics moved from CoordinatorEventProcessor + private volatile int aliveCoordinatorServerCount; private volatile int tabletServerCount; private volatile int offlineBucketCount; private volatile int tableCount; @@ -88,6 +89,8 @@ public final class CoordinatorEventManager implements EventManager { // Register coordinator metrics coordinatorMetricGroup.gauge(MetricNames.ACTIVE_COORDINATOR_COUNT, () -> 1); + coordinatorMetricGroup.gauge( + MetricNames.ALIVE_COORDINATOR_COUNT, () -> aliveCoordinatorServerCount); coordinatorMetricGroup.gauge( MetricNames.ACTIVE_TABLET_SERVER_COUNT, () -> tabletServerCount); coordinatorMetricGroup.gauge(MetricNames.OFFLINE_BUCKET_COUNT, () -> offlineBucketCount); @@ -105,6 +108,7 @@ public final class CoordinatorEventManager implements EventManager { AccessContextEvent<MetricsData> accessContextEvent = new AccessContextEvent<>( context -> { + int coordinatorServerCount = context.getLiveCoordinatorServers().size(); int tabletServerCount = context.getLiveTabletServers().size(); int tableCount = context.allTables().size(); int lakeTableCount = context.getLakeTableCount(); @@ -138,6 +142,7 @@ public final class CoordinatorEventManager implements EventManager { } return new MetricsData( + coordinatorServerCount, tabletServerCount, tableCount, lakeTableCount, @@ -152,6 +157,7 @@ public final class CoordinatorEventManager implements EventManager { // Wait for the result and update local metrics try { MetricsData metricsData = accessContextEvent.getResultFuture().get(); + this.aliveCoordinatorServerCount = metricsData.coordinatorServerCount; this.tabletServerCount = metricsData.tabletServerCount; this.tableCount = metricsData.tableCount; this.lakeTableCount = metricsData.lakeTableCount; @@ -275,6 +281,7 @@ public final class CoordinatorEventManager implements EventManager { } private static class MetricsData { + private final int coordinatorServerCount; private final int tabletServerCount; private final int tableCount; private final int lakeTableCount; @@ -284,6 +291,7 @@ public final class CoordinatorEventManager implements EventManager { private final int replicasToDeleteCount; public MetricsData( + int coordinatorServerCount, int tabletServerCount, int tableCount, int lakeTableCount, @@ -291,6 +299,7 @@ public final class CoordinatorEventManager implements EventManager { int partitionCount, int offlineBucketCount, int replicasToDeleteCount) { + this.coordinatorServerCount = coordinatorServerCount; this.tabletServerCount = tabletServerCount; this.tableCount = tableCount; this.lakeTableCount = lakeTableCount; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/DeadCoordinatorEvent.java similarity index 54% copy from fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java copy to fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/DeadCoordinatorEvent.java index 5953f3366..17d17b0bc 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/DeadCoordinatorEvent.java @@ -15,51 +15,42 @@ * limitations under the License. */ -package org.apache.fluss.server.zk.data; +package org.apache.fluss.server.coordinator.event; -import org.apache.fluss.cluster.Endpoint; - -import java.util.List; import java.util.Objects; -/** - * The address information of an active coordinator stored in {@link ZkData.CoordinatorZNode}. - * - * @see CoordinatorAddressJsonSerde for json serialization and deserialization. - */ -public class CoordinatorAddress { - private final String id; - private final List<Endpoint> endpoints; +/** An event for coordinator server became dead. */ +public class DeadCoordinatorEvent implements CoordinatorEvent { - public CoordinatorAddress(String id, List<Endpoint> endpoints) { - this.id = id; - this.endpoints = endpoints; - } + private final String serverId; - public String getId() { - return id; + public DeadCoordinatorEvent(String serverId) { + this.serverId = serverId; } - public List<Endpoint> getEndpoints() { - return endpoints; + public String getServerId() { + return serverId; } @Override public boolean equals(Object o) { + if (this == o) { + return true; + } if (o == null || getClass() != o.getClass()) { return false; } - CoordinatorAddress that = (CoordinatorAddress) o; - return Objects.equals(id, that.id) && Objects.equals(endpoints, that.endpoints); + DeadCoordinatorEvent that = (DeadCoordinatorEvent) o; + return Objects.equals(serverId, that.serverId); } @Override public int hashCode() { - return Objects.hash(id, endpoints); + return Objects.hash(serverId); } @Override public String toString() { - return "CoordinatorAddress{" + "id='" + id + '\'' + ", endpoints=" + endpoints + '}'; + return "DeadCoordinatorEvent{" + "serverId=" + serverId + '}'; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NewCoordinatorEvent.java similarity index 54% copy from fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java copy to fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NewCoordinatorEvent.java index 5953f3366..da6dcf2b5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NewCoordinatorEvent.java @@ -15,51 +15,42 @@ * limitations under the License. */ -package org.apache.fluss.server.zk.data; +package org.apache.fluss.server.coordinator.event; -import org.apache.fluss.cluster.Endpoint; - -import java.util.List; import java.util.Objects; -/** - * The address information of an active coordinator stored in {@link ZkData.CoordinatorZNode}. - * - * @see CoordinatorAddressJsonSerde for json serialization and deserialization. - */ -public class CoordinatorAddress { - private final String id; - private final List<Endpoint> endpoints; +/** An event for new coordinator server. */ +public class NewCoordinatorEvent implements CoordinatorEvent { - public CoordinatorAddress(String id, List<Endpoint> endpoints) { - this.id = id; - this.endpoints = endpoints; - } + private final String serverId; - public String getId() { - return id; + public NewCoordinatorEvent(String serverId) { + this.serverId = serverId; } - public List<Endpoint> getEndpoints() { - return endpoints; + public String getServerId() { + return serverId; } @Override public boolean equals(Object o) { + if (this == o) { + return true; + } if (o == null || getClass() != o.getClass()) { return false; } - CoordinatorAddress that = (CoordinatorAddress) o; - return Objects.equals(id, that.id) && Objects.equals(endpoints, that.endpoints); + NewCoordinatorEvent that = (NewCoordinatorEvent) o; + return Objects.equals(serverId, that.serverId); } @Override public int hashCode() { - return Objects.hash(id, endpoints); + return Objects.hash(serverId); } @Override public String toString() { - return "CoordinatorAddress{" + "id='" + id + '\'' + ", endpoints=" + endpoints + '}'; + return "NewCoordinatorEvent{" + "serverId=" + serverId + '}'; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java new file mode 100644 index 000000000..2f30f3ac0 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcher.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event.watcher; + +import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent; +import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener; +import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A watcher to watch the coordinator server changes(new/delete) in zookeeper. */ +public class CoordinatorChangeWatcher extends ServerBaseChangeWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(CoordinatorChangeWatcher.class); + + public CoordinatorChangeWatcher(ZooKeeperClient zooKeeperClient, EventManager eventManager) { + super(zooKeeperClient, eventManager, ZkData.CoordinatorIdsZNode.path()); + } + + @Override + protected CuratorCacheListener createListener() { + return new CoordinatorChangeListener(); + } + + @Override + protected String getWatcherName() { + return "CoordinatorChangeWatcher"; + } + + protected String getServerIdFromEvent(ChildData data) { + return ZKPaths.getNodeFromPath(data.getPath()); + } + + private final class CoordinatorChangeListener implements CuratorCacheListener { + + @Override + public void event(Type type, ChildData oldData, ChildData newData) { + if (newData != null) { + LOG.debug("Received {} event (path: {})", type, newData.getPath()); + } else { + LOG.debug("Received {} event", type); + } + + switch (type) { + case NODE_CREATED: + { + if (newData != null && newData.getData().length > 0) { + String serverId = getServerIdFromEvent(newData); + LOG.info("Received CHILD_ADDED event for server {}.", serverId); + eventManager.put(new NewCoordinatorEvent(serverId)); + } + break; + } + case NODE_DELETED: + { + if (oldData != null && oldData.getData().length > 0) { + String serverId = getServerIdFromEvent(oldData); + LOG.info("Received CHILD_REMOVED event for server {}.", serverId); + eventManager.put(new DeadCoordinatorEvent(serverId)); + } + break; + } + default: + break; + } + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/ServerBaseChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/ServerBaseChangeWatcher.java new file mode 100644 index 000000000..e9f82c95b --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/ServerBaseChangeWatcher.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event.watcher; + +import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An abstract base server class for {@link CoordinatorChangeWatcher} and {@link + * TabletServerChangeWatcher}. + */ +public abstract class ServerBaseChangeWatcher { + + private static final Logger LOG = LoggerFactory.getLogger(ServerBaseChangeWatcher.class); + + protected final CuratorCache curatorCache; + protected final EventManager eventManager; + protected volatile boolean running; + + public ServerBaseChangeWatcher( + ZooKeeperClient zooKeeperClient, EventManager eventManager, String zkPath) { + this.curatorCache = CuratorCache.build(zooKeeperClient.getCuratorClient(), zkPath); + this.eventManager = eventManager; + this.curatorCache.listenable().addListener(createListener()); + } + + /** Creates the listener for server change events. */ + protected abstract CuratorCacheListener createListener(); + + /** Returns the watcher name for logging. */ + protected abstract String getWatcherName(); + + public void start() { + running = true; + curatorCache.start(); + } + + public void stop() { + if (!running) { + return; + } + running = false; + LOG.info("Stopping {}", getWatcherName()); + curatorCache.close(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java index a8f4dcb15..569d5117a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcher.java @@ -28,7 +28,6 @@ import org.apache.fluss.server.zk.data.TabletServerRegistration; import org.apache.fluss.server.zk.data.ZkData.ServerIdZNode; import org.apache.fluss.server.zk.data.ZkData.ServerIdsZNode; import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache; import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.fluss.shaded.curator5.org.apache.curator.utils.ZKPaths; @@ -36,34 +35,31 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** A watcher to watch the tablet server changes(new/delete) in zookeeper. */ -public class TabletServerChangeWatcher { +public class TabletServerChangeWatcher extends ServerBaseChangeWatcher { private static final Logger LOG = LoggerFactory.getLogger(TabletServerChangeWatcher.class); - private final CuratorCache curatorCache; - - private volatile boolean running; - - private final EventManager eventManager; public TabletServerChangeWatcher(ZooKeeperClient zooKeeperClient, EventManager eventManager) { - this.curatorCache = - CuratorCache.build(zooKeeperClient.getCuratorClient(), ServerIdsZNode.path()); - this.eventManager = eventManager; - this.curatorCache.listenable().addListener(new TabletServerChangeListener()); + super(zooKeeperClient, eventManager, ServerIdsZNode.path()); } - public void start() { - running = true; - curatorCache.start(); + @Override + protected CuratorCacheListener createListener() { + return new TabletServerChangeListener(); } - public void stop() { - if (!running) { - return; + @Override + protected String getWatcherName() { + return "TabletServerChangeWatcher"; + } + + protected int getServerIdFromEvent(ChildData data) { + try { + return Integer.parseInt(ZKPaths.getNodeFromPath(data.getPath())); + } catch (NumberFormatException e) { + throw new FlussRuntimeException( + "Invalid server id in zookeeper path: " + data.getPath(), e); } - running = false; - LOG.info("Stopping TableChangeWatcher"); - curatorCache.close(); } private final class TabletServerChangeListener implements CuratorCacheListener { @@ -108,13 +104,4 @@ public class TabletServerChangeWatcher { } } } - - private int getServerIdFromEvent(ChildData data) { - try { - return Integer.parseInt(ZKPaths.getNodeFromPath(data.getPath())); - } catch (NumberFormatException e) { - throw new FlussRuntimeException( - "Invalid server id in zookeeper path: " + data.getPath(), e); - } - } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index 749257409..78667ca48 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -62,7 +62,6 @@ import org.apache.fluss.server.zk.data.ZkData.BucketRemoteLogsZNode; import org.apache.fluss.server.zk.data.ZkData.BucketSnapshotIdZNode; import org.apache.fluss.server.zk.data.ZkData.BucketSnapshotsZNode; import org.apache.fluss.server.zk.data.ZkData.ConfigEntityZNode; -import org.apache.fluss.server.zk.data.ZkData.CoordinatorZNode; import org.apache.fluss.server.zk.data.ZkData.DatabaseZNode; import org.apache.fluss.server.zk.data.ZkData.DatabasesZNode; import org.apache.fluss.server.zk.data.ZkData.KvSnapshotLeaseZNode; @@ -188,20 +187,38 @@ public class ZooKeeperClient implements AutoCloseable { // Coordinator server // -------------------------------------------------------------------------------------------- - /** Register a coordinator leader server to ZK. */ + /** Register a coordinator server to ZK. */ + public void registerCoordinatorServer(CoordinatorAddress coordinatorAddress) throws Exception { + String path = ZkData.CoordinatorIdZNode.path(coordinatorAddress.getId()); + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL) + .forPath(path, ZkData.CoordinatorIdZNode.encode(coordinatorAddress)); + LOG.info("Registered Coordinator server {} at path {}.", coordinatorAddress, path); + } + + /** Register a coordinator leader to ZK. */ public void registerCoordinatorLeader(CoordinatorAddress coordinatorAddress) throws Exception { - String path = CoordinatorZNode.path(); + String path = ZkData.CoordinatorLeaderZNode.path(); zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) - .forPath(path, CoordinatorZNode.encode(coordinatorAddress)); - LOG.info("Registered leader {} at path {}.", coordinatorAddress, path); + .forPath(path, ZkData.CoordinatorLeaderZNode.encode(coordinatorAddress)); + LOG.info("Registered Coordinator leader {} at path {}.", coordinatorAddress, path); } /** Get the leader address registered in ZK. */ - public Optional<CoordinatorAddress> getCoordinatorAddress() throws Exception { - Optional<byte[]> bytes = getOrEmpty(CoordinatorZNode.path()); - return bytes.map(CoordinatorZNode::decode); + public Optional<CoordinatorAddress> getCoordinatorLeaderAddress() throws Exception { + Optional<byte[]> bytes = getOrEmpty(ZkData.CoordinatorLeaderZNode.path()); + return bytes.map( + data -> + // maybe an empty node when a leader is elected but not registered + data.length == 0 ? null : ZkData.CoordinatorLeaderZNode.decode(data)); + } + + /** Gets the list of coordinator server Ids. */ + public List<String> getCoordinatorServerList() throws Exception { + return getChildren(ZkData.CoordinatorIdsZNode.path()); } // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java index 5953f3366..7759e66b8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/CoordinatorAddress.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Objects; /** - * The address information of an active coordinator stored in {@link ZkData.CoordinatorZNode}. + * The address information of an active coordinator stored in {@link ZkData.CoordinatorLeaderZNode}. * * @see CoordinatorAddressJsonSerde for json serialization and deserialization. */ diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index aed5de0b3..6e007fdfc 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -305,13 +305,53 @@ public final class ZkData { // ------------------------------------------------------------------------------------------ /** - * The znode for the active coordinator. The znode path is: + * The znode for alive coordinators. The znode path is: * - * <p>/coordinators/active + * <p>/coordinators/ids + */ + public static final class CoordinatorIdsZNode { + public static String path() { + return "/coordinators/ids"; + } + } + + /** + * The znode for a registered Coordinator information. The znode path is: + * + * <p>/coordinators/ids/[serverId] + */ + public static final class CoordinatorIdZNode { + public static String path(String serverId) { + return CoordinatorIdsZNode.path() + "/" + serverId; + } + + public static byte[] encode(CoordinatorAddress coordinatorAddress) { + return JsonSerdeUtils.writeValueAsBytes( + coordinatorAddress, CoordinatorAddressJsonSerde.INSTANCE); + } + + public static CoordinatorAddress decode(byte[] json) { + return JsonSerdeUtils.readValue(json, CoordinatorAddressJsonSerde.INSTANCE); + } + } + + /** + * The znode for the coordinator leader election. The znode path is: + * + * <p>/coordinators/election + */ + public static final class CoordinatorElectionZNode { + public static String path() { + return "/coordinators/election"; + } + } + + /** + * The znode for the active coordinator leader. The znode path is: * - * <p>Note: introduce standby coordinators in the future for znode "/coordinators/standby/". + * <p>/coordinators/leader */ - public static final class CoordinatorZNode { + public static final class CoordinatorLeaderZNode { public static String path() { return "/coordinators/active"; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java index 2504aebed..c71f0f96c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/ServerTestBase.java @@ -25,7 +25,7 @@ import org.apache.fluss.server.coordinator.CoordinatorServer; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; -import org.apache.fluss.server.zk.data.ZkData.CoordinatorZNode; +import org.apache.fluss.server.zk.data.ZkData; import org.apache.fluss.server.zk.data.ZkData.ServerIdZNode; import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.data.Stat; import org.apache.fluss.testutils.common.AllCallbackWrapper; @@ -82,10 +82,13 @@ public abstract class ServerTestBase { @Test void testExceptionWhenRunServer() throws Exception { ServerBase server = getStartFailServer(); - assertThatThrownBy(server::start) - .isInstanceOf(FlussException.class) - .hasMessage(String.format("Failed to start the %s.", server.getServerName())); - server.close(); + try { + assertThatThrownBy(server::start) + .isInstanceOf(FlussException.class) + .hasMessage(String.format("Failed to start the %s.", server.getServerName())); + } finally { + server.close(); + } } @Test @@ -94,7 +97,7 @@ public abstract class ServerTestBase { // get the EPHEMERAL node of server String path = server instanceof CoordinatorServer - ? CoordinatorZNode.path() + ? ZkData.CoordinatorIdZNode.path(((CoordinatorServer) server).getServerId()) : ServerIdZNode.path(server.conf.getInt(ConfigOptions.TABLET_SERVER_ID)); long oldNodeCtime = zookeeperClient.getStat(path).get().getCtime(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 0d304df1e..997a8738f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -417,7 +417,7 @@ class CoordinatorEventProcessorTest { BucketState t1Bucket0State = fromCtx(ctx -> ctx.getBucketState(t1Bucket0)); assertThat(t1Bucket0State).isEqualTo(OnlineBucket); // t1 bucket 1 should reelect a leader since the leader is not alive - // the bucket whose leader is in the server should be online a again, but the leadership + // the bucket whose leader is in the server should be online again, but the leadership // should change the leader for bucket2 of t1 should change since the leader fail BucketState t1Bucket1State = fromCtx(ctx -> ctx.getBucketState(t1Bucket1)); assertThat(t1Bucket1State).isEqualTo(OnlineBucket); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java new file mode 100644 index 000000000..66cc39332 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.cluster.Endpoint; +import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.NotCoordinatorLeaderException; +import org.apache.fluss.rpc.GatewayClientProxy; +import org.apache.fluss.rpc.RpcClient; +import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.CreateDatabaseRequest; +import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.data.CoordinatorAddress; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework; +import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.Watcher; +import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper; +import org.apache.fluss.testutils.common.AllCallbackWrapper; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Integration test for Coordinator Server High Availability. + * + * <p>This test verifies that when multiple coordinator servers are started, only the leader + * processes RPC requests while standby servers return {@link NotCoordinatorLeaderException}. + * + * <p>It also tests the complete leader election lifecycle: participate in election -> become leader + * (initialize leader services) -> lose leadership (become standby) -> re-participate in election -> + * become leader again. + */ +class CoordinatorHAITCase { + + @RegisterExtension + public static final AllCallbackWrapper<ZooKeeperExtension> ZOO_KEEPER_EXTENSION_WRAPPER = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + private static ZooKeeperClient zookeeperClient; + + private CoordinatorServer coordinatorServer1; + private CoordinatorServer coordinatorServer2; + private RpcClient rpcClient; + + @BeforeAll + static void baseBeforeAll() { + zookeeperClient = + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getZooKeeperClient(NOPErrorHandler.INSTANCE); + } + + @BeforeEach + void setUp() throws Exception { + Configuration clientConf = new Configuration(); + rpcClient = RpcClient.create(clientConf, TestingClientMetricGroup.newInstance(), false); + } + + @AfterEach + void tearDown() throws Exception { + if (coordinatorServer1 != null) { + coordinatorServer1.close(); + } + if (coordinatorServer2 != null) { + coordinatorServer2.close(); + } + if (rpcClient != null) { + rpcClient.close(); + } + } + + @Test + void testLeaderAndStandbyRpcBehavior() throws Exception { + coordinatorServer1 = new CoordinatorServer(createConfiguration()); + coordinatorServer2 = new CoordinatorServer(createConfiguration()); + + List<CoordinatorServer> coordinatorServerList = + Arrays.asList(coordinatorServer1, coordinatorServer2); + + // start 2 coordinator servers + for (CoordinatorServer server : coordinatorServerList) { + server.start(); + } + + // wait until one coordinator becomes leader + waitUntilCoordinatorServerElected(); + + CoordinatorAddress leaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); + + // find the leader and standby servers + CoordinatorServer leaderServer = null; + CoordinatorServer standbyServer = null; + for (CoordinatorServer coordinatorServer : coordinatorServerList) { + if (Objects.equals(coordinatorServer.getServerId(), leaderAddress.getId())) { + leaderServer = coordinatorServer; + } else { + standbyServer = coordinatorServer; + } + } + + assertThat(leaderServer).isNotNull(); + assertThat(standbyServer).isNotNull(); + + // create gateways for both servers + CoordinatorGateway leaderGateway = createGatewayForServer(leaderServer); + CoordinatorGateway standbyGateway = createGatewayForServer(standbyServer); + + // test: leader should process request normally (no exception means success) + String testDbName = "test_ha_db_" + System.currentTimeMillis(); + CreateDatabaseRequest createDbRequest = + new CreateDatabaseRequest().setDatabaseName(testDbName).setIgnoreIfExists(false); + + // leader should process request successfully without throwing exception + leaderGateway.createDatabase(createDbRequest).get(); + + // test: standby should throw NotCoordinatorLeaderException + String testDbName2 = "test_ha_db2_" + System.currentTimeMillis(); + CreateDatabaseRequest createDbRequest2 = + new CreateDatabaseRequest().setDatabaseName(testDbName2).setIgnoreIfExists(false); + + assertThatThrownBy(() -> standbyGateway.createDatabase(createDbRequest2).get()) + .isInstanceOf(ExecutionException.class) + .hasCauseInstanceOf(NotCoordinatorLeaderException.class) + .hasMessageContaining("not the current leader"); + } + + @Test + void testLeaderLosesLeadershipAndReElected() throws Exception { + coordinatorServer1 = new CoordinatorServer(createConfiguration()); + coordinatorServer2 = new CoordinatorServer(createConfiguration()); + + coordinatorServer1.start(); + coordinatorServer2.start(); + + // Step 1: Initial leader election + waitUntilCoordinatorServerElected(); + CoordinatorAddress firstLeaderAddr = zookeeperClient.getCoordinatorLeaderAddress().get(); + + CoordinatorServer leader = findServerById(firstLeaderAddr.getId()); + CoordinatorServer standby = findServerByNotId(firstLeaderAddr.getId()); + assertThat(leader).isNotNull(); + assertThat(standby).isNotNull(); + + verifyServerIsLeader(leader, "test_reentrant_db_1"); + verifyServerIsStandby(standby); + + // Step 2: Kill leader's ZK session → standby becomes the new leader + killZkSession(leader); + waitUntilNewLeaderElected(leader.getServerId()); + assertThat(zookeeperClient.getCoordinatorLeaderAddress().get().getId()) + .as("After killing leader, standby should become leader") + .isEqualTo(standby.getServerId()); + + verifyServerIsLeader(standby, "test_reentrant_db_2"); + verifyServerIsStandby(leader); + + // Step 3: Wait for the killed leader to fully reconnect and re-join election, + // then kill the current leader → the original leader should become leader again. + // This proves LeaderLatch election is re-entrant. + waitUntilServerRegistered(leader); + killZkSession(standby); + waitUntilNewLeaderElected(standby.getServerId()); + assertThat(zookeeperClient.getCoordinatorLeaderAddress().get().getId()) + .as("Original leader should be re-elected, proving re-entrant election") + .isEqualTo(leader.getServerId()); + + verifyServerIsLeader(leader, "test_reentrant_db_3"); + createGatewayForServer(leader).metadata(new MetadataRequest()).get(); + } + + @Test + void testMultipleLeadershipLossAndRecovery() throws Exception { + coordinatorServer1 = new CoordinatorServer(createConfiguration()); + coordinatorServer2 = new CoordinatorServer(createConfiguration()); + + coordinatorServer1.start(); + coordinatorServer2.start(); + + // Perform multiple leadership loss and recovery cycles. + // Each iteration kills the current leader and waits for the other server to take over. + for (int i = 0; i < 3; i++) { + waitUntilCoordinatorServerElected(); + CoordinatorAddress currentLeaderAddress = + zookeeperClient.getCoordinatorLeaderAddress().get(); + + CoordinatorServer currentLeader = findServerById(currentLeaderAddress.getId()); + CoordinatorServer otherServer = findServerByNotId(currentLeaderAddress.getId()); + assertThat(currentLeader).isNotNull(); + + verifyServerIsLeader(currentLeader, "test_cycle_db_" + i); + + // Ensure the other server has reconnected and re-joined election + // (it may have been killed in a previous iteration) + waitUntilServerRegistered(otherServer); + + killZkSession(currentLeader); + waitUntilNewLeaderElected(currentLeaderAddress.getId()); + } + + // Final verification: ensure cluster is still functional + CoordinatorAddress finalLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); + CoordinatorServer finalLeader = findServerById(finalLeaderAddress.getId()); + assertThat(finalLeader).isNotNull(); + createGatewayForServer(finalLeader).metadata(new MetadataRequest()).get(); + } + + private CoordinatorGateway createGatewayForServer(CoordinatorServer server) { + List<Endpoint> endpoints = server.getRpcServer().getBindEndpoints(); + Endpoint endpoint = endpoints.get(0); + // Use server.hashCode() as id since serverId is a UUID string + ServerNode serverNode = + new ServerNode( + server.hashCode(), + endpoint.getHost(), + endpoint.getPort(), + ServerType.COORDINATOR); + + return GatewayClientProxy.createGatewayProxy( + () -> serverNode, rpcClient, CoordinatorGateway.class); + } + + private static Configuration createConfiguration() { + Configuration configuration = new Configuration(); + configuration.setString( + ConfigOptions.ZOOKEEPER_ADDRESS, + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString()); + configuration.setString( + ConfigOptions.BIND_LISTENERS, "CLIENT://localhost:0,FLUSS://localhost:0"); + configuration.setString(ConfigOptions.ADVERTISED_LISTENERS, "CLIENT://198.168.0.1:100"); + configuration.set(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); + // Use shorter timeout for faster test execution, but not too short to avoid false session + // timeouts + // 5 seconds is enough for testing while avoiding spurious session expirations + configuration.set(ConfigOptions.ZOOKEEPER_SESSION_TIMEOUT, Duration.ofSeconds(5)); + configuration.set(ConfigOptions.ZOOKEEPER_CONNECTION_TIMEOUT, Duration.ofSeconds(5)); + configuration.set(ConfigOptions.ZOOKEEPER_RETRY_WAIT, Duration.ofMillis(500)); + return configuration; + } + + private void waitUntilCoordinatorServerElected() { + waitUntil( + () -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(), + Duration.ofMinutes(1), + "Fail to wait coordinator server elected"); + } + + private CoordinatorServer findServerById(String serverId) { + if (coordinatorServer1 != null && coordinatorServer1.getServerId().equals(serverId)) { + return coordinatorServer1; + } + if (coordinatorServer2 != null && coordinatorServer2.getServerId().equals(serverId)) { + return coordinatorServer2; + } + return null; + } + + private CoordinatorServer findServerByNotId(String serverId) { + if (coordinatorServer1 != null && !coordinatorServer1.getServerId().equals(serverId)) { + return coordinatorServer1; + } + if (coordinatorServer2 != null && !coordinatorServer2.getServerId().equals(serverId)) { + return coordinatorServer2; + } + return null; + } + + private static Throwable getRootCause(Throwable t) { + if (t instanceof ExecutionException || t instanceof CompletionException) { + return t.getCause() != null ? t.getCause() : t; + } + return t; + } + + /** + * Kills the ZK session of a CoordinatorServer to simulate real session timeout. + * + * <p>This creates a duplicate ZK connection with the same session ID and immediately closes it, + * which forces the ZK server to expire the original session. This causes: + * + * <ol> + * <li>All ephemeral nodes for that session to be deleted (LeaderLatch node, leader address) + * <li>The original client receives SESSION_EXPIRED event + * <li>Curator fires ConnectionState.LOST + * <li>LeaderLatch calls notLeader() callback + * <li>Curator reconnects with a new session and re-participates in election + * </ol> + */ + private void killZkSession(CoordinatorServer server) throws Exception { + CuratorFramework curatorClient = server.getZooKeeperClient().getCuratorClient(); + ZooKeeper zk = curatorClient.getZookeeperClient().getZooKeeper(); + long sessionId = zk.getSessionId(); + byte[] sessionPasswd = zk.getSessionPasswd(); + String connectString = ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString(); + + // We must wait for the duplicate connection to be fully established before + // closing it. Otherwise, close() may happen before the TCP connection reaches + // the ZK server, meaning the server never sees the duplicate session and the + // original session stays alive (making the test flaky). + CountDownLatch connectedLatch = new CountDownLatch(1); + ZooKeeper dupZk = + new ZooKeeper( + connectString, + 1000, + event -> { + if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { + connectedLatch.countDown(); + } + }, + sessionId, + sessionPasswd); + if (!connectedLatch.await(10, TimeUnit.SECONDS)) { + dupZk.close(); + throw new RuntimeException( + "Failed to establish duplicate ZK connection for session kill"); + } + dupZk.close(); + } + + /** Waits until a new coordinator leader is elected that is different from the given old one. */ + private void waitUntilNewLeaderElected(String oldLeaderId) { + waitUntil( + () -> { + try { + return zookeeperClient + .getCoordinatorLeaderAddress() + .map(addr -> !addr.getId().equals(oldLeaderId)) + .orElse(false); + } catch (Exception e) { + return false; + } + }, + Duration.ofMinutes(1), + "Fail to wait for new coordinator leader to be elected"); + } + + /** Verifies that the given server can process requests as a leader. */ + private void verifyServerIsLeader(CoordinatorServer server, String dbName) throws Exception { + createGatewayForServer(server) + .createDatabase( + new CreateDatabaseRequest().setDatabaseName(dbName).setIgnoreIfExists(true)) + .get(); + } + + /** Verifies that the given server rejects requests as a standby (non-leader). */ + private void verifyServerIsStandby(CoordinatorServer server) { + assertThatThrownBy( + () -> + createGatewayForServer(server) + .createDatabase( + new CreateDatabaseRequest() + .setDatabaseName( + "standby_check_" + + System.nanoTime()) + .setIgnoreIfExists(false)) + .get()) + .satisfies( + t -> + assertThat(getRootCause(t)) + .isInstanceOf(NotCoordinatorLeaderException.class)); + } + + /** + * Waits until a server is registered in ZK (its ephemeral node at /coordinators/ids/[serverId] + * exists). This ensures the server has reconnected after a session loss and re-joined the + * election, so it can become leader again when needed. + */ + private void waitUntilServerRegistered(CoordinatorServer server) { + String path = "/coordinators/ids/" + server.getServerId(); + waitUntil( + () -> { + try { + return zookeeperClient.getCuratorClient().checkExists().forPath(path) + != null; + } catch (Exception e) { + return false; + } + }, + Duration.ofSeconds(30), + "Server " + server.getServerId() + " did not re-register in ZK"); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java new file mode 100644 index 000000000..ad89f57ad --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.data.CoordinatorAddress; +import org.apache.fluss.testutils.common.AllCallbackWrapper; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; +import static org.assertj.core.api.Assertions.assertThat; + +class CoordinatorServerElectionTest { + @RegisterExtension + public static final AllCallbackWrapper<ZooKeeperExtension> ZOO_KEEPER_EXTENSION_WRAPPER = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + protected static ZooKeeperClient zookeeperClient; + + @BeforeAll + static void baseBeforeAll() { + zookeeperClient = + ZOO_KEEPER_EXTENSION_WRAPPER + .getCustomExtension() + .getZooKeeperClient(NOPErrorHandler.INSTANCE); + } + + @Test + void testCoordinatorServerElection() throws Exception { + CoordinatorServer coordinatorServer1 = new CoordinatorServer(createConfiguration()); + CoordinatorServer coordinatorServer2 = new CoordinatorServer(createConfiguration()); + CoordinatorServer coordinatorServer3 = new CoordinatorServer(createConfiguration()); + + List<CoordinatorServer> coordinatorServerList = + Arrays.asList(coordinatorServer1, coordinatorServer2, coordinatorServer3); + + // start 3 coordinator servers + for (int i = 0; i < 3; i++) { + CoordinatorServer server = coordinatorServerList.get(i); + server.start(); + } + + // random coordinator become leader + waitUntilCoordinatorServerElected(); + + CoordinatorAddress firstLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); + + // Find the leader and try to restart it. + CoordinatorServer firstLeader = null; + for (CoordinatorServer coordinatorServer : coordinatorServerList) { + if (Objects.equals(coordinatorServer.getServerId(), firstLeaderAddress.getId())) { + firstLeader = coordinatorServer; + break; + } + } + assertThat(firstLeader).isNotNull(); + firstLeader.close(); + firstLeader.start(); + + // Then we should get another Coordinator server leader elected + waitUntilCoordinatorServerReelected(firstLeaderAddress); + CoordinatorAddress secondLeaderAddress = + zookeeperClient.getCoordinatorLeaderAddress().get(); + assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress); + + CoordinatorServer secondLeader = null; + for (CoordinatorServer coordinatorServer : coordinatorServerList) { + if (Objects.equals(coordinatorServer.getServerId(), secondLeaderAddress.getId())) { + secondLeader = coordinatorServer; + break; + } + } + CoordinatorServer nonLeader = null; + for (CoordinatorServer coordinatorServer : coordinatorServerList) { + if (!Objects.equals(coordinatorServer.getServerId(), firstLeaderAddress.getId()) + && !Objects.equals( + coordinatorServer.getServerId(), secondLeaderAddress.getId())) { + nonLeader = coordinatorServer; + break; + } + } + // kill other 2 coordinator servers except the first one + nonLeader.close(); + secondLeader.close(); + + // the origin coordinator server should become leader again + waitUntilCoordinatorServerReelected(secondLeaderAddress); + CoordinatorAddress thirdLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); + + assertThat(thirdLeaderAddress.getId()).isEqualTo(firstLeaderAddress.getId()); + } + + /** Create a configuration with Zookeeper address setting. */ + protected static Configuration createConfiguration() { + Configuration configuration = new Configuration(); + configuration.setString( + ConfigOptions.ZOOKEEPER_ADDRESS, + ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().getConnectString()); + configuration.setString( + ConfigOptions.BIND_LISTENERS, "CLIENT://localhost:0,FLUSS://localhost:0"); + configuration.setString(ConfigOptions.ADVERTISED_LISTENERS, "CLIENT://198.168.0.1:100"); + configuration.set(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); + + return configuration; + } + + public void waitUntilCoordinatorServerElected() { + waitUntil( + () -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(), + Duration.ofMinutes(1), + "Fail to wait coordinator server elected"); + } + + public void waitUntilCoordinatorServerReelected(CoordinatorAddress originAddress) { + waitUntil( + () -> + zookeeperClient.getCoordinatorLeaderAddress().isPresent() + && !zookeeperClient + .getCoordinatorLeaderAddress() + .get() + .equals(originAddress), + Duration.ofMinutes(1), + "Fail to wait coordinator server reelected"); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java index bd060d6ee..ae39bd185 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java @@ -63,6 +63,7 @@ class CoordinatorServerITCase extends ServerITCaseBase { ConfigOptions.BIND_LISTENERS, String.format("%s://%s:%d", DEFAULT_LISTENER_NAME, HOSTNAME, getPort())); conf.set(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); + return conf; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java index aee7f0d47..ecdc4e3dc 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerTest.java @@ -26,8 +26,10 @@ import org.apache.fluss.server.zk.data.CoordinatorAddress; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import java.time.Duration; import java.util.Optional; +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link CoordinatorServer} . */ @@ -38,6 +40,7 @@ class CoordinatorServerTest extends ServerTestBase { @BeforeEach void beforeEach() throws Exception { coordinatorServer = startCoordinatorServer(createConfiguration()); + waitUntilCoordinatorServerElected(); } @AfterEach @@ -55,7 +58,11 @@ class CoordinatorServerTest extends ServerTestBase { @Override protected ServerBase getStartFailServer() { Configuration configuration = createConfiguration(); - configuration.set(ConfigOptions.BIND_LISTENERS, "CLIENT://localhost:-12"); + // CoordinatorServer starts leader services asynchronously in a separate election + // thread. An invalid port wouldn't cause start() to throw because the port binding + // happens asynchronously. Instead, use an empty ZK address to cause a synchronous + // failure in startZookeeperClient() during start(). + configuration.setString(ConfigOptions.ZOOKEEPER_ADDRESS, ""); return new CoordinatorServer(configuration); } @@ -63,10 +70,18 @@ class CoordinatorServerTest extends ServerTestBase { protected void checkAfterStartServer() throws Exception { assertThat(coordinatorServer.getRpcServer()).isNotNull(); // check the data put in zk after coordinator server start - Optional<CoordinatorAddress> optCoordinatorAddr = zookeeperClient.getCoordinatorAddress(); + Optional<CoordinatorAddress> optCoordinatorAddr = + zookeeperClient.getCoordinatorLeaderAddress(); assertThat(optCoordinatorAddr).isNotEmpty(); verifyEndpoint( optCoordinatorAddr.get().getEndpoints(), coordinatorServer.getRpcServer().getBindEndpoints()); } + + public void waitUntilCoordinatorServerElected() { + waitUntil( + () -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(), + Duration.ofSeconds(5), + "Fail to wait coordinator server elected"); + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcherTest.java similarity index 64% copy from fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java copy to fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcherTest.java index d7739028d..a9f181915 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/CoordinatorChangeWatcherTest.java @@ -17,17 +17,14 @@ package org.apache.fluss.server.coordinator.event.watcher; -import org.apache.fluss.cluster.Endpoint; -import org.apache.fluss.cluster.ServerType; import org.apache.fluss.server.coordinator.event.CoordinatorEvent; -import org.apache.fluss.server.coordinator.event.DeadTabletServerEvent; -import org.apache.fluss.server.coordinator.event.NewTabletServerEvent; +import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent; +import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent; import org.apache.fluss.server.coordinator.event.TestingEventManager; -import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; -import org.apache.fluss.server.zk.data.TabletServerRegistration; +import org.apache.fluss.server.zk.data.CoordinatorAddress; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.junit.jupiter.api.Test; @@ -35,46 +32,36 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link TabletServerChangeWatcher} . */ -class TabletServerChangeWatcherTest { +/** Test for {@link CoordinatorChangeWatcher} . */ +class CoordinatorChangeWatcherTest { @RegisterExtension public static final AllCallbackWrapper<ZooKeeperExtension> ZOO_KEEPER_EXTENSION_WRAPPER = new AllCallbackWrapper<>(new ZooKeeperExtension()); @Test - void testServetChanges() throws Exception { + void testServerChanges() throws Exception { ZooKeeperClient zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() .getZooKeeperClient(NOPErrorHandler.INSTANCE); TestingEventManager eventManager = new TestingEventManager(); - TabletServerChangeWatcher tabletServerChangeWatcher = - new TabletServerChangeWatcher(zookeeperClient, eventManager); - tabletServerChangeWatcher.start(); + CoordinatorChangeWatcher coordinatorChangeWatcher = + new CoordinatorChangeWatcher(zookeeperClient, eventManager); + coordinatorChangeWatcher.start(); // register new servers List<CoordinatorEvent> expectedEvents = new ArrayList<>(); for (int i = 0; i < 10; i++) { - TabletServerRegistration tabletServerRegistration = - new TabletServerRegistration( - "rack" + i, - Collections.singletonList(new Endpoint("host" + i, 1234, "CLIENT")), - System.currentTimeMillis()); - expectedEvents.add( - new NewTabletServerEvent( - new ServerInfo( - i, - tabletServerRegistration.getRack(), - tabletServerRegistration.getEndpoints(), - ServerType.TABLET_SERVER))); - zookeeperClient.registerTabletServer(i, tabletServerRegistration); + expectedEvents.add(new NewCoordinatorEvent(String.valueOf(i))); + CoordinatorAddress coordinatorAddress = + new CoordinatorAddress(String.valueOf(i), new ArrayList<>()); + zookeeperClient.registerCoordinatorServer(coordinatorAddress); } retry( @@ -88,7 +75,7 @@ class TabletServerChangeWatcherTest { // unregister servers for (int i = 0; i < 10; i++) { - expectedEvents.add(new DeadTabletServerEvent(i)); + expectedEvents.add(new DeadCoordinatorEvent(String.valueOf(i))); } retry( @@ -97,6 +84,6 @@ class TabletServerChangeWatcherTest { assertThat(eventManager.getEvents()) .containsExactlyInAnyOrderElementsOf(expectedEvents)); - tabletServerChangeWatcher.stop(); + coordinatorChangeWatcher.stop(); } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java index d7739028d..6da8dfacd 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TabletServerChangeWatcherTest.java @@ -49,7 +49,7 @@ class TabletServerChangeWatcherTest { new AllCallbackWrapper<>(new ZooKeeperExtension()); @Test - void testServetChanges() throws Exception { + void testServerChanges() throws Exception { ZooKeeperClient zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index baabd58cb..8ce9f6a7c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -266,6 +266,7 @@ public final class FlussClusterExtension setRemoteDataDir(conf); coordinatorServer = new CoordinatorServer(conf, clock); coordinatorServer.start(); + waitUntilCoordinatorServerElected(); coordinatorServerInfo = // TODO, Currently, we use 0 as coordinator server id. new ServerInfo( @@ -276,6 +277,7 @@ public final class FlussClusterExtension } else { // start the existing coordinator server coordinatorServer.start(); + waitUntilCoordinatorServerElected(); coordinatorServerInfo = new ServerInfo( 0, @@ -941,6 +943,15 @@ public final class FlussClusterExtension return coordinatorServer; } + public void waitUntilCoordinatorServerElected() throws Exception { + coordinatorServer.getLeaderElectionFuture().get(); + + waitUntil( + () -> zooKeeperClient.getCoordinatorLeaderAddress().isPresent(), + Duration.ofSeconds(10), + "Fail to wait coordinator server elected"); + } + // -------------------------------------------------------------------------------------------- /** Builder for {@link FlussClusterExtension}. */ diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index ac8bf705a..c867b8dce 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -109,14 +109,14 @@ class ZooKeeperClientTest { void testCoordinatorLeader() throws Exception { // try to get leader address, should return empty since node leader address stored in // zk - assertThat(zookeeperClient.getCoordinatorAddress()).isEmpty(); + assertThat(zookeeperClient.getCoordinatorLeaderAddress()).isEmpty(); CoordinatorAddress coordinatorAddress = new CoordinatorAddress( "2", Endpoint.fromListenersString("CLIENT://localhost1:10012")); // register leader address zookeeperClient.registerCoordinatorLeader(coordinatorAddress); // check get leader address - CoordinatorAddress gottenAddress = zookeeperClient.getCoordinatorAddress().get(); + CoordinatorAddress gottenAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); assertThat(gottenAddress).isEqualTo(coordinatorAddress); } diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index 97240938b..5e9e668be 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -294,10 +294,15 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM </thead> <tbody> <tr> - <th rowspan="24"><strong>coordinator</strong></th> - <td style={{textAlign: 'center', verticalAlign: 'middle' }} rowspan="9">-</td> + <th rowspan="25"><strong>coordinator</strong></th> + <td style={{textAlign: 'center', verticalAlign: 'middle' }} rowspan="10">-</td> <td>activeCoordinatorCount</td> - <td>The number of active CoordinatorServer in this cluster.</td> + <td>The number of active CoordinatorServer (only leader) in this cluster.</td> + <td>Gauge</td> + </tr> + <tr> + <td>aliveCoordinatorCount</td> + <td>The number of alive (including leader and standby) CoordinatorServer in this cluster.</td> <td>Gauge</td> </tr> <tr>
