This is an automated email from the ASF dual-hosted git repository. lokiore pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new e853980bc0 PHOENIX-7493 Graceful Failover with Phoenix HA (#2075) e853980bc0 is described below commit e853980bc04ae44e6670fd60857360d67831e607 Author: ritegarg <58840065+riteg...@users.noreply.github.com> AuthorDate: Fri Mar 28 09:31:22 2025 -0700 PHOENIX-7493 Graceful Failover with Phoenix HA (#2075) --- .../phoenix/cache/ServerMetadataCacheImpl.java | 3 - .../org/apache/phoenix/jdbc/ClusterRoleRecord.java | 4 +- .../jdbc/ClusterRoleRecordGeneratorTool.java | 2 +- .../apache/phoenix/jdbc/HAGroupStoreClient.java | 207 +++++++++ .../apache/phoenix/jdbc/HAGroupStoreManager.java | 80 ++++ .../apache/phoenix/jdbc/HighAvailabilityGroup.java | 2 - .../phoenix/jdbc/HighAvailabilityPolicy.java | 2 +- .../org/apache/phoenix/jdbc/PhoenixHAAdmin.java | 494 +++++++++++++++++++++ .../apache/phoenix/jdbc/PhoenixHAAdminTool.java | 466 +------------------ .../org/apache/phoenix/query/QueryServices.java | 2 + .../apache/phoenix/query/QueryServicesOptions.java | 6 +- .../protobuf/RegionServerEndpointService.proto | 9 + .../coprocessor/PhoenixRegionServerEndpoint.java | 20 + .../UngroupedAggregateRegionObserver.java | 18 + .../phoenix/hbase/index/IndexRegionObserver.java | 12 + phoenix-core/pom.xml | 4 + .../phoenix/cache/ServerMetadataCacheIT.java | 2 - .../end2end/PhoenixRegionServerEndpointIT.java | 13 +- .../apache/phoenix/jdbc/HAGroupStoreClientIT.java | 427 ++++++++++++++++++ .../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 88 ++++ .../jdbc/HighAvailabilityTestingUtility.java | 12 +- .../phoenix/jdbc/PhoenixHAAdminToolTest.java | 12 +- 22 files changed, 1409 insertions(+), 476 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java index 5f9aa10455..91b783628b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/cache/ServerMetadataCacheImpl.java @@ -20,8 +20,6 @@ package org.apache.phoenix.cache; import java.sql.Connection; import java.sql.SQLException; import java.util.Properties; -import java.util.concurrent.TimeUnit; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.util.Bytes; @@ -33,7 +31,6 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.thirdparty.com.google.common.cache.Cache; import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.phoenix.thirdparty.com.google.common.cache.RemovalListener; -import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.slf4j.Logger; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java index aa28c6e85f..c2af18df8b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java @@ -59,13 +59,13 @@ public class ClusterRoleRecord { * take traffic, standby and offline do not, and unknown is used if the state cannot be determined. */ public enum ClusterRole { - ACTIVE, STANDBY, OFFLINE, UNKNOWN; + ACTIVE, STANDBY, OFFLINE, UNKNOWN, ACTIVE_TO_STANDBY; /** * @return true if a cluster with this role can be connected, otherwise false */ public boolean canConnect() { - return this == ACTIVE || this == STANDBY; + return this == ACTIVE || this == STANDBY || this == ACTIVE_TO_STANDBY; } public static ClusterRole from(byte[] bytes) { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java index 93899f87a2..03c29320d4 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorTool.java @@ -40,7 +40,7 @@ import java.util.ArrayList; import java.util.List; import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ATTR_PREFIX; -import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.getLocalZkUrl; +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; /** diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java new file mode 100644 index 0000000000..05f4247ace --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + + +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + + +/** + * Write-through cache for HAGroupStore. + * Uses {@link PathChildrenCache} from {@link org.apache.curator.framework.CuratorFramework}. + */ +public class HAGroupStoreClient implements Closeable { + + private static final long HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS = 30000L; + private static volatile HAGroupStoreClient haGroupStoreClientInstance; + private PhoenixHAAdmin phoenixHaAdmin; + private static final Logger LOGGER = LoggerFactory.getLogger(HAGroupStoreClient.class); + // Map contains <ClusterRole, Map<HAGroupName(String), ClusterRoleRecord>> + private final ConcurrentHashMap<ClusterRoleRecord.ClusterRole, ConcurrentHashMap<String, ClusterRoleRecord>> clusterRoleToCRRMap + = new ConcurrentHashMap<>(); + private PathChildrenCache pathChildrenCache; + private volatile boolean isHealthy; + + /** + * Creates/gets an instance of HAGroupStoreClient. + * Can return null instance if unable to initialize. + * + * @param conf configuration + * @return HAGroupStoreClient instance + */ + public static HAGroupStoreClient getInstance(Configuration conf) { + if (haGroupStoreClientInstance == null || !haGroupStoreClientInstance.isHealthy) { + synchronized (HAGroupStoreClient.class) { + if (haGroupStoreClientInstance == null || !haGroupStoreClientInstance.isHealthy) { + haGroupStoreClientInstance = new HAGroupStoreClient(conf, null); + if (!haGroupStoreClientInstance.isHealthy) { + haGroupStoreClientInstance.close(); + haGroupStoreClientInstance = null; + } + } + } + } + return haGroupStoreClientInstance; + } + + @VisibleForTesting + HAGroupStoreClient(final Configuration conf, final PathChildrenCacheListener pathChildrenCacheListener) { + try { + this.phoenixHaAdmin = new PhoenixHAAdmin(conf); + final PathChildrenCache pathChildrenCache; + pathChildrenCache = new PathChildrenCache(phoenixHaAdmin.getCurator(), ZKPaths.PATH_SEPARATOR, true); + final CountDownLatch latch = new CountDownLatch(1); + if (pathChildrenCacheListener != null) { + pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); + } else { + pathChildrenCache.getListenable().addListener((client, event) -> { + LOGGER.info("HAGroupStoreClient PathChildrenCache Received event for type {}", event.getType()); + final ChildData childData = event.getData(); + ClusterRoleRecord eventCRR = extractCRROrNull(childData); + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + if (eventCRR != null && eventCRR.getHaGroupName() != null) { + updateClusterRoleRecordMap(eventCRR); + } + break; + case CHILD_REMOVED: + // In case of CHILD_REMOVED, we get the old version of data that was just deleted in event. + if (eventCRR != null && eventCRR.getHaGroupName() != null + && !eventCRR.getHaGroupName().isEmpty() + && eventCRR.getRole(phoenixHaAdmin.getZkUrl()) != null) { + LOGGER.info("Received CHILD_REMOVED event, Removing CRR {} from existing CRR Map {}", eventCRR, clusterRoleToCRRMap); + final ClusterRoleRecord.ClusterRole role = eventCRR.getRole(phoenixHaAdmin.getZkUrl()); + clusterRoleToCRRMap.putIfAbsent(role, new ConcurrentHashMap<>()); + clusterRoleToCRRMap.get(role).remove(eventCRR.getHaGroupName()); + } + break; + case INITIALIZED: + latch.countDown(); + break; + case CONNECTION_LOST: + case CONNECTION_SUSPENDED: + isHealthy = false; + break; + case CONNECTION_RECONNECTED: + isHealthy = true; + break; + default: + LOGGER.warn("Unexpected event type {}, complete event {}", event.getType(), event); + } + }); + } + pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); + this.pathChildrenCache = pathChildrenCache; + isHealthy = latch.await(HA_GROUP_STORE_CLIENT_INITIALIZATION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + buildClusterRoleToCRRMap(); + } catch (Exception e) { + isHealthy = false; + LOGGER.error("Unexpected error occurred while initializing HAGroupStoreClient, marking cache as unhealthy", e); + } + } + + private ClusterRoleRecord extractCRROrNull(final ChildData childData) { + if (childData != null) { + byte[] data = childData.getData(); + return ClusterRoleRecord.fromJson(data).orElse(null); + } + return null; + } + + private void updateClusterRoleRecordMap(final ClusterRoleRecord crr) { + if (crr != null && crr.getHaGroupName() != null && crr.getRole(phoenixHaAdmin.getZkUrl()) != null) { + ClusterRoleRecord.ClusterRole role = crr.getRole(phoenixHaAdmin.getZkUrl()); + LOGGER.info("Updating Existing CRR Map {} with new CRR {}", clusterRoleToCRRMap, crr); + clusterRoleToCRRMap.putIfAbsent(role, new ConcurrentHashMap<>()); + clusterRoleToCRRMap.get(role).put(crr.getHaGroupName(), crr); + LOGGER.info("Added new CRR {} to CRR Map", crr); + // Remove any pre-existing mapping with any other role for this HAGroupName + for (ClusterRoleRecord.ClusterRole mapRole : clusterRoleToCRRMap.keySet()) { + if (mapRole != role) { + ConcurrentHashMap<String, ClusterRoleRecord> roleWiseMap = clusterRoleToCRRMap.get(mapRole); + if (roleWiseMap.containsKey(crr.getHaGroupName())) { + LOGGER.info("Removing any pre-existing mapping with role {} for HAGroupName {}", mapRole, crr.getHaGroupName()); + roleWiseMap.remove(crr.getHaGroupName()); + } + } + } + LOGGER.info("Final Updated CRR Map {}", clusterRoleToCRRMap); + } + } + + private void buildClusterRoleToCRRMap() { + List<ClusterRoleRecord> clusterRoleRecords = pathChildrenCache.getCurrentData().stream().map(this::extractCRROrNull) + .filter(Objects::nonNull).collect(Collectors.toList()); + for (ClusterRoleRecord crr : clusterRoleRecords) { + updateClusterRoleRecordMap(crr); + } + } + + public void rebuild() throws Exception { + if (!isHealthy) { + throw new IOException("HAGroupStoreClient is not healthy"); + } + LOGGER.info("Rebuilding HAGroupStoreClient for HA groups"); + // NOTE: this is a BLOCKING method. + // Completely rebuild the internal cache by querying for all needed data + // WITHOUT generating any events to send to listeners. + pathChildrenCache.rebuild(); + buildClusterRoleToCRRMap(); + LOGGER.info("Rebuild Complete for HAGroupStoreClient"); + } + + + @Override + public void close() { + try { + LOGGER.info("Closing HAGroupStoreClient"); + clusterRoleToCRRMap.clear(); + if (pathChildrenCache != null) { + pathChildrenCache.close(); + } + LOGGER.info("Closed HAGroupStoreClient"); + } catch (IOException e) { + LOGGER.error("Exception closing HAGroupStoreClient", e); + } + } + + public List<ClusterRoleRecord> getCRRsByClusterRole(ClusterRoleRecord.ClusterRole clusterRole) throws IOException { + if (!isHealthy) { + throw new IOException("HAGroupStoreClient is not healthy"); + } + return clusterRoleToCRRMap.getOrDefault(clusterRole, new ConcurrentHashMap<>()).values().stream().collect(ImmutableList.toImmutableList()); + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java new file mode 100644 index 0000000000..690953a7b0 --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; + +public class HAGroupStoreManager { + private static volatile HAGroupStoreManager haGroupStoreManagerInstance; + private final boolean mutationBlockEnabled; + private final Configuration conf; + + /** + * Creates/gets an instance of HAGroupStoreManager. + * + * @param conf configuration + * @return HAGroupStoreManager instance + */ + public static HAGroupStoreManager getInstance(Configuration conf) { + if (haGroupStoreManagerInstance == null) { + synchronized (HAGroupStoreManager.class) { + if (haGroupStoreManagerInstance == null) { + haGroupStoreManagerInstance = new HAGroupStoreManager(conf); + } + } + } + return haGroupStoreManagerInstance; + } + + private HAGroupStoreManager(final Configuration conf) { + this.mutationBlockEnabled = conf.getBoolean(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, + DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED); + this.conf = conf; + } + + /** + * Checks whether mutation is blocked or not. + * @throws IOException when HAGroupStoreClient is not healthy. + */ + public boolean isMutationBlocked() throws IOException { + if (mutationBlockEnabled) { + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstance(conf); + if (haGroupStoreClient != null) { + return !haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty(); + } + throw new IOException("HAGroupStoreClient is not initialized"); + } + return false; + } + + /** + * Force rebuilds the HAGroupStoreClient + * @throws Exception + */ + public void invalidateHAGroupStoreClient() throws Exception { + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstance(conf); + if (haGroupStoreClient != null) { + haGroupStoreClient.rebuild(); + } else { + throw new IOException("HAGroupStoreClient is not initialized"); + } + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java index 9f27a0998d..938b60275b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java @@ -49,9 +49,7 @@ import java.sql.Connection; import java.sql.Driver; import java.sql.DriverManager; import java.sql.SQLException; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java index 7a82d47a93..ea026a6806 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityPolicy.java @@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory; /** * An HighAvailabilityGroup provides a JDBC connection from given connection string and properties. */ -enum HighAvailabilityPolicy { +public enum HighAvailabilityPolicy { FAILOVER { @Override public Connection provide(HighAvailabilityGroup haGroup, Properties info, diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java new file mode 100644 index 0000000000..c9019b6c1a --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdmin.java @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.atomic.AtomicValue; +import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.util.JDBCUtil; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +/** + * Helper class to update cluster role record for a ZK cluster. + * The ZK client this accessor has is confined to a single ZK cluster, but it can be used to operate + * multiple HA groups that are associated with this cluster. + * This is not thread-safe yet. Multiple threads can update CRRs at same time potentially causing inconsistency. + */ +public class PhoenixHAAdmin implements Closeable { + + /** + * Wrapper class for static accessor + */ + public static class HighAvailibilityCuratorProvider { + + public static final HighAvailibilityCuratorProvider INSTANCE = new HighAvailibilityCuratorProvider(); + /** + * Gets curator blocking if necessary to create it + */ + public CuratorFramework getCurator(String zkUrl, Properties properties) throws IOException { + return HighAvailabilityGroup.getCurator(zkUrl, properties); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(PhoenixHAAdmin.class); + + /** The fully qualified ZK URL for an HBase cluster in format host:port:/hbase */ + private final String zkUrl; + /** Configuration of this command line tool. */ + private final Configuration conf; + /** Client properties which has copies of configuration defining ZK timeouts / retries. */ + private final Properties properties = new Properties(); + /** Curator Provider **/ + private final HighAvailibilityCuratorProvider + highAvailibilityCuratorProvider; + + public PhoenixHAAdmin(Configuration conf) { + this(getLocalZkUrl(conf), conf, HighAvailibilityCuratorProvider.INSTANCE); + } + + public PhoenixHAAdmin(String zkUrl, Configuration conf) { + this(zkUrl, conf, HighAvailibilityCuratorProvider.INSTANCE); + } + + public PhoenixHAAdmin(String zkUrl, Configuration conf, + HighAvailibilityCuratorProvider highAvailibilityCuratorProvider) { + Preconditions.checkNotNull(zkUrl); + Preconditions.checkNotNull(conf); + Preconditions.checkNotNull(highAvailibilityCuratorProvider); + this.zkUrl = JDBCUtil.formatZookeeperUrl(zkUrl); + this.conf = conf; + conf.iterator().forEachRemaining(k -> properties.setProperty(k.getKey(), k.getValue())); + this.highAvailibilityCuratorProvider = highAvailibilityCuratorProvider; + } + + /** + * Helper method to get local ZK fully qualified URL (host:port:/hbase) from configuration. + */ + public static String getLocalZkUrl(Configuration conf) { + String localZkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM); + if (StringUtils.isEmpty(localZkQuorum)) { + String msg = "ZK quorum not found by looking up key " + HConstants.ZOOKEEPER_QUORUM; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + + String portStr = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); + int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT; + if (portStr != null) { + try { + port = Integer.parseInt(portStr); + } catch (NumberFormatException e) { + String msg = String.format("Unrecognized ZK port '%s' in ZK quorum '%s'", + portStr, localZkQuorum); + LOG.error(msg, e); + throw new IllegalArgumentException(msg, e); + } + } + + String localZkRoot = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); + return String.format("%s:%d:%s", localZkQuorum, port, localZkRoot); + } + + /** + * Gets curator from the cache if available otherwise calls into getCurator to make it. + */ + public CuratorFramework getCurator() throws IOException { + return highAvailibilityCuratorProvider.getCurator(zkUrl, properties); + } + + + /** + * Check if current cluster is ACTIVE role for the given HA group. + * In case of Exception when it fails to read cluster role data from the current cluster, it + * will assume current cluster is not ACTIVE. Callers should be aware of "false positive" + * possibility especially due to connectivity issue between this tool and remote ZK cluster. + * @param haGroupName the HA group name; a cluster can be associated with multiple HA groups + * @return true if current cluster is ACTIVE role, otherwise false + */ + boolean isCurrentActiveCluster(String haGroupName) { + try { + byte[] data = getCurator().getData().forPath(toPath(haGroupName)); + + Optional<ClusterRoleRecord> record = ClusterRoleRecord.fromJson(data); + return record.isPresent() && record.get() + .getRole(zkUrl) == ClusterRoleRecord.ClusterRole.ACTIVE; + } catch (KeeperException.NoNodeException ne) { + LOG.info( + "No role record found for HA group {} on '{}', assuming it is not active", + haGroupName, zkUrl); + return false; + } catch (Exception e) { + LOG.warn("Got exception when reading record for {} on cluster {}", + haGroupName, zkUrl, e); + return false; + } + } + + /** + * This lists all cluster role records stored in the zookeeper nodes. + * This read-only operation and hence no side effect on the ZK cluster. + */ + public List<ClusterRoleRecord> listAllClusterRoleRecordsOnZookeeper() throws IOException { + List<String> haGroupNames; + try { + haGroupNames = getCurator().getChildren().forPath(ZKPaths.PATH_SEPARATOR); + } catch (Exception e) { + String msg = String.format("Got exception when listing all HA groups in %s", zkUrl); + LOG.error(msg); + throw new IOException(msg, e); + } + + List<ClusterRoleRecord> records = new ArrayList<>(); + List<String> failedHaGroups = new ArrayList<>(); + for (String haGroupName : haGroupNames) { + try { + byte[] data = getCurator().getData().forPath(ZKPaths.PATH_SEPARATOR + haGroupName); + Optional<ClusterRoleRecord> record = ClusterRoleRecord.fromJson(data); + if (record.isPresent()) { + records.add(record.get()); + } else { // fail to deserialize data from JSON + failedHaGroups.add(haGroupName); + } + } catch (Exception e) { + LOG.warn("Got exception when reading data for HA group {}", + haGroupName, e); + failedHaGroups.add(haGroupName); + } + } + + if (!failedHaGroups.isEmpty()) { + String + msg = + String.format( + "Found following HA groups: %s. Fail to read cluster " + "role records for following HA groups: %s", + String.join(",", haGroupNames), String.join(",", failedHaGroups)); + LOG.error(msg); + throw new IOException(msg); + } + return records; + } + + /** + * Helper method to write the given cluster role records into the ZK clusters respectively. + * + * // TODO: add retry logics + * + * @param records The cluster role record list to save on ZK + * @param forceful if true, this method will ignore errors on other clusters; otherwise it will + * not update next cluster (in order) if there is any failure on current cluster + * @return a map of HA group name to list cluster's url for cluster role record failing to write + */ + public Map<String, List<String>> syncClusterRoleRecords(List<ClusterRoleRecord> records, + boolean forceful) throws IOException { + Map<String, List<String>> failedHaGroups = new HashMap<>(); + for (ClusterRoleRecord record : records) { + String haGroupName = record.getHaGroupName(); + try (PhoenixHAAdmin admin1 = new PhoenixHAAdmin(record.getZk1(), conf, HighAvailibilityCuratorProvider.INSTANCE); + PhoenixHAAdmin admin2 = new PhoenixHAAdmin(record.getZk2(), conf, HighAvailibilityCuratorProvider.INSTANCE)) { + // Update the cluster previously ACTIVE cluster first. + // It reduces the chances of split-brain between clients and clusters. + // If can not determine previous ACTIVE cluster, update new STANDBY cluster first. + final PairOfSameType<PhoenixHAAdmin> pair; + if (admin1.isCurrentActiveCluster(haGroupName)) { + pair = new PairOfSameType<>(admin1, admin2); + } else if (admin2.isCurrentActiveCluster(haGroupName)) { + pair = new PairOfSameType<>(admin2, admin1); + } else if (record.getRole(admin1.getZkUrl()) == ClusterRoleRecord.ClusterRole.STANDBY) { + pair = new PairOfSameType<>(admin1, admin2); + } else { + pair = new PairOfSameType<>(admin2, admin1); + } + try { + pair.getFirst().createOrUpdateDataOnZookeeper(record); + } catch (IOException e) { + LOG.error("Error to create or update data on Zookeeper, cluster={}, record={}", + pair.getFirst(), record); + failedHaGroups.computeIfAbsent(haGroupName, (k) -> new ArrayList<>()) + .add(pair.getFirst().getZkUrl()); + if (!forceful) { + LOG.error("-forceful option is not enabled by command line options, " + + "skip writing record {} to ZK clusters", record); + // skip writing this record to second ZK cluster, so we should report that + failedHaGroups.computeIfAbsent(haGroupName, (k) -> new ArrayList<>()) + .add(pair.getSecond().getZkUrl()); + continue; // do not update this record on second cluster + } + } + try { + pair.getSecond().createOrUpdateDataOnZookeeper(record); + } catch (IOException e) { + LOG.error("Error to create or update data on Zookeeper, cluster={}, record={}", + pair.getFirst(), record); + failedHaGroups.computeIfAbsent(haGroupName, (k) -> new ArrayList<>()) + .add(pair.getSecond().getZkUrl()); + } + } + } + return failedHaGroups; + } + + /** + * Verify cluster role records stored in local ZK nodes, and repair with remote znodes for any + * inconsistency. + * @return a list of HA group names with inconsistent cluster role records, or empty list + */ + List<String> verifyAndRepairWithRemoteZnode() throws Exception { + List<String> inconsistentHaGroups = new ArrayList<>(); + for (ClusterRoleRecord record : listAllClusterRoleRecordsOnZookeeper()) { + // the remote znodes may be on different ZK clusters. + if (record.getRole(zkUrl) == ClusterRoleRecord.ClusterRole.UNKNOWN) { + LOG.warn("Unknown cluster role for cluster '{}' in record {}", + zkUrl, record); + continue; + } + String remoteZkUrl = record.getZk1().equals(zkUrl) ? record.getZk2() : record.getZk1(); + try (PhoenixHAAdmin remoteAdmin = new PhoenixHAAdmin(remoteZkUrl, conf, + HighAvailibilityCuratorProvider.INSTANCE)) { + ClusterRoleRecord remoteRecord; + try { + String zPath = toPath(record.getHaGroupName()); + byte[] data = remoteAdmin.getCurator().getData().forPath(zPath); + Optional<ClusterRoleRecord> recordOptional = ClusterRoleRecord.fromJson(data); + if (!recordOptional.isPresent()) { + remoteAdmin.createOrUpdateDataOnZookeeper(record); + continue; + } + remoteRecord = recordOptional.get(); + } catch (KeeperException.NoNodeException ne) { + LOG.warn( + "No record znode yet, creating for HA group {} on {}", + record.getHaGroupName(), remoteAdmin); + remoteAdmin.createDataOnZookeeper(record); + LOG.info("Created znode on cluster {} with record {}", + remoteAdmin, record); + continue; + } catch (Exception e) { + LOG.error( + "Error to get data on remote cluster {} for HA group {}", remoteAdmin, + record.getHaGroupName(), e); + continue; + } + + if (!record.getHaGroupName().equals(remoteRecord.getHaGroupName())) { + inconsistentHaGroups.add(record.getHaGroupName()); + LOG.error( + "INTERNAL ERROR: got cluster role record for different HA groups." + " Local record: {}, remote record: {}", + record, remoteRecord); + } else if (remoteRecord.isNewerThan(record)) { + createOrUpdateDataOnZookeeper(remoteRecord); + } else if (record.isNewerThan(remoteRecord)) { + remoteAdmin.createOrUpdateDataOnZookeeper(record); + } else if (record.equals(remoteRecord)) { + LOG.info("Cluster role record {} is consistent", record); + } else { + inconsistentHaGroups.add(record.getHaGroupName()); + LOG.error( + "Cluster role record for HA group {} is inconsistent. On cluster " + "{} the record is {}; on cluster {} the record is {}", + record.getHaGroupName(), this, record, remoteAdmin, remoteRecord); + } + } + } + return inconsistentHaGroups; + } + + /** + * This updates the cluster role data on the zookeeper it connects to. + * To avoid conflicts, it does CAS (compare-and-set) when updating. The constraint is that the + * given record's version should be larger the existing record's version. This is a way to help + * avoiding manual update conflicts. If the given record can not meet version check, it will + * reject the update request and client (human operator or external system) should retry. + * @param record the new cluster role record to be saved on ZK + * @return true if the data on ZK is updated otherwise false + * @throws IOException if it fails to update the cluster role data on ZK + */ + public boolean createOrUpdateDataOnZookeeper(ClusterRoleRecord record) throws IOException { + if (!zkUrl.equals(record.getZk1()) && !zkUrl.equals(record.getZk2())) { + String + msg = + String.format( + "INTERNAL ERROR: " + "ZK cluster is not associated with cluster role record! " + "ZK cluster URL: '%s'. Cluster role record: %s", + zkUrl, record); + LOG.error(msg); + throw new IOException(msg); + } + + String haGroupName = record.getHaGroupName(); + byte[] data; + try { + data = getCurator().getData().forPath(toPath(haGroupName)); // Get initial data + } catch (KeeperException.NoNodeException ne) { + LOG.info("No record znode yet, creating for HA group {} on {}", + haGroupName, zkUrl); + createDataOnZookeeper(record); + LOG.info("Created znode for HA group {} with record data {} on {}", + haGroupName, record, zkUrl); + return true; + } catch (Exception e) { + String + msg = + String.format( + "Fail to read cluster role record data for HA group %s " + "on cluster '%s'", + haGroupName, zkUrl); + LOG.error(msg, e); + throw new IOException(msg, e); + } + + Optional<ClusterRoleRecord> existingRecordOptional = ClusterRoleRecord.fromJson(data); + if (!existingRecordOptional.isPresent()) { + String + msg = + String.format( + "Fail to parse existing cluster role record data for HA " + "group %s", + haGroupName); + LOG.error(msg); + throw new IOException(msg); + } + + ClusterRoleRecord existingRecord = existingRecordOptional.get(); + if (record.getVersion() < existingRecord.getVersion()) { + String + msg = + String.format( + "Invalid new cluster role record for HA group '%s' " + "because new record's version V%d is smaller than existing V%d. " + "Existing role record: %s. New role record fail to save: %s", + haGroupName, record.getVersion(), existingRecord.getVersion(), + existingRecord, record); + LOG.warn(msg); + return false; // return instead of error out to tolerate + } + + if (record.getVersion() == existingRecord.getVersion()) { + if (record.equals(existingRecord)) { + LOG.debug( + "Cluster role does not change since last update on ZK."); + return false; // no need to update iff they are the same. + } else { + String + msg = + String.format( + "Invalid new cluster role record for HA group '%s' " + "because it has the same version V%d but inconsistent data. " + "Existing role record: %s. New role record fail to save: %s", + haGroupName, record.getVersion(), existingRecord, record); + LOG.error(msg); + throw new IOException(msg); + } + } + + return updateDataOnZookeeper(existingRecord, record); + } + + /** + * Helper to create the znode on the ZK cluster. + */ + private void createDataOnZookeeper(ClusterRoleRecord record) throws IOException { + String haGroupName = record.getHaGroupName(); + // znode path for given haGroup name assuming namespace (prefix) has been set. + String haGroupPath = toPath(haGroupName); + try { + getCurator().create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) + .forPath(haGroupPath, ClusterRoleRecord.toJson(record)); + } catch (KeeperException.NodeExistsException nee) { + //this method assumes that the znode doesn't exist yet, but it could have been + //created between now and the last time we checked. We swallow the exception and + //rely on our caller to check to make sure the znode that's saved is correct + LOG.warn("Znode for HA group {} already exists. ", haGroupPath, nee); + } catch (Exception e) { + LOG.error( + "Fail to initialize the znode for HA group {} with record data {}", haGroupPath, + record, e); + throw new IOException("Fail to initialize znode for HA group " + haGroupPath, e); + } + } + + /** + * Helper to update the znode on ZK cluster assuming current data is the given old record. + */ + private boolean updateDataOnZookeeper(ClusterRoleRecord oldRecord, ClusterRoleRecord newRecord) + throws IOException { + // znode path for given haGroup name assuming namespace (prefix) has been set. + String haGroupPath = toPath(newRecord.getHaGroupName()); + RetryPolicy retryPolicy = HighAvailabilityGroup.createRetryPolicy(properties); + try { + DistributedAtomicValue + v = + new DistributedAtomicValue(getCurator(), haGroupPath, retryPolicy); + AtomicValue<byte[]> + result = + v.compareAndSet(ClusterRoleRecord.toJson(oldRecord), + ClusterRoleRecord.toJson(newRecord)); + LOG.info( + "Updated cluster role record ({}->{}) for HA group {} on cluster '{}': {}", + oldRecord.getVersion(), newRecord.getVersion(), newRecord.getHaGroupName(), + zkUrl, result.succeeded() ? "succeeded" : "failed"); + LOG.debug( + "Old DistributedAtomicValue: {}, New DistributedAtomicValue: {},", + new String(result.preValue(), StandardCharsets.UTF_8), + new String(result.postValue(), StandardCharsets.UTF_8)); + return result.succeeded(); + } catch (Exception e) { + String + msg = + String.format( + "Fail to update cluster role record to ZK for the HA " + "group %s due to '%s'." + "Existing role record: %s. New role record fail to save: %s", + haGroupPath, e.getMessage(), oldRecord, newRecord); + LOG.error(msg, e); + throw new IOException(msg, e); + } + } + + /** + * Helper method to get ZK path for an HA group given the HA group name. + * It assumes the ZK namespace (prefix) has been set. + */ + public static String toPath(String haGroupName) { + return ZKPaths.PATH_SEPARATOR + haGroupName; + } + + public String getZkUrl() { + return zkUrl; + } + + @Override + public void close() { + LOG.debug("PhoenixHAAdmin for {} is now closed.", zkUrl); + } + + @Override + public String toString() { + return zkUrl; + } +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java index 5da3ea1593..a59dc9fd8d 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixHAAdminTool.java @@ -17,18 +17,11 @@ */ package org.apache.phoenix.jdbc; -import java.io.Closeable; import java.io.FileReader; -import java.io.IOException; import java.io.Reader; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Properties; import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser; @@ -38,25 +31,14 @@ import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options; import org.apache.phoenix.thirdparty.org.apache.commons.cli.PosixParser; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.curator.RetryPolicy; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.atomic.AtomicValue; -import org.apache.curator.framework.recipes.atomic.DistributedAtomicValue; -import org.apache.curator.utils.ZKPaths; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; -import org.apache.phoenix.util.JDBCUtil; +import org.apache.phoenix.jdbc.PhoenixHAAdmin.HighAvailibilityCuratorProvider; import org.apache.phoenix.util.JacksonUtil; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -109,9 +91,10 @@ public class PhoenixHAAdminTool extends Configured implements Tool { if (commandLine.hasOption(HELP_OPT.getOpt())) { printUsageMessage(); return RET_SUCCESS; - } else if (commandLine.hasOption(LIST_OPT.getOpt())) { // list - String zkUrl = getLocalZkUrl(getConf()); // Admin is created against local ZK cluster - try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { + } + String zkUrl = PhoenixHAAdmin.getLocalZkUrl(getConf()); // Admin is created against local ZK cluster + if (commandLine.hasOption(LIST_OPT.getOpt())) { // list + try (PhoenixHAAdmin admin = new PhoenixHAAdmin(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { List<ClusterRoleRecord> records = admin.listAllClusterRoleRecordsOnZookeeper(); JacksonUtil.getObjectWriterPretty().writeValue(System.out, records); } @@ -119,16 +102,17 @@ public class PhoenixHAAdminTool extends Configured implements Tool { String fileName = commandLine.getOptionValue(MANIFEST_OPT.getOpt()); List<ClusterRoleRecord> records = readRecordsFromFile(fileName); boolean forceful = commandLine.hasOption(FORCEFUL_OPT.getOpt()); - Map<String, List<String>> failedHaGroups = syncClusterRoleRecords(records, forceful); - if (!failedHaGroups.isEmpty()) { - System.out.println("Found following HA groups are failing to write the clusters:"); - failedHaGroups.forEach((k, v) -> - System.out.printf("%s -> [%s]\n", k, String.join(",", v))); - return RET_SYNC_ERROR; + try (PhoenixHAAdmin admin = new PhoenixHAAdmin(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { + Map<String, List<String>> failedHaGroups = admin.syncClusterRoleRecords(records, forceful); + if (!failedHaGroups.isEmpty()) { + System.out.println("Found following HA groups are failing to write the clusters:"); + failedHaGroups.forEach((k, v) -> + System.out.printf("%s -> [%s]\n", k, String.join(",", v))); + return RET_SYNC_ERROR; + } } } else if (commandLine.hasOption(REPAIR_OPT.getOpt())) { // verify and repair - String zkUrl = getLocalZkUrl(getConf()); // Admin is created against local ZK cluster - try (PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { + try (PhoenixHAAdmin admin = new PhoenixHAAdmin(zkUrl, getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { List<String> inconsistentRecord = admin.verifyAndRepairWithRemoteZnode(); if (!inconsistentRecord.isEmpty()) { System.out.println("Found following inconsistent cluster role records: "); @@ -171,64 +155,6 @@ public class PhoenixHAAdminTool extends Configured implements Tool { } } - /** - * Helper method to write the given cluster role records into the ZK clusters respectively. - * - * // TODO: add retry logics - * - * @param records The cluster role record list to save on ZK - * @param forceful if true, this method will ignore errors on other clusters; otherwise it will - * not update next cluster (in order) if there is any failure on current cluster - * @return a map of HA group name to list cluster's url for cluster role record failing to write - */ - private Map<String, List<String>> syncClusterRoleRecords(List<ClusterRoleRecord> records, - boolean forceful) throws IOException { - Map<String, List<String>> failedHaGroups = new HashMap<>(); - for (ClusterRoleRecord record : records) { - String haGroupName = record.getHaGroupName(); - try (PhoenixHAAdminHelper admin1 = new PhoenixHAAdminHelper(record.getZk1(), getConf(), HighAvailibilityCuratorProvider.INSTANCE); - PhoenixHAAdminHelper admin2 = new PhoenixHAAdminHelper(record.getZk2(), getConf(), HighAvailibilityCuratorProvider.INSTANCE)) { - // Update the cluster previously ACTIVE cluster first. - // It reduces the chances of split-brain between clients and clusters. - // If can not determine previous ACTIVE cluster, update new STANDBY cluster first. - final PairOfSameType<PhoenixHAAdminHelper> pair; - if (admin1.isCurrentActiveCluster(haGroupName)) { - pair = new PairOfSameType<>(admin1, admin2); - } else if (admin2.isCurrentActiveCluster(haGroupName)) { - pair = new PairOfSameType<>(admin2, admin1); - } else if (record.getRole(admin1.getZkUrl()) == ClusterRole.STANDBY) { - pair = new PairOfSameType<>(admin1, admin2); - } else { - pair = new PairOfSameType<>(admin2, admin1); - } - try { - pair.getFirst().createOrUpdateDataOnZookeeper(record); - } catch (IOException e) { - LOG.error("Error to create or update data on Zookeeper, cluster={}, record={}", - pair.getFirst(), record); - failedHaGroups.computeIfAbsent(haGroupName, (k) -> new ArrayList<>()) - .add(pair.getFirst().zkUrl); - if (!forceful) { - LOG.error("-forceful option is not enabled by command line options, " - + "skip writing record {} to ZK clusters", record); - // skip writing this record to second ZK cluster, so we should report that - failedHaGroups.computeIfAbsent(haGroupName, (k) -> new ArrayList<>()) - .add(pair.getSecond().zkUrl); - continue; // do not update this record on second cluster - } - } - try { - pair.getSecond().createOrUpdateDataOnZookeeper(record); - } catch (IOException e) { - LOG.error("Error to create or update data on Zookeeper, cluster={}, record={}", - pair.getFirst(), record); - failedHaGroups.computeIfAbsent(haGroupName, (k) -> new ArrayList<>()) - .add(pair.getSecond().zkUrl); - } - } - } - return failedHaGroups; - } /** * Parses the commandline arguments, throw exception if validation fails. @@ -268,370 +194,6 @@ public class PhoenixHAAdminTool extends Configured implements Tool { formatter.printHelp("help", OPTIONS); } - /** - * Helper method to get local ZK fully qualified URL (host:port:/hbase) from configuration. - */ - public static String getLocalZkUrl(Configuration conf) { - String localZkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM); - if (StringUtils.isEmpty(localZkQuorum)) { - String msg = "ZK quorum not found by looking up key " + HConstants.ZOOKEEPER_QUORUM; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - - String portStr = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT); - int port = HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT; - if (portStr != null) { - try { - port = Integer.parseInt(portStr); - } catch (NumberFormatException e) { - String msg = String.format("Unrecognized ZK port '%s' in ZK quorum '%s'", - portStr, localZkQuorum); - LOG.error(msg, e); - throw new IllegalArgumentException(msg, e); - } - } - - String localZkRoot = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); - - return String.format("%s:%d:%s", localZkQuorum, port, localZkRoot); - } - - /** - * Wrapper class for static accessor - */ - @VisibleForTesting - static class HighAvailibilityCuratorProvider { - - public static final HighAvailibilityCuratorProvider INSTANCE = new HighAvailibilityCuratorProvider(); - - /** - * Gets curator blocking if necessary to create it - */ - public CuratorFramework getCurator(String zkUrl, Properties properties) throws IOException { - return HighAvailabilityGroup.getCurator(zkUrl, properties); - } - } - - /** - * Helper class to update cluster role record for a ZK cluster. - * - * The ZK client this accessor has is confined to a single ZK cluster, but it can be used to - * operate multiple HA groups that are associated with this cluster. - */ - @VisibleForTesting - static class PhoenixHAAdminHelper implements Closeable { - /** The fully qualified ZK URL for an HBase cluster in format host:port:/hbase */ - private final String zkUrl; - /** Configuration of this command line tool. */ - private final Configuration conf; - /** Client properties which has copies of configuration defining ZK timeouts / retries. */ - private final Properties properties = new Properties(); - /** Curator Provider **/ - private final HighAvailibilityCuratorProvider highAvailibilityCuratorProvider; - - PhoenixHAAdminHelper(String zkUrl, Configuration conf, HighAvailibilityCuratorProvider highAvailibilityCuratorProvider) { - Preconditions.checkNotNull(zkUrl); - Preconditions.checkNotNull(conf); - Preconditions.checkNotNull(highAvailibilityCuratorProvider); - this.zkUrl = JDBCUtil.formatZookeeperUrl(zkUrl); - this.conf = conf; - conf.iterator().forEachRemaining(k -> properties.setProperty(k.getKey(), k.getValue())); - this.highAvailibilityCuratorProvider = highAvailibilityCuratorProvider; - } - - /** - * Gets curator from the cache if available otherwise calls into getCurator to make it. - */ - private CuratorFramework getCurator() throws IOException { - return highAvailibilityCuratorProvider.getCurator(zkUrl, properties); - } - - /** - * Check if current cluster is ACTIVE role for the given HA group. - * - * In case of Exception when it fails to read cluster role data from the current cluster, it - * will assume current cluster is not ACTIVE. Callers should be aware of "false positive" - * possibility especially due to connectivity issue between this tool and remote ZK cluster. - * - * @param haGroupName the HA group name; a cluster can be associated with multiple HA groups - * @return true if current cluster is ACTIVE role, otherwise false - */ - private boolean isCurrentActiveCluster(String haGroupName) { - try { - byte[] data = getCurator().getData().forPath(toPath(haGroupName)); - - Optional<ClusterRoleRecord> record = ClusterRoleRecord.fromJson(data); - return record.isPresent() && record.get().getRole(zkUrl) == ClusterRole.ACTIVE; - } catch (NoNodeException ne) { - LOG.info("No role record found for HA group {} on '{}', assuming it is not active", - haGroupName, zkUrl); - return false; - } catch (Exception e) { - LOG.warn("Got exception when reading record for {} on cluster {}", - haGroupName, zkUrl, e); - return false; - } - } - - /** - * This lists all cluster role records stored in the zookeeper nodes. - * - * This read-only operation and hence no side effect on the ZK cluster. - */ - List<ClusterRoleRecord> listAllClusterRoleRecordsOnZookeeper() throws IOException { - List<String> haGroupNames; - try { - haGroupNames = getCurator().getChildren().forPath(ZKPaths.PATH_SEPARATOR); - } catch (Exception e) { - String msg = String.format("Got exception when listing all HA groups in %s", zkUrl); - LOG.error(msg); - throw new IOException(msg, e); - } - - List<ClusterRoleRecord> records = new ArrayList<>(); - List<String> failedHaGroups = new ArrayList<>(); - for (String haGroupName : haGroupNames) { - try { - byte[] data = getCurator().getData().forPath(ZKPaths.PATH_SEPARATOR + haGroupName); - Optional<ClusterRoleRecord> record = ClusterRoleRecord.fromJson(data); - if (record.isPresent()) { - records.add(record.get()); - } else { // fail to deserialize data from JSON - failedHaGroups.add(haGroupName); - } - } catch (Exception e) { - LOG.warn("Got exception when reading data for HA group {}", haGroupName, e); - failedHaGroups.add(haGroupName); - } - } - - if (!failedHaGroups.isEmpty()) { - String msg = String.format("Found following HA groups: %s. Fail to read cluster " - + "role records for following HA groups: %s", - String.join(",", haGroupNames), String.join(",", failedHaGroups)); - LOG.error(msg); - throw new IOException(msg); - } - return records; - } - - /** - * Verify cluster role records stored in local ZK nodes, and repair with remote znodes for - * any inconsistency. - * - * @return a list of HA group names with inconsistent cluster role records, or empty list - */ - List<String> verifyAndRepairWithRemoteZnode() throws Exception { - List<String> inconsistentHaGroups = new ArrayList<>(); - for (ClusterRoleRecord record : listAllClusterRoleRecordsOnZookeeper()) { - // the remote znodes may be on different ZK clusters. - if (record.getRole(zkUrl) == ClusterRole.UNKNOWN) { - LOG.warn("Unknown cluster role for cluster '{}' in record {}", zkUrl, record); - continue; - } - String remoteZkUrl = record.getZk1().equals(zkUrl) - ? record.getZk2() - : record.getZk1(); - try (PhoenixHAAdminHelper remoteAdmin = new PhoenixHAAdminHelper(remoteZkUrl, conf, HighAvailibilityCuratorProvider.INSTANCE)) { - ClusterRoleRecord remoteRecord; - try { - String zPath = toPath(record.getHaGroupName()); - byte[] data = remoteAdmin.getCurator().getData().forPath(zPath); - Optional<ClusterRoleRecord> recordOptional = ClusterRoleRecord.fromJson(data); - if (!recordOptional.isPresent()) { - remoteAdmin.createOrUpdateDataOnZookeeper(record); - continue; - } - remoteRecord = recordOptional.get(); - } catch (NoNodeException ne) { - LOG.warn("No record znode yet, creating for HA group {} on {}", - record.getHaGroupName(), remoteAdmin); - remoteAdmin.createDataOnZookeeper(record); - LOG.info("Created znode on cluster {} with record {}", remoteAdmin, record); - continue; - } catch (Exception e) { - LOG.error("Error to get data on remote cluster {} for HA group {}", - remoteAdmin, record.getHaGroupName(), e); - continue; - } - - if (!record.getHaGroupName().equals(remoteRecord.getHaGroupName())) { - inconsistentHaGroups.add(record.getHaGroupName()); - LOG.error("INTERNAL ERROR: got cluster role record for different HA groups." - + " Local record: {}, remote record: {}", record, remoteRecord); - } else if (remoteRecord.isNewerThan(record)) { - createOrUpdateDataOnZookeeper(remoteRecord); - } else if (record.isNewerThan(remoteRecord)) { - remoteAdmin.createOrUpdateDataOnZookeeper(record); - } else if (record.equals(remoteRecord)) { - LOG.info("Cluster role record {} is consistent", record); - } else { - inconsistentHaGroups.add(record.getHaGroupName()); - LOG.error("Cluster role record for HA group {} is inconsistent. On cluster " - + "{} the record is {}; on cluster {} the record is {}", - record.getHaGroupName(), this, record, remoteAdmin, remoteRecord); - } - } - } - return inconsistentHaGroups; - } - - /** - * This updates the cluster role data on the zookeeper it connects to. - * - * To avoid conflicts, it does CAS (compare-and-set) when updating. The constraint is that - * the given record's version should be larger the existing record's version. This is a way - * to help avoiding manual update conflicts. If the given record can not meet version - * check, it will reject the update request and client (human operator or external system) - * should retry. - * - * @param record the new cluster role record to be saved on ZK - * @throws IOException if it fails to update the cluster role data on ZK - * @return true if the data on ZK is updated otherwise false - */ - boolean createOrUpdateDataOnZookeeper(ClusterRoleRecord record) throws IOException { - if (!zkUrl.equals(record.getZk1()) && !zkUrl.equals(record.getZk2())) { - String msg = String.format("INTERNAL ERROR: " - + "ZK cluster is not associated with cluster role record! " - + "ZK cluster URL: '%s'. Cluster role record: %s", - zkUrl, record); - LOG.error(msg); - throw new IOException(msg); - } - - String haGroupName = record.getHaGroupName(); - byte[] data; - try { - data = getCurator().getData().forPath(toPath(haGroupName)); // Get initial data - } catch (NoNodeException ne) { - LOG.info("No record znode yet, creating for HA group {} on {}", haGroupName, zkUrl); - createDataOnZookeeper(record); - LOG.info("Created znode for HA group {} with record data {} on {}", haGroupName, - record, zkUrl); - return true; - } catch (Exception e) { - String msg = String.format("Fail to read cluster role record data for HA group %s " - + "on cluster '%s'", haGroupName, zkUrl); - LOG.error(msg, e); - throw new IOException(msg, e); - } - - Optional<ClusterRoleRecord> existingRecordOptional = ClusterRoleRecord.fromJson(data); - if (!existingRecordOptional.isPresent()) { - String msg = String.format("Fail to parse existing cluster role record data for HA " - + "group %s", haGroupName); - LOG.error(msg); - throw new IOException(msg); - } - - ClusterRoleRecord existingRecord = existingRecordOptional.get(); - if (record.getVersion() < existingRecord.getVersion()) { - String msg = String.format("Invalid new cluster role record for HA group '%s' " - + "because new record's version V%d is smaller than existing V%d. " - + "Existing role record: %s. New role record fail to save: %s", - haGroupName, record.getVersion(), existingRecord.getVersion(), - existingRecord, record); - LOG.warn(msg); - return false; // return instead of error out to tolerate - } - - if (record.getVersion() == existingRecord.getVersion()) { - if (record.equals(existingRecord)) { - LOG.debug("Cluster role does not change since last update on ZK."); - return false; // no need to update iff they are the same. - } else { - String msg = String.format("Invalid new cluster role record for HA group '%s' " - + "because it has the same version V%d but inconsistent data. " - + "Existing role record: %s. New role record fail to save: %s", - haGroupName, record.getVersion(), existingRecord, record); - LOG.error(msg); - throw new IOException(msg); - } - } - - return updateDataOnZookeeper(existingRecord, record); - } - - /** - * Helper to create the znode on the ZK cluster. - */ - private void createDataOnZookeeper(ClusterRoleRecord record) throws IOException { - String haGroupName = record.getHaGroupName(); - // znode path for given haGroup name assuming namespace (prefix) has been set. - String haGroupPath = toPath(haGroupName); - try { - getCurator().create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(haGroupPath, ClusterRoleRecord.toJson(record)); - } catch (NodeExistsException nee) { - //this method assumes that the znode doesn't exist yet, but it could have been - //created between now and the last time we checked. We swallow the exception and - //rely on our caller to check to make sure the znode that's saved is correct - LOG.warn("Znode for HA group {} already exists. ", - haGroupPath, nee); - } catch (Exception e) { - LOG.error("Fail to initialize the znode for HA group {} with record data {}", - haGroupPath, record, e); - throw new IOException("Fail to initialize znode for HA group " + haGroupPath, e); - } - } - - /** - * Helper to update the znode on ZK cluster assuming current data is the given old record. - */ - private boolean updateDataOnZookeeper(ClusterRoleRecord oldRecord, - ClusterRoleRecord newRecord) throws IOException { - // znode path for given haGroup name assuming namespace (prefix) has been set. - String haGroupPath = toPath(newRecord.getHaGroupName()); - RetryPolicy retryPolicy = HighAvailabilityGroup.createRetryPolicy(properties); - try { - DistributedAtomicValue v = new DistributedAtomicValue(getCurator(), haGroupPath, retryPolicy); - AtomicValue<byte[]> result = v.compareAndSet( - ClusterRoleRecord.toJson(oldRecord), ClusterRoleRecord.toJson(newRecord)); - LOG.info("Updated cluster role record ({}->{}) for HA group {} on cluster '{}': {}", - oldRecord.getVersion(), newRecord.getVersion(), newRecord.getHaGroupName(), - zkUrl, result.succeeded() ? "succeeded" : "failed"); - LOG.debug("Old DistributedAtomicValue: {}, New DistributedAtomicValue: {},", - new String(result.preValue(), StandardCharsets.UTF_8), - new String(result.postValue(), StandardCharsets.UTF_8)); - return result.succeeded(); - } catch (Exception e) { - String msg = String.format("Fail to update cluster role record to ZK for the HA " - + "group %s due to '%s'." - + "Existing role record: %s. New role record fail to save: %s", - haGroupPath, e.getMessage(), oldRecord, newRecord); - LOG.error(msg, e); - throw new IOException(msg, e); - } - } - - /** - * Helper method to get ZK path for an HA group given the HA group name. - * - * It assumes the ZK namespace (prefix) has been set. - */ - private static String toPath(String haGroupName) { - return ZKPaths.PATH_SEPARATOR + haGroupName; - } - - String getZkUrl() { - return zkUrl; - } - - @Override - public void close() { - LOG.debug("PhoenixHAAdmin for {} is now closed.", zkUrl); - } - - @Override - public String toString() { - return zkUrl; - } - } - public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int retCode = ToolRunner.run(conf, new PhoenixHAAdminTool(), args); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index 80e8bca6b1..be3801d118 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -382,6 +382,8 @@ public interface QueryServices extends SQLCloseable { public static final String PHOENIX_VIEW_TTL_ENABLED = "phoenix.view.ttl.enabled"; public static final String PHOENIX_VIEW_TTL_TENANT_VIEWS_PER_SCAN_LIMIT = "phoenix.view.ttl.tenant_views_per_scan.limit"; + // Block mutations based on cluster role record + public static final String CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = "phoenix.cluster.role.based.mutation.block.enabled"; // Before 4.15 when we created a view we included the parent table column metadata in the view // metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no longer store the parent diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index c4f3812b51..9755572566 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -26,6 +26,7 @@ import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_ import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB; import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG; import static org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC; import static org.apache.phoenix.query.QueryServices.CONNECTION_ACTIVITY_LOGGING_ENABLED; @@ -450,6 +451,8 @@ public class QueryServicesOptions { public static final int DEFAULT_PHOENIX_STREAMS_GET_TABLE_REGIONS_TIMEOUT = 300000; // 5 minutes + public static final Boolean DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false; + private final Configuration config; @@ -553,7 +556,8 @@ public class QueryServicesOptions { DEFAULT_SERVER_MERGE_FOR_UNCOVERED_INDEX) .setIfUnset(MAX_IN_LIST_SKIP_SCAN_SIZE, DEFAULT_MAX_IN_LIST_SKIP_SCAN_SIZE) .setIfUnset(CONNECTION_ACTIVITY_LOGGING_ENABLED, DEFAULT_CONNECTION_ACTIVITY_LOGGING_ENABLED) - .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL, DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS); + .setIfUnset(CONNECTION_ACTIVITY_LOGGING_INTERVAL, DEFAULT_CONNECTION_ACTIVITY_LOGGING_INTERVAL_IN_MINS) + .setIfUnset(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED); // HBase sets this to 1, so we reset it to something more appropriate. // Hopefully HBase will change this, because we can't know if a user set diff --git a/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto b/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto index 2d0da268ba..725f3fc763 100644 --- a/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto +++ b/phoenix-core-client/src/main/protobuf/RegionServerEndpointService.proto @@ -50,10 +50,19 @@ message InvalidateServerMetadataCacheRequest { repeated InvalidateServerMetadataCache invalidateServerMetadataCacheRequests = 1; } +message InvalidateHAGroupStoreClientRequest { +} + +message InvalidateHAGroupStoreClientResponse { +} + service RegionServerEndpointService { rpc validateLastDDLTimestamp(ValidateLastDDLTimestampRequest) returns (ValidateLastDDLTimestampResponse); rpc invalidateServerMetadataCache(InvalidateServerMetadataCacheRequest) returns (InvalidateServerMetadataCacheResponse); + + rpc invalidateHAGroupStoreClient(InvalidateHAGroupStoreClientRequest) + returns (InvalidateHAGroupStoreClientResponse); } \ No newline at end of file diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java index 59fd1209db..22fa84c4a6 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java @@ -33,6 +33,7 @@ import org.apache.phoenix.cache.ServerMetadataCacheImpl; import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos; import org.apache.phoenix.coprocessorclient.metrics.MetricsMetadataCachingSource; import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory; +import org.apache.phoenix.jdbc.HAGroupStoreManager; import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.util.ClientUtil; import org.apache.phoenix.util.SchemaUtil; @@ -106,6 +107,24 @@ public class PhoenixRegionServerEndpoint } } + @Override + public void invalidateHAGroupStoreClient(RpcController controller, + RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest request, + RpcCallback<RegionServerEndpointProtos.InvalidateHAGroupStoreClientResponse> done) { + LOGGER.info("PhoenixRegionServerEndpoint invalidating HAGroupStoreClient"); + HAGroupStoreManager haGroupStoreManager; + try { + haGroupStoreManager = HAGroupStoreManager.getInstance(conf); + haGroupStoreManager.invalidateHAGroupStoreClient(); + } catch (Throwable t) { + String errorMsg = "Invalidating HAGroupStoreClient FAILED, check exception for " + + "specific details"; + LOGGER.error(errorMsg, t); + IOException ioe = ClientUtil.createIOException(errorMsg, t); + ProtobufUtil.setControllerException(controller, ioe); + } + } + @Override public Iterable<Service> getServices() { return Collections.singletonList(this); @@ -114,4 +133,5 @@ public class PhoenixRegionServerEndpoint public ServerMetadataCache getServerMetadataCache() { return ServerMetadataCacheImpl.getInstance(conf); } + } \ No newline at end of file diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 74c965345a..915db903b0 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; @@ -88,6 +89,7 @@ import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexFailurePolicyHelper; import org.apache.phoenix.index.PhoenixIndexFailurePolicyHelper.MutateCommand; import org.apache.phoenix.index.PhoenixIndexMetaData; +import org.apache.phoenix.jdbc.HAGroupStoreManager; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; @@ -1055,4 +1057,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver protected boolean isRegionObserverFor(Scan scan) { return scan.getAttribute(BaseScannerRegionObserverConstants.UNGROUPED_AGG) != null; } + + @Override + public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, + MiniBatchOperationInProgress<Mutation> miniBatchOp) + throws IOException { + final Configuration conf = c.getEnvironment().getConfiguration(); + try { + final HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(conf); + if (haGroupStoreManager.isMutationBlocked()) { + throw new IOException("Blocking Mutation as Some CRRs are in ACTIVE_TO_STANDBY " + + "state and CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is true"); + } + } catch (Exception e) { + throw new IOException(e); + } + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 1223c3a980..72ef706566 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -47,6 +47,7 @@ import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.index.PhoenixIndexBuilderHelper; +import org.apache.phoenix.jdbc.HAGroupStoreManager; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap; @@ -160,6 +161,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { private static final OperationStatus NOWRITE = new OperationStatus(SUCCESS); public static final String PHOENIX_APPEND_METADATA_TO_WAL = "phoenix.append.metadata.to.wal"; public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false; + /** * Class to represent pending data table rows * */ @@ -512,6 +514,9 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } } + /* + * Also checks for mutationBlockEnabled if CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is enabled. + */ @Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { @@ -519,6 +524,12 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { return; } try { + final Configuration conf = c.getEnvironment().getConfiguration(); + final HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(conf); + if (haGroupStoreManager.isMutationBlocked()) { + throw new IOException("Blocking Mutation as Some CRRs are in ACTIVE_TO_STANDBY " + + "state and CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is true"); + } preBatchMutateWithExceptions(c, miniBatchOp); return; } catch (Throwable t) { @@ -1303,6 +1314,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { return ts; } } + public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable { PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp); diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index 691731627d..4987d477c1 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -364,6 +364,10 @@ <artifactId>curator-framework</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + </dependency> <!-- Other test dependencies --> diff --git a/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java index fa051203ca..f92db070e2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/cache/ServerMetadataCacheIT.java @@ -39,7 +39,6 @@ import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; -import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; @@ -73,7 +72,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.LAST_DDL_TIMESTAMP import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; import static org.apache.phoenix.query.ConnectionQueryServicesImpl.INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE; import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java index 8996939290..670aba7f15 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointIT.java @@ -32,7 +32,6 @@ import org.apache.phoenix.util.ClientUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.phoenix.util.ServerUtil; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -158,6 +157,18 @@ public class PhoenixRegionServerEndpointIT extends BaseTest { } } + @Test + public void testInvalidateHAGroupStoreClient() { + HRegionServer regionServer = utility.getMiniHBaseCluster().getRegionServer(0); + PhoenixRegionServerEndpoint coprocessor = getPhoenixRegionServerEndpoint(regionServer); + assertNotNull(coprocessor); + ServerRpcController controller = new ServerRpcController(); + RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest request + = RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest.newBuilder().build(); + coprocessor.invalidateHAGroupStoreClient(controller, request, null); + assertFalse(controller.failed()); + } + private String getCreateTableStmt(String tableName) { return "CREATE TABLE " + tableName + " (a_string varchar not null, col1 integer" + diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java new file mode 100644 index 0000000000..cd0679e9c2 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +/** + * Integration tests for {@link HAGroupStoreClient} + */ +@Category(NeedsOwnMiniClusterTest.class) +public class HAGroupStoreClientIT extends BaseTest { + + private final PhoenixHAAdmin haAdmin = new PhoenixHAAdmin(config); + private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 1000L; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void before() throws Exception { + // Clean up all the existing CRRs + List<ClusterRoleRecord> crrs = haAdmin.listAllClusterRoleRecordsOnZookeeper(); + for (ClusterRoleRecord crr : crrs) { + haAdmin.getCurator().delete().forPath(toPath(crr.getHaGroupName())); + } + } + + @Test + public void testHAGroupStoreClientWithSingleCRR() throws Exception { + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstance(config); + ClusterRoleRecord crr = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + haAdmin.createOrUpdateDataOnZookeeper(crr); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 1; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty(); + + // Now Update CRR so that current cluster has state ACTIVE_TO_STANDBY + crr = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L); + haAdmin.createOrUpdateDataOnZookeeper(crr); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + // Check that now the cluster should be in ActiveToStandby + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).isEmpty(); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size() == 1; + + + // Change it back to ACTIVE so that cluster is not in ACTIVE_TO_STANDBY state + crr = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 3L); + haAdmin.createOrUpdateDataOnZookeeper(crr); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 1; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty(); + + + // Change it again to ACTIVE_TO_STANDBY so that we can validate watcher works repeatedly + crr = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 4L); + haAdmin.createOrUpdateDataOnZookeeper(crr); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).isEmpty(); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size() == 1; + + + // Change peer cluster to ACTIVE_TO_STANDBY so that we can still process mutation on this cluster + crr = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, 5L); + haAdmin.createOrUpdateDataOnZookeeper(crr); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 1; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty(); + } + + + @Test + public void testHAGroupStoreClientWithMultipleCRRs() throws Exception { + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstance(config); + // Setup initial CRRs + ClusterRoleRecord crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 2; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty(); + + // Now Update CRR so that current cluster has state ACTIVE_TO_STANDBY for only 1 crr + crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L); + crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + // Check that now the cluster should be in ActiveToStandby + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 1; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size() == 1; + + + // Change it back to ACTIVE so that cluster is not in ACTIVE_TO_STANDBY state + crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 3L); + crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 3L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 2; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty(); + + + // Change other crr to ACTIVE_TO_STANDBY and one in ACTIVE state so that we can validate watcher works repeatedly + crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 4L); + crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 4L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 1; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size() == 1; + + + // Change peer cluster to ACTIVE_TO_STANDBY so that we can still process mutation on this cluster + crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, 5L); + crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, 5L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 2; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty(); + } + + @Test + public void testMultiThreadedAccessToHACache() throws Exception { + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstance(config); + // Setup initial CRRs + ClusterRoleRecord crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + int threadCount = 10; + final CountDownLatch latch = new CountDownLatch(threadCount); + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 2; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty(); + latch.countDown(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + assert latch.await(10, TimeUnit.SECONDS); + + // Update CRRs + crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L); + crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + final CountDownLatch latch2 = new CountDownLatch(threadCount); + executor = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 1; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size() == 1; + latch2.countDown(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + assert latch2.await(10, TimeUnit.SECONDS); + } + + @Test + public void testHAGroupStoreClientWithRootPathDeletion() throws Exception { + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstance(config); + ClusterRoleRecord crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 1; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size() == 1; + + haAdmin.getCurator().delete().deletingChildrenIfNeeded().forPath(ZKPaths.PATH_SEPARATOR); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).isEmpty(); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).isEmpty(); + + + crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L); + crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 1; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size() == 1; + } + + @Test + public void testThrowsExceptionWithZKDisconnectionAndThenConnection() throws Exception { + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstance(config); + // Setup initial CRRs + ClusterRoleRecord crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 2; + + // Shutdown the ZK Cluster to simulate CONNECTION_SUSPENDED event + utility.shutdownMiniZKCluster(); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + //Check that HAGroupStoreClient instance is not healthy and throws IOException + assertThrows(IOException.class, () -> haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE)); + + // Start ZK on the same port to simulate CONNECTION_RECONNECTED event + utility.startMiniZKCluster(1,Integer.parseInt(getZKClientPort(config))); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + //Check that HAGroupStoreClient instance is back to healthy and provides correct response + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 2; + } + + + @Test + public void testHAGroupStoreClientWithDifferentZKURLFormats() throws Exception { + HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient.getInstance(config); + final String zkClientPort = getZKClientPort(config); + // Setup initial CRRs + final String format1 = "127.0.0.1\\:"+zkClientPort+"::/hbase"; // 127.0.0.1\:53228::/hbase + final String format2 = "127.0.0.1:"+zkClientPort+"::/hbase"; // 127.0.0.1:53228::/hbase + final String format3 = "127.0.0.1\\:"+zkClientPort+":/hbase"; // 127.0.0.1\:53228:/hbase + + ClusterRoleRecord crr1 = new ClusterRoleRecord("parallel1", + HighAvailabilityPolicy.PARALLEL, format1, ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + + ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel2", + HighAvailabilityPolicy.PARALLEL, format2, ClusterRoleRecord.ClusterRole.STANDBY, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + ClusterRoleRecord crr3 = new ClusterRoleRecord("parallel3", + HighAvailabilityPolicy.PARALLEL, format3, ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + haAdmin.createOrUpdateDataOnZookeeper(crr3); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE).size() == 1; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY).size() == 1; + assert haGroupStoreClient.getCRRsByClusterRole(ClusterRoleRecord.ClusterRole.STANDBY).size() == 1; + } + + + /** + * This test verifies that the updates coming via PathChildrenCacheListener are in order in which updates are sent to ZK + * @throws Exception + */ + @Test + public void testHAGroupStoreClientWithMultiThreadedUpdates() throws Exception { + // Number of threads to execute + int threadCount = 15; + + // Capture versions of crr in a list(crrEventVersions) in order they are received. + List<Integer> crrEventVersions = new ArrayList<>(); + CountDownLatch eventsLatch = new CountDownLatch(threadCount); + PathChildrenCacheListener pathChildrenCacheListener = (client, event) -> { + if(event.getData() != null && event.getData().getData() != null && ClusterRoleRecord.fromJson(event.getData().getData()).isPresent()) { + ClusterRoleRecord crr = ClusterRoleRecord.fromJson(event.getData().getData()).get(); + crrEventVersions.add((int)crr.getVersion()); + eventsLatch.countDown(); + } + }; + + // Start a new HAGroupStoreClient. + new HAGroupStoreClient(config, pathChildrenCacheListener); + + // Create multiple threads for update to ZK. + final CountDownLatch updateLatch = new CountDownLatch(threadCount); + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + + // List captures the order of events that are sent. + List<Integer> updateList = new ArrayList<>(); + + // Create a queue which can be polled to send updates to ZK. + ConcurrentLinkedQueue<ClusterRoleRecord> updateQueue = new ConcurrentLinkedQueue<>(); + for(int i = 0; i < threadCount; i++) { + updateQueue.add(createCRR(i+1)); + updateList.add(i+1); + } + + // Submit updates to ZK. + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + synchronized (HAGroupStoreClientIT.class) { + haAdmin.createOrUpdateDataOnZookeeper(Objects.requireNonNull(updateQueue.poll())); + } + updateLatch.countDown(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + // Check if updates are sent and updates are received. + assert eventsLatch.await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS*threadCount, TimeUnit.MILLISECONDS); + assert updateLatch.await(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS*threadCount, TimeUnit.MILLISECONDS); + + // Assert that the order of updates is same as order of events. + assert updateList.equals(crrEventVersions); + } + + private ClusterRoleRecord createCRR(Integer version) { + return new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, version); + } + + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java new file mode 100644 index 0000000000..9dfbb6cb4f --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.List; +import java.util.Map; + +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath; +import static org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED; +import static org.junit.Assert.assertFalse; + + +@Category(NeedsOwnMiniClusterTest.class) +public class HAGroupStoreManagerIT extends BaseTest { + private final PhoenixHAAdmin haAdmin = new PhoenixHAAdmin(config); + private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 1000L; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED, "true"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void before() throws Exception { + // Clean up all the existing CRRs + List<ClusterRoleRecord> crrs = haAdmin.listAllClusterRoleRecordsOnZookeeper(); + for (ClusterRoleRecord crr : crrs) { + haAdmin.getCurator().delete().forPath(toPath(crr.getHaGroupName())); + } + } + + @Test + public void testHAGroupStoreManagerWithSingleCRR() throws Exception { + HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(config); + // Setup initial CRRs + ClusterRoleRecord crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + ClusterRoleRecord crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 1L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + assertFalse(haGroupStoreManager.isMutationBlocked()); + + crr1 = new ClusterRoleRecord("failover", + HighAvailabilityPolicy.FAILOVER, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L); + crr2 = new ClusterRoleRecord("parallel", + HighAvailabilityPolicy.PARALLEL, haAdmin.getZkUrl(), ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY, + "random-zk-url", ClusterRoleRecord.ClusterRole.STANDBY, 2L); + haAdmin.createOrUpdateDataOnZookeeper(crr1); + haAdmin.createOrUpdateDataOnZookeeper(crr2); + + Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS); + + assert haGroupStoreManager.isMutationBlocked(); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java index 27b627753f..4a7c6bce91 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java @@ -20,7 +20,6 @@ package org.apache.phoenix.jdbc; import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.phoenix.end2end.PhoenixRegionServerEndpointTestImpl; import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl; -import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; import org.apache.commons.lang3.RandomUtils; @@ -35,7 +34,6 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; -import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +61,8 @@ import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCo import static org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT; import static org.apache.phoenix.jdbc.FailoverPhoenixConnection.FAILOVER_TIMEOUT_MS_ATTR; import static org.apache.phoenix.jdbc.HighAvailabilityGroup.*; +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.HighAvailibilityCuratorProvider; + import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -86,8 +86,8 @@ public class HighAvailabilityTestingUtility { private String url1; /** The host\:port::/hbase format of the JDBC string for HBase cluster 2. */ private String url2; - private PhoenixHAAdminHelper haAdmin1; - private PhoenixHAAdminHelper haAdmin2; + private PhoenixHAAdmin haAdmin1; + private PhoenixHAAdmin haAdmin2; private Admin admin1; private Admin admin2; @VisibleForTesting @@ -121,8 +121,8 @@ public class HighAvailabilityTestingUtility { url1 = String.format("%s\\:%d::/hbase", confAddress1, hbaseCluster1.getZkCluster().getClientPort()); url2 = String.format("%s\\:%d::/hbase", confAddress2, hbaseCluster2.getZkCluster().getClientPort()); - haAdmin1 = new PhoenixHAAdminHelper(getUrl1(), hbaseCluster1.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE); - haAdmin2 = new PhoenixHAAdminHelper(getUrl2(), hbaseCluster2.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE); + haAdmin1 = new PhoenixHAAdmin(getUrl1(), hbaseCluster1.getConfiguration(), HighAvailibilityCuratorProvider.INSTANCE); + haAdmin2 = new PhoenixHAAdmin(getUrl2(), hbaseCluster2.getConfiguration(), HighAvailibilityCuratorProvider.INSTANCE); admin1 = hbaseCluster1.getConnection().getAdmin(); admin2 = hbaseCluster2.getConnection().getAdmin(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java index eeafec80ba..e00d15c515 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixHAAdminToolTest.java @@ -17,8 +17,10 @@ */ package org.apache.phoenix.jdbc; +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl; import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.RET_SUCCESS; -import static org.apache.phoenix.jdbc.PhoenixHAAdminTool.getLocalZkUrl; +import static org.apache.phoenix.jdbc.PhoenixHAAdmin.HighAvailibilityCuratorProvider; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -46,7 +48,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.util.ToolRunner; import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; -import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; import org.apache.zookeeper.KeeperException.NoNodeException; import org.junit.After; import org.junit.Before; @@ -59,7 +60,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Unit test for {@link PhoenixHAAdminTool} including the helper class {@link PhoenixHAAdminHelper}. + * Unit test for {@link PhoenixHAAdminTool} including the helper class {@link PhoenixHAAdmin}. * * @see PhoenixHAAdminToolIT */ @@ -70,12 +71,13 @@ public class PhoenixHAAdminToolTest { private static final PrintStream STDOUT = System.out; private static final ByteArrayOutputStream STDOUT_CAPTURE = new ByteArrayOutputStream(); - private final PhoenixHAAdminTool.HighAvailibilityCuratorProvider mockHighAvailibilityCuratorProvider = Mockito.mock(PhoenixHAAdminTool.HighAvailibilityCuratorProvider.class); + private final HighAvailibilityCuratorProvider mockHighAvailibilityCuratorProvider = Mockito.mock(HighAvailibilityCuratorProvider.class); /** Use mocked curator since there is no mini-ZK cluster. */ private final CuratorFramework curator = Mockito.mock(CuratorFramework.class); /** HA admin to test for one test case. */ - private final PhoenixHAAdminHelper admin = new PhoenixHAAdminHelper(ZK1, new Configuration(), mockHighAvailibilityCuratorProvider); + private final PhoenixHAAdmin + admin = new PhoenixHAAdmin(ZK1, new Configuration(), mockHighAvailibilityCuratorProvider); private String haGroupName; private ClusterRoleRecord recordV1;