gjacoby126 commented on code in PR #1430: URL: https://github.com/apache/phoenix/pull/1430#discussion_r877435588
########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java: ########## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.RandomUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.GenericTestUtils; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; +import static org.apache.hadoop.test.GenericTestUtils.waitFor; +import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY; +import static org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utility class for testing HBase failover. + */ +public class HighAvailabilityTestingUtility { + private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityTestingUtility.class); + + /** + * Utility class for creating and maintaining HBase DR cluster pair. + * + * TODO: @Bharath check if we can use utility from upstream HBase for a pair of mini clusters. + */ + public static class HBaseTestingUtilityPair implements Closeable { + private final HBaseTestingUtility hbaseCluster1 = new HBaseTestingUtility(); + private final HBaseTestingUtility hbaseCluster2 = new HBaseTestingUtility(); + /** The host:port:/hbase format of the JDBC string for HBase cluster 1. */ + private String url1; + /** The host:port:/hbase format of the JDBC string for HBase cluster 2. */ + private String url2; + private PhoenixHAAdminHelper haAdmin1; + private PhoenixHAAdminHelper haAdmin2; + private Admin admin1; + private Admin admin2; + + public HBaseTestingUtilityPair() { + Configuration conf1 = hbaseCluster1.getConfiguration(); + Configuration conf2 = hbaseCluster2.getConfiguration(); + setUpDefaultHBaseConfig(conf1); + setUpDefaultHBaseConfig(conf2); + } + + /** + * Instantiates and starts the two mini HBase DR cluster. Enable peering if configured. + * + * @throws Exception if fails to start either cluster + */ + public void start() throws Exception { + hbaseCluster1.startMiniCluster(); + hbaseCluster2.startMiniCluster(); + + url1 = String.format("localhost:%d:/hbase", hbaseCluster1.getZkCluster().getClientPort()); + url2 = String.format("localhost:%d:/hbase", hbaseCluster2.getZkCluster().getClientPort()); + + haAdmin1 = new PhoenixHAAdminHelper(getUrl1(), hbaseCluster1.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE); + haAdmin2 = new PhoenixHAAdminHelper(getUrl2(), hbaseCluster2.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE); + + admin1 = hbaseCluster1.getHBaseAdmin(); + admin2 = hbaseCluster2.getHBaseAdmin(); + + // Enable replication between the two HBase clusters. + ReplicationPeerConfig replicationPeerConfig1 = new ReplicationPeerConfig(); + replicationPeerConfig1.setClusterKey(hbaseCluster2.getClusterKey()); + ReplicationPeerConfig replicationPeerConfig2 = new ReplicationPeerConfig(); + replicationPeerConfig2.setClusterKey(hbaseCluster1.getClusterKey()); + + ReplicationAdmin replicationAdmin1 = new ReplicationAdmin( + hbaseCluster1.getConfiguration()); + replicationAdmin1.addPeer(PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT, replicationPeerConfig1); + replicationAdmin1.close(); + + ReplicationAdmin replicationAdmin2 = new ReplicationAdmin( + hbaseCluster2.getConfiguration()); + replicationAdmin2.addPeer(PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT, replicationPeerConfig2); + replicationAdmin2.close(); + + LOG.info("MiniHBase DR cluster pair is ready for testing. Cluster Urls [{},{}]", + getUrl1(), getUrl2()); + logClustersStates(); + } + + /** initialize two ZK clusters for cluster role znode. */ + public void initClusterRole(String haGroupName, HighAvailabilityPolicy policy) + throws Exception { + ClusterRoleRecord record = new ClusterRoleRecord( + haGroupName, policy, + getUrl1(), ClusterRole.ACTIVE, + getUrl2(), ClusterRole.STANDBY, + 1); + haAdmin1.createOrUpdateDataOnZookeeper(record); + haAdmin2.createOrUpdateDataOnZookeeper(record); + } + + /** + * Set cluster roles for an HA group and wait the cluster role transition to happen. + * + * @param haGroup the HA group name + * @param role1 cluster role for the first cluster in the group + * @param role2 cluster role for the second cluster in the group + */ + public void transitClusterRole(HighAvailabilityGroup haGroup, ClusterRole role1, + ClusterRole role2) throws Exception { + final ClusterRoleRecord newRoleRecord = new ClusterRoleRecord( + haGroup.getGroupInfo().getName(), haGroup.getRoleRecord().getPolicy(), + getUrl1(), role1, + getUrl2(), role2, + haGroup.getRoleRecord().getVersion() + 1); // always use a newer version + LOG.info("Transiting cluster role for HA group {} V{}->V{}, existing: {}, new: {}", + haGroup.getGroupInfo().getName(), haGroup.getRoleRecord().getVersion(), + newRoleRecord.getVersion(), haGroup.getRoleRecord(), newRoleRecord); + boolean successAtLeastOnce = false; + try { + haAdmin1.createOrUpdateDataOnZookeeper(newRoleRecord); + successAtLeastOnce = true; + } catch (IOException e) { + LOG.warn("Fail to update new record on {} because {}", getUrl1(), e.getMessage()); + } + try { + haAdmin2.createOrUpdateDataOnZookeeper(newRoleRecord); + successAtLeastOnce = true; + } catch (IOException e) { + LOG.warn("Fail to update new record on {} because {}", getUrl2(), e.getMessage()); + } + + if (!successAtLeastOnce) { + throw new IOException("Failed to update the new role record on either cluster"); + } + // wait for the cluster roles are populated into client side from ZK nodes. + waitFor(() -> newRoleRecord.equals(haGroup.getRoleRecord()), 1000, 120_000); Review Comment: will a 120s timeout lead to slow failing tests if the mini ZK is unhealthy? What's the expected time to wait? ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupTestIT.java: ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.HAGroupInfo; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.State.READY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.State.UNINITIALIZED; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.startsWith; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.ExistsBuilder; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.ConnectionQueryServicesImpl; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit test for {@link HighAvailabilityGroup}. + * + * This does not require a mini HBase cluster. Instead, it uses mocked components. + * + * @see HighAvailabilityGroupIT + */ +@Category(NeedsOwnMiniClusterTest.class) +public class HighAvailabilityGroupTestIT { + + // TODO: This is not an IT but can't run in parallel with other UTs, refactor + // This test cannot be run in parallel since it registers/deregisters driver + + private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityGroupTestIT.class); + private static final String ZK1 = "zk1-1,zk1-2:2181:/hbase"; + private static final String ZK2 = "zk2-1,zk2-2:2181:/hbase"; + private static final PhoenixEmbeddedDriver DRIVER = mock(PhoenixEmbeddedDriver.class); + + /** The client properties to create a JDBC connection. */ + private final Properties clientProperties = new Properties(); + /** The mocked cluster role record of the HA group. */ + private final ClusterRoleRecord record = mock(ClusterRoleRecord.class); + /** The HA group to test. This is spied but not mocked. */ + private HighAvailabilityGroup haGroup; + + @Rule + public final TestName testName = new TestName(); + @Rule + public final Timeout globalTimeout= new Timeout(300, TimeUnit.SECONDS); + + @BeforeClass + public static void setupBeforeClass() throws SQLException { + // Mock a connection + PhoenixConnection connection = mock(PhoenixConnection.class); + when(connection.getURL()).thenReturn(ZK1); + + // Mock a CQS + ConnectionQueryServices cqs = mock(ConnectionQueryServicesImpl.class); + when(cqs.connect(anyString(), any(Properties.class))).thenReturn(connection); + + // Register the mocked PhoenixEmbeddedDriver + when(DRIVER.acceptsURL(startsWith(PhoenixRuntime.JDBC_PROTOCOL))).thenReturn(true); + when(DRIVER.getConnectionQueryServices(anyString(), any(Properties.class))).thenReturn(cqs); + DriverManager.registerDriver(DRIVER); + + // Unregister the PhoenixDriver so that all Phoenix JDBC requests will get mocked + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + } + + @Before + public void init() { + final String haGroupName = testName.getMethodName(); + // By default the HA policy is FAILOVER + when(record.getPolicy()).thenReturn(HighAvailabilityPolicy.FAILOVER); + when(record.getHaGroupName()).thenReturn(haGroupName); + // Make ZK1 ACTIVE + when(record.getActiveUrl()).thenReturn(Optional.of(ZK1)); + when(record.getRole(eq(ZK1))).thenReturn(ClusterRole.ACTIVE); + + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + + HAGroupInfo haGroupInfo = new HAGroupInfo(haGroupName, ZK1, ZK2); + haGroup = spy(new HighAvailabilityGroup(haGroupInfo, clientProperties, record, READY)); + } + + /** + * Test that the HA group can be connected to get a JDBC connection. + * + * The JDBC connection is from the shim layer, which wraps Phoenix Connections. + */ + @Test + public void testConnect() throws SQLException { + final Connection conn = haGroup.connect(clientProperties); + assertTrue(conn instanceof FailoverPhoenixConnection); + FailoverPhoenixConnection failoverConnection = conn.unwrap(FailoverPhoenixConnection.class); + assertNotNull(failoverConnection); + // Verify that the failover should have connected to ACTIVE cluster once + verify(haGroup, times(1)).connectActive(any(Properties.class)); + verify(haGroup, times(1)).connectToOneCluster(anyString(), eq(clientProperties)); + verify(DRIVER, atLeastOnce()).getConnectionQueryServices(anyString(), eq(clientProperties)); + + when(record.getPolicy()).thenReturn(HighAvailabilityPolicy.PARALLEL); + // get a new connection from this HA group + final Connection conn2 = haGroup.connect(clientProperties); + assertTrue(conn2 instanceof ParallelPhoenixConnection); + } + + /** + * Test that the HA group can not be connected when it is not ready. + */ + @Test + public void testConnectShouldFailWhenNotReady() throws SQLException { + final HAGroupInfo info = haGroup.getGroupInfo(); + haGroup = spy(new HighAvailabilityGroup(info, clientProperties, record, UNINITIALIZED)); + try { + haGroup.connect(clientProperties); + fail("Should have failed since HA group is not READY!"); + } catch (SQLException e) { + LOG.info("Got expected exception", e); + assertEquals(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION.getErrorCode(), + e.getErrorCode()); + verify(DRIVER, never()).getConnectionQueryServices(anyString(), eq(clientProperties)); + } + } + + /** + * Test that it can connect to a cluster in the HA group to get a PhoenixConnection. + */ + @Test + public void testConnectToOneCluster() throws SQLException { + // test with JDBC string + final String jdbcString = String.format("jdbc:phoenix:%s", ZK1); + haGroup.connectToOneCluster(jdbcString, clientProperties); + verify(DRIVER, times(1)).getConnectionQueryServices(anyString(), eq(clientProperties)); + + // test with only ZK string + haGroup.connectToOneCluster(ZK1, clientProperties); + verify(DRIVER, times(2)).getConnectionQueryServices(anyString(), eq(clientProperties)); + } + + /** + * Test that when cluster role is not connectable (e.g OFFLINE or UNKNOWN), can not connect it. + */ + @Test + public void testConnectToOneClusterShouldFailIfNotConnectable() throws SQLException { + when(record.getRole(eq(ZK1))).thenReturn(ClusterRole.UNKNOWN); + // test with JDBC string and UNKNOWN cluster role + final String jdbcString = String.format("jdbc:phoenix:%s", ZK1); + try { + haGroup.connectToOneCluster(jdbcString, clientProperties); + fail("Should have failed because cluster is in UNKNOWN role"); + } catch (SQLException e) { // expected exception + assertEquals(SQLExceptionCode.HA_CLUSTER_CAN_NOT_CONNECT.getErrorCode(), + e.getErrorCode()); + } + verify(DRIVER, never()).getConnectionQueryServices(anyString(), eq(clientProperties)); + + // test with only ZK string and OFFLINE cluster role + when(record.getRole(eq(ZK1))).thenReturn(ClusterRole.OFFLINE); + try { + haGroup.connectToOneCluster(jdbcString, clientProperties); + fail("Should have failed because cluster is in OFFLINE role"); + } catch (SQLException e) { // expected exception + assertEquals(SQLExceptionCode.HA_CLUSTER_CAN_NOT_CONNECT.getErrorCode(), + e.getErrorCode()); + } + verify(DRIVER, never()).getConnectionQueryServices(anyString(), eq(clientProperties)); + } + + /** + * Test {@link HighAvailabilityGroup#connectToOneCluster} with invalid connection string. + */ + @Test (expected = IllegalArgumentException.class) + public void testConnectToOneClusterShouldFailWithNonHAJdbcString() throws SQLException { + final String jdbcString = "jdbc:phoenix:dummyhost"; + haGroup.connectToOneCluster(jdbcString, clientProperties); + verify(DRIVER, never()).getConnectionQueryServices(anyString(), eq(clientProperties)); + } + + /** + * Test {@link HighAvailabilityGroup#connectToOneCluster} with a connection string that doesn't match. + */ + @Test + public void testConnectToOneClusterShouldNotFailWithDifferentHostOrderJdbcString() throws SQLException { + // test with JDBC string + final String hosts = "zk1-2,zk1-1:2181:/hbase"; + final String jdbcString = String.format("jdbc:phoenix:%s", hosts); + haGroup.connectToOneCluster(jdbcString, clientProperties); + verify(DRIVER, times(1)).getConnectionQueryServices(eq(String.format("jdbc:phoenix:%s",ZK1)), eq(clientProperties)); + } + + /** + * Test {@link HighAvailabilityGroup#get} should fail with empty High Availability group name. + */ + @Test + public void testGetShouldFailWithoutHAGroupName() throws SQLException { + String jdbcString = String.format("jdbc:phoenix:[%s|%s]", ZK1, ZK2); + Properties properties = new Properties(); // without HA group name + try { + HighAvailabilityGroup.get(jdbcString, properties); + fail("Should have fail because the HA group name is not set"); + } catch (SQLException e) { + LOG.info("Got expected exception when HA group name is empty", e); + assertEquals(SQLExceptionCode.HA_INVALID_PROPERTIES.getErrorCode(), e.getErrorCode()); + } // all other exceptions should fail this test + verify(DRIVER, never()).getConnectionQueryServices(anyString(), eq(properties)); + } + + /** + * Test that the HA group knows a phoenix connection is connected to ACTIVE cluster. + */ + @Test + public void testIsConnectionActive() throws SQLException { + assertFalse(haGroup.isActive(null)); + PhoenixConnection connection = haGroup.connectToOneCluster(ZK1, clientProperties); + assertTrue(haGroup.isActive(connection)); + } + + /** + * Test that when missing cluster role records, the HA connection request will fall back to the + * single cluster connection. + */ + @SuppressWarnings("UnstableApiUsage") + @Test + public void testNegativeCacheWhenMissingClusterRoleRecords() throws Exception { + String haGroupName2 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + HAGroupInfo haGroupInfo2 = new HAGroupInfo(haGroupName2, ZK1, ZK2); + HighAvailabilityGroup haGroup2 = spy(new HighAvailabilityGroup(haGroupInfo2, clientProperties, null, READY)); + doThrow(new RuntimeException("Mocked Exception when init HA group 2")) + .when(haGroup2).init(); + HighAvailabilityGroup.GROUPS.put(haGroupInfo2, haGroup2); + + String jdbcString = String.format("jdbc:phoenix:[%s|%s]", ZK1, ZK2); + // Getting HA group will get exception due to (mocked) ZK connection error + try { + HighAvailabilityGroup.get(jdbcString, clientProperties); + fail("Should have fail because the HA group fails to init and ZK is not connectable"); + } catch (Exception e) { Review Comment: Shouldn't this catch the actual exception you're expecting? ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtilityIT.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneHBaseDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test failover basics for {@link HighAvailabilityTestingUtility}. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class HighAvailabilityTestingUtilityIT { + private static final Logger LOG = LoggerFactory.getLogger( + HighAvailabilityTestingUtilityIT.class); + private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair(); + + @Rule + public TestName testName = new TestName(); + + /** Table name per test case. */ + private String tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setup() throws Exception { + String haGroupName = testName.getMethodName(); + + // Make first cluster ACTIVE + CLUSTERS.initClusterRole(haGroupName,HighAvailabilityPolicy.FAILOVER); + + tableName = testName.getMethodName(); + CLUSTERS.createTableOnClusterPair(tableName); + } + + /** + * Test Phoenix connection creation and basic operations with HBase cluster(s) unavail. + */ + @Test + public void testClusterUnavailableNormalConnection() throws Exception { + doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster2(), () -> { + CLUSTERS.logClustersStates(); + + //current guess is this is not working due to some Review Comment: is this comment correct or does it need to be altered/removed? ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java: ########## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.RandomUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.GenericTestUtils; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; +import static org.apache.hadoop.test.GenericTestUtils.waitFor; +import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY; +import static org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utility class for testing HBase failover. + */ +public class HighAvailabilityTestingUtility { + private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityTestingUtility.class); + + /** + * Utility class for creating and maintaining HBase DR cluster pair. + * + * TODO: @Bharath check if we can use utility from upstream HBase for a pair of mini clusters. + */ + public static class HBaseTestingUtilityPair implements Closeable { + private final HBaseTestingUtility hbaseCluster1 = new HBaseTestingUtility(); + private final HBaseTestingUtility hbaseCluster2 = new HBaseTestingUtility(); + /** The host:port:/hbase format of the JDBC string for HBase cluster 1. */ + private String url1; + /** The host:port:/hbase format of the JDBC string for HBase cluster 2. */ + private String url2; + private PhoenixHAAdminHelper haAdmin1; + private PhoenixHAAdminHelper haAdmin2; + private Admin admin1; + private Admin admin2; + + public HBaseTestingUtilityPair() { + Configuration conf1 = hbaseCluster1.getConfiguration(); + Configuration conf2 = hbaseCluster2.getConfiguration(); + setUpDefaultHBaseConfig(conf1); + setUpDefaultHBaseConfig(conf2); + } + + /** + * Instantiates and starts the two mini HBase DR cluster. Enable peering if configured. + * + * @throws Exception if fails to start either cluster + */ + public void start() throws Exception { + hbaseCluster1.startMiniCluster(); + hbaseCluster2.startMiniCluster(); + + url1 = String.format("localhost:%d:/hbase", hbaseCluster1.getZkCluster().getClientPort()); + url2 = String.format("localhost:%d:/hbase", hbaseCluster2.getZkCluster().getClientPort()); + + haAdmin1 = new PhoenixHAAdminHelper(getUrl1(), hbaseCluster1.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE); + haAdmin2 = new PhoenixHAAdminHelper(getUrl2(), hbaseCluster2.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE); + + admin1 = hbaseCluster1.getHBaseAdmin(); + admin2 = hbaseCluster2.getHBaseAdmin(); + + // Enable replication between the two HBase clusters. + ReplicationPeerConfig replicationPeerConfig1 = new ReplicationPeerConfig(); + replicationPeerConfig1.setClusterKey(hbaseCluster2.getClusterKey()); + ReplicationPeerConfig replicationPeerConfig2 = new ReplicationPeerConfig(); + replicationPeerConfig2.setClusterKey(hbaseCluster1.getClusterKey()); + + ReplicationAdmin replicationAdmin1 = new ReplicationAdmin( + hbaseCluster1.getConfiguration()); + replicationAdmin1.addPeer(PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT, replicationPeerConfig1); + replicationAdmin1.close(); + + ReplicationAdmin replicationAdmin2 = new ReplicationAdmin( + hbaseCluster2.getConfiguration()); + replicationAdmin2.addPeer(PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT, replicationPeerConfig2); + replicationAdmin2.close(); + + LOG.info("MiniHBase DR cluster pair is ready for testing. Cluster Urls [{},{}]", + getUrl1(), getUrl2()); + logClustersStates(); + } + + /** initialize two ZK clusters for cluster role znode. */ + public void initClusterRole(String haGroupName, HighAvailabilityPolicy policy) + throws Exception { + ClusterRoleRecord record = new ClusterRoleRecord( + haGroupName, policy, + getUrl1(), ClusterRole.ACTIVE, + getUrl2(), ClusterRole.STANDBY, + 1); + haAdmin1.createOrUpdateDataOnZookeeper(record); + haAdmin2.createOrUpdateDataOnZookeeper(record); + } + + /** + * Set cluster roles for an HA group and wait the cluster role transition to happen. + * + * @param haGroup the HA group name + * @param role1 cluster role for the first cluster in the group + * @param role2 cluster role for the second cluster in the group + */ + public void transitClusterRole(HighAvailabilityGroup haGroup, ClusterRole role1, + ClusterRole role2) throws Exception { + final ClusterRoleRecord newRoleRecord = new ClusterRoleRecord( + haGroup.getGroupInfo().getName(), haGroup.getRoleRecord().getPolicy(), + getUrl1(), role1, + getUrl2(), role2, + haGroup.getRoleRecord().getVersion() + 1); // always use a newer version + LOG.info("Transiting cluster role for HA group {} V{}->V{}, existing: {}, new: {}", + haGroup.getGroupInfo().getName(), haGroup.getRoleRecord().getVersion(), + newRoleRecord.getVersion(), haGroup.getRoleRecord(), newRoleRecord); + boolean successAtLeastOnce = false; + try { + haAdmin1.createOrUpdateDataOnZookeeper(newRoleRecord); + successAtLeastOnce = true; + } catch (IOException e) { + LOG.warn("Fail to update new record on {} because {}", getUrl1(), e.getMessage()); + } + try { + haAdmin2.createOrUpdateDataOnZookeeper(newRoleRecord); + successAtLeastOnce = true; + } catch (IOException e) { + LOG.warn("Fail to update new record on {} because {}", getUrl2(), e.getMessage()); + } + + if (!successAtLeastOnce) { + throw new IOException("Failed to update the new role record on either cluster"); + } + // wait for the cluster roles are populated into client side from ZK nodes. + waitFor(() -> newRoleRecord.equals(haGroup.getRoleRecord()), 1000, 120_000); + + LOG.info("Now the HA group {} should have detected and updated V{} cluster role record", + haGroup, newRoleRecord.getVersion()); + } + + /** + * Log the state of both clusters + */ + public void logClustersStates() { + String cluster1Status, cluster2Status; + try { + cluster1Status = admin1.getClusterStatus().toString(); + } catch (IOException e) { + cluster1Status = "Unable to get cluster status."; + } + try { + cluster2Status = admin2.getClusterStatus().toString(); + } catch (IOException e){ + cluster2Status = "Unable to get cluster status."; + } + LOG.info("Cluster Status [\n{},\n{}\n]", cluster1Status, cluster2Status); + } + + /** + * @return testing utility for cluster 1 + */ + public HBaseTestingUtility getHBaseCluster1() { + return hbaseCluster1; + } + + /** + * @return testing utility for cluster 2 + */ + public HBaseTestingUtility getHBaseCluster2() { + return hbaseCluster2; + } + + /** + * Returns a Phoenix Connection to a cluster + * @param clusterIndex 0 based + * @return a Phoenix Connection to the indexed cluster + */ + public Connection getClusterConnection(int clusterIndex) throws SQLException { + String clusterUrl = clusterIndex == 0 ? getUrl1() : getUrl2(); + Properties props = new Properties(); + String url = String.format("jdbc:phoenix:%s", clusterUrl); + return DriverManager.getConnection(url, props); + } + + /** + * Returns a Phoenix Connection to cluster 0 + * @return a Phoenix Connection to the cluster + */ + public Connection getCluster0Connection() throws SQLException { + return getClusterConnection(0); + } + + /** + * Returns a Phoenix Connection to cluster 1 + * @return a Phoenix Connection to the cluster + */ + public Connection getCluster1Connection() throws SQLException { + return getClusterConnection(1); + } + + //TODO: Replace with a real check for replication complete + /** + * Checks/waits with timeout if replication is done + * @return true if replication is done else false + */ + public boolean checkReplicationComplete() { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return true; + } + + /** A Testable interface that can throw checked exception. */ + @FunctionalInterface + public interface Testable { + void test() throws Exception; + } + + /** + * Shutdown mini HBase cluster, do the test, and restart the HBase cluster. + * + * Please note the ZK cluster and DFS cluster are untouched. + * + * @param cluster the HBase cluster facility whose HBase cluster to restart + * @param testable testing logic that is runnable + * @throws Exception if fails to stop, test or restart HBase cluster + */ + public static void doTestWhenOneHBaseDown(HBaseTestingUtility cluster, Testable testable) + throws Exception { + final int zkClientPort = cluster.getZkCluster().getClientPort(); + try { + LOG.info("Shutting down HBase cluster using ZK localhost:{}", zkClientPort); + cluster.shutdownMiniHBaseCluster(); Review Comment: starting and stopping clusters can be slow and these doTestWhen* methods are used quite a bit above -- what's the perf for the tests like? ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtilityIT.java: ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneHBaseDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test failover basics for {@link HighAvailabilityTestingUtility}. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class HighAvailabilityTestingUtilityIT { + private static final Logger LOG = LoggerFactory.getLogger( + HighAvailabilityTestingUtilityIT.class); + private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair(); + + @Rule + public TestName testName = new TestName(); + + /** Table name per test case. */ + private String tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setup() throws Exception { + String haGroupName = testName.getMethodName(); + + // Make first cluster ACTIVE + CLUSTERS.initClusterRole(haGroupName,HighAvailabilityPolicy.FAILOVER); + + tableName = testName.getMethodName(); + CLUSTERS.createTableOnClusterPair(tableName); + } + + /** + * Test Phoenix connection creation and basic operations with HBase cluster(s) unavail. + */ + @Test + public void testClusterUnavailableNormalConnection() throws Exception { + doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster2(), () -> { + CLUSTERS.logClustersStates(); + + //current guess is this is not working due to some + //saw that we dont' even hit the RegionServer on this cluster + try (Connection conn = CLUSTERS.getCluster0Connection()) { + doTestBasicOperationsWithConnection(conn, tableName, null); + } + }); + doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster1(), () -> { + CLUSTERS.logClustersStates(); + + //current guess is this is not working due to some Review Comment: ditto ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java: ########## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.RandomUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.GenericTestUtils; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; +import static org.apache.hadoop.test.GenericTestUtils.waitFor; +import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY; +import static org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utility class for testing HBase failover. + */ +public class HighAvailabilityTestingUtility { + private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityTestingUtility.class); + + /** + * Utility class for creating and maintaining HBase DR cluster pair. + * + * TODO: @Bharath check if we can use utility from upstream HBase for a pair of mini clusters. + */ + public static class HBaseTestingUtilityPair implements Closeable { + private final HBaseTestingUtility hbaseCluster1 = new HBaseTestingUtility(); + private final HBaseTestingUtility hbaseCluster2 = new HBaseTestingUtility(); + /** The host:port:/hbase format of the JDBC string for HBase cluster 1. */ + private String url1; + /** The host:port:/hbase format of the JDBC string for HBase cluster 2. */ + private String url2; + private PhoenixHAAdminHelper haAdmin1; + private PhoenixHAAdminHelper haAdmin2; + private Admin admin1; + private Admin admin2; + + public HBaseTestingUtilityPair() { + Configuration conf1 = hbaseCluster1.getConfiguration(); + Configuration conf2 = hbaseCluster2.getConfiguration(); + setUpDefaultHBaseConfig(conf1); + setUpDefaultHBaseConfig(conf2); + } + + /** + * Instantiates and starts the two mini HBase DR cluster. Enable peering if configured. + * + * @throws Exception if fails to start either cluster + */ + public void start() throws Exception { + hbaseCluster1.startMiniCluster(); + hbaseCluster2.startMiniCluster(); + + url1 = String.format("localhost:%d:/hbase", hbaseCluster1.getZkCluster().getClientPort()); + url2 = String.format("localhost:%d:/hbase", hbaseCluster2.getZkCluster().getClientPort()); + + haAdmin1 = new PhoenixHAAdminHelper(getUrl1(), hbaseCluster1.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE); + haAdmin2 = new PhoenixHAAdminHelper(getUrl2(), hbaseCluster2.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE); + + admin1 = hbaseCluster1.getHBaseAdmin(); + admin2 = hbaseCluster2.getHBaseAdmin(); + + // Enable replication between the two HBase clusters. + ReplicationPeerConfig replicationPeerConfig1 = new ReplicationPeerConfig(); + replicationPeerConfig1.setClusterKey(hbaseCluster2.getClusterKey()); + ReplicationPeerConfig replicationPeerConfig2 = new ReplicationPeerConfig(); + replicationPeerConfig2.setClusterKey(hbaseCluster1.getClusterKey()); + + ReplicationAdmin replicationAdmin1 = new ReplicationAdmin( + hbaseCluster1.getConfiguration()); + replicationAdmin1.addPeer(PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT, replicationPeerConfig1); + replicationAdmin1.close(); + + ReplicationAdmin replicationAdmin2 = new ReplicationAdmin( + hbaseCluster2.getConfiguration()); + replicationAdmin2.addPeer(PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT, replicationPeerConfig2); + replicationAdmin2.close(); + + LOG.info("MiniHBase DR cluster pair is ready for testing. Cluster Urls [{},{}]", + getUrl1(), getUrl2()); + logClustersStates(); + } + + /** initialize two ZK clusters for cluster role znode. */ + public void initClusterRole(String haGroupName, HighAvailabilityPolicy policy) + throws Exception { + ClusterRoleRecord record = new ClusterRoleRecord( + haGroupName, policy, + getUrl1(), ClusterRole.ACTIVE, + getUrl2(), ClusterRole.STANDBY, + 1); + haAdmin1.createOrUpdateDataOnZookeeper(record); + haAdmin2.createOrUpdateDataOnZookeeper(record); + } + + /** + * Set cluster roles for an HA group and wait the cluster role transition to happen. + * + * @param haGroup the HA group name + * @param role1 cluster role for the first cluster in the group + * @param role2 cluster role for the second cluster in the group + */ + public void transitClusterRole(HighAvailabilityGroup haGroup, ClusterRole role1, + ClusterRole role2) throws Exception { + final ClusterRoleRecord newRoleRecord = new ClusterRoleRecord( + haGroup.getGroupInfo().getName(), haGroup.getRoleRecord().getPolicy(), + getUrl1(), role1, + getUrl2(), role2, + haGroup.getRoleRecord().getVersion() + 1); // always use a newer version + LOG.info("Transiting cluster role for HA group {} V{}->V{}, existing: {}, new: {}", + haGroup.getGroupInfo().getName(), haGroup.getRoleRecord().getVersion(), + newRoleRecord.getVersion(), haGroup.getRoleRecord(), newRoleRecord); + boolean successAtLeastOnce = false; + try { + haAdmin1.createOrUpdateDataOnZookeeper(newRoleRecord); + successAtLeastOnce = true; + } catch (IOException e) { + LOG.warn("Fail to update new record on {} because {}", getUrl1(), e.getMessage()); + } + try { + haAdmin2.createOrUpdateDataOnZookeeper(newRoleRecord); + successAtLeastOnce = true; + } catch (IOException e) { + LOG.warn("Fail to update new record on {} because {}", getUrl2(), e.getMessage()); + } + + if (!successAtLeastOnce) { + throw new IOException("Failed to update the new role record on either cluster"); + } + // wait for the cluster roles are populated into client side from ZK nodes. + waitFor(() -> newRoleRecord.equals(haGroup.getRoleRecord()), 1000, 120_000); + + LOG.info("Now the HA group {} should have detected and updated V{} cluster role record", + haGroup, newRoleRecord.getVersion()); + } + + /** + * Log the state of both clusters + */ + public void logClustersStates() { + String cluster1Status, cluster2Status; + try { + cluster1Status = admin1.getClusterStatus().toString(); + } catch (IOException e) { + cluster1Status = "Unable to get cluster status."; + } + try { + cluster2Status = admin2.getClusterStatus().toString(); + } catch (IOException e){ + cluster2Status = "Unable to get cluster status."; + } + LOG.info("Cluster Status [\n{},\n{}\n]", cluster1Status, cluster2Status); + } + + /** + * @return testing utility for cluster 1 + */ + public HBaseTestingUtility getHBaseCluster1() { + return hbaseCluster1; + } + + /** + * @return testing utility for cluster 2 + */ + public HBaseTestingUtility getHBaseCluster2() { + return hbaseCluster2; + } + + /** + * Returns a Phoenix Connection to a cluster + * @param clusterIndex 0 based + * @return a Phoenix Connection to the indexed cluster + */ + public Connection getClusterConnection(int clusterIndex) throws SQLException { + String clusterUrl = clusterIndex == 0 ? getUrl1() : getUrl2(); + Properties props = new Properties(); + String url = String.format("jdbc:phoenix:%s", clusterUrl); + return DriverManager.getConnection(url, props); + } + + /** + * Returns a Phoenix Connection to cluster 0 + * @return a Phoenix Connection to the cluster + */ + public Connection getCluster0Connection() throws SQLException { Review Comment: why 0-based count when in other APIs in this class (such as getUrl) we use 1 and 2 to represent the two clusters? ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityTestingUtility.java: ########## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.RandomUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.GenericTestUtils; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; +import static org.apache.hadoop.test.GenericTestUtils.waitFor; +import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY; +import static org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_BASE_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_SESSION_TIMEOUT_MS_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Utility class for testing HBase failover. + */ +public class HighAvailabilityTestingUtility { + private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityTestingUtility.class); + + /** + * Utility class for creating and maintaining HBase DR cluster pair. + * + * TODO: @Bharath check if we can use utility from upstream HBase for a pair of mini clusters. + */ + public static class HBaseTestingUtilityPair implements Closeable { + private final HBaseTestingUtility hbaseCluster1 = new HBaseTestingUtility(); + private final HBaseTestingUtility hbaseCluster2 = new HBaseTestingUtility(); + /** The host:port:/hbase format of the JDBC string for HBase cluster 1. */ + private String url1; + /** The host:port:/hbase format of the JDBC string for HBase cluster 2. */ + private String url2; + private PhoenixHAAdminHelper haAdmin1; + private PhoenixHAAdminHelper haAdmin2; + private Admin admin1; + private Admin admin2; + + public HBaseTestingUtilityPair() { + Configuration conf1 = hbaseCluster1.getConfiguration(); + Configuration conf2 = hbaseCluster2.getConfiguration(); + setUpDefaultHBaseConfig(conf1); + setUpDefaultHBaseConfig(conf2); + } + + /** + * Instantiates and starts the two mini HBase DR cluster. Enable peering if configured. + * + * @throws Exception if fails to start either cluster + */ + public void start() throws Exception { + hbaseCluster1.startMiniCluster(); + hbaseCluster2.startMiniCluster(); + + url1 = String.format("localhost:%d:/hbase", hbaseCluster1.getZkCluster().getClientPort()); + url2 = String.format("localhost:%d:/hbase", hbaseCluster2.getZkCluster().getClientPort()); + + haAdmin1 = new PhoenixHAAdminHelper(getUrl1(), hbaseCluster1.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE); + haAdmin2 = new PhoenixHAAdminHelper(getUrl2(), hbaseCluster2.getConfiguration(), PhoenixHAAdminTool.HighAvailibilityCuratorProvider.INSTANCE); + + admin1 = hbaseCluster1.getHBaseAdmin(); + admin2 = hbaseCluster2.getHBaseAdmin(); + + // Enable replication between the two HBase clusters. + ReplicationPeerConfig replicationPeerConfig1 = new ReplicationPeerConfig(); + replicationPeerConfig1.setClusterKey(hbaseCluster2.getClusterKey()); + ReplicationPeerConfig replicationPeerConfig2 = new ReplicationPeerConfig(); + replicationPeerConfig2.setClusterKey(hbaseCluster1.getClusterKey()); + + ReplicationAdmin replicationAdmin1 = new ReplicationAdmin( + hbaseCluster1.getConfiguration()); + replicationAdmin1.addPeer(PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT, replicationPeerConfig1); + replicationAdmin1.close(); + + ReplicationAdmin replicationAdmin2 = new ReplicationAdmin( + hbaseCluster2.getConfiguration()); + replicationAdmin2.addPeer(PHOENIX_HA_GROUP_STORE_PEER_ID_DEFAULT, replicationPeerConfig2); + replicationAdmin2.close(); + + LOG.info("MiniHBase DR cluster pair is ready for testing. Cluster Urls [{},{}]", + getUrl1(), getUrl2()); + logClustersStates(); + } + + /** initialize two ZK clusters for cluster role znode. */ + public void initClusterRole(String haGroupName, HighAvailabilityPolicy policy) + throws Exception { + ClusterRoleRecord record = new ClusterRoleRecord( + haGroupName, policy, + getUrl1(), ClusterRole.ACTIVE, + getUrl2(), ClusterRole.STANDBY, + 1); + haAdmin1.createOrUpdateDataOnZookeeper(record); + haAdmin2.createOrUpdateDataOnZookeeper(record); + } + + /** + * Set cluster roles for an HA group and wait the cluster role transition to happen. + * + * @param haGroup the HA group name + * @param role1 cluster role for the first cluster in the group + * @param role2 cluster role for the second cluster in the group + */ + public void transitClusterRole(HighAvailabilityGroup haGroup, ClusterRole role1, + ClusterRole role2) throws Exception { + final ClusterRoleRecord newRoleRecord = new ClusterRoleRecord( + haGroup.getGroupInfo().getName(), haGroup.getRoleRecord().getPolicy(), + getUrl1(), role1, + getUrl2(), role2, + haGroup.getRoleRecord().getVersion() + 1); // always use a newer version + LOG.info("Transiting cluster role for HA group {} V{}->V{}, existing: {}, new: {}", + haGroup.getGroupInfo().getName(), haGroup.getRoleRecord().getVersion(), + newRoleRecord.getVersion(), haGroup.getRoleRecord(), newRoleRecord); + boolean successAtLeastOnce = false; + try { + haAdmin1.createOrUpdateDataOnZookeeper(newRoleRecord); + successAtLeastOnce = true; + } catch (IOException e) { + LOG.warn("Fail to update new record on {} because {}", getUrl1(), e.getMessage()); + } + try { + haAdmin2.createOrUpdateDataOnZookeeper(newRoleRecord); + successAtLeastOnce = true; + } catch (IOException e) { + LOG.warn("Fail to update new record on {} because {}", getUrl2(), e.getMessage()); + } + + if (!successAtLeastOnce) { + throw new IOException("Failed to update the new role record on either cluster"); + } + // wait for the cluster roles are populated into client side from ZK nodes. + waitFor(() -> newRoleRecord.equals(haGroup.getRoleRecord()), 1000, 120_000); + + LOG.info("Now the HA group {} should have detected and updated V{} cluster role record", + haGroup, newRoleRecord.getVersion()); + } + + /** + * Log the state of both clusters + */ + public void logClustersStates() { + String cluster1Status, cluster2Status; + try { + cluster1Status = admin1.getClusterStatus().toString(); + } catch (IOException e) { + cluster1Status = "Unable to get cluster status."; + } + try { + cluster2Status = admin2.getClusterStatus().toString(); + } catch (IOException e){ + cluster2Status = "Unable to get cluster status."; + } + LOG.info("Cluster Status [\n{},\n{}\n]", cluster1Status, cluster2Status); + } + + /** + * @return testing utility for cluster 1 + */ + public HBaseTestingUtility getHBaseCluster1() { + return hbaseCluster1; + } + + /** + * @return testing utility for cluster 2 + */ + public HBaseTestingUtility getHBaseCluster2() { + return hbaseCluster2; + } + + /** + * Returns a Phoenix Connection to a cluster + * @param clusterIndex 0 based + * @return a Phoenix Connection to the indexed cluster + */ + public Connection getClusterConnection(int clusterIndex) throws SQLException { + String clusterUrl = clusterIndex == 0 ? getUrl1() : getUrl2(); + Properties props = new Properties(); + String url = String.format("jdbc:phoenix:%s", clusterUrl); + return DriverManager.getConnection(url, props); + } + + /** + * Returns a Phoenix Connection to cluster 0 + * @return a Phoenix Connection to the cluster + */ + public Connection getCluster0Connection() throws SQLException { + return getClusterConnection(0); + } + + /** + * Returns a Phoenix Connection to cluster 1 + * @return a Phoenix Connection to the cluster + */ + public Connection getCluster1Connection() throws SQLException { + return getClusterConnection(1); + } + + //TODO: Replace with a real check for replication complete + /** + * Checks/waits with timeout if replication is done + * @return true if replication is done else false + */ + public boolean checkReplicationComplete() { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return true; + } + + /** A Testable interface that can throw checked exception. */ + @FunctionalInterface + public interface Testable { + void test() throws Exception; + } + + /** + * Shutdown mini HBase cluster, do the test, and restart the HBase cluster. + * + * Please note the ZK cluster and DFS cluster are untouched. + * + * @param cluster the HBase cluster facility whose HBase cluster to restart + * @param testable testing logic that is runnable + * @throws Exception if fails to stop, test or restart HBase cluster + */ + public static void doTestWhenOneHBaseDown(HBaseTestingUtility cluster, Testable testable) + throws Exception { + final int zkClientPort = cluster.getZkCluster().getClientPort(); + try { + LOG.info("Shutting down HBase cluster using ZK localhost:{}", zkClientPort); + cluster.shutdownMiniHBaseCluster(); + LOG.info("Start testing when HBase is down using ZK localhost:{}", zkClientPort); + testable.test(); + LOG.info("Test succeeded when HBase is down using ZK localhost:{}", zkClientPort); + } finally { + LOG.info("Finished testing when HBase is down using ZK localhost:{}", zkClientPort); + cluster.startMiniHBaseCluster(1, 1); + LOG.info("Restarted HBase cluster using ZK localhost:{}", zkClientPort); + } + } + + /** + * Shutdown mini ZK and HBase cluster, do the test, and restart both HBase and ZK cluster. + * + * @param cluster the HBase cluster facility that has mini ZK, DFS and HBase cluster + * @param testable testing logic that is runnable + * @throws Exception if fails to stop, test or restart + */ + public static void doTestWhenOneZKDown(HBaseTestingUtility cluster, Testable testable) + throws Exception { + final int zkClientPort = cluster.getZkCluster().getClientPort(); + try { + LOG.info("Shutting down HBase cluster using ZK localhost:{}", zkClientPort); + cluster.shutdownMiniHBaseCluster(); + LOG.info("Shutting down ZK cluster at localhost:{}", zkClientPort); + cluster.shutdownMiniZKCluster(); + LOG.info("Start testing when ZK & HBase is down at localhost:{}", zkClientPort); + testable.test(); + LOG.info("Test succeeded when ZK & HBase is down at localhost:{}", zkClientPort); + } finally { + LOG.info("Finished testing when ZK & HBase is down at localhost:{}", zkClientPort); + cluster.startMiniZKCluster(1, zkClientPort); + LOG.info("Restarted ZK cluster at localhost:{}", zkClientPort); + cluster.startMiniHBaseCluster(1, 1); + LOG.info("Restarted HBase cluster using ZK localhost:{}", zkClientPort); + } + } + + /** + * @return the JDBC connection URL for this pair of HBase cluster in the HA format + */ + public String getJdbcUrl() { + return String.format("jdbc:phoenix:[%s|%s]", url1, url2); + } + + public String getUrl1() { + return url1; + } + + public String getUrl2() { + return url2; + } + + /** + * @return a ZK client by curator framework for the cluster 1. + */ + public CuratorFramework createCurator1() throws IOException { + Properties properties = new Properties(); + getHBaseCluster1().getConfiguration() + .iterator() + .forEachRemaining(k -> properties.setProperty(k.getKey(), k.getValue())); + return HighAvailabilityGroup.getCurator(getUrl1(), properties); + } + + /** + * @return a ZK client by curator framework for the cluster 2. + */ + public CuratorFramework createCurator2() throws IOException { + Properties properties = new Properties(); + getHBaseCluster2().getConfiguration() + .iterator() + .forEachRemaining(k -> properties.setProperty(k.getKey(), k.getValue())); + return HighAvailabilityGroup.getCurator(getUrl2(), properties); + } + + /** + * Create table on two clusters and enable replication. + * + * @param tableName the table name + * @throws SQLException if error happens + */ + public void createTableOnClusterPair(String tableName) throws SQLException { + createTableOnClusterPair(tableName, true); + } + + /** + * Create table on two clusters. + * + * If the replication scope is true then enable replication for this table. + * + * @param tableName the table name + * @param replicationScope the table replication scope true=1 and false=0 + * @throws SQLException if error happens + */ + public void createTableOnClusterPair(String tableName, boolean replicationScope) Review Comment: do these methods which hard code a particular schema necessary to be in the testing utility, as opposed to part of the test (or associated util) that use them? Seems like it would be cleaner to have a method that takes in DDL SQL and applies to both clusters; that way future tests can use it with whatever DDL they need. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
