dbwong commented on code in PR #1430: URL: https://github.com/apache/phoenix/pull/1430#discussion_r898611015
########## phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java: ########## @@ -0,0 +1,871 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import com.sun.istack.NotNull; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.ZKPaths; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.com.google.common.cache.Cache; +import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.phoenix.util.JDBCUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION; + +/** + * An high availability (HA) group is an association between a pair of HBase clusters, a group of + * clients, and an HA policy. + * <p> + * This class is thread safe. Multiple threads may access an instance of this class, including + * multiple clients that call init in order to create a connection, two cluster role managers that + * watches node changes in ZooKeeper. + * <p> + * The lifecycle of an HA group is confined in the global cache, meaning clients can get an instance + * from the cache but cannot construct or close an HA group instance. The reason is that HA group + * is a shared resource by many clients. Closing it intentionally or accidentally by a client will + * impact other connections in this group with unexpected behavior. + */ +@SuppressWarnings("UnstableApiUsage") +public class HighAvailabilityGroup { + public static final String PHOENIX_HA_ATTR_PREFIX = "phoenix.ha."; + public static final String PHOENIX_HA_GROUP_ATTR = PHOENIX_HA_ATTR_PREFIX + "group.name"; + /** + * Should we fall back to single cluster when cluster role record is missing? + */ + public static final String PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY = + PHOENIX_HA_ATTR_PREFIX + "fallback.enabled"; + public static final String PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_DEFAULT = + String.valueOf(Boolean.TRUE); + /** + * The single-cluster connection URL when it needs to fall back. + */ + public static final String PHOENIX_HA_FALLBACK_CLUSTER_KEY = + PHOENIX_HA_ATTR_PREFIX + "fallback.cluster"; + public static final String PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE = + "phoenix" + ZKPaths.PATH_SEPARATOR + "ha"; + + public static final String PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY = + PHOENIX_HA_ATTR_PREFIX + "zk.connection.timeout.ms"; + public static final int PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT = 4_000; + public static final String PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY = + PHOENIX_HA_ATTR_PREFIX + "zk.session.timeout.ms"; + public static final int PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_DEFAULT = 4_000; + public static final String PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY = + PHOENIX_HA_ATTR_PREFIX + "zk.retry.base.sleep.ms"; + + public static final int PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT = 1000; + public static final String PHOENIX_HA_ZK_RETRY_MAX_KEY = + PHOENIX_HA_ATTR_PREFIX + "zk.retry.max"; + public static final int PHOENIX_HA_ZK_RETRY_MAX_DEFAULT = 5; + public static final String PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY = + PHOENIX_HA_ATTR_PREFIX + "zk.retry.max.sleep.ms"; + public static final int PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT = 10_000; + public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry( + PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT, + PHOENIX_HA_ZK_RETRY_MAX_DEFAULT, + PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT); + + public static final String PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY = + PHOENIX_HA_ATTR_PREFIX + "transition.timeout.ms"; + public static final long PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000; // 5 mins + + static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityGroup.class); + @VisibleForTesting + static final Map<HAGroupInfo, HighAvailabilityGroup> GROUPS = new ConcurrentHashMap<>(); + @VisibleForTesting + static final Cache<HAGroupInfo, Boolean> MISSING_CRR_GROUPS_CACHE = CacheBuilder.newBuilder() + .expireAfterWrite(PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT, TimeUnit.MILLISECONDS) + .build(); + /** + * The Curator client cache, one client instance per cluster. + */ + @VisibleForTesting + static final Cache<String, CuratorFramework> CURATOR_CACHE = CacheBuilder.newBuilder() + .expireAfterAccess(DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION, TimeUnit.MILLISECONDS) + .removalListener((notification) -> + ((CuratorFramework) Objects.requireNonNull(notification.getValue())).close()) + .build(); + /** + * High availability group info. + */ + private final HAGroupInfo info; + /** + * Client properties used to initialize this HA group. + */ + private final Properties properties; + /** + * Executor service for the two role managers. + */ + private final ExecutorService roleManagerExecutor = Executors.newFixedThreadPool(2, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("phoenixHAGroup-%d").build()); + /** + * The count down latch to make sure at least one role manager has pulled data from ZK. + */ + private final CountDownLatch roleManagerLatch = new CountDownLatch(1); + /** + * Pair of role managers for watching cluster role records from the two ZK clusters. + */ + private final AtomicReference<PairOfSameType<HAClusterRoleManager>> roleManagers + = new AtomicReference<>(); + /** + * Executor for applying the cluster role to this HA group. + */ + private final ExecutorService nodeChangedExecutor = Executors.newFixedThreadPool(1); + /** + * Current cluster role record for this HA group. + */ + private volatile ClusterRoleRecord roleRecord; + /** + * State of this HA group. + */ + private volatile State state = State.UNINITIALIZED; + + /** + * Private constructor. + * <p> + * To get an instance, please call {@link HighAvailabilityGroup#get(String, Properties)}. + */ + private HighAvailabilityGroup(HAGroupInfo info, Properties properties) { + this.info = info; + this.properties = properties; + } + /** + * This is for test usage only. In production, the record should be retrieved from ZooKeeper. + */ + @VisibleForTesting + HighAvailabilityGroup(HAGroupInfo info, Properties properties, ClusterRoleRecord record, + State state) { + this.info = info; + this.properties = properties; + this.roleRecord = record; + this.state = state; + } + + public static HAGroupInfo getHAGroupInfo(String url, Properties properties) + throws SQLException { + if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) { + url = url.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1); + } + if (!(url.contains("[") && url.contains("|") && url.contains("]"))) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL) + .setMessage(String.format("URL %s is not a valid HA connection string", url)) + .build() + .buildException(); + } + url = url.substring(url.indexOf("[") + 1, url.indexOf("]")); + String[] urls = url.split("\\|"); + + String name = properties.getProperty(PHOENIX_HA_GROUP_ATTR); + if (StringUtils.isEmpty(name)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.HA_INVALID_PROPERTIES) + .setMessage(String.format("HA group name can not be empty for HA URL %s", url)) + .build() + .buildException(); + } + return new HAGroupInfo(name, urls[0], urls[1]); + } + + /** + * Get an instance of HA group given the HA connecting URL (with "|") and client properties. + * <p> + * The HA group does not have a public constructor. This method is the only public one for + * getting an HA group instance. The reason is that, HA group is considered expensive to create + * and maintain. Caching it will make it reusable for all connection requests to this group. + * <p> + * It will return the cached instance, if any, for the target HA group. The HA group creation + * and initialization are blocking operations. Upon initialization failure, the HA group + * information may be saved in a negative cache iff the cause is due to missing cluster role + * records. In presence of empty (not null or exception) return value, client may choose to fall + * back to a single cluster connection to compensate missing cluster role records. + * + * @return Optional of target HA group (initialized), or empty if missing cluster role records + * @throws SQLException fails to get or initialize an HA group + */ + public static Optional<HighAvailabilityGroup> get(String url, Properties properties) + throws SQLException { + HAGroupInfo info = getHAGroupInfo(url, properties); + if (MISSING_CRR_GROUPS_CACHE.getIfPresent(info) != null) { + return Optional.empty(); + } + + HighAvailabilityGroup haGroup = GROUPS.computeIfAbsent(info, + haGroupInfo -> new HighAvailabilityGroup(haGroupInfo, properties)); + try { + haGroup.init(); + } catch (Exception e) { + GROUPS.remove(info); + haGroup.close(); + try { + CuratorFramework curator1 = CURATOR_CACHE.getIfPresent(info.getUrl1()); + CuratorFramework curator2 = CURATOR_CACHE.getIfPresent(info.getUrl2()); + if (curator1 != null && curator2 != null) { + Stat node1 = curator1.checkExists().forPath(info.getZkPath()); + Stat node2 = curator2.checkExists().forPath(info.getZkPath()); + if (node1 == null && node2 == null) { + // The HA group fails to initialize due to missing cluster role records on + // both ZK clusters. We will put this HA group into negative cache. + MISSING_CRR_GROUPS_CACHE.put(info, true); + return Optional.empty(); + } + } + } catch (Exception e2) { + LOG.error("HA group {} failed to initialized. Got exception when checking if znode" + + " exists on the two ZK clusters.", info, e2); + } + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) + .setMessage(String.format("Cannot start HA group %s for URL %s", haGroup, url)) + .setRootCause(e) + .build() + .buildException(); + } + return Optional.of(haGroup); + } + + /** + * This method helps client to get the single cluster to fallback. + * <p> + * When getting HA group using {@link #get(String, Properties)}, it may return empty (not null + * or exception) value. In that case client may choose to fall back to a single cluster + * connection to compensate missing cluster role records instead of throw errors. + * + * @param url The HA connection url optionally; empty optional if properties disables fallback + * @param properties The client connection properties + * @return The connection url of the single cluster to fall back + * @throws SQLException if fails to get HA information and/or invalid properties are seen + */ + static Optional<String> getFallbackCluster(String url, Properties properties) throws SQLException { + HAGroupInfo haGroupInfo = getHAGroupInfo(url, properties); + + String fallback = properties.getProperty(PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY, + PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_DEFAULT); + if (!Boolean.parseBoolean(fallback)) { + LOG.info("Fallback to single cluster not enabled for the HA group {} per configuration." + + " HA url: '{}'.", haGroupInfo.getName(), url); + return Optional.empty(); + } + String fallbackCluster = properties.getProperty(PHOENIX_HA_FALLBACK_CLUSTER_KEY); + if (StringUtils.isEmpty(fallbackCluster)) { + fallbackCluster = haGroupInfo.getUrl1(); + } + LOG.info("Falling back to single cluster '{}' for the HA group {} to serve HA connection " + + "request against url '{}'.", + fallbackCluster, haGroupInfo.getName(), url); + return Optional.of(fallbackCluster); + } + + /** + * Get an active curator ZK client for the given properties and ZK endpoint. + * <p> + * This can be from cached object since Curator should be shared per cluster. + * + * @param jdbcUrl the ZK endpoint host:port or the JDBC connection String host:port:/hbase + * @param properties the properties defining time out values and retry count + * @return a new Curator framework client + */ + @SuppressWarnings("UnstableApiUsage") + public static CuratorFramework getCurator(String jdbcUrl, Properties properties) + throws IOException { + try { + return CURATOR_CACHE.get(jdbcUrl, () -> { + CuratorFramework curator = createCurator(jdbcUrl, properties); + if (!curator.blockUntilConnected(PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT, + TimeUnit.MILLISECONDS)) + throw new RuntimeException("Failed to connect to the CuratorFramework in " + + "timeout " + PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT + " ms"); + return curator; + }); + } catch (Exception e) { + LOG.error("Fail to get an active curator for url {}", jdbcUrl, e); + // invalidate the cache when getting/creating throws exception + CURATOR_CACHE.invalidate(jdbcUrl); + throw new IOException(e); + } + } + + /** + * Create a curator ZK client for the given properties and ZK endpoint. + * <p> + * Unless caller needs a new curator, it should use {@link #getCurator(String, Properties)}. + */ + private static CuratorFramework createCurator(String jdbcUrl, Properties properties) { + // Get the ZK endpoint in host:port format by removing JDBC protocol and HBase root node + final String zkUrl; + if (jdbcUrl.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) { + jdbcUrl = jdbcUrl.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1); + } + Preconditions.checkArgument(!StringUtils.isEmpty(jdbcUrl), "JDBC url is empty!"); + String[] urls = jdbcUrl.split(":"); + if (urls.length == 1) { + zkUrl = String.format("%s:%s", urls[0], HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + } else if (urls.length == 2 || urls.length == 3) { + zkUrl = String.format("%s:%s", urls[0], urls[1]); + } else { + throw new IllegalArgumentException("Invalid JDBC url!" + jdbcUrl); + } + + // Get timeout and retry counts + String connectionTimeoutMsProp = properties.getProperty( + PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY); + final int connectionTimeoutMs = !StringUtils.isEmpty(connectionTimeoutMsProp) + ? Integer.parseInt(connectionTimeoutMsProp) + : PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT; + String sessionTimeoutMsProps = properties.getProperty(PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY); + final int sessionTimeoutMs = !StringUtils.isEmpty(sessionTimeoutMsProps) + ? Integer.parseInt(sessionTimeoutMsProps) + : PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_DEFAULT; + final RetryPolicy retryPolicy = createRetryPolicy(properties); + + CuratorFramework curator = CuratorFrameworkFactory + .builder() + .connectString(zkUrl) + .namespace(PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE) + .connectionTimeoutMs(connectionTimeoutMs) + .sessionTimeoutMs(sessionTimeoutMs) + .retryPolicy(retryPolicy) + .canBeReadOnly(true) + .build(); + curator.start(); + return curator; + } + + /** + * Create a Curator retry policy from properties. + * <p> + * If properties is null, return a default retry policy. + * + * @param properties properties defining timeout and max retries + * @return a retry policy which can be used for Curator operations + */ + public static RetryPolicy createRetryPolicy(Properties properties) { + if (properties == null) { + return RETRY_POLICY; + } + String baseSleepTimeMsProp = properties.getProperty(PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY); + int baseSleepTimeMs = StringUtils.isNotEmpty(baseSleepTimeMsProp) + ? Integer.parseInt(baseSleepTimeMsProp) + : PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT; + String maxRetriesProp = properties.getProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY); + int maxRetries = StringUtils.isNotEmpty(maxRetriesProp) + ? Integer.parseInt(maxRetriesProp) + : PHOENIX_HA_ZK_RETRY_MAX_DEFAULT; + String maxSleepTimeMsProp = properties.getProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY); + int maxSleepTimeMs = StringUtils.isNotEmpty(maxSleepTimeMsProp) + ? Integer.parseInt(maxSleepTimeMsProp) + : PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT; + return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries, maxSleepTimeMs); + } + + /** + * Initialize this HA group by registering ZK watchers and getting initial cluster role record. + * <p> + * If this is already initialized, calling this method is a no-op. This method is lock free as + * current thread will either return fast or wait for the in-progress initialization or timeout. + */ + public void init() throws IOException { + if (state != State.UNINITIALIZED) { + return; + } + + PairOfSameType<HAClusterRoleManager> newRoleManagers = new PairOfSameType<>( + new HAClusterRoleManager(info.urls.getFirst(), properties), + new HAClusterRoleManager(info.urls.getSecond(), properties)); + if (!roleManagers.compareAndSet(null, newRoleManagers)) { + LOG.info("Someone already started role managers; waiting for that one..."); + waitForInitialization(properties); + return; + } + + Future<?> f1 = roleManagerExecutor.submit(newRoleManagers.getFirst()); + Future<?> f2 = roleManagerExecutor.submit(newRoleManagers.getSecond()); + try { + waitForInitialization(properties); + } catch (IOException e) { + // HA group that fails to initialize will not be kept in the global cache. + // Next connection request will create and initialize a new HA group. + // Before returning in case of exception, following code will cancel the futures. + f1.cancel(true); + f2.cancel(true); + throw e; + } + + assert roleRecord != null; + LOG.info("Initial cluster role for HA group {} is {}", info, roleRecord); + } + + /** + * Helper method that will block current thread until the HA group is initialized. + * <p> + * After returning, the HA state might not be in READY state. That is possible when a new ZK + * node change is detected triggering HA group to become IN_TRANSIT state. + * + * @param properties the connection properties + * @throws IOException when current HA group is not initialized before timeout + */ + private void waitForInitialization(Properties properties) throws IOException { + String connectionTimeoutMsProp = properties.getProperty( + PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY); + int timeout = !StringUtils.isEmpty(connectionTimeoutMsProp) + ? Integer.parseInt(connectionTimeoutMsProp) + : PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT; + boolean started = false; + try { + started = roleManagerLatch.await(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.warn("Got interrupted when waiting for cluster role managers to start", e); + Thread.currentThread().interrupt(); + } + if (!started) { + LOG.warn("Timed out {}ms waiting for HA group '{}' to be initialized.", timeout, info); + throw new IOException("Fail to initialize HA group " + info); + } + } + + /** + * Create a JDBC connection in this high availability group. + * + * @param properties connection properties + * @return a JDBC connection implementation + * @throws SQLException if fails to connect a JDBC connection + */ + public Connection connect(Properties properties) throws SQLException { + if (state != State.READY) { + throw new SQLExceptionInfo + .Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) + .setMessage("HA group is not ready!") + .setHaGroupInfo(info.toString()) + .build() + .buildException(); + } + return roleRecord.getPolicy().provide(this, properties); + } + + /** + * Get a Phoenix connection against the current active HBase cluster. + * <p> + * If there is no active cluster, it will throw exception instead of blocking or retrying. + * + * @param properties connection properties + * @return a Phoenix connection to current active HBase cluster + * @throws SQLException if fails to get a connection + */ + PhoenixConnection connectActive(final Properties properties) throws SQLException { + try { + Optional<String> url = roleRecord.getActiveUrl(); + if (state == State.READY && url.isPresent()) { + PhoenixConnection conn = connectToOneCluster(url.get(), properties); + // After connection is created, double check if the cluster is still ACTIVE + // This is to make sure the newly created connection will not be returned to client + // if the target cluster is not active any more. This can happen during failover. + if (state == State.READY && isActive(conn)) { + return conn; + } else { + conn.close(); Review Comment: I think leak here is unlikely but I can add some extra protection on the isActive check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
