dbwong commented on code in PR #1430:
URL: https://github.com/apache/phoenix/pull/1430#discussion_r881164572


##########
phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java:
##########
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.GenericTestUtils;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
+import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.test.GenericTestUtils.waitFor;
+import static 
org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY;
+import static 
org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY;
+import static 
org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class for testing HBase failover.
+ */
+public class HighAvailabilityTestingUtility {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HighAvailabilityTestingUtility.class);
+
+    /**
+     * Utility class for creating and maintaining HBase DR cluster pair.
+     *
+     * TODO: @Bharath check if we can use utility from upstream HBase for a 
pair of mini clusters.
+     */
+    public static class HBaseTestingUtilityPair implements Closeable {
+        private final HBaseTestingUtility hbaseCluster1 = new 
HBaseTestingUtility();
+        private final HBaseTestingUtility hbaseCluster2 = new 
HBaseTestingUtility();
+        /** The host:port:/hbase format of the JDBC string for HBase cluster 
1. */
+        private String url1;
+        /** The host:port:/hbase format of the JDBC string for HBase cluster 
2. */
+        private String url2;
+        private PhoenixHAAdminHelper haAdmin1;
+        private PhoenixHAAdminHelper haAdmin2;
+        private Admin admin1;
+        private Admin admin2;
+
+        public HBaseTestingUtilityPair() {
+            Configuration conf1 = hbaseCluster1.getConfiguration();
+            Configuration conf2 = hbaseCluster2.getConfiguration();
+            setUpDefaultHBaseConfig(conf1);
+            setUpDefaultHBaseConfig(conf2);
+        }
+
+        /**
+         * Instantiates and starts the two mini HBase DR cluster. Enable 
peering if configured.
+         *
+         * @throws Exception if fails to start either cluster
+         */
+        public void start() throws Exception {
+            hbaseCluster1.startMiniCluster();
+            hbaseCluster2.startMiniCluster();
+
+            url1 = String.format("localhost:%d:/hbase", 
hbaseCluster1.getZkCluster().getClientPort());
+            url2 = String.format("localhost:%d:/hbase", 
hbaseCluster2.getZkCluster().getClientPort());
+
+            haAdmin1 = new PhoenixHAAdminHelper(getUrl1(), 
hbaseCluster1.getConfiguration(), 
PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE);
+            haAdmin2 = new PhoenixHAAdminHelper(getUrl2(), 
hbaseCluster2.getConfiguration(), 
PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE);
+
+            admin1 = hbaseCluster1.getHBaseAdmin();
+            admin2 = hbaseCluster2.getHBaseAdmin();
+
+            // Enable replication between the two HBase clusters.
+            ReplicationPeerConfig replicationPeerConfig1 = new 
ReplicationPeerConfig();
+            
replicationPeerConfig1.setClusterKey(hbaseCluster2.getClusterKey());
+            ReplicationPeerConfig replicationPeerConfig2 = new 
ReplicationPeerConfig();
+            
replicationPeerConfig2.setClusterKey(hbaseCluster1.getClusterKey());
+
+            ReplicationAdmin replicationAdmin1 = new ReplicationAdmin(
+                    hbaseCluster1.getConfiguration());
+            replicationAdmin1.addPeer(PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT, 
replicationPeerConfig1);
+            replicationAdmin1.close();
+
+            ReplicationAdmin replicationAdmin2 = new ReplicationAdmin(
+                    hbaseCluster2.getConfiguration());
+            replicationAdmin2.addPeer(PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT, 
replicationPeerConfig2);
+            replicationAdmin2.close();
+
+            LOG.info("MiniHBase DR cluster pair is ready for testing.  Cluster 
Urls [{},{}]",
+                    getUrl1(), getUrl2());
+            logClustersStates();
+        }
+
+        /** initialize two ZK clusters for cluster role znode. */
+        public void initClusterRole(String haGroupName, HighAvailabilityPolicy 
policy)
+                throws Exception {
+            ClusterRoleRecord record = new ClusterRoleRecord(
+                    haGroupName, policy,
+                    getUrl1(), ClusterRole.ACTIVE,
+                    getUrl2(), ClusterRole.STANDBY,
+                    1);
+            haAdmin1.createOrUpdateDataOnZookeeper(record);
+            haAdmin2.createOrUpdateDataOnZookeeper(record);
+        }
+
+        /**
+         * Set cluster roles for an HA group and wait the cluster role 
transition to happen.
+         *
+         * @param haGroup the HA group name
+         * @param role1 cluster role for the first cluster in the group
+         * @param role2 cluster role for the second cluster in the group
+         */
+        public void transitClusterRole(HighAvailabilityGroup haGroup, 
ClusterRole role1,
+                ClusterRole role2) throws Exception {
+            final ClusterRoleRecord newRoleRecord = new ClusterRoleRecord(
+                    haGroup.getGroupInfo().getName(), 
haGroup.getRoleRecord().getPolicy(),
+                    getUrl1(), role1,
+                    getUrl2(), role2,
+                    haGroup.getRoleRecord().getVersion() + 1); // always use a 
newer version
+            LOG.info("Transiting cluster role for HA group {} V{}->V{}, 
existing: {}, new: {}",
+                    haGroup.getGroupInfo().getName(), 
haGroup.getRoleRecord().getVersion(),
+                    newRoleRecord.getVersion(), haGroup.getRoleRecord(), 
newRoleRecord);
+            boolean successAtLeastOnce = false;
+            try {
+                haAdmin1.createOrUpdateDataOnZookeeper(newRoleRecord);
+                successAtLeastOnce = true;
+            } catch (IOException e) {
+                LOG.warn("Fail to update new record on {} because {}", 
getUrl1(), e.getMessage());
+            }
+            try {
+                haAdmin2.createOrUpdateDataOnZookeeper(newRoleRecord);
+                successAtLeastOnce = true;
+            } catch (IOException e) {
+                LOG.warn("Fail to update new record on {} because {}", 
getUrl2(), e.getMessage());
+            }
+
+            if (!successAtLeastOnce) {
+                throw new IOException("Failed to update the new role record on 
either cluster");
+            }
+            // wait for the cluster roles are populated into client side from 
ZK nodes.
+            waitFor(() -> newRoleRecord.equals(haGroup.getRoleRecord()), 1000, 
120_000);
+
+            LOG.info("Now the HA group {} should have detected and updated V{} 
cluster role record",
+                    haGroup, newRoleRecord.getVersion());
+        }
+
+        /**
+         * Log the state of both clusters
+         */
+        public void logClustersStates() {
+            String cluster1Status, cluster2Status;
+            try {
+                cluster1Status = admin1.getClusterStatus().toString();
+            } catch (IOException e) {
+                cluster1Status = "Unable to get cluster status.";
+            }
+            try {
+                cluster2Status = admin2.getClusterStatus().toString();
+            } catch (IOException e){
+                cluster2Status = "Unable to get cluster status.";
+            }
+            LOG.info("Cluster Status [\n{},\n{}\n]", cluster1Status, 
cluster2Status);
+        }
+
+        /**
+         * @return testing utility for cluster 1
+         */
+        public HBaseTestingUtility getHBaseCluster1() {
+            return hbaseCluster1;
+        }
+
+        /**
+         * @return testing utility for cluster 2
+         */
+        public HBaseTestingUtility getHBaseCluster2() {
+            return hbaseCluster2;
+        }
+
+        /**
+         * Returns a Phoenix Connection to a cluster
+         * @param clusterIndex 0 based
+         * @return a Phoenix Connection to the indexed cluster
+         */
+        public Connection getClusterConnection(int clusterIndex) throws 
SQLException {
+            String clusterUrl = clusterIndex == 0 ? getUrl1() : getUrl2();
+            Properties props = new Properties();
+            String url = String.format("jdbc:phoenix:%s", clusterUrl);
+            return DriverManager.getConnection(url, props);
+        }
+
+        /**
+         * Returns a Phoenix Connection to cluster 0
+         * @return a Phoenix Connection to the cluster
+         */
+        public Connection getCluster0Connection() throws SQLException {

Review Comment:
   Some of the classes were written with the hope to be generic enough to in 
the future support arbitrary number of clusters.  If you feel strongly one way 
or another we can go with one or another.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to