This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch PHOENIX-7562-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this push: new 5ffd6490ed PHOENIX-7566 HAGroupState subscription feature and state management for ReplicationLogReader (#2274) 5ffd6490ed is described below commit 5ffd6490ede889ab65f8a2de5a8b27cdb968150e Author: ritegarg <58840065+riteg...@users.noreply.github.com> AuthorDate: Tue Sep 16 22:44:00 2025 -0700 PHOENIX-7566 HAGroupState subscription feature and state management for ReplicationLogReader (#2274) Co-authored-by: Ritesh Garg <ritesh.g...@riteshg-ltmd34g.internal.salesforce.com> --- .../java/org/apache/phoenix/jdbc/ClusterType.java | 35 ++ .../apache/phoenix/jdbc/HAGroupStateListener.java | 57 ++ .../apache/phoenix/jdbc/HAGroupStoreClient.java | 250 ++++++-- .../apache/phoenix/jdbc/HAGroupStoreManager.java | 211 +++++-- .../apache/phoenix/jdbc/HAGroupStoreRecord.java | 33 +- ...gionServerEndpointITWithConsistentFailover.java | 4 +- .../IndexRegionObserverMutationBlockingIT.java | 4 +- .../phoenix/jdbc/HAGroupStateSubscriptionIT.java | 644 +++++++++++++++++++++ .../apache/phoenix/jdbc/HAGroupStoreClientIT.java | 9 +- .../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 277 ++++++++- .../org/apache/phoenix/jdbc/PhoenixHAAdminIT.java | 6 +- .../apache/phoenix/util/HAGroupStoreTestUtil.java | 17 +- 12 files changed, 1423 insertions(+), 124 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java new file mode 100644 index 0000000000..e194c23e35 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterType.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +/** + * Enumeration representing the type of cluster in an HA group configuration. + * Used to distinguish between local and peer clusters when subscribing to + * HA group state change notifications. + */ +public enum ClusterType { + /** + * Represents the local cluster where the client is running. + */ + LOCAL, + + /** + * Represents the peer cluster in the HA group configuration. + */ + PEER +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java new file mode 100644 index 0000000000..634f50e0d0 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStateListener.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState; + +/** + * Interface for external clients who want to be notified of HA group state transitions. + * + * <p>Listeners can subscribe to be notified when:</p> + * <ul> + * <li>Specific state transitions occur (from one state to another)</li> + * <li>Any transition to a target state occurs (from any state to a specific state)</li> + * </ul> + * + * <p>Notifications are provided for both local and peer cluster state changes, + * distinguished by the {@link ClusterType} parameter.</p> + * + * @see HAGroupStoreManager#subscribeToTargetState + */ +public interface HAGroupStateListener { + + /** + * Called when an HA group state transition occurs. + * + * <p>Implementations should be fast and non-blocking to avoid impacting + * the HA group state management system. If heavy processing is required, + * consider delegating to a separate thread.</p> + * + * @param haGroupName the name of the HA group that transitioned + * @param toState the new state after the transition + * @param modifiedTime the time the state transition occurred + * @param clusterType whether this transition occurred on the local or peer cluster + * + * @throws Exception implementations may throw exceptions, but they will be + * logged and will not prevent other listeners from being notified + */ + void onStateChange(String haGroupName, + HAGroupState toState, + long modifiedTime, + ClusterType clusterType); +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java index 123dc28d5c..e4743bc7fd 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -25,10 +25,13 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -76,14 +79,14 @@ import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK; */ public class HAGroupStoreClient implements Closeable { - public static final String ZK_CONSISTENT_HA_NAMESPACE = - "phoenix" + ZKPaths.PATH_SEPARATOR + "consistentHA"; + public static final String ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE = "phoenix" + + ZKPaths.PATH_SEPARATOR + "consistentHA" + + ZKPaths.PATH_SEPARATOR + "groupState"; private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS = 30000L; // Multiplier for ZK session timeout to account for time it will take for HMaster to abort // the region server in case ZK connection is lost from the region server. - private static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1; - private static final String CACHE_TYPE_LOCAL = "LOCAL"; - private static final String CACHE_TYPE_PEER = "PEER"; + @VisibleForTesting + static final double ZK_SESSION_TIMEOUT_MULTIPLIER = 1.1; private PhoenixHAAdmin phoenixHaAdmin; private PhoenixHAAdmin peerPhoenixHaAdmin; private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStoreClient.class); @@ -108,6 +111,14 @@ public class HAGroupStoreClient implements Closeable { private final PathChildrenCacheListener peerCustomPathChildrenCacheListener; // Wait time for sync mode private final long waitTimeForSyncModeInMs; + // State tracking for transition detection + private volatile HAGroupState lastKnownLocalState; + private volatile HAGroupState lastKnownPeerState; + + // Subscription storage for HA group state change notifications per client instance + // Map key format: "clusterType:targetState" -> Set<Listeners> + private final ConcurrentHashMap<String, CopyOnWriteArraySet<HAGroupStateListener>> + targetStateSubscribers = new ConcurrentHashMap<>(); // Policy for the HA group private HighAvailabilityPolicy policy; private ClusterRole clusterRole; @@ -131,7 +142,7 @@ public class HAGroupStoreClient implements Closeable { * @return HAGroupStoreClient instance */ public static HAGroupStoreClient getInstanceForZkUrl(Configuration conf, String haGroupName, - String zkUrl) throws SQLException { + String zkUrl) { Preconditions.checkNotNull(haGroupName, "haGroupName cannot be null"); String localZkUrl = Objects.toString(zkUrl, getLocalZkUrl(conf)); Preconditions.checkNotNull(localZkUrl, "zkUrl cannot be null"); @@ -159,7 +170,7 @@ public class HAGroupStoreClient implements Closeable { /** * Get the list of HAGroupNames from system table. * We can also get the list of HAGroupNames from the system table by providing the zkUrl in - * where clause but we need to match the formatted zkUrl with the zkUrl in the system table so + * where clause, but we need to match the formatted zkUrl with the zkUrl in the system table so * that matching is done correctly. * * @param zkUrl for connecting to Table @@ -206,10 +217,11 @@ public class HAGroupStoreClient implements Closeable { // Initialize HAGroupStoreClient attributes initializeHAGroupStoreClientAttributes(haGroupName); // Initialize Phoenix HA Admin - this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl, conf, ZK_CONSISTENT_HA_NAMESPACE); + this.phoenixHaAdmin = new PhoenixHAAdmin(this.zkUrl, + conf, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); // Initialize local cache this.pathChildrenCache = initializePathChildrenCache(phoenixHaAdmin, - pathChildrenCacheListener, CACHE_TYPE_LOCAL); + pathChildrenCacheListener, ClusterType.LOCAL); // Initialize ZNode if not present in ZK initializeZNodeIfNeeded(); if (this.pathChildrenCache != null) { @@ -263,7 +275,7 @@ public class HAGroupStoreClient implements Closeable { if (!isHealthy) { throw new IOException("HAGroupStoreClient is not healthy"); } - return fetchCacheRecord(this.pathChildrenCache, CACHE_TYPE_LOCAL).getLeft(); + return fetchCacheRecord(this.pathChildrenCache, ClusterType.LOCAL).getLeft(); } /** @@ -283,7 +295,7 @@ public class HAGroupStoreClient implements Closeable { throw new IOException("HAGroupStoreClient is not healthy"); } Pair<HAGroupStoreRecord, Stat> cacheRecord = fetchCacheRecord( - this.pathChildrenCache, CACHE_TYPE_LOCAL); + this.pathChildrenCache, ClusterType.LOCAL); HAGroupStoreRecord currentHAGroupStoreRecord = cacheRecord.getLeft(); Stat currentHAGroupStoreRecordStat = cacheRecord.getRight(); if (currentHAGroupStoreRecord == null) { @@ -293,10 +305,29 @@ public class HAGroupStoreClient implements Closeable { } if (isUpdateNeeded(currentHAGroupStoreRecord.getHAGroupState(), currentHAGroupStoreRecordStat.getMtime(), haGroupState)) { + // We maintain last sync time as the last time cluster was in sync state. + // If state changes from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, record that time + // Once state changes back to ACTIVE_IN_SYNC or the role is + // NOT ACTIVE or ACTIVE_TO_STANDBY + // set the time to null to mark that we are current(or we don't have any reader). + // TODO: Verify that for reader this is the correct approach. + Long lastSyncTimeInMs = currentHAGroupStoreRecord + .getLastSyncStateTimeInMs(); + ClusterRole clusterRole = haGroupState.getClusterRole(); + if (currentHAGroupStoreRecord.getHAGroupState() + == HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC + && haGroupState == HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC) { + lastSyncTimeInMs = System.currentTimeMillis(); + } else if (haGroupState == HAGroupState.ACTIVE_IN_SYNC + || !(ClusterRole.ACTIVE.equals(clusterRole) + || ClusterRole.ACTIVE_TO_STANDBY.equals(clusterRole))) { + lastSyncTimeInMs = null; + } HAGroupStoreRecord newHAGroupStoreRecord = new HAGroupStoreRecord( currentHAGroupStoreRecord.getProtocolVersion(), currentHAGroupStoreRecord.getHaGroupName(), - haGroupState + haGroupState, + lastSyncTimeInMs ); // TODO: Check if cluster role is changing, if so, we need to update // the system table first @@ -339,11 +370,11 @@ public class HAGroupStoreClient implements Closeable { * @return HAGroupStoreRecord for the specified HA group name, or null if not found * @throws IOException if the client is not healthy */ - private HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws IOException { + public HAGroupStoreRecord getHAGroupStoreRecordFromPeer() throws IOException { if (!isHealthy) { throw new IOException("HAGroupStoreClient is not healthy"); } - return fetchCacheRecord(this.peerPathChildrenCache, CACHE_TYPE_PEER).getLeft(); + return fetchCacheRecord(this.peerPathChildrenCache, ClusterType.PEER).getLeft(); } private void initializeZNodeIfNeeded() throws IOException, @@ -353,13 +384,21 @@ public class HAGroupStoreClient implements Closeable { Pair<HAGroupStoreRecord, Stat> cacheRecordFromZK = phoenixHaAdmin.getHAGroupStoreRecordInZooKeeper(this.haGroupName); HAGroupStoreRecord haGroupStoreRecord = cacheRecordFromZK.getLeft(); + HAGroupState defaultHAGroupState = this.clusterRole.getDefaultHAGroupState(); + // Initialize lastSyncTimeInMs only if we start in ACTIVE_NOT_IN_SYNC state + // and ZNode is not already present + Long lastSyncTimeInMs = defaultHAGroupState.equals(HAGroupState.ACTIVE_NOT_IN_SYNC) + ? System.currentTimeMillis() + : null; HAGroupStoreRecord newHAGroupStoreRecord = new HAGroupStoreRecord( HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, - this.clusterRole.getDefaultHAGroupState() + this.clusterRole.getDefaultHAGroupState(), + lastSyncTimeInMs ); // Only update current ZNode if it doesn't have the same role as present in System Table. // If not exists, then create ZNode + // TODO: Discuss if this approach is what reader needs. if (haGroupStoreRecord != null && !haGroupStoreRecord.getClusterRole().equals(this.clusterRole)) { phoenixHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, @@ -431,16 +470,17 @@ public class HAGroupStoreClient implements Closeable { try { // Setup peer connection if needed (first time or ZK Url changed) if (peerPathChildrenCache == null - || (peerPhoenixHaAdmin != null && - !StringUtils.equals(this.peerZKUrl, peerPhoenixHaAdmin.getZkUrl()))) { + || peerPhoenixHaAdmin != null + && !StringUtils.equals(this.peerZKUrl, peerPhoenixHaAdmin.getZkUrl())) { // Clean up existing peer connection if it exists closePeerConnection(); // Setup new peer connection this.peerPhoenixHaAdmin - = new PhoenixHAAdmin(this.peerZKUrl, conf, ZK_CONSISTENT_HA_NAMESPACE); + = new PhoenixHAAdmin(this.peerZKUrl, conf, + ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); // Create new PeerPathChildrenCache this.peerPathChildrenCache = initializePathChildrenCache(peerPhoenixHaAdmin, - this.peerCustomPathChildrenCacheListener, CACHE_TYPE_PEER); + this.peerCustomPathChildrenCacheListener, ClusterType.PEER); } } catch (Exception e) { closePeerConnection(); @@ -459,7 +499,8 @@ public class HAGroupStoreClient implements Closeable { } private PathChildrenCache initializePathChildrenCache(PhoenixHAAdmin admin, - PathChildrenCacheListener customListener, String cacheType) { + PathChildrenCacheListener customListener, + ClusterType cacheType) { LOGGER.info("Initializing {} PathChildrenCache with URL {}", cacheType, admin.getZkUrl()); PathChildrenCache newPathChildrenCache = null; try { @@ -487,30 +528,38 @@ public class HAGroupStoreClient implements Closeable { } } - private PathChildrenCacheListener createCacheListener(CountDownLatch latch, String cacheType) { + private PathChildrenCacheListener createCacheListener(CountDownLatch latch, + ClusterType cacheType) { return (client, event) -> { final ChildData childData = event.getData(); - HAGroupStoreRecord eventRecord = extractHAGroupStoreRecordOrNull(childData); + Pair<HAGroupStoreRecord, Stat> eventRecordAndStat + = extractHAGroupStoreRecordOrNull(childData); + HAGroupStoreRecord eventRecord = eventRecordAndStat.getLeft(); + Stat eventStat = eventRecordAndStat.getRight(); LOGGER.info("HAGroupStoreClient Cache {} received event {} type {} at {}", cacheType, eventRecord, event.getType(), System.currentTimeMillis()); switch (event.getType()) { - // TODO: Add support for event watcher for HAGroupStoreRecord - // case CHILD_ADDED: - // case CHILD_UPDATED: - // case CHILD_REMOVED: + case CHILD_ADDED: + case CHILD_UPDATED: + if (eventRecord != null) { + handleStateChange(eventRecord, eventStat, cacheType); + } + break; + case CHILD_REMOVED: + break; case INITIALIZED: latch.countDown(); break; case CONNECTION_LOST: case CONNECTION_SUSPENDED: - if (CACHE_TYPE_LOCAL.equals(cacheType)) { + if (ClusterType.LOCAL.equals(cacheType)) { isHealthy = false; } LOGGER.warn("{} HAGroupStoreClient cache connection lost/suspended", cacheType); break; case CONNECTION_RECONNECTED: - if (CACHE_TYPE_LOCAL.equals(cacheType)) { + if (ClusterType.LOCAL.equals(cacheType)) { isHealthy = true; } LOGGER.info("{} HAGroupStoreClient cache connection reconnected", cacheType); @@ -524,7 +573,7 @@ public class HAGroupStoreClient implements Closeable { private Pair<HAGroupStoreRecord, Stat> fetchCacheRecord(PathChildrenCache cache, - String cacheType) { + ClusterType cacheType) { if (cache == null) { LOGGER.warn("{} HAGroupStoreClient cache is null, returning null", cacheType); return Pair.of(null, null); @@ -537,7 +586,7 @@ public class HAGroupStoreClient implements Closeable { return result; } - if (cacheType.equals(CACHE_TYPE_PEER)) { + if (cacheType.equals(ClusterType.PEER)) { return Pair.of(null, null); } // If no record found, try to rebuild and fetch again @@ -555,23 +604,25 @@ public class HAGroupStoreClient implements Closeable { } private Pair<HAGroupStoreRecord, Stat> extractRecordAndStat(PathChildrenCache cache, - String targetPath, String cacheType) { + String targetPath, + ClusterType cacheType) { ChildData childData = cache.getCurrentData(targetPath); if (childData != null) { - HAGroupStoreRecord record = extractHAGroupStoreRecordOrNull(childData); - Stat currentStat = childData.getStat(); - LOGGER.info("Built {} cluster record: {}", cacheType, record); - return Pair.of(record, currentStat); + Pair<HAGroupStoreRecord, Stat> recordAndStat + = extractHAGroupStoreRecordOrNull(childData); + LOGGER.info("Built {} cluster record: {}", cacheType, recordAndStat.getLeft()); + return recordAndStat; } return Pair.of(null, null); } - private HAGroupStoreRecord extractHAGroupStoreRecordOrNull(final ChildData childData) { + private Pair<HAGroupStoreRecord, Stat> extractHAGroupStoreRecordOrNull( + final ChildData childData) { if (childData != null) { byte[] data = childData.getData(); - return HAGroupStoreRecord.fromJson(data).orElse(null); + return Pair.of(HAGroupStoreRecord.fromJson(data).orElse(null), childData.getStat()); } - return null; + return Pair.of(null, null); } @@ -638,4 +689,127 @@ public class HAGroupStoreClient implements Closeable { return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime) > waitTime); } + // ========== HA Group State Change Subscription Methods ========== + + /** + * Subscribe to be notified when any transition to a target state occurs. + * + * @param targetState the target state to watch for + * @param clusterType whether to monitor local or peer cluster + * @param listener the listener to notify when any transition to the target state occurs + */ + public void subscribeToTargetState(HAGroupState targetState, + ClusterType clusterType, HAGroupStateListener listener) { + String key = buildTargetStateKey(clusterType, targetState); + targetStateSubscribers.computeIfAbsent(key, k -> new CopyOnWriteArraySet<>()).add(listener); + LOGGER.info("Subscribed listener to target state {} for HA group {} on {} cluster", + targetState, haGroupName, clusterType); + } + + /** + * Unsubscribe from target state notifications. + * + * @param targetState the target state + * @param clusterType whether monitoring local or peer cluster + * @param listener the listener to remove + */ + public void unsubscribeFromTargetState(HAGroupState targetState, + ClusterType clusterType, HAGroupStateListener listener) { + String key = buildTargetStateKey(clusterType, targetState); + CopyOnWriteArraySet<HAGroupStateListener> listeners = targetStateSubscribers.get(key); + if (listeners != null && listeners.remove(listener)) { + if (listeners.isEmpty()) { + targetStateSubscribers.remove(key); + } + LOGGER.info("Unsubscribed listener from target state {} for HA group {} on {} cluster", + targetState, haGroupName, clusterType); + } + } + + /** + * Handle state change detection and notify subscribers if a transition occurred. + * + * @param newRecord the new HA group store record + * @param cacheType the type of cache (LOCAL or PEER) + */ + private void handleStateChange(HAGroupStoreRecord newRecord, + Stat newStat, ClusterType cacheType) { + HAGroupState newState = newRecord.getHAGroupState(); + HAGroupState oldState; + ClusterType clusterType; + + if (ClusterType.LOCAL.equals(cacheType)) { + oldState = lastKnownLocalState; + lastKnownLocalState = newState; + clusterType = ClusterType.LOCAL; + } else { + oldState = lastKnownPeerState; + lastKnownPeerState = newState; + clusterType = ClusterType.PEER; + } + + // Only notify if there's an actual state transition or initial state + if (oldState == null || !oldState.equals(newState)) { + LOGGER.info("Detected state transition for HA group {} from {} to {} on {} cluster", + haGroupName, oldState, newState, clusterType); + notifySubscribers(oldState, newState, newStat.getMtime(), clusterType); + } + } + + /** + * Notify all relevant subscribers of a state transition. + * + * @param fromState the state transitioned from + * @param toState the state transitioned to + * @param clusterType the cluster type where the transition occurred + */ + private void notifySubscribers(HAGroupState fromState, + HAGroupState toState, + long modifiedTime, + ClusterType clusterType) { + LOGGER.debug("Notifying subscribers of state transition " + + "for HA group {} from {} to {} on {} cluster", + haGroupName, fromState, toState, clusterType); + String targetStateKey = buildTargetStateKey(clusterType, toState); + + // Collect all listeners that need to be notified + Set<HAGroupStateListener> listenersToNotify = new HashSet<>(); + + // Find target state subscribers + CopyOnWriteArraySet<HAGroupStateListener> targetListeners + = targetStateSubscribers.get(targetStateKey); + if (targetListeners != null) { + listenersToNotify.addAll(targetListeners); + } + + // Notify all listeners with error isolation + if (!listenersToNotify.isEmpty()) { + LOGGER.info("Notifying {} listeners of state transition" + + "for HA group {} from {} to {} on {} cluster", + listenersToNotify.size(), haGroupName, fromState, toState, clusterType); + + for (HAGroupStateListener listener : listenersToNotify) { + try { + listener.onStateChange(haGroupName, + toState, modifiedTime, clusterType); + } catch (Exception e) { + LOGGER.error("Error notifying listener of state transition " + + "for HA group {} from {} to {} on {} cluster", + haGroupName, fromState, toState, clusterType, e); + // Continue notifying other listeners + } + } + } + } + + // ========== Helper Methods ========== + + /** + * Build key for target state subscriptions. + */ + private String buildTargetStateKey(ClusterType clusterType, + HAGroupState targetState) { + return clusterType + ":" + targetState; + } + } \ No newline at end of file diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java index e04d1a2dac..7045a301f3 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java @@ -36,7 +36,8 @@ import static org.apache.phoenix.query.QueryServicesOptions /** * Implementation of HAGroupStoreManager that uses HAGroupStoreClient. - * Manages all HAGroupStoreClient instances. + * Manages all HAGroupStoreClient instances and provides passthrough + * functionality for HA group state change notifications. */ public class HAGroupStoreManager { private static volatile HAGroupStoreManager haGroupStoreManagerInstance; @@ -78,7 +79,6 @@ public class HAGroupStoreManager { return HAGroupStoreClient.getHAGroupNames(this.zkUrl); } - /** * Checks whether mutation is blocked or not for a specific HA group. * @@ -87,17 +87,15 @@ public class HAGroupStoreManager { * @return true if mutation is blocked, false otherwise. * @throws IOException when HAGroupStoreClient is not healthy. */ - public boolean isMutationBlocked(String haGroupName) throws IOException, SQLException { + public boolean isMutationBlocked(String haGroupName) throws IOException { if (mutationBlockEnabled) { - HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstanceForZkUrl(conf, - haGroupName, zkUrl); - if (haGroupStoreClient != null) { - return haGroupStoreClient.getHAGroupStoreRecord() != null - && haGroupStoreClient.getHAGroupStoreRecord().getClusterRole() != null - && haGroupStoreClient.getHAGroupStoreRecord().getClusterRole() - .isMutationBlocked(); - } - throw new IOException("HAGroupStoreClient is not initialized"); + HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); + HAGroupStoreRecord recordWithMetadata + = haGroupStoreClient.getHAGroupStoreRecord(); + return recordWithMetadata != null + && recordWithMetadata.getClusterRole() != null + && recordWithMetadata.getClusterRole() + .isMutationBlocked(); } return false; } @@ -138,13 +136,8 @@ public class HAGroupStoreManager { */ public void invalidateHAGroupStoreClient(final String haGroupName, boolean broadcastUpdate) throws Exception { - HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstanceForZkUrl(conf, - haGroupName, zkUrl); - if (haGroupStoreClient != null) { - haGroupStoreClient.rebuild(); - } else { - throw new IOException("HAGroupStoreClient is not initialized"); - } + HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); + haGroupStoreClient.rebuild(); } /** @@ -156,13 +149,23 @@ public class HAGroupStoreManager { * @throws IOException when HAGroupStoreClient is not healthy. */ public Optional<HAGroupStoreRecord> getHAGroupStoreRecord(final String haGroupName) - throws IOException, SQLException { - HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstanceForZkUrl(conf, - haGroupName, zkUrl); - if (haGroupStoreClient != null) { - return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord()); - } - throw new IOException("HAGroupStoreClient is not initialized"); + throws IOException { + HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); + return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecord()); + } + + /** + * Returns the HAGroupStoreRecord for a specific HA group from peer cluster. + * + * @param haGroupName name of the HA group + * @return Optional HAGroupStoreRecord for the HA group from peer cluster can be empty if + * the HA group is not found or peer cluster is not available. + * @throws IOException when HAGroupStoreClient is not healthy. + */ + public Optional<HAGroupStoreRecord> getPeerHAGroupStoreRecord(final String haGroupName) + throws IOException { + HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); + return Optional.ofNullable(haGroupStoreClient.getHAGroupStoreRecordFromPeer()); } /** @@ -173,15 +176,10 @@ public class HAGroupStoreManager { */ public void setHAGroupStatusToStoreAndForward(final String haGroupName) throws IOException, StaleHAGroupStoreRecordVersionException, - InvalidClusterRoleTransitionException, SQLException { - HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstanceForZkUrl(conf, - haGroupName, zkUrl); - if (haGroupStoreClient != null) { - haGroupStoreClient.setHAGroupStatusIfNeeded( - HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC); - } else { - throw new IOException("HAGroupStoreClient is not initialized"); - } + InvalidClusterRoleTransitionException { + HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); + haGroupStoreClient.setHAGroupStatusIfNeeded( + HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC); } /** @@ -190,19 +188,78 @@ public class HAGroupStoreManager { * @param haGroupName name of the HA group * @throws IOException when HAGroupStoreClient is not healthy. */ - public void setHAGroupStatusRecordToSync(final String haGroupName) + public void setHAGroupStatusToSync(final String haGroupName) throws IOException, StaleHAGroupStoreRecordVersionException, - InvalidClusterRoleTransitionException, SQLException { - HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstanceForZkUrl(conf, - haGroupName, zkUrl); - if (haGroupStoreClient != null) { - haGroupStoreClient.setHAGroupStatusIfNeeded( - HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC); - } else { - throw new IOException("HAGroupStoreClient is not initialized"); + InvalidClusterRoleTransitionException { + HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); + haGroupStoreClient.setHAGroupStatusIfNeeded( + HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC); + } + + /** + * Sets the HAGroupStoreRecord to degrade reader functionality in local cluster. + * Transitions from STANDBY to DEGRADED_STANDBY_FOR_READER or from + * DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY. + * + * @param haGroupName name of the HA group + * @throws IOException when HAGroupStoreClient is not healthy. + * @throws InvalidClusterRoleTransitionException when the current state + * cannot transition to a degraded reader state + */ + public void setReaderToDegraded(final String haGroupName) + throws IOException, StaleHAGroupStoreRecordVersionException, + InvalidClusterRoleTransitionException { + HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); + HAGroupStoreRecord currentRecord + = haGroupStoreClient.getHAGroupStoreRecord(); + + if (currentRecord == null) { + throw new IOException("Current HAGroupStoreRecord is null for HA group: " + + haGroupName); + } + + HAGroupStoreRecord.HAGroupState currentState = currentRecord.getHAGroupState(); + HAGroupStoreRecord.HAGroupState targetState + = HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER; + + if (currentState == HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER) { + targetState = HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY; } + + haGroupStoreClient.setHAGroupStatusIfNeeded(targetState); } + /** + * Sets the HAGroupStoreRecord to restore reader functionality in local cluster. + * Transitions from DEGRADED_STANDBY_FOR_READER to STANDBY or from + * DEGRADED_STANDBY to DEGRADED_STANDBY_FOR_WRITER. + * + * @param haGroupName name of the HA group + * @throws IOException when HAGroupStoreClient is not healthy. + * @throws InvalidClusterRoleTransitionException when the current state + * cannot transition to a healthy reader state + */ + public void setReaderToHealthy(final String haGroupName) + throws IOException, StaleHAGroupStoreRecordVersionException, + InvalidClusterRoleTransitionException { + HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); + HAGroupStoreRecord currentRecord + = haGroupStoreClient.getHAGroupStoreRecord(); + + if (currentRecord == null) { + throw new IOException("Current HAGroupStoreRecord is null " + + "for HA group: " + haGroupName); + } + + HAGroupStoreRecord.HAGroupState currentState = currentRecord.getHAGroupState(); + HAGroupStoreRecord.HAGroupState targetState = HAGroupStoreRecord.HAGroupState.STANDBY; + + if (currentState == HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY) { + targetState = HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER; + } + + haGroupStoreClient.setHAGroupStatusIfNeeded(targetState); + } /** * Returns the ClusterRoleRecord for the cluster pair. @@ -214,12 +271,70 @@ public class HAGroupStoreManager { * @throws IOException when HAGroupStoreClient is not healthy. */ public ClusterRoleRecord getClusterRoleRecord(String haGroupName) - throws IOException, SQLException { + throws IOException { + HAGroupStoreClient haGroupStoreClient = getHAGroupStoreClient(haGroupName); + return haGroupStoreClient.getClusterRoleRecord(); + } + + /** + * Subscribe to be notified when any transition to a target state occurs. + * + * @param haGroupName the name of the HA group to monitor + * @param targetState the target state to watch for + * @param clusterType whether to monitor local or peer cluster + * @param listener the listener to notify when any transition to the target state occurs + * @throws IOException if unable to get HAGroupStoreClient instance + */ + public void subscribeToTargetState(String haGroupName, + HAGroupStoreRecord.HAGroupState targetState, + ClusterType clusterType, + HAGroupStateListener listener) throws IOException { + HAGroupStoreClient client = getHAGroupStoreClient(haGroupName); + client.subscribeToTargetState(targetState, clusterType, listener); + LOGGER.debug("Delegated subscription to target state {} " + + "for HA group {} on {} cluster to client", + targetState, haGroupName, clusterType); + } + + /** + * Unsubscribe from target state notifications. + * + * @param haGroupName the name of the HA group + * @param targetState the target state + * @param clusterType whether monitoring local or peer cluster + * @param listener the listener to remove + */ + public void unsubscribeFromTargetState(String haGroupName, + HAGroupStoreRecord.HAGroupState targetState, + ClusterType clusterType, + HAGroupStateListener listener) { + try { + HAGroupStoreClient client = getHAGroupStoreClient(haGroupName); + client.unsubscribeFromTargetState(targetState, clusterType, listener); + LOGGER.debug("Delegated unsubscription from target state {} " + + "for HA group {} on {} cluster to client", + targetState, haGroupName, clusterType); + } catch (IOException e) { + LOGGER.warn("HAGroupStoreClient not found for HA group: {} - cannot unsubscribe: {}", + haGroupName, e.getMessage()); + } + } + + /** + * Helper method to get HAGroupStoreClient instance with consistent error handling. + * + * @param haGroupName name of the HA group + * @return HAGroupStoreClient instance for the specified HA group + * @throws IOException when HAGroupStoreClient is not initialized + */ + private HAGroupStoreClient getHAGroupStoreClient(final String haGroupName) + throws IOException { HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstanceForZkUrl(conf, haGroupName, zkUrl); - if (haGroupStoreClient != null) { - return haGroupStoreClient.getClusterRoleRecord(); + if (haGroupStoreClient == null) { + throw new IOException("HAGroupStoreClient is not initialized " + + "for HA group: " + haGroupName); } - throw new IOException("HAGroupStoreClient is not initialized"); + return haGroupStoreClient; } } \ No newline at end of file diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java index aa49a22eb2..6891d93c46 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java @@ -118,9 +118,11 @@ public class HAGroupStoreRecord { OFFLINE.allowedTransitions = ImmutableSet.of(); // This needs to be manually recovered by operator UNKNOWN.allowedTransitions = ImmutableSet.of(); - ACTIVE_NOT_IN_SYNC_TO_STANDBY.allowedTransitions = ImmutableSet.of(ABORT_TO_ACTIVE_NOT_IN_SYNC, + ACTIVE_NOT_IN_SYNC_TO_STANDBY.allowedTransitions + = ImmutableSet.of(ABORT_TO_ACTIVE_NOT_IN_SYNC, ACTIVE_IN_SYNC_TO_STANDBY); - ACTIVE_IN_SYNC_TO_STANDBY.allowedTransitions = ImmutableSet.of(ABORT_TO_ACTIVE_IN_SYNC, STANDBY); + ACTIVE_IN_SYNC_TO_STANDBY.allowedTransitions + = ImmutableSet.of(ABORT_TO_ACTIVE_IN_SYNC, STANDBY); STANDBY_TO_ACTIVE.allowedTransitions = ImmutableSet.of(ABORT_TO_STANDBY, ACTIVE_IN_SYNC); DEGRADED_STANDBY.allowedTransitions @@ -161,17 +163,28 @@ public class HAGroupStoreRecord { private final String protocolVersion; private final String haGroupName; private final HAGroupState haGroupState; + private final Long lastSyncStateTimeInMs; @JsonCreator public HAGroupStoreRecord(@JsonProperty("protocolVersion") String protocolVersion, @JsonProperty("haGroupName") String haGroupName, - @JsonProperty("haGroupState") HAGroupState haGroupState) { + @JsonProperty("haGroupState") HAGroupState haGroupState, + @JsonProperty("lastSyncStateTimeInMs") Long lastSyncStateTimeInMs) { Preconditions.checkNotNull(haGroupName, "HA group name cannot be null!"); Preconditions.checkNotNull(haGroupState, "HA group state cannot be null!"); this.protocolVersion = Objects.toString(protocolVersion, DEFAULT_PROTOCOL_VERSION); this.haGroupName = haGroupName; this.haGroupState = haGroupState; + this.lastSyncStateTimeInMs = lastSyncStateTimeInMs; + } + + /** + * Convenience constructor for backward compatibility without lastSyncStateTimeInMs. + */ + public HAGroupStoreRecord(String protocolVersion, + String haGroupName, HAGroupState haGroupState) { + this(protocolVersion, haGroupName, haGroupState, null); } public static Optional<HAGroupStoreRecord> fromJson(byte[] bytes) { @@ -193,9 +206,10 @@ public class HAGroupStoreRecord { } public boolean hasSameInfo(HAGroupStoreRecord other) { - return haGroupName.equals(other.haGroupName) && - haGroupState.equals(other.haGroupState) && - protocolVersion.equals(other.protocolVersion); + return haGroupName.equals(other.haGroupName) + && haGroupState.equals(other.haGroupState) + && protocolVersion.equals(other.protocolVersion) + && Objects.equals(lastSyncStateTimeInMs, other.lastSyncStateTimeInMs); } public String getProtocolVersion() { @@ -211,6 +225,10 @@ public class HAGroupStoreRecord { return haGroupState; } + public Long getLastSyncStateTimeInMs() { + return lastSyncStateTimeInMs; + } + @JsonIgnore public ClusterRoleRecord.ClusterRole getClusterRole() { return haGroupState.getClusterRole(); @@ -222,6 +240,7 @@ public class HAGroupStoreRecord { .append(protocolVersion) .append(haGroupName) .append(haGroupState) + .append(lastSyncStateTimeInMs) .hashCode(); } @@ -239,6 +258,7 @@ public class HAGroupStoreRecord { .append(protocolVersion, otherRecord.protocolVersion) .append(haGroupName, otherRecord.haGroupName) .append(haGroupState, otherRecord.haGroupState) + .append(lastSyncStateTimeInMs, otherRecord.lastSyncStateTimeInMs) .isEquals(); } } @@ -249,6 +269,7 @@ public class HAGroupStoreRecord { + "protocolVersion='" + protocolVersion + '\'' + ", haGroupName='" + haGroupName + '\'' + ", haGroupState=" + haGroupState + + ", lastSyncStateTimeInMs=" + lastSyncStateTimeInMs + '}'; } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java index 9aa6173fd1..e66da0829b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointITWithConsistentFailover.java @@ -43,7 +43,7 @@ import org.junit.rules.TestName; import java.util.Map; -import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE; +import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -83,7 +83,7 @@ public class PhoenixRegionServerEndpointITWithConsistentFailover extends BaseTes assertNotNull(coprocessor); ServerRpcController controller = new ServerRpcController(); - try (PhoenixHAAdmin peerHAAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_NAMESPACE)) { + try (PhoenixHAAdmin peerHAAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE)) { HAGroupStoreRecord peerHAGroupStoreRecord = new HAGroupStoreRecord(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, HAGroupState.STANDBY); peerHAAdmin.createHAGroupStoreRecordInZooKeeper(peerHAGroupStoreRecord); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java index fdf64dd2df..377bdf69d7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRegionObserverMutationBlockingIT.java @@ -17,7 +17,7 @@ */ package org.apache.phoenix.end2end.index; -import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE; +import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; import static org.junit.Assert.assertEquals; @@ -75,7 +75,7 @@ public class IndexRegionObserverMutationBlockingIT extends BaseTest { @Before public void setUp() throws Exception { - haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_NAMESPACE); + haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); String zkUrl = getLocalZkUrl(config); String peerZkUrl = CLUSTERS.getZkUrl2(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java new file mode 100644 index 0000000000..32239e3887 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStateSubscriptionIT.java @@ -0,0 +1,644 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.HAGroupStoreTestUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE; +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Integration tests for HA Group State Change subscription functionality. + * Tests the new subscription system where HAGroupStoreClient directly manages + * subscriptions and HAGroupStoreManager acts as a passthrough. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class HAGroupStateSubscriptionIT extends BaseTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStateSubscriptionIT.class); + + @Rule + public TestName testName = new TestName(); + + private PhoenixHAAdmin haAdmin; + private PhoenixHAAdmin peerHaAdmin; + private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 2000L; + private String zkUrl; + private String peerZKUrl; + private static final HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new HighAvailabilityTestingUtility.HBaseTestingUtilityPair(); + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(2); + props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + CLUSTERS.start(); + } + + @Before + public void before() throws Exception { + haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); + zkUrl = getLocalZkUrl(config); + this.peerZKUrl = CLUSTERS.getZkUrl2(); + peerHaAdmin = new PhoenixHAAdmin(peerZKUrl, config, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); + + // Clean up existing HAGroupStoreRecords + try { + List<String> haGroupNames = HAGroupStoreClient.getHAGroupNames(zkUrl); + for (String haGroupName : haGroupNames) { + haAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName)); + peerHaAdmin.getCurator().delete().quietly().forPath(toPath(haGroupName)); + } + + } catch (Exception e) { + // Ignore cleanup errors + } + // Remove any existing entries in the system table + HAGroupStoreTestUtil.deleteAllHAGroupRecordsInSystemTable(zkUrl); + + // Insert a HAGroupStoreRecord into the system table + HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(), zkUrl, peerZKUrl, + ClusterRoleRecord.ClusterRole.ACTIVE, ClusterRoleRecord.ClusterRole.STANDBY, null); + } + + // ========== Multi-Cluster & Basic Subscription Tests ========== + + @Test + public void testDifferentTargetStatesPerCluster() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config); + + // Track notifications + AtomicInteger localNotifications = new AtomicInteger(0); + AtomicInteger peerNotifications = new AtomicInteger(0); + AtomicReference<ClusterType> lastLocalClusterType = new AtomicReference<>(); + AtomicReference<ClusterType> lastPeerClusterType = new AtomicReference<>(); + + // Create listeners for different target states + HAGroupStateListener localListener = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == ClusterType.LOCAL) { + localNotifications.incrementAndGet(); + lastLocalClusterType.set(clusterType); + LOGGER.info("Local target state listener called: {} on {}", toState, clusterType); + } + }; + + HAGroupStateListener peerListener = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.ACTIVE_NOT_IN_SYNC && clusterType == ClusterType.PEER) { + peerNotifications.incrementAndGet(); + lastPeerClusterType.set(clusterType); + LOGGER.info("Peer target state listener called: {} on {}", toState, clusterType); + } + }; + + // Subscribe to different target states on different clusters + manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY_TO_ACTIVE, ClusterType.LOCAL, localListener); + manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.PEER, peerListener); + + // Trigger transition to STANDBY_TO_ACTIVE on LOCAL cluster + HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY_TO_ACTIVE); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Trigger transition to STANDBY on PEER cluster + HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC); + peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify no cross-cluster triggering + assertEquals("Local cluster should receive its target state notification", 1, localNotifications.get()); + assertEquals("Peer cluster should receive its target state notification", 1, peerNotifications.get()); + assertEquals("Local notification should have LOCAL cluster type", ClusterType.LOCAL, lastLocalClusterType.get()); + assertEquals("Peer notification should have PEER cluster type", ClusterType.PEER, lastPeerClusterType.get()); + + } + + @Test + public void testUnsubscribeSpecificCluster() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config); + + // Track notifications + AtomicInteger totalNotifications = new AtomicInteger(0); + AtomicReference<ClusterType> lastClusterType = new AtomicReference<>(); + + HAGroupStateListener listener = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.STANDBY) { + totalNotifications.incrementAndGet(); + lastClusterType.set(clusterType); + LOGGER.info("Listener called: {} on {}", toState, clusterType); + } + }; + + // Subscribe to same target state on both clusters + manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.LOCAL, listener); + manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.PEER, listener); + + // Unsubscribe from LOCAL cluster only + manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.LOCAL, listener); + + // Trigger transition to STANDBY on LOCAL → should NOT call listener + HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + assertEquals("Should receive no notifications from LOCAL cluster", 0, totalNotifications.get()); + + // Trigger transition to STANDBY on PEER → should call listener + HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY); + peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + assertEquals("Should receive notification only from PEER cluster", 1, totalNotifications.get()); + assertEquals("Notification should be from PEER cluster", ClusterType.PEER, lastClusterType.get()); + + } + + // ========== Multiple Listeners Tests ========== + + @Test + public void testMultipleListenersMultipleClusters() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config); + + // Track notifications from multiple listeners + AtomicInteger listener1LocalNotifications = new AtomicInteger(0); + AtomicInteger listener2LocalNotifications = new AtomicInteger(0); + AtomicInteger listener1PeerNotifications = new AtomicInteger(0); + AtomicInteger listener2PeerNotifications = new AtomicInteger(0); + + HAGroupStateListener listener1 = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.DEGRADED_STANDBY) { + if (clusterType == ClusterType.LOCAL) { + listener1LocalNotifications.incrementAndGet(); + } else { + listener1PeerNotifications.incrementAndGet(); + } + LOGGER.info("Listener1 called: {} on {}", toState, clusterType); + } + }; + + HAGroupStateListener listener2 = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.DEGRADED_STANDBY) { + if (clusterType == ClusterType.LOCAL) { + listener2LocalNotifications.incrementAndGet(); + } else { + listener2PeerNotifications.incrementAndGet(); + } + LOGGER.info("Listener2 called: {} on {}", toState, clusterType); + } + }; + + // Register multiple listeners for same target state on both clusters + manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, ClusterType.LOCAL, listener1); + manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, ClusterType.LOCAL, listener2); + manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, ClusterType.PEER, listener1); + manager.subscribeToTargetState(haGroupName, HAGroupState.DEGRADED_STANDBY, ClusterType.PEER, listener2); + + // Trigger transition to DEGRADED_STANDBY on LOCAL + HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.DEGRADED_STANDBY); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Trigger transition to DEGRADED_STANDBY on PEER + HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.DEGRADED_STANDBY); + peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify all listeners called for each cluster + assertEquals("Listener1 should receive LOCAL notification", 1, listener1LocalNotifications.get()); + assertEquals("Listener2 should receive LOCAL notification", 1, listener2LocalNotifications.get()); + assertEquals("Listener1 should receive PEER notification", 1, listener1PeerNotifications.get()); + assertEquals("Listener2 should receive PEER notification", 1, listener2PeerNotifications.get()); + + } + + @Test + public void testSameListenerDifferentTargetStates() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config); + + // Track which target states were reached + AtomicInteger stateANotifications = new AtomicInteger(0); + AtomicInteger stateBNotifications = new AtomicInteger(0); + AtomicReference<ClusterType> lastStateAClusterType = new AtomicReference<>(); + AtomicReference<ClusterType> lastStateBClusterType = new AtomicReference<>(); + + HAGroupStateListener sharedListener = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY && clusterType == ClusterType.LOCAL) { + stateANotifications.incrementAndGet(); + lastStateAClusterType.set(clusterType); + LOGGER.info("Shared listener - Target State A: {} on {}", toState, clusterType); + } else if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == ClusterType.PEER) { + stateBNotifications.incrementAndGet(); + lastStateBClusterType.set(clusterType); + LOGGER.info("Shared listener - Target State B: {} on {}", toState, clusterType); + } + }; + + // Register same listener for different target states on different clusters + manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, ClusterType.LOCAL, sharedListener); + manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.PEER, sharedListener); + + // Trigger target state A on LOCAL + HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, 0); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Trigger target state B on PEER + HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC); + peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify listener called for each appropriate target state/cluster combination + assertEquals("Should receive target state A notification", 1, stateANotifications.get()); + assertEquals("Should receive target state B notification", 1, stateBNotifications.get()); + assertEquals("Target state A should be from LOCAL cluster", ClusterType.LOCAL, lastStateAClusterType.get()); + assertEquals("Target state B should be from PEER cluster", ClusterType.PEER, lastStateBClusterType.get()); + + } + + // ========== Edge Cases & Error Handling ========== + + @Test + public void testSubscriptionToNonExistentHAGroup() throws Exception { + String nonExistentHAGroup = "nonExistentGroup_" + testName.getMethodName(); + HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config); + + HAGroupStateListener listener = (groupName, toState, modifiedTime, clusterType) -> { + // Should not be called + }; + + // Try to subscribe to non-existent HA group + try { + manager.subscribeToTargetState(nonExistentHAGroup, HAGroupState.STANDBY, ClusterType.LOCAL, listener); + fail("Expected IOException for non-existent HA group"); + } catch (IOException e) { + assertTrue("Exception should mention the HA group name", e.getMessage().contains(nonExistentHAGroup)); + LOGGER.info("Correctly caught exception for non-existent HA group: {}", e.getMessage()); + } + } + + @Test + public void testListenerExceptionIsolation() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config); + + // Track notifications + AtomicInteger goodListener1Notifications = new AtomicInteger(0); + AtomicInteger goodListener2Notifications = new AtomicInteger(0); + AtomicInteger badListenerCalls = new AtomicInteger(0); + + HAGroupStateListener goodListener1 = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.ACTIVE_IN_SYNC) { + goodListener1Notifications.incrementAndGet(); + LOGGER.info("Good listener 1 called: {} on {}", toState, clusterType); + } + }; + + HAGroupStateListener badListener = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.ACTIVE_IN_SYNC) { + badListenerCalls.incrementAndGet(); + LOGGER.info("Bad listener called, about to throw exception"); + throw new RuntimeException("Test exception from bad listener"); + } + }; + + HAGroupStateListener goodListener2 = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.ACTIVE_IN_SYNC) { + goodListener2Notifications.incrementAndGet(); + LOGGER.info("Good listener 2 called: {} on {}", toState, clusterType); + } + }; + + // Register listeners - bad listener in the middle + manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, goodListener1); + manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, badListener); + manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, goodListener2); + + // Trigger transition to target state + HAGroupStoreRecord transitionRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, transitionRecord, 0); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify all listeners were called despite exception in bad listener + assertEquals("Good listener 1 should be called", 1, goodListener1Notifications.get()); + assertEquals("Good listener 2 should be called", 1, goodListener2Notifications.get()); + assertEquals("Bad listener should be called", 1, badListenerCalls.get()); + } + + // ========== Performance & Concurrency Tests ========== + + @Test + public void testConcurrentMultiClusterSubscriptions() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config); + + final int threadCount = 10; + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch completionLatch = new CountDownLatch(threadCount); + final AtomicInteger successfulSubscriptions = new AtomicInteger(0); + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + + // Create concurrent subscription tasks + for (int i = 0; i < threadCount; i++) { + final int threadIndex = i; + executor.submit(() -> { + try { + startLatch.await(); // Wait for all threads to be ready + + HAGroupStateListener listener = (groupName, toState, modifiedTime, clusterType) -> { + LOGGER.debug("Thread {} listener called: {} on {}", threadIndex, toState, clusterType); + }; + + // Half subscribe to LOCAL, half to PEER + ClusterType clusterType = (threadIndex % 2 == 0) ? ClusterType.LOCAL : ClusterType.PEER; + + manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, clusterType, listener); + successfulSubscriptions.incrementAndGet(); + + // Also test unsubscribe + manager.unsubscribeFromTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, clusterType, listener); + + } catch (Exception e) { + LOGGER.error("Thread {} failed", threadIndex, e); + } finally { + completionLatch.countDown(); + } + }); + } + + // Start all threads simultaneously + startLatch.countDown(); + + // Wait for completion + assertTrue("All threads should complete within timeout", completionLatch.await(30, TimeUnit.SECONDS)); + assertEquals("All threads should successfully subscribe", threadCount, successfulSubscriptions.get()); + + executor.shutdown(); + } + + @Test + public void testHighFrequencyMultiClusterChanges() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config); + + // Track notifications + AtomicInteger localNotifications = new AtomicInteger(0); + AtomicInteger peerNotifications = new AtomicInteger(0); + + HAGroupStateListener listener = (groupName, toState, modifiedTime, clusterType) -> { + if (clusterType == ClusterType.LOCAL) { + localNotifications.incrementAndGet(); + } else { + peerNotifications.incrementAndGet(); + } + LOGGER.debug("High frequency listener: {} on {}", toState, clusterType); + }; + + // Subscribe to target state on both clusters + manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.LOCAL, listener); + manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.PEER, listener); + + // Rapidly alternate state changes on both clusters + final int changeCount = 5; + HAGroupStoreRecord initialPeerRecord = new HAGroupStoreRecord("1.0", haGroupName,HAGroupState.DEGRADED_STANDBY_FOR_WRITER); + peerHaAdmin.createHAGroupStoreRecordInZooKeeper(initialPeerRecord); + + for (int i = 0; i < changeCount; i++) { + // Change local cluster + HAGroupStoreRecord localRecord = new HAGroupStoreRecord("1.0", haGroupName, + (i % 2 == 0) ? HAGroupState.STANDBY : HAGroupState.DEGRADED_STANDBY_FOR_READER); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localRecord, -1); + + // Change peer cluster + HAGroupStoreRecord peerRecord = new HAGroupStoreRecord("1.0", haGroupName, + (i % 2 == 0) ? HAGroupState.STANDBY : HAGroupState.DEGRADED_STANDBY_FOR_WRITER); + peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, peerRecord, -1); + + // Small delay between changes + Thread.sleep(500); + } + + // Final wait for all events to propagate + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify transitions detected on both clusters + // Expected: 3 transitions to STANDBY state (i=0,2,4 → STANDBY) + assertEquals("Should detect exactly 3 local cluster transitions to STANDBY", 3, localNotifications.get()); + assertEquals("Should detect exactly 3 peer cluster transitions to STANDBY", 3, peerNotifications.get()); + + LOGGER.info("Detected {} local and {} peer notifications", localNotifications.get(), peerNotifications.get()); + + } + + // ========== Cleanup & Resource Management Tests ========== + + @Test + public void testSubscriptionCleanupPerCluster() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager manager = HAGroupStoreManager.getInstance(config); + + // Track notifications to verify functionality + AtomicInteger localActiveNotifications = new AtomicInteger(0); + AtomicInteger peerActiveNotifications = new AtomicInteger(0); + AtomicInteger localStandbyNotifications = new AtomicInteger(0); + AtomicInteger peerStandbyNotifications = new AtomicInteger(0); + + // Create listeners that track which ones are called + HAGroupStateListener listener1 = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == ClusterType.LOCAL) { + localActiveNotifications.incrementAndGet(); + LOGGER.info("Listener1 LOCAL ACTIVE_IN_SYNC: {}", toState); + } + }; + + HAGroupStateListener listener2 = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == ClusterType.LOCAL) { + localActiveNotifications.incrementAndGet(); + LOGGER.info("Listener2 LOCAL ACTIVE_IN_SYNC: {}", toState); + } else if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == ClusterType.PEER) { + peerActiveNotifications.incrementAndGet(); + LOGGER.info("Listener2 PEER STANDBY_TO_ACTIVE: {}", toState); + } + }; + + HAGroupStateListener listener3 = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == ClusterType.PEER) { + peerActiveNotifications.incrementAndGet(); + LOGGER.info("Listener3 PEER STANDBY_TO_ACTIVE: {}", toState); + } + }; + + HAGroupStateListener listener4 = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.ACTIVE_IN_SYNC && clusterType == ClusterType.LOCAL) { + localStandbyNotifications.incrementAndGet(); + LOGGER.info("Listener4 LOCAL ACTIVE_IN_SYNC: {}", toState); + } else if (toState == HAGroupState.STANDBY_TO_ACTIVE && clusterType == ClusterType.PEER) { + peerStandbyNotifications.incrementAndGet(); + LOGGER.info("Listener4 PEER STANDBY_TO_ACTIVE: {}", toState); + } + }; + + // Subscribe listeners to both clusters for target states + manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener1); + manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener2); + manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener2); + manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener3); + manager.subscribeToTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener4); + manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener4); + + // Test initial functionality - trigger ACTIVE_IN_SYNC on LOCAL + HAGroupStoreRecord localActiveRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localActiveRecord, 0); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Should have 2 notifications for LOCAL ACTIVE_NOT_IN_SYNC (listener1 + listener2) + assertEquals("Should have 2 LOCAL ACTIVE_IN_SYNC notifications initially", 2, localActiveNotifications.get()); + + // Test initial functionality - trigger STANDBY_TO_ACTIVE on PEER + HAGroupStoreRecord peerActiveRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY_TO_ACTIVE); + peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerActiveRecord); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Should have 2 notifications for PEER STANDBY_TO_ACTIVE (listener2 + listener3) + assertEquals("Should have 2 PEER STANDBY_TO_ACTIVE notifications initially", 2, peerActiveNotifications.get()); + + // Reset counters for cleanup testing + localActiveNotifications.set(0); + peerActiveNotifications.set(0); + + // Unsubscribe selectively + manager.unsubscribeFromTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener1); + manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY_TO_ACTIVE, ClusterType.PEER, listener2); + manager.unsubscribeFromTargetState(haGroupName, HAGroupState.ACTIVE_IN_SYNC, ClusterType.LOCAL, listener4); + + // Test after partial unsubscribe - trigger ACTIVE_IN_SYNC on LOCAL again by first changing to some other state. + HAGroupStoreRecord localActiveRecord2 = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localActiveRecord2, 1); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + HAGroupStoreRecord localActiveRecord3 = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_IN_SYNC); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localActiveRecord3, 2); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Should have only 1 notification for LOCAL ACTIVE_IN_SYNC (only listener2 remains) + assertEquals("Should have 1 LOCAL ACTIVE_IN_SYNC notification after partial unsubscribe", 1, localActiveNotifications.get()); + + // Test after partial unsubscribe - trigger STANDBY_TO_ACTIVE on PEER again + HAGroupStoreRecord peerActiveRecord2 = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY); + peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, peerActiveRecord2, 0); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + HAGroupStoreRecord peerActiveRecord3 = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY_TO_ACTIVE); + peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, peerActiveRecord3, 1); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Should have only 1 notification for PEER STANDBY_TO_ACTIVE (only listener3 remains) + assertEquals("Should have 1 PEER STANDBY_TO_ACTIVE notification after partial unsubscribe", 1, peerActiveNotifications.get()); + + // Reset counters again + localActiveNotifications.set(0); + peerActiveNotifications.set(0); + + // Unsubscribe all remaining listeners + manager.unsubscribeFromTargetState(haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.LOCAL, listener2); + manager.unsubscribeFromTargetState(haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC, ClusterType.PEER, listener3); + manager.unsubscribeFromTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.PEER, listener4); + + // Test after complete unsubscribe - trigger ACTIVE_NOT_IN_SYNC on both clusters + HAGroupStoreRecord localActiveRecord4 = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, localActiveRecord4, 3); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + HAGroupStoreRecord peerActiveRecord4 = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.ACTIVE_NOT_IN_SYNC); + peerHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, peerActiveRecord4, 2); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Should have no notifications after complete unsubscribe + assertEquals("Should have 0 LOCAL ACTIVE_NOT_IN_SYNC notifications after complete unsubscribe", 0, localActiveNotifications.get()); + assertEquals("Should have 0 PEER ACTIVE_NOT_IN_SYNC notifications after complete unsubscribe", 0, peerActiveNotifications.get()); + + // Test that new subscriptions still work properly + AtomicInteger newSubscriptionNotifications = new AtomicInteger(0); + HAGroupStateListener newTestListener = (groupName, toState, modifiedTime, clusterType) -> { + if (toState == HAGroupState.STANDBY && clusterType == ClusterType.LOCAL) { + newSubscriptionNotifications.incrementAndGet(); + LOGGER.info("New subscription triggered: {} on {} at {}", toState, clusterType, modifiedTime); + } + }; + + // Subscribe with new test listener + manager.subscribeToTargetState(haGroupName, HAGroupState.STANDBY, ClusterType.LOCAL, newTestListener); + + // Trigger STANDBY state and verify new subscription works + HAGroupStoreRecord standbyRecord = new HAGroupStoreRecord("1.0", haGroupName, HAGroupState.STANDBY); + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, standbyRecord, 4); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Expected: exactly 1 notification for the new subscription + assertEquals("New subscription should receive exactly 1 notification", 1, newSubscriptionNotifications.get()); + + LOGGER.info("Subscription cleanup test completed successfully with {} notifications from new subscription", + newSubscriptionNotifications.get()); + + } + + /** + * Helper method to access private subscription maps via reflection + */ + @SuppressWarnings("unchecked") + private ConcurrentHashMap<String, CopyOnWriteArraySet<HAGroupStateListener>> getSubscriptionMap(HAGroupStoreClient client, String fieldName) throws Exception { + Field field = HAGroupStoreClient.class.getDeclaredField(fieldName); + field.setAccessible(true); + return (ConcurrentHashMap<String, CopyOnWriteArraySet<HAGroupStateListener>>) field.get(client); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java index 5a062b3305..9e8087a57f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java @@ -40,6 +40,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -52,7 +53,7 @@ import java.util.concurrent.TimeUnit; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; -import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE; +import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; @@ -91,8 +92,8 @@ public class HAGroupStoreClientIT extends BaseTest { @Before public void before() throws Exception { - haAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), ZK_CONSISTENT_HA_NAMESPACE); - peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_NAMESPACE); + haAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); + peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); haAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); peerHaAdmin.getCurator().delete().quietly().forPath(toPath(testName.getMethodName())); zkUrl = getLocalZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration()); @@ -429,6 +430,8 @@ public class HAGroupStoreClientIT extends BaseTest { // The record should be automatically rebuilt from System Table as it is not in ZK assertNotNull(currentRecord); assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, currentRecord.getHAGroupState()); + // The record should have a timestamp + assertNotNull(currentRecord.getLastSyncStateTimeInMs()); record1 = new HAGroupStoreRecord("v1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC_TO_STANDBY); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java index 34d143e5e6..c9e17294c5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java @@ -17,13 +17,16 @@ */ package org.apache.phoenix.jdbc; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.exception.InvalidClusterRoleTransitionException; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.phoenix.util.HAGroupStoreTestUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; @@ -31,18 +34,25 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import java.lang.reflect.Field; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT; +import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; +import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE; +import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_SESSION_TIMEOUT_MULTIPLIER; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Integration tests for {@link HAGroupStoreManager}. @@ -63,13 +73,14 @@ public class HAGroupStoreManagerIT extends BaseTest { public static synchronized void doSetup() throws Exception { Map<String, String> props = Maps.newHashMapWithExpectedSize(2); props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true"); + props.put(ZK_SESSION_TIMEOUT, String.valueOf(30*1000)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); CLUSTERS.start(); } @Before public void before() throws Exception { - haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_NAMESPACE); + haAdmin = new PhoenixHAAdmin(config, ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); zkUrl = getLocalZkUrl(config); this.peerZKUrl = CLUSTERS.getZkUrl2(); @@ -147,7 +158,6 @@ public class HAGroupStoreManagerIT extends BaseTest { // Should be present assertTrue(recordOpt.isPresent()); - // Delete record from System Table HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName, zkUrl); // Delete record from ZK @@ -166,12 +176,90 @@ public class HAGroupStoreManagerIT extends BaseTest { Optional<HAGroupStoreRecord> retrievedOpt = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); assertTrue(retrievedOpt.isPresent()); - // Record for comparison - HAGroupStoreRecord record = new HAGroupStoreRecord( - HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC); + // Get MTime from HAAdmin for equality verification below. + Pair<HAGroupStoreRecord, Stat> currentRecordAndStat = haAdmin.getHAGroupStoreRecordInZooKeeper(haGroupName); + + // Complete object comparison field-by-field + assertEquals(haGroupName, retrievedOpt.get().getHaGroupName()); + assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, retrievedOpt.get().getHAGroupState()); + Long lastSyncStateTimeInMs = retrievedOpt.get().getLastSyncStateTimeInMs(); + Long mtime = currentRecordAndStat.getRight().getMtime(); + // Allow a small margin of error + assertTrue(Math.abs(lastSyncStateTimeInMs - mtime) <= 1); + assertEquals(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, retrievedOpt.get().getProtocolVersion()); + } + + @Test + public void testGetPeerHAGroupStoreRecord() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config); + + // Initially, peer record should not be present + Optional<HAGroupStoreRecord> peerRecordOpt = haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName); + assertFalse(peerRecordOpt.isPresent()); + + // Create a peer HAAdmin to create records in peer cluster + PhoenixHAAdmin peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), + ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); + + try { + // Create a HAGroupStoreRecord in the peer cluster + HAGroupStoreRecord peerRecord = new HAGroupStoreRecord( + "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY); + + peerHaAdmin.createHAGroupStoreRecordInZooKeeper(peerRecord); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Now peer record should be present + peerRecordOpt = haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName); + assertTrue(peerRecordOpt.isPresent()); + + // Verify the peer record details + HAGroupStoreRecord retrievedPeerRecord = peerRecordOpt.get(); + assertEquals(haGroupName, retrievedPeerRecord.getHaGroupName()); + assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, retrievedPeerRecord.getHAGroupState()); + assertEquals(HAGroupStoreRecord.DEFAULT_PROTOCOL_VERSION, retrievedPeerRecord.getProtocolVersion()); + + // Delete peer record + peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Peer record should no longer be present + peerRecordOpt = haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName); + assertFalse(peerRecordOpt.isPresent()); + + // Create peer record again with different state + HAGroupStoreRecord newPeerRecord = new HAGroupStoreRecord( + "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER); + + peerHaAdmin.createHAGroupStoreRecordInZooKeeper(newPeerRecord); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify the updated peer record + peerRecordOpt = haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName); + assertTrue(peerRecordOpt.isPresent()); + assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER, + peerRecordOpt.get().getHAGroupState()); + + } finally { + // Clean up peer record + try { + peerHaAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName); + } catch (Exception e) { + // Ignore cleanup errors + } + peerHaAdmin.close(); + } + } - // Complete object comparison instead of field-by-field - assertEquals(record, retrievedOpt.get()); + @Test + public void testGetPeerHAGroupStoreRecordWhenHAGroupNotInSystemTable() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config); + + // Try to get peer record for an HA group that doesn't exist in system table + Optional<HAGroupStoreRecord> peerRecordOpt = haGroupStoreManager.getPeerHAGroupStoreRecord(haGroupName); + assertFalse("Peer record should not be present for non-existent HA group", peerRecordOpt.isPresent()); } @Test @@ -214,9 +302,13 @@ public class HAGroupStoreManagerIT extends BaseTest { conf.set(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "false"); conf.set(HConstants.ZOOKEEPER_QUORUM, getLocalZkUrl(config)); - HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config); + // Set the HAGroupStoreManager instance to null via reflection to force recreation + Field field = HAGroupStoreManager.class.getDeclaredField("haGroupStoreManagerInstance"); + field.setAccessible(true); + field.set(null, null); - // Create HAGroupStoreRecord with ACTIVE_TO_STANDBY role + HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config); + // Create HAGroupStoreRecord with ACTIVE_IN_SYNC_TO_STANDBY role HAGroupStoreRecord transitionRecord = new HAGroupStoreRecord( "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY); @@ -225,6 +317,9 @@ public class HAGroupStoreManagerIT extends BaseTest { // Mutations should not be blocked even with ACTIVE_TO_STANDBY role assertFalse(haGroupStoreManager.isMutationBlocked(haGroupName)); + + // Set the HAGroupStoreManager instance back to null via reflection to force recreation for other tests + field.set(null, null); } @Test @@ -248,29 +343,43 @@ public class HAGroupStoreManagerIT extends BaseTest { assertTrue(updatedRecordOpt.isPresent()); HAGroupStoreRecord updatedRecord = updatedRecordOpt.get(); assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, updatedRecord.getHAGroupState()); + assertNotNull(updatedRecord.getLastSyncStateTimeInMs()); + + // Set the HA group status to store and forward again and verify + // that getLastSyncStateTimeInMs is same (ACTIVE_NOT_IN_SYNC) + // The time should only update when we move to AIS to ANIS + haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + Optional<HAGroupStoreRecord> updatedRecordOpt2 = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); + assertTrue(updatedRecordOpt2.isPresent()); + HAGroupStoreRecord updatedRecord2 = updatedRecordOpt.get(); + assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, updatedRecord2.getHAGroupState()); + assertEquals(updatedRecord.getLastSyncStateTimeInMs(), updatedRecord2.getLastSyncStateTimeInMs()); } @Test - public void testSetHAGroupStatusRecordToSync() throws Exception { + public void testSetHAGroupStatusToSync() throws Exception { String haGroupName = testName.getMethodName(); HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config); - // Create an initial HAGroupStoreRecord with ACTIVE_NOT_IN_SYNC status - HAGroupStoreRecord initialRecord = new HAGroupStoreRecord( - "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC); - - haAdmin.createHAGroupStoreRecordInZooKeeper(initialRecord); - Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + // Initial record should be present in ACTIVE_NOT_IN_SYNC status + HAGroupStoreRecord initialRecord = haGroupStoreManager.getHAGroupStoreRecord(haGroupName).orElse(null); + assertNotNull(initialRecord); + assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC, initialRecord.getHAGroupState()); + assertNotNull(initialRecord.getLastSyncStateTimeInMs()); - // Set the HA group status to sync (ACTIVE) - haGroupStoreManager.setHAGroupStatusRecordToSync(haGroupName); + // Set the HA group status to sync (ACTIVE), we need to wait for ZK_SESSION_TIMEOUT * Multiplier + Thread.sleep((long) Math.ceil(config.getLong(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT) + * ZK_SESSION_TIMEOUT_MULTIPLIER)); + haGroupStoreManager.setHAGroupStatusToSync(haGroupName); Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); - // Verify the status was updated to ACTIVE + // Verify the state was updated to ACTIVE_IN_SYNC Optional<HAGroupStoreRecord> updatedRecordOpt = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); assertTrue(updatedRecordOpt.isPresent()); HAGroupStoreRecord updatedRecord = updatedRecordOpt.get(); - assertEquals(ClusterRoleRecord.ClusterRole.ACTIVE, updatedRecord.getClusterRole()); + assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC, updatedRecord.getHAGroupState()); + assertNull(updatedRecord.getLastSyncStateTimeInMs()); } @Test @@ -350,4 +459,130 @@ public class HAGroupStoreManagerIT extends BaseTest { } + @Test + public void testSetReaderToDegraded() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config); + + // Update the auto-created record to STANDBY state for testing + HAGroupStoreRecord standbyRecord = new HAGroupStoreRecord( + "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.STANDBY); + + // Get the record to initialize ZNode from HAGroup so that we can artificially update it via HAAdmin + Optional<HAGroupStoreRecord> currentRecord = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); + assertTrue(currentRecord.isPresent()); + + // Update via HAAdmin + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, standbyRecord, 0); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Call setReaderToDegraded + haGroupStoreManager.setReaderToDegraded(haGroupName); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify the status was updated to DEGRADED_STANDBY_FOR_READER + Optional<HAGroupStoreRecord> updatedRecordOpt = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); + assertTrue(updatedRecordOpt.isPresent()); + HAGroupStoreRecord updatedRecord = updatedRecordOpt.get(); + assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER, updatedRecord.getHAGroupState()); + + // Test transition from DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY + HAGroupStoreRecord degradedWriterRecord = new HAGroupStoreRecord( + "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER); + + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, degradedWriterRecord, 2); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Call setReaderToDegraded again + haGroupStoreManager.setReaderToDegraded(haGroupName); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify the status was updated to DEGRADED_STANDBY + updatedRecordOpt = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); + assertTrue(updatedRecordOpt.isPresent()); + updatedRecord = updatedRecordOpt.get(); + assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY, updatedRecord.getHAGroupState()); + } + + @Test + public void testSetReaderToHealthy() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config); + + // Get the record to initialize ZNode from HAGroup so that we can artificially update it via HAAdmin + Optional<HAGroupStoreRecord> currentRecord = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); + assertTrue(currentRecord.isPresent()); + + // Update the auto-created record to DEGRADED_STANDBY_FOR_READER state for testing + HAGroupStoreRecord degradedReaderRecord = new HAGroupStoreRecord( + "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_READER); + + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, degradedReaderRecord, 0); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Call setReaderToHealthy + haGroupStoreManager.setReaderToHealthy(haGroupName); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify the status was updated to STANDBY + Optional<HAGroupStoreRecord> updatedRecordOpt = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); + assertTrue(updatedRecordOpt.isPresent()); + HAGroupStoreRecord updatedRecord = updatedRecordOpt.get(); + assertEquals(HAGroupStoreRecord.HAGroupState.STANDBY, updatedRecord.getHAGroupState()); + + // Test transition from DEGRADED_STANDBY to DEGRADED_STANDBY_FOR_WRITER + HAGroupStoreRecord degradedRecord = new HAGroupStoreRecord( + "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY); + + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, degradedRecord, 2); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Call setReaderToHealthy again + haGroupStoreManager.setReaderToHealthy(haGroupName); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Verify the status was updated to DEGRADED_STANDBY_FOR_WRITER + updatedRecordOpt = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); + assertTrue(updatedRecordOpt.isPresent()); + updatedRecord = updatedRecordOpt.get(); + assertEquals(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY_FOR_WRITER, updatedRecord.getHAGroupState()); + } + + @Test + public void testReaderStateTransitionInvalidStates() throws Exception { + String haGroupName = testName.getMethodName(); + HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config); + + // Get the record to initialize ZNode from HAGroup so that we can artificially update it via HAAdmin + Optional<HAGroupStoreRecord> currentRecord = haGroupStoreManager.getHAGroupStoreRecord(haGroupName); + assertTrue(currentRecord.isPresent()); + + // Update the auto-created record to ACTIVE_IN_SYNC state (invalid for both operations) + HAGroupStoreRecord activeRecord = new HAGroupStoreRecord( + "1.0", haGroupName, HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC); + + haAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName, activeRecord, 0); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + // Test setReaderToDegraded with invalid state + try { + haGroupStoreManager.setReaderToDegraded(haGroupName); + fail("Expected InvalidClusterRoleTransitionException for setReaderToDegraded with ACTIVE_IN_SYNC state"); + } catch (InvalidClusterRoleTransitionException e) { + // Expected behavior + assertTrue("Exception should mention the invalid transition", + e.getMessage().contains("ACTIVE_IN_SYNC") && e.getMessage().contains("DEGRADED_STANDBY_FOR_READER")); + } + + // Test setReaderToHealthy with invalid state + try { + haGroupStoreManager.setReaderToHealthy(haGroupName); + fail("Expected InvalidClusterRoleTransitionException for setReaderToHealthy with ACTIVE_IN_SYNC state"); + } catch (InvalidClusterRoleTransitionException e) { + // Expected behavior + assertTrue("Exception should mention the invalid transition", + e.getMessage().contains("ACTIVE_IN_SYNC") && e.getMessage().contains("STANDBY")); + } + } + } \ No newline at end of file diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java index 5e2685d03a..044a8a4925 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/PhoenixHAAdminIT.java @@ -41,7 +41,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_NAMESPACE; +import static org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE; import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -71,8 +71,8 @@ public class PhoenixHAAdminIT extends BaseTest { @Before public void before() throws Exception { - haAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), ZK_CONSISTENT_HA_NAMESPACE); - peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_NAMESPACE); + haAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(), ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); + peerHaAdmin = new PhoenixHAAdmin(CLUSTERS.getHBaseCluster2().getConfiguration(), ZK_CONSISTENT_HA_GROUP_STATE_NAMESPACE); cleanupTestZnodes(); } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java index d15e73aca2..abc4d02a50 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/HAGroupStoreTestUtil.java @@ -110,4 +110,19 @@ public class HAGroupStoreTestUtil { conn.commit(); } } -} \ No newline at end of file + + /** + * Deletes all HA group records from the system table for testing purposes. + * + * @param zkUrl the ZooKeeper URL to connect to + * @throws SQLException if the database operation fails + */ + public static void deleteAllHAGroupRecordsInSystemTable(String zkUrl) throws SQLException { + // Delete all records from System Table + try (PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + zkUrl); + Statement stmt = conn.createStatement()) { + stmt.execute("DELETE FROM " + SYSTEM_HA_GROUP_NAME); + conn.commit(); + } + } +} \ No newline at end of file