dbwong commented on code in PR #1430:
URL: https://github.com/apache/phoenix/pull/1430#discussion_r898611015


##########
phoenix-core/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java:
##########
@@ -0,0 +1,871 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import com.sun.istack.NotNull;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.PairOfSameType;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
+import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.phoenix.util.JDBCUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION;
+
+/**
+ * An high availability (HA) group is an association between a pair of HBase 
clusters, a group of
+ * clients, and an HA policy.
+ * <p>
+ * This class is thread safe. Multiple threads may access an instance of this 
class, including
+ * multiple clients that call init in order to create a connection, two 
cluster role managers that
+ * watches node changes in ZooKeeper.
+ * <p>
+ * The lifecycle of an HA group is confined in the global cache, meaning 
clients can get an instance
+ * from the cache but cannot construct or close an HA group instance.  The 
reason is that HA group
+ * is a shared resource by many clients.  Closing it intentionally or 
accidentally by a client will
+ * impact other connections in this group with unexpected behavior.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class HighAvailabilityGroup {
+    public static final String PHOENIX_HA_ATTR_PREFIX = "phoenix.ha.";
+    public static final String PHOENIX_HA_GROUP_ATTR = PHOENIX_HA_ATTR_PREFIX 
+ "group.name";
+    /**
+     * Should we fall back to single cluster when cluster role record is 
missing?
+     */
+    public static final String PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY 
=
+            PHOENIX_HA_ATTR_PREFIX + "fallback.enabled";
+    public static final String 
PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_DEFAULT =
+            String.valueOf(Boolean.TRUE);
+    /**
+     * The single-cluster connection URL when it needs to fall back.
+     */
+    public static final String PHOENIX_HA_FALLBACK_CLUSTER_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "fallback.cluster";
+    public static final String PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE =
+            "phoenix" + ZKPaths.PATH_SEPARATOR + "ha";
+
+    public static final String PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "zk.connection.timeout.ms";
+    public static final int PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT = 
4_000;
+    public static final String PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "zk.session.timeout.ms";
+    public static final int PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_DEFAULT = 4_000;
+    public static final String PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "zk.retry.base.sleep.ms";
+
+    public static final int PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT = 1000;
+    public static final String PHOENIX_HA_ZK_RETRY_MAX_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "zk.retry.max";
+    public static final int PHOENIX_HA_ZK_RETRY_MAX_DEFAULT = 5;
+    public static final String PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "zk.retry.max.sleep.ms";
+    public static final int PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT = 10_000;
+    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(
+            PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT,
+            PHOENIX_HA_ZK_RETRY_MAX_DEFAULT,
+            PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT);
+
+    public static final String PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY =
+            PHOENIX_HA_ATTR_PREFIX + "transition.timeout.ms";
+    public static final long PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT = 5 * 60 
* 1000; // 5 mins
+
+    static final Logger LOG = 
LoggerFactory.getLogger(HighAvailabilityGroup.class);
+    @VisibleForTesting
+    static final Map<HAGroupInfo, HighAvailabilityGroup> GROUPS = new 
ConcurrentHashMap<>();
+    @VisibleForTesting
+    static final Cache<HAGroupInfo, Boolean> MISSING_CRR_GROUPS_CACHE = 
CacheBuilder.newBuilder()
+            .expireAfterWrite(PHOENIX_HA_TRANSITION_TIMEOUT_MS_DEFAULT, 
TimeUnit.MILLISECONDS)
+            .build();
+    /**
+     * The Curator client cache, one client instance per cluster.
+     */
+    @VisibleForTesting
+    static final Cache<String, CuratorFramework> CURATOR_CACHE = 
CacheBuilder.newBuilder()
+            .expireAfterAccess(DEFAULT_CLIENT_CONNECTION_CACHE_MAX_DURATION, 
TimeUnit.MILLISECONDS)
+            .removalListener((notification) ->
+                    ((CuratorFramework) 
Objects.requireNonNull(notification.getValue())).close())
+            .build();
+    /**
+     * High availability group info.
+     */
+    private final HAGroupInfo info;
+    /**
+     * Client properties used to initialize this HA group.
+     */
+    private final Properties properties;
+    /**
+     * Executor service for the two role managers.
+     */
+    private final ExecutorService roleManagerExecutor = 
Executors.newFixedThreadPool(2,
+            new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("phoenixHAGroup-%d").build());
+    /**
+     * The count down latch to make sure at least one role manager has pulled 
data from ZK.
+     */
+    private final CountDownLatch roleManagerLatch = new CountDownLatch(1);
+    /**
+     * Pair of role managers for watching cluster role records from the two ZK 
clusters.
+     */
+    private final AtomicReference<PairOfSameType<HAClusterRoleManager>> 
roleManagers
+            = new AtomicReference<>();
+    /**
+     * Executor for applying the cluster role to this HA group.
+     */
+    private final ExecutorService nodeChangedExecutor = 
Executors.newFixedThreadPool(1);
+    /**
+     * Current cluster role record for this HA group.
+     */
+    private volatile ClusterRoleRecord roleRecord;
+    /**
+     * State of this HA group.
+     */
+    private volatile State state = State.UNINITIALIZED;
+
+    /**
+     * Private constructor.
+     * <p>
+     * To get an instance, please call {@link 
HighAvailabilityGroup#get(String, Properties)}.
+     */
+    private HighAvailabilityGroup(HAGroupInfo info, Properties properties) {
+        this.info = info;
+        this.properties = properties;
+    }
+    /**
+     * This is for test usage only. In production, the record should be 
retrieved from ZooKeeper.
+     */
+    @VisibleForTesting
+    HighAvailabilityGroup(HAGroupInfo info, Properties properties, 
ClusterRoleRecord record,
+                          State state) {
+        this.info = info;
+        this.properties = properties;
+        this.roleRecord = record;
+        this.state = state;
+    }
+
+    public static HAGroupInfo getHAGroupInfo(String url, Properties properties)
+            throws SQLException {
+        if (url.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
+            url = url.substring(PhoenixRuntime.JDBC_PROTOCOL.length() + 1);
+        }
+        if (!(url.contains("[") && url.contains("|") && url.contains("]"))) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_CONNECTION_URL)
+                    .setMessage(String.format("URL %s is not a valid HA 
connection string", url))
+                    .build()
+                    .buildException();
+        }
+        url = url.substring(url.indexOf("[") + 1, url.indexOf("]"));
+        String[] urls = url.split("\\|");
+
+        String name = properties.getProperty(PHOENIX_HA_GROUP_ATTR);
+        if (StringUtils.isEmpty(name)) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.HA_INVALID_PROPERTIES)
+                    .setMessage(String.format("HA group name can not be empty 
for HA URL %s", url))
+                    .build()
+                    .buildException();
+        }
+        return new HAGroupInfo(name, urls[0], urls[1]);
+    }
+
+    /**
+     * Get an instance of HA group given the HA connecting URL (with "|") and 
client properties.
+     * <p>
+     * The HA group does not have a public constructor. This method is the 
only public one for
+     * getting an HA group instance. The reason is that, HA group is 
considered expensive to create
+     * and maintain. Caching it will make it reusable for all connection 
requests to this group.
+     * <p>
+     * It will return the cached instance, if any, for the target HA group. 
The HA group creation
+     * and initialization are blocking operations. Upon initialization 
failure, the HA group
+     * information may be saved in a negative cache iff the cause is due to 
missing cluster role
+     * records. In presence of empty (not null or exception) return value, 
client may choose to fall
+     * back to a single cluster connection to compensate missing cluster role 
records.
+     *
+     * @return Optional of target HA group (initialized), or empty if missing 
cluster role records
+     * @throws SQLException fails to get or initialize an HA group
+     */
+    public static Optional<HighAvailabilityGroup> get(String url, Properties 
properties)
+            throws SQLException {
+        HAGroupInfo info = getHAGroupInfo(url, properties);
+        if (MISSING_CRR_GROUPS_CACHE.getIfPresent(info) != null) {
+            return Optional.empty();
+        }
+
+        HighAvailabilityGroup haGroup = GROUPS.computeIfAbsent(info,
+                haGroupInfo -> new HighAvailabilityGroup(haGroupInfo, 
properties));
+        try {
+            haGroup.init();
+        } catch (Exception e) {
+            GROUPS.remove(info);
+            haGroup.close();
+            try {
+                CuratorFramework curator1 = 
CURATOR_CACHE.getIfPresent(info.getUrl1());
+                CuratorFramework curator2 = 
CURATOR_CACHE.getIfPresent(info.getUrl2());
+                if (curator1 != null && curator2 != null) {
+                    Stat node1 = 
curator1.checkExists().forPath(info.getZkPath());
+                    Stat node2 = 
curator2.checkExists().forPath(info.getZkPath());
+                    if (node1 == null && node2 == null) {
+                        // The HA group fails to initialize due to missing 
cluster role records on
+                        // both ZK clusters. We will put this HA group into 
negative cache.
+                        MISSING_CRR_GROUPS_CACHE.put(info, true);
+                        return Optional.empty();
+                    }
+                }
+            } catch (Exception e2) {
+                LOG.error("HA group {} failed to initialized. Got exception 
when checking if znode"
+                        + " exists on the two ZK clusters.", info, e2);
+            }
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
+                    .setMessage(String.format("Cannot start HA group %s for 
URL %s", haGroup, url))
+                    .setRootCause(e)
+                    .build()
+                    .buildException();
+        }
+        return Optional.of(haGroup);
+    }
+
+    /**
+     * This method helps client to get the single cluster to fallback.
+     * <p>
+     * When getting HA group using {@link #get(String, Properties)}, it may 
return empty (not null
+     * or exception) value. In that case client may choose to fall back to a 
single cluster
+     * connection to compensate missing cluster role records instead of throw 
errors.
+     *
+     * @param url        The HA connection url optionally; empty optional if 
properties disables fallback
+     * @param properties The client connection properties
+     * @return The connection url of the single cluster to fall back
+     * @throws SQLException if fails to get HA information and/or invalid 
properties are seen
+     */
+    static Optional<String> getFallbackCluster(String url, Properties 
properties) throws SQLException {
+        HAGroupInfo haGroupInfo = getHAGroupInfo(url, properties);
+
+        String fallback = 
properties.getProperty(PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY,
+                PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_DEFAULT);
+        if (!Boolean.parseBoolean(fallback)) {
+            LOG.info("Fallback to single cluster not enabled for the HA group 
{} per configuration."
+                    + " HA url: '{}'.", haGroupInfo.getName(), url);
+            return Optional.empty();
+        }
+        String fallbackCluster = 
properties.getProperty(PHOENIX_HA_FALLBACK_CLUSTER_KEY);
+        if (StringUtils.isEmpty(fallbackCluster)) {
+            fallbackCluster = haGroupInfo.getUrl1();
+        }
+        LOG.info("Falling back to single cluster '{}' for the HA group {} to 
serve HA connection "
+                        + "request against url '{}'.",
+                fallbackCluster, haGroupInfo.getName(), url);
+        return Optional.of(fallbackCluster);
+    }
+
+    /**
+     * Get an active curator ZK client for the given properties and ZK 
endpoint.
+     * <p>
+     * This can be from cached object since Curator should be shared per 
cluster.
+     *
+     * @param jdbcUrl    the ZK endpoint host:port or the JDBC connection 
String host:port:/hbase
+     * @param properties the properties defining time out values and retry 
count
+     * @return a new Curator framework client
+     */
+    @SuppressWarnings("UnstableApiUsage")
+    public static CuratorFramework getCurator(String jdbcUrl, Properties 
properties)
+            throws IOException {
+        try {
+            return CURATOR_CACHE.get(jdbcUrl, () -> {
+                CuratorFramework curator = createCurator(jdbcUrl, properties);
+                if 
(!curator.blockUntilConnected(PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT,
+                        TimeUnit.MILLISECONDS))
+                    throw new RuntimeException("Failed to connect to the 
CuratorFramework in "
+                            + "timeout " + 
PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT + " ms");
+                return curator;
+            });
+        } catch (Exception e) {
+            LOG.error("Fail to get an active curator for url {}", jdbcUrl, e);
+            // invalidate the cache when getting/creating throws exception
+            CURATOR_CACHE.invalidate(jdbcUrl);
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Create a curator ZK client for the given properties and ZK endpoint.
+     * <p>
+     * Unless caller needs a new curator, it should use {@link 
#getCurator(String, Properties)}.
+     */
+    private static CuratorFramework createCurator(String jdbcUrl, Properties 
properties) {
+        // Get the ZK endpoint in host:port format by removing JDBC protocol 
and HBase root node
+        final String zkUrl;
+        if (jdbcUrl.startsWith(PhoenixRuntime.JDBC_PROTOCOL)) {
+            jdbcUrl = jdbcUrl.substring(PhoenixRuntime.JDBC_PROTOCOL.length() 
+ 1);
+        }
+        Preconditions.checkArgument(!StringUtils.isEmpty(jdbcUrl), "JDBC url 
is empty!");
+        String[] urls = jdbcUrl.split(":");
+        if (urls.length == 1) {
+            zkUrl = String.format("%s:%s", urls[0], 
HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+        } else if (urls.length == 2 || urls.length == 3) {
+            zkUrl = String.format("%s:%s", urls[0], urls[1]);
+        } else {
+            throw new IllegalArgumentException("Invalid JDBC url!" + jdbcUrl);
+        }
+
+        // Get timeout and retry counts
+        String connectionTimeoutMsProp = properties.getProperty(
+                PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY);
+        final int connectionTimeoutMs = 
!StringUtils.isEmpty(connectionTimeoutMsProp)
+                ? Integer.parseInt(connectionTimeoutMsProp)
+                : PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT;
+        String sessionTimeoutMsProps = 
properties.getProperty(PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY);
+        final int sessionTimeoutMs = 
!StringUtils.isEmpty(sessionTimeoutMsProps)
+                ? Integer.parseInt(sessionTimeoutMsProps)
+                : PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_DEFAULT;
+        final RetryPolicy retryPolicy = createRetryPolicy(properties);
+
+        CuratorFramework curator = CuratorFrameworkFactory
+                .builder()
+                .connectString(zkUrl)
+                .namespace(PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE)
+                .connectionTimeoutMs(connectionTimeoutMs)
+                .sessionTimeoutMs(sessionTimeoutMs)
+                .retryPolicy(retryPolicy)
+                .canBeReadOnly(true)
+                .build();
+        curator.start();
+        return curator;
+    }
+
+    /**
+     * Create a Curator retry policy from properties.
+     * <p>
+     * If properties is null, return a default retry policy.
+     *
+     * @param properties properties defining timeout and max retries
+     * @return a retry policy which can be used for Curator operations
+     */
+    public static RetryPolicy createRetryPolicy(Properties properties) {
+        if (properties == null) {
+            return RETRY_POLICY;
+        }
+        String baseSleepTimeMsProp = 
properties.getProperty(PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY);
+        int baseSleepTimeMs = StringUtils.isNotEmpty(baseSleepTimeMsProp)
+                ? Integer.parseInt(baseSleepTimeMsProp)
+                : PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_DEFAULT;
+        String maxRetriesProp = 
properties.getProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY);
+        int maxRetries = StringUtils.isNotEmpty(maxRetriesProp)
+                ? Integer.parseInt(maxRetriesProp)
+                : PHOENIX_HA_ZK_RETRY_MAX_DEFAULT;
+        String maxSleepTimeMsProp = 
properties.getProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY);
+        int maxSleepTimeMs = StringUtils.isNotEmpty(maxSleepTimeMsProp)
+                ? Integer.parseInt(maxSleepTimeMsProp)
+                : PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_DEFAULT;
+        return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries, 
maxSleepTimeMs);
+    }
+
+    /**
+     * Initialize this HA group by registering ZK watchers and getting initial 
cluster role record.
+     * <p>
+     * If this is already initialized, calling this method is a no-op. This 
method is lock free as
+     * current thread will either return fast or wait for the in-progress 
initialization or timeout.
+     */
+    public void init() throws IOException {
+        if (state != State.UNINITIALIZED) {
+            return;
+        }
+
+        PairOfSameType<HAClusterRoleManager> newRoleManagers = new 
PairOfSameType<>(
+                new HAClusterRoleManager(info.urls.getFirst(), properties),
+                new HAClusterRoleManager(info.urls.getSecond(), properties));
+        if (!roleManagers.compareAndSet(null, newRoleManagers)) {
+            LOG.info("Someone already started role managers; waiting for that 
one...");
+            waitForInitialization(properties);
+            return;
+        }
+
+        Future<?> f1 = roleManagerExecutor.submit(newRoleManagers.getFirst());
+        Future<?> f2 = roleManagerExecutor.submit(newRoleManagers.getSecond());
+        try {
+            waitForInitialization(properties);
+        } catch (IOException e) {
+            // HA group that fails to initialize will not be kept in the 
global cache.
+            // Next connection request will create and initialize a new HA 
group.
+            // Before returning in case of exception, following code will 
cancel the futures.
+            f1.cancel(true);
+            f2.cancel(true);
+            throw e;
+        }
+
+        assert roleRecord != null;
+        LOG.info("Initial cluster role for HA group {} is {}", info, 
roleRecord);
+    }
+
+    /**
+     * Helper method that will block current thread until the HA group is 
initialized.
+     * <p>
+     * After returning, the HA state might not be in READY state. That is 
possible when a new ZK
+     * node change is detected triggering HA group to become IN_TRANSIT state.
+     *
+     * @param properties the connection properties
+     * @throws IOException when current HA group is not initialized before 
timeout
+     */
+    private void waitForInitialization(Properties properties) throws 
IOException {
+        String connectionTimeoutMsProp = properties.getProperty(
+                PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY);
+        int timeout = !StringUtils.isEmpty(connectionTimeoutMsProp)
+                ? Integer.parseInt(connectionTimeoutMsProp)
+                : PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT;
+        boolean started = false;
+        try {
+            started = roleManagerLatch.await(timeout, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOG.warn("Got interrupted when waiting for cluster role managers 
to start", e);
+            Thread.currentThread().interrupt();
+        }
+        if (!started) {
+            LOG.warn("Timed out {}ms waiting for HA group '{}' to be 
initialized.", timeout, info);
+            throw new IOException("Fail to initialize HA group " + info);
+        }
+    }
+
+    /**
+     * Create a JDBC connection in this high availability group.
+     *
+     * @param properties connection properties
+     * @return a JDBC connection implementation
+     * @throws SQLException if fails to connect a JDBC connection
+     */
+    public Connection connect(Properties properties) throws SQLException {
+        if (state != State.READY) {
+            throw new SQLExceptionInfo
+                    .Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION)
+                    .setMessage("HA group is not ready!")
+                    .setHaGroupInfo(info.toString())
+                    .build()
+                    .buildException();
+        }
+        return roleRecord.getPolicy().provide(this, properties);
+    }
+
+    /**
+     * Get a Phoenix connection against the current active HBase cluster.
+     * <p>
+     * If there is no active cluster, it will throw exception instead of 
blocking or retrying.
+     *
+     * @param properties connection properties
+     * @return a Phoenix connection to current active HBase cluster
+     * @throws SQLException if fails to get a connection
+     */
+    PhoenixConnection connectActive(final Properties properties) throws 
SQLException {
+        try {
+            Optional<String> url = roleRecord.getActiveUrl();
+            if (state == State.READY && url.isPresent()) {
+                PhoenixConnection conn = connectToOneCluster(url.get(), 
properties);
+                // After connection is created, double check if the cluster is 
still ACTIVE
+                // This is to make sure the newly created connection will not 
be returned to client
+                // if the target cluster is not active any more. This can 
happen during failover.
+                if (state == State.READY && isActive(conn)) {
+                    return conn;
+                } else {
+                    conn.close();

Review Comment:
   I think leak here is unlikely but I can add some extra protection on the 
isActive check.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to