gjacoby126 commented on code in PR #1430: URL: https://github.com/apache/phoenix/pull/1430#discussion_r875277029
########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/ClusterRoleRecordGeneratorToolIT.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GENERATOR_FILE_ATTR; +import static org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUPS_ATTR; +import static org.apache.phoenix.jdbc.ClusterRoleRecordGeneratorTool.PHOENIX_HA_GROUP_POLICY_ATTR_FORMAT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for {@link ClusterRoleRecordGeneratorTool}. + * + * @see ClusterRoleRecordGeneratorToolTest + */ +public class ClusterRoleRecordGeneratorToolIT { + private static final Logger LOG = LoggerFactory.getLogger(ClusterRoleRecordGeneratorToolIT.class); + private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair(); + + @Rule + public final TestName testName = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.start(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + CLUSTERS.close(); + } + + @Test + public void testRun() throws Exception { + ClusterRoleRecordGeneratorTool generator = new ClusterRoleRecordGeneratorTool(); + Configuration conf = new Configuration(CLUSTERS.getHBaseCluster1().getConfiguration()); + String haGroupNames = String.format("%s,%s,%s", testName.getMethodName(), + testName.getMethodName() + 1, testName.getMethodName() + 2); + conf.set(PHOENIX_HA_GROUPS_ATTR, haGroupNames); + // create a temp file + File file = File.createTempFile("phoenix.ha.cluster.role.records", ".json"); + file.deleteOnExit(); + conf.set(PHOENIX_HA_GENERATOR_FILE_ATTR, file.getAbsolutePath()); + + generator.setConf(conf); + int ret = ToolRunner.run(conf, new ClusterRoleRecordGeneratorTool(), new String[]{}); + assertEquals(0, ret); + String recordJson = FileUtils.readFileToString(file); + LOG.info("The created file content is: \n{}", recordJson); + for (String haGroupName : haGroupNames.split(",")) { + assertTrue(recordJson.contains(haGroupName)); + } + } + + @Test + public void testListAllRecordsByGenerator1() throws Exception { + ClusterRoleRecordGeneratorTool generator1 = new ClusterRoleRecordGeneratorTool(); + Configuration conf1 = new Configuration(CLUSTERS.getHBaseCluster1().getConfiguration()); + conf1.set(PHOENIX_HA_GROUPS_ATTR, testName.getMethodName()); + // explicitly set the failover policy as FAILOVER + conf1.set(String.format(PHOENIX_HA_GROUP_POLICY_ATTR_FORMAT, testName.getMethodName()), + HighAvailabilityPolicy.FAILOVER.name()); + generator1.setConf(conf1); + + // created with cluster1's conf, so cluster1 is ACTIVE + List<ClusterRoleRecord> records = generator1.listAllRecordsByZk(); Review Comment: what's actually creating the records we're listing here? ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java: ########## @@ -0,0 +1,879 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; +import static org.apache.hadoop.test.GenericTestUtils.waitFor; +import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION; +import static org.apache.phoenix.jdbc.FailoverPhoenixConnection.FAILOVER_TIMEOUT_MS_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithStatement; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; +import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.FailoverSQLException; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.ConnectionQueryServicesImpl; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test failover basics for {@link FailoverPhoenixConnection}. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class FailoverPhoenixConnectionIT { + private static final Logger LOG = LoggerFactory.getLogger(FailoverPhoenixConnectionIT.class); + private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair(); + + @Rule + public final TestName testName = new TestName(); + @Rule + public final Timeout globalTimeout= new Timeout(300, TimeUnit.SECONDS); + + /** Client properties to create a connection per test. */ + private Properties clientProperties; + /** HA group for this test. */ + private HighAvailabilityGroup haGroup; + /** Table name per test case. */ + private String tableName; + /** HA Group name for this test. */ + private String haGroupName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setup() throws Exception { + haGroupName = testName.getMethodName(); + clientProperties = new Properties(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + // Set some client configurations to make test run faster + clientProperties.setProperty(COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); + clientProperties.setProperty(FAILOVER_TIMEOUT_MS_ATTR, "30000"); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3"); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, "1000"); + clientProperties.setProperty(ZK_SESSION_TIMEOUT, "3000"); + clientProperties.setProperty(PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY, "3000"); + clientProperties.setProperty("zookeeper.recovery.retry.maxsleeptime", "1000"); + + // Make first cluster ACTIVE + CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); + + haGroup = getHighAvailibilityGroup(CLUSTERS.getJdbcUrl(), clientProperties); + LOG.info("Initialized haGroup {} with URL {}", haGroup, CLUSTERS.getJdbcUrl()); + tableName = testName.getMethodName().toUpperCase(); + CLUSTERS.createTableOnClusterPair(tableName); + } + + @After + public void tearDown() throws Exception { + try { + haGroup.close(); + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl1(), haGroup.getProperties()) + .close(); + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl2(), haGroup.getProperties()) + .close(); + } catch (Exception e) { + LOG.error("Fail to tear down the HA group and the CQS. Will ignore", e); + } + } + + /** + * Test Phoenix connection creation and basic operations with HBase cluster pair. + */ + @Test + public void testOperationUsingConnection() throws Exception { + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + } + + /** + * Test close() once more should not fail, as the second close should be a no-op. + */ + @Test + public void testCloseConnectionOnceMore() throws Exception { + Connection conn = createFailoverConnection(); + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + conn.close(); + conn.close(); // this is NOT duplicate code, but instead this is essential for this test. + } + + /** + * Tests that new Phoenix connections are not created during failover. + */ + @Test + public void testConnectionCreationFailsIfNoActiveCluster() throws Exception { + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.STANDBY); + + try { + createFailoverConnection(); + fail("Should have failed because neither cluster is ACTIVE"); + } catch (SQLException e) { + LOG.info("Got expected exception when creating new connection", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + } // all other type of exception will fail this test. + } + + /** + * Tests new Phoenix connections are created if one cluster is OFFLINE and the other ACTIVE. + */ + @Test + public void testConnectionOneOfflineOneActive() throws Exception { + CLUSTERS.transitClusterRole(haGroup, ClusterRole.OFFLINE, ClusterRole.ACTIVE); + + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + } + + /** + * Test that failover can finish according to the policy. + * + * In this test case, there is no existing CQS or open connections to close. + * + * @see #testFailoverCanFinishWhenOneZKDownWithCQS + */ + @Test + public void testFailoverCanFinishWhenOneZKDown() throws Exception { Review Comment: Didn't this one use to flap some in our internal CI? If so, has this been stabilized? We should make sure we're not introducing more flappers upstream ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java: ########## @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.hadoop.test.GenericTestUtils.waitFor; +import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneHBaseDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; +import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for {@link HighAvailabilityGroup} with mini clusters. + * + * @see HighAvailabilityGroupTestIT + */ +@SuppressWarnings("UnstableApiUsage") +@Category(NeedsOwnMiniClusterTest.class) +public class HighAvailabilityGroupIT { + private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityGroupIT.class); + private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair(); + + /** Client properties to create a connection per test. */ + private Properties clientProperties; + /** JDBC connection string for this test HA group. */ + private String jdbcUrl; + /** Failover HA group for to test. */ + private HighAvailabilityGroup haGroup; + /** HA Group name for this test. */ + private String haGroupName; + + @Rule + public final TestName testName = new TestName(); + @Rule + public final Timeout globalTimeout = new Timeout(180, TimeUnit.SECONDS); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + HighAvailabilityGroup.CURATOR_CACHE.invalidateAll(); + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setup() throws Exception { + haGroupName = testName.getMethodName(); + clientProperties = new Properties(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3"); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, "2000"); + // Make first cluster ACTIVE + CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); + jdbcUrl = CLUSTERS.getJdbcUrl(); + haGroup = getHighAvailibilityGroup(jdbcUrl,clientProperties); + } + + @After + public void tearDown() throws Exception { + haGroup.close(); + try { + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl1(), haGroup.getProperties()) + .close(); + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl2(), haGroup.getProperties()) + .close(); + } catch (Exception e) { + LOG.error("Fail to tear down the HA group and the CQS. Will ignore", e); + } + } + + /** + * Test get static method. + */ + @Test + public void testGet() throws Exception { + // Client will get the same HighAvailabilityGroup using the same information as key + Optional<HighAvailabilityGroup> haGroup2 = Optional.empty(); + try { + haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertSame(haGroup, haGroup2.get()); + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get a different HighAvailabilityGroup when group name is different + String haGroupName3 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + CLUSTERS.initClusterRole(haGroupName3, HighAvailabilityPolicy.FAILOVER); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3); + Optional<HighAvailabilityGroup> haGroup3 = Optional.empty(); + try { + haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup3.isPresent()); + assertNotSame(haGroup, haGroup3.get()); + } finally { + haGroup3.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get the same HighAvailabilityGroup using the same information as key again + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroup.getGroupInfo().getName()); + Optional<HighAvailabilityGroup> haGroup4 = Optional.empty(); + try { + haGroup4 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup4.isPresent()); + assertSame(haGroup, haGroup4.get()); + } finally { + haGroup4.ifPresent(HighAvailabilityGroup::close); + } + } + + /** + * Test that HA group should see latest version of cluster role record. + */ + @Test + public void testGetWithDifferentRecordVersion() throws Exception { + String haGroupName2 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + + // create cluster role records with different versions on two ZK clusters + final String zpath = ZKPaths.PATH_SEPARATOR + haGroupName2; + ClusterRoleRecord record1 = new ClusterRoleRecord( + haGroupName2, HighAvailabilityPolicy.FAILOVER, + CLUSTERS.getUrl1(), ClusterRole.ACTIVE, + CLUSTERS.getUrl2(), ClusterRole.STANDBY, + 1); + CLUSTERS.createCurator1().create().forPath(zpath, ClusterRoleRecord.toJson(record1)); + ClusterRoleRecord record2 = new ClusterRoleRecord( + record1.getHaGroupName(), record1.getPolicy(), + record1.getZk1(), record1.getRole1(), + record1.getZk2(), record1.getRole2(), + record1.getVersion() + 1); // record2 is newer + CLUSTERS.createCurator2().create().forPath(zpath, ClusterRoleRecord.toJson(record2)); + + Optional<HighAvailabilityGroup> haGroup2 = Optional.empty(); + try { + haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertNotSame(haGroup2.get(), haGroup); + // HA group should see latest version when both role managers are started + HighAvailabilityGroup finalHaGroup = haGroup2.get(); + waitFor(() -> record2.equals(finalHaGroup.getRoleRecord()), 100, 30_000); + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + } + + /** + * Test that client can get an HA group when ACTIVE ZK cluster is down. + * + * NOTE: we can not test with existing HA group because {@link HighAvailabilityGroup#get} would + * get the cached object, which has also been initialized. + * + * The reason this works is because getting an HA group depends on one ZK watcher connects to a + * ZK cluster to get the associated cluster role record. It does not depend on both ZK cluster + * being up and running. It does not actually create HBase connection either. + */ + @Test + public void testCanGetHaGroupWhenActiveZKClusterDown() throws Exception { + String haGroupName2 = testName.getMethodName() + 2; + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { + HighAvailabilityGroup haGroup2 = null; + try { + haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties); + LOG.info("Can get the new HA group {} after both ZK clusters restart", haGroup2); + } finally { + if (haGroup2 != null) { + haGroup2.close(); + } + } + }); + } + + /** + * Test that client can not get an HA group when both ZK clusters are down. + * + * NOTE: we can not test with existing HA group because {@link HighAvailabilityGroup#get} would + * get the cached object, which has also been initialized. + * + * What if two HBase clusters instead of two ZK clusters are down? We may still get a new HA + * group because creating and initializing HA group do not set up the HBase connection, which + * should indeed fail when ACTIVE HBase cluster is down. + */ + @Test + public void testCanNotGetHaGroupWhenTwoZKClustersDown() throws Exception { + String haGroupName2 = testName.getMethodName() + 2; + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> { + try { + HighAvailabilityGroup.get(jdbcUrl, clientProperties); + fail("Should have failed because both ZK cluster were shutdown!"); + } catch (SQLException e) { + LOG.info("Got expected SQLException because both ZK clusters are down", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + assertTrue(e.getCause() instanceof IOException); + } + }) + ); + + HighAvailabilityGroup haGroup2 = null; + try { + haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties); + LOG.info("Can get the new HA group {} after both ZK clusters restart", haGroup2); + } finally { + if (haGroup2 != null) { + haGroup2.close(); + } + } + } + + @Test + public void testGetHaGroupFailsFastWhenBothZKClusterDownFromBeginning() { + String haGroupName = testName.getMethodName(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + String badJdbcUrl = String.format("jdbc:phoenix:[%s|%s]", "127.0.0.1:0", "127.0.0.1:1"); + LOG.info("Start testing HighAvailabilityGroup::get() when both ZK clusters are down..."); + long startTime = System.currentTimeMillis(); + try { + HighAvailabilityGroup.get(badJdbcUrl, clientProperties); + fail("Should always throw an exception."); + } catch (SQLException e){ + LOG.info("Got expected exception", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + } finally { + LOG.info("Stop testing HighAvailabilityGroup::get() when both ZK clusters are down..."); Review Comment: nit: couldn't this be done by just putting a short timeout on the Junit test attribute? ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java: ########## @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.hadoop.test.GenericTestUtils.waitFor; +import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneHBaseDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; +import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for {@link HighAvailabilityGroup} with mini clusters. + * + * @see HighAvailabilityGroupTestIT + */ +@SuppressWarnings("UnstableApiUsage") +@Category(NeedsOwnMiniClusterTest.class) +public class HighAvailabilityGroupIT { + private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityGroupIT.class); + private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair(); + + /** Client properties to create a connection per test. */ + private Properties clientProperties; + /** JDBC connection string for this test HA group. */ + private String jdbcUrl; + /** Failover HA group for to test. */ + private HighAvailabilityGroup haGroup; + /** HA Group name for this test. */ + private String haGroupName; + + @Rule + public final TestName testName = new TestName(); + @Rule + public final Timeout globalTimeout = new Timeout(180, TimeUnit.SECONDS); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + HighAvailabilityGroup.CURATOR_CACHE.invalidateAll(); + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setup() throws Exception { + haGroupName = testName.getMethodName(); + clientProperties = new Properties(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3"); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, "2000"); + // Make first cluster ACTIVE + CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); + jdbcUrl = CLUSTERS.getJdbcUrl(); + haGroup = getHighAvailibilityGroup(jdbcUrl,clientProperties); + } + + @After + public void tearDown() throws Exception { + haGroup.close(); + try { + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl1(), haGroup.getProperties()) + .close(); + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl2(), haGroup.getProperties()) + .close(); + } catch (Exception e) { + LOG.error("Fail to tear down the HA group and the CQS. Will ignore", e); + } + } + + /** + * Test get static method. + */ + @Test + public void testGet() throws Exception { + // Client will get the same HighAvailabilityGroup using the same information as key + Optional<HighAvailabilityGroup> haGroup2 = Optional.empty(); + try { + haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertSame(haGroup, haGroup2.get()); + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get a different HighAvailabilityGroup when group name is different + String haGroupName3 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + CLUSTERS.initClusterRole(haGroupName3, HighAvailabilityPolicy.FAILOVER); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3); + Optional<HighAvailabilityGroup> haGroup3 = Optional.empty(); + try { + haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup3.isPresent()); + assertNotSame(haGroup, haGroup3.get()); + } finally { + haGroup3.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get the same HighAvailabilityGroup using the same information as key again + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroup.getGroupInfo().getName()); + Optional<HighAvailabilityGroup> haGroup4 = Optional.empty(); + try { + haGroup4 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup4.isPresent()); + assertSame(haGroup, haGroup4.get()); + } finally { + haGroup4.ifPresent(HighAvailabilityGroup::close); + } + } + + /** + * Test that HA group should see latest version of cluster role record. + */ + @Test + public void testGetWithDifferentRecordVersion() throws Exception { + String haGroupName2 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + + // create cluster role records with different versions on two ZK clusters + final String zpath = ZKPaths.PATH_SEPARATOR + haGroupName2; + ClusterRoleRecord record1 = new ClusterRoleRecord( + haGroupName2, HighAvailabilityPolicy.FAILOVER, + CLUSTERS.getUrl1(), ClusterRole.ACTIVE, + CLUSTERS.getUrl2(), ClusterRole.STANDBY, + 1); + CLUSTERS.createCurator1().create().forPath(zpath, ClusterRoleRecord.toJson(record1)); + ClusterRoleRecord record2 = new ClusterRoleRecord( + record1.getHaGroupName(), record1.getPolicy(), + record1.getZk1(), record1.getRole1(), + record1.getZk2(), record1.getRole2(), + record1.getVersion() + 1); // record2 is newer + CLUSTERS.createCurator2().create().forPath(zpath, ClusterRoleRecord.toJson(record2)); + + Optional<HighAvailabilityGroup> haGroup2 = Optional.empty(); + try { + haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertNotSame(haGroup2.get(), haGroup); + // HA group should see latest version when both role managers are started + HighAvailabilityGroup finalHaGroup = haGroup2.get(); + waitFor(() -> record2.equals(finalHaGroup.getRoleRecord()), 100, 30_000); + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + } + + /** + * Test that client can get an HA group when ACTIVE ZK cluster is down. + * + * NOTE: we can not test with existing HA group because {@link HighAvailabilityGroup#get} would + * get the cached object, which has also been initialized. + * + * The reason this works is because getting an HA group depends on one ZK watcher connects to a + * ZK cluster to get the associated cluster role record. It does not depend on both ZK cluster + * being up and running. It does not actually create HBase connection either. + */ + @Test + public void testCanGetHaGroupWhenActiveZKClusterDown() throws Exception { + String haGroupName2 = testName.getMethodName() + 2; + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { + HighAvailabilityGroup haGroup2 = null; + try { + haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties); + LOG.info("Can get the new HA group {} after both ZK clusters restart", haGroup2); + } finally { + if (haGroup2 != null) { + haGroup2.close(); + } + } + }); + } + + /** + * Test that client can not get an HA group when both ZK clusters are down. + * + * NOTE: we can not test with existing HA group because {@link HighAvailabilityGroup#get} would + * get the cached object, which has also been initialized. + * + * What if two HBase clusters instead of two ZK clusters are down? We may still get a new HA + * group because creating and initializing HA group do not set up the HBase connection, which + * should indeed fail when ACTIVE HBase cluster is down. + */ + @Test + public void testCanNotGetHaGroupWhenTwoZKClustersDown() throws Exception { + String haGroupName2 = testName.getMethodName() + 2; + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> { + try { + HighAvailabilityGroup.get(jdbcUrl, clientProperties); + fail("Should have failed because both ZK cluster were shutdown!"); + } catch (SQLException e) { + LOG.info("Got expected SQLException because both ZK clusters are down", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + assertTrue(e.getCause() instanceof IOException); + } + }) + ); + + HighAvailabilityGroup haGroup2 = null; + try { + haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties); + LOG.info("Can get the new HA group {} after both ZK clusters restart", haGroup2); + } finally { + if (haGroup2 != null) { + haGroup2.close(); + } + } + } + + @Test + public void testGetHaGroupFailsFastWhenBothZKClusterDownFromBeginning() { + String haGroupName = testName.getMethodName(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + String badJdbcUrl = String.format("jdbc:phoenix:[%s|%s]", "127.0.0.1:0", "127.0.0.1:1"); + LOG.info("Start testing HighAvailabilityGroup::get() when both ZK clusters are down..."); + long startTime = System.currentTimeMillis(); + try { + HighAvailabilityGroup.get(badJdbcUrl, clientProperties); + fail("Should always throw an exception."); + } catch (SQLException e){ + LOG.info("Got expected exception", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + } finally { + LOG.info("Stop testing HighAvailabilityGroup::get() when both ZK clusters are down..."); + long elapsedTime = System.currentTimeMillis() - startTime; + long maxTime = (4 * HighAvailabilityGroup.PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT); + assertTrue(String.format("Expected elapsed time %d to be less than %d",elapsedTime,maxTime), elapsedTime <= maxTime); + } + } + + /** + * Test that it should fail fast to get HA group if the cluster role information is not there. + */ + @Test + public void testGetShouldFailWithoutClusterRoleData() throws SQLException { + String invalidHaGroupName = testName.getMethodName() + RandomStringUtils.randomAlphanumeric(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, invalidHaGroupName); + assertFalse(HighAvailabilityGroup.get(jdbcUrl, clientProperties).isPresent()); + } + + /** + * Test with invalid High Availability connection string. + */ + @Test + public void testGetShouldFailWithNonHAJdbcString() { + final String oldJdbcString = String.format("jdbc:phoenix:%s", CLUSTERS.getUrl1()); + try { + HighAvailabilityGroup.get(oldJdbcString, clientProperties); + fail("Should have failed with invalid connection string '" + oldJdbcString + "'"); + } catch (SQLException e) { + LOG.info("Got expected exception with invalid connection string {}", oldJdbcString, e); + assertEquals(SQLExceptionCode.MALFORMED_CONNECTION_URL.getErrorCode(), e.getErrorCode()); + } + } + + /** + * Test that we can connect to this HA group to get a JDBC connection. + */ + @Test + public void testConnect() throws SQLException { + Connection connection = haGroup.connect(clientProperties); + assertNotNull(connection); + assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); + } + + /** + * Test connect to one cluster and returns a Phoenix connection which can be wrapped. + */ + @Test + public void testConnectToOneCluster() throws SQLException { + final String url = CLUSTERS.getUrl1(); + PhoenixConnection connection = haGroup.connectToOneCluster(url, clientProperties); + // TODO: should we ignore the security info in url? Review Comment: nit: does this TODO need to be addressed? ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java: ########## @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.hadoop.test.GenericTestUtils.waitFor; +import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneHBaseDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; +import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for {@link HighAvailabilityGroup} with mini clusters. + * + * @see HighAvailabilityGroupTestIT + */ +@SuppressWarnings("UnstableApiUsage") +@Category(NeedsOwnMiniClusterTest.class) +public class HighAvailabilityGroupIT { + private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityGroupIT.class); + private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair(); + + /** Client properties to create a connection per test. */ + private Properties clientProperties; + /** JDBC connection string for this test HA group. */ + private String jdbcUrl; + /** Failover HA group for to test. */ + private HighAvailabilityGroup haGroup; + /** HA Group name for this test. */ + private String haGroupName; + + @Rule + public final TestName testName = new TestName(); + @Rule + public final Timeout globalTimeout = new Timeout(180, TimeUnit.SECONDS); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + HighAvailabilityGroup.CURATOR_CACHE.invalidateAll(); + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setup() throws Exception { + haGroupName = testName.getMethodName(); + clientProperties = new Properties(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3"); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, "2000"); + // Make first cluster ACTIVE + CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); + jdbcUrl = CLUSTERS.getJdbcUrl(); + haGroup = getHighAvailibilityGroup(jdbcUrl,clientProperties); + } + + @After + public void tearDown() throws Exception { + haGroup.close(); + try { + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl1(), haGroup.getProperties()) + .close(); + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl2(), haGroup.getProperties()) + .close(); + } catch (Exception e) { + LOG.error("Fail to tear down the HA group and the CQS. Will ignore", e); + } + } + + /** + * Test get static method. + */ + @Test + public void testGet() throws Exception { + // Client will get the same HighAvailabilityGroup using the same information as key + Optional<HighAvailabilityGroup> haGroup2 = Optional.empty(); + try { + haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertSame(haGroup, haGroup2.get()); + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get a different HighAvailabilityGroup when group name is different + String haGroupName3 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + CLUSTERS.initClusterRole(haGroupName3, HighAvailabilityPolicy.FAILOVER); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3); + Optional<HighAvailabilityGroup> haGroup3 = Optional.empty(); + try { + haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup3.isPresent()); + assertNotSame(haGroup, haGroup3.get()); + } finally { + haGroup3.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get the same HighAvailabilityGroup using the same information as key again + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroup.getGroupInfo().getName()); + Optional<HighAvailabilityGroup> haGroup4 = Optional.empty(); + try { + haGroup4 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup4.isPresent()); + assertSame(haGroup, haGroup4.get()); + } finally { + haGroup4.ifPresent(HighAvailabilityGroup::close); + } + } + + /** + * Test that HA group should see latest version of cluster role record. + */ + @Test + public void testGetWithDifferentRecordVersion() throws Exception { + String haGroupName2 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + + // create cluster role records with different versions on two ZK clusters + final String zpath = ZKPaths.PATH_SEPARATOR + haGroupName2; + ClusterRoleRecord record1 = new ClusterRoleRecord( + haGroupName2, HighAvailabilityPolicy.FAILOVER, + CLUSTERS.getUrl1(), ClusterRole.ACTIVE, + CLUSTERS.getUrl2(), ClusterRole.STANDBY, + 1); + CLUSTERS.createCurator1().create().forPath(zpath, ClusterRoleRecord.toJson(record1)); + ClusterRoleRecord record2 = new ClusterRoleRecord( + record1.getHaGroupName(), record1.getPolicy(), + record1.getZk1(), record1.getRole1(), + record1.getZk2(), record1.getRole2(), + record1.getVersion() + 1); // record2 is newer + CLUSTERS.createCurator2().create().forPath(zpath, ClusterRoleRecord.toJson(record2)); + + Optional<HighAvailabilityGroup> haGroup2 = Optional.empty(); + try { + haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertNotSame(haGroup2.get(), haGroup); + // HA group should see latest version when both role managers are started + HighAvailabilityGroup finalHaGroup = haGroup2.get(); + waitFor(() -> record2.equals(finalHaGroup.getRoleRecord()), 100, 30_000); + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + } + + /** + * Test that client can get an HA group when ACTIVE ZK cluster is down. + * + * NOTE: we can not test with existing HA group because {@link HighAvailabilityGroup#get} would + * get the cached object, which has also been initialized. + * + * The reason this works is because getting an HA group depends on one ZK watcher connects to a + * ZK cluster to get the associated cluster role record. It does not depend on both ZK cluster + * being up and running. It does not actually create HBase connection either. + */ + @Test + public void testCanGetHaGroupWhenActiveZKClusterDown() throws Exception { + String haGroupName2 = testName.getMethodName() + 2; + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { + HighAvailabilityGroup haGroup2 = null; + try { + haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties); + LOG.info("Can get the new HA group {} after both ZK clusters restart", haGroup2); + } finally { + if (haGroup2 != null) { + haGroup2.close(); + } + } + }); + } + + /** + * Test that client can not get an HA group when both ZK clusters are down. + * + * NOTE: we can not test with existing HA group because {@link HighAvailabilityGroup#get} would + * get the cached object, which has also been initialized. + * + * What if two HBase clusters instead of two ZK clusters are down? We may still get a new HA + * group because creating and initializing HA group do not set up the HBase connection, which + * should indeed fail when ACTIVE HBase cluster is down. + */ + @Test + public void testCanNotGetHaGroupWhenTwoZKClustersDown() throws Exception { + String haGroupName2 = testName.getMethodName() + 2; + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> { + try { + HighAvailabilityGroup.get(jdbcUrl, clientProperties); + fail("Should have failed because both ZK cluster were shutdown!"); + } catch (SQLException e) { + LOG.info("Got expected SQLException because both ZK clusters are down", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + assertTrue(e.getCause() instanceof IOException); + } + }) + ); + + HighAvailabilityGroup haGroup2 = null; + try { + haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties); + LOG.info("Can get the new HA group {} after both ZK clusters restart", haGroup2); + } finally { + if (haGroup2 != null) { + haGroup2.close(); + } + } + } + + @Test + public void testGetHaGroupFailsFastWhenBothZKClusterDownFromBeginning() { + String haGroupName = testName.getMethodName(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + String badJdbcUrl = String.format("jdbc:phoenix:[%s|%s]", "127.0.0.1:0", "127.0.0.1:1"); + LOG.info("Start testing HighAvailabilityGroup::get() when both ZK clusters are down..."); + long startTime = System.currentTimeMillis(); + try { + HighAvailabilityGroup.get(badJdbcUrl, clientProperties); + fail("Should always throw an exception."); + } catch (SQLException e){ + LOG.info("Got expected exception", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + } finally { + LOG.info("Stop testing HighAvailabilityGroup::get() when both ZK clusters are down..."); + long elapsedTime = System.currentTimeMillis() - startTime; + long maxTime = (4 * HighAvailabilityGroup.PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT); + assertTrue(String.format("Expected elapsed time %d to be less than %d",elapsedTime,maxTime), elapsedTime <= maxTime); + } + } + + /** + * Test that it should fail fast to get HA group if the cluster role information is not there. + */ + @Test + public void testGetShouldFailWithoutClusterRoleData() throws SQLException { + String invalidHaGroupName = testName.getMethodName() + RandomStringUtils.randomAlphanumeric(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, invalidHaGroupName); + assertFalse(HighAvailabilityGroup.get(jdbcUrl, clientProperties).isPresent()); + } + + /** + * Test with invalid High Availability connection string. + */ + @Test + public void testGetShouldFailWithNonHAJdbcString() { + final String oldJdbcString = String.format("jdbc:phoenix:%s", CLUSTERS.getUrl1()); + try { + HighAvailabilityGroup.get(oldJdbcString, clientProperties); + fail("Should have failed with invalid connection string '" + oldJdbcString + "'"); + } catch (SQLException e) { + LOG.info("Got expected exception with invalid connection string {}", oldJdbcString, e); + assertEquals(SQLExceptionCode.MALFORMED_CONNECTION_URL.getErrorCode(), e.getErrorCode()); + } + } + + /** + * Test that we can connect to this HA group to get a JDBC connection. + */ + @Test + public void testConnect() throws SQLException { + Connection connection = haGroup.connect(clientProperties); + assertNotNull(connection); + assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); + } + + /** + * Test connect to one cluster and returns a Phoenix connection which can be wrapped. + */ + @Test + public void testConnectToOneCluster() throws SQLException { + final String url = CLUSTERS.getUrl1(); + PhoenixConnection connection = haGroup.connectToOneCluster(url, clientProperties); + // TODO: should we ignore the security info in url? + assertEquals(url, connection.getURL()); + + try { + haGroup.connectToOneCluster(null, clientProperties); + fail("Should have failed since null is not in any HA group"); + } catch (Exception e) { + LOG.info("Got expected exception with invalid null host url", e); + } + + final String randomHostUrl = String.format("%s:%d", + RandomStringUtils.randomAlphabetic(4), RandomUtils.nextInt(0,65536)); + try { + haGroup.connectToOneCluster(randomHostUrl, clientProperties); + fail("Should have failed since '" + randomHostUrl + "' is not in HA group " + haGroup); + } catch (IllegalArgumentException e) { + LOG.info("Got expected exception with invalid host url '{}'", randomHostUrl, e); + } + } + + /** + * Test that it can connect to a given cluster in this HA group after ZK service restarts. + * + * Method {@link HighAvailabilityGroup#connectToOneCluster(String, Properties)} is used by + * Phoenix HA framework to connect to one specific HBase cluster in this HA group. The cluster + * may not necessarily be in ACTIVE role. For example, parallel HA connection needs to connect + * to both clusters. This tests that it can connect to a specific ZK cluster after ZK restarts. + */ + @Test + public void testConnectToOneClusterAfterZKRestart() throws Exception { + final String tableName = testName.getMethodName(); + CLUSTERS.createTableOnClusterPair(tableName); + + final String url1 = CLUSTERS.getUrl1(); + final String jdbcUrlToCluster1 = "jdbc:phoenix:" + url1; + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { + try { + DriverManager.getConnection(jdbcUrlToCluster1); + } catch (SQLException e) { + LOG.info("Got expected IOException when creating Phoenix connection", e); + } + }); + + // test with plain JDBC connection after cluster restarts + try (Connection conn = DriverManager.getConnection(jdbcUrlToCluster1)) { + doTestBasicOperationsWithConnection(conn, tableName, null); + } + // test with HA group to get connection to one cluster + try (Connection conn = haGroup.connectToOneCluster(jdbcUrlToCluster1, clientProperties)) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + } + + /** + * Test {@link HighAvailabilityGroup#isActive(PhoenixConnection)}. + */ + @Test + public void testIsConnectionActive() throws Exception { + PhoenixConnection conn1 = haGroup.connectToOneCluster(CLUSTERS.getUrl1(), clientProperties); + assertTrue(haGroup.isActive(conn1)); + PhoenixConnection conn2 = haGroup.connectToOneCluster(CLUSTERS.getUrl2(), clientProperties); + assertFalse(haGroup.isActive(conn2)); + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + assertFalse(haGroup.isActive(conn1)); + assertTrue(haGroup.isActive(conn2)); + } + + /** + * Test that when node changes, the high availability group will detect and issue state change. + */ + @Test + public void testNodeChange() throws Exception { + assertEquals(ClusterRole.ACTIVE, haGroup.getRoleRecord().getRole(CLUSTERS.getUrl1())); + assertEquals(ClusterRole.STANDBY, haGroup.getRoleRecord().getRole(CLUSTERS.getUrl2())); + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + assertEquals(ClusterRole.STANDBY, haGroup.getRoleRecord().getRole(CLUSTERS.getUrl1())); + assertEquals(ClusterRole.ACTIVE, haGroup.getRoleRecord().getRole(CLUSTERS.getUrl2())); + } + + /** + * Test that if STANDBY HBase cluster is down, the connect should work. + */ + @Test + public void testCanConnectWhenStandbyHBaseClusterDown() throws Exception { + doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster2(), () -> { + // HA group is already initialized + Connection connection = haGroup.connect(clientProperties); + assertNotNull(connection); + assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); + }); + } + + /** + * Test that if STANDBY ZK cluster is down, the connect should work. + * + * This differs from {@link #testCanConnectWhenStandbyHBaseClusterDown} because this stops the + * ZK cluster, not the HBase cluster. + */ + @Test + public void testCanConnectWhenStandbyZKClusterDown() throws Exception { + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> { + // Clear the HA Cache + HighAvailabilityGroup.CURATOR_CACHE.invalidateAll(); + + // HA group is already initialized + Connection connection = haGroup.connect(clientProperties); + assertNotNull(connection); + assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); + }); + } + + /** + * Test that if STANDBY HBase cluster is down, connect to new HA group should work. + * + * This test covers only HBase cluster is down, and both ZK clusters are still healthy so + * clients will be able to get latest clusters role record from both clusters. This tests a new + * HA group which will get initialized during the STANDBY HBase cluster down time. + */ + @Test + public void testCanConnectNewGroupWhenStandbyHBaseClusterDown() throws Exception { + doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster2(), () -> { + // get and initialize a new HA group when cluster2 is down + String haGroupName2 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + Optional<HighAvailabilityGroup> haGroup2 = Optional.empty(); + try { + haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertNotSame(haGroup2.get(), haGroup); + // get a new connection in this new HA group; should be pointing to ACTIVE cluster1 + try (Connection connection = haGroup2.get().connect(clientProperties)) { + assertNotNull(connection); + assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); + } + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + }); + } + + /** + * Test that if STANDBY cluster ZK service is down, connect to new HA group should work. + * + * This differs from {@link #testCanConnectNewGroupWhenStandbyHBaseClusterDown} because this is + * testing scenarios when STANDBY ZK cluster is down. + */ + @Test + public void testCanConnectNewGroupWhenStandbyZKClusterDown() throws Exception { + // get and initialize a new HA group when cluster2 is down + String haGroupName2 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> { + Optional<HighAvailabilityGroup> haGroup2 = Optional.empty(); + try { + haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertNotSame(haGroup2.get(), haGroup); + // get a new connection in this new HA group; should be pointing to ACTIVE cluster1 + Connection connection = haGroup2.get().connect(clientProperties); + assertNotNull(connection); + assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + }); + } + + /** + * Test it can not establish active connection to the ACTIVE HBase cluster if it is down. + */ + @Test + public void testCanNotEstablishConnectionWhenActiveHBaseClusterDown() throws Exception { + doTestWhenOneHBaseDown(CLUSTERS.getHBaseCluster1(), () -> { + try { + haGroup.connectActive(clientProperties); + fail("Should have failed because ACTIVE HBase cluster is down."); + } catch (SQLException e) { + LOG.info("Got expected exception when ACTIVE HBase cluster is down", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + } + }); + } + + /** + * Test that client can not establish connection to when the ACTIVE ZK cluster is down, + * while client can establish active connection after active ZK cluster restarts. + * + * This differs from the {@link #testCanNotEstablishConnectionWhenActiveHBaseClusterDown()} + * because this is for ZK cluster down while the other is for HBase cluster down. + */ + @Test + public void testConnectActiveWhenActiveZKClusterRestarts() throws Exception { + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { + try { + haGroup.connectActive(clientProperties); + fail("Should have failed because of ACTIVE ZK cluster is down."); + } catch (SQLException e) { + LOG.info("Got expected exception when ACTIVE ZK cluster is down", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + } + }); + + try (Connection conn = haGroup.connectActive(clientProperties)) { + assertNotNull(conn); + LOG.info("Successfully connect to HA group {} after restarting ACTIVE ZK", haGroup); + } // all other exceptions will fail the test + } + + /** + * Test when one ZK starts after the HA group has been initialized. + * + * In this case, both cluster role managers will start and apply discovered cluster role record. + */ + @Test + public void testOneZKStartsAfterInit() throws Exception { + String haGroupName2 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + + // create cluster role records with different versions on two ZK clusters + final String zpath = ZKPaths.PATH_SEPARATOR + haGroupName2; + ClusterRoleRecord record1 = new ClusterRoleRecord( + haGroupName2, HighAvailabilityPolicy.FAILOVER, + CLUSTERS.getUrl1(), ClusterRole.ACTIVE, + CLUSTERS.getUrl2(), ClusterRole.STANDBY, + 1); + CLUSTERS.createCurator1().create().forPath(zpath, ClusterRoleRecord.toJson(record1)); + ClusterRoleRecord record2 = new ClusterRoleRecord( + record1.getHaGroupName(), record1.getPolicy(), + record1.getZk1(), record1.getRole1(), + record1.getZk2(), record1.getRole2(), + record1.getVersion() + 1); // record2 is newer + CLUSTERS.createCurator2().create().forPath(zpath, ClusterRoleRecord.toJson(record2)); + + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> { + Optional<HighAvailabilityGroup> haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertNotSame(haGroup2.get(), haGroup); + // apparently the HA group cluster role record should be from the healthy cluster + assertEquals(record1, haGroup2.get().getRoleRecord()); + }); + + // When ZK2 is connected, its cluster role manager should apply newer cluster role record + waitFor(() -> { + try { + Optional<HighAvailabilityGroup> haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + return haGroup2.isPresent() && record2.equals(haGroup2.get().getRoleRecord()); + } catch (SQLException e) { + LOG.warn("Fail to get HA group {}", haGroupName2); + return false; + } + }, 100, 30_000); + + // clean up HA group 2 + HighAvailabilityGroup.get(jdbcUrl, clientProperties).ifPresent(HighAvailabilityGroup::close); + } + + /** + * Test that HA connection request will fall back to the first cluster when HA group fails + * to initialize due to missing cluster role record (CRR). + */ + @Test + public void testFallbackToSingleConnection() throws Exception { + final String tableName = testName.getMethodName(); + CLUSTERS.createTableOnClusterPair(tableName); + + String haGroupName2 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + // no cluster role record for the HA group with name haGroupName2 + try (Connection conn = DriverManager.getConnection(jdbcUrl, clientProperties)) { + // connection is PhoenixConnection instead of HA connection (failover or parallel) Review Comment: Interesting (and good) fallback behavior. I'm sure it's somewhere below, but do we log anywhere that this fallback happened? (Seems like a WARN level sort of thing.) ########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/FailoverPhoenixConnectionIT.java: ########## @@ -0,0 +1,879 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT; +import static org.apache.hadoop.test.GenericTestUtils.waitFor; +import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION; +import static org.apache.phoenix.jdbc.FailoverPhoenixConnection.FAILOVER_TIMEOUT_MS_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithStatement; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; +import static org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.FailoverSQLException; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.ConnectionQueryServicesImpl; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test failover basics for {@link FailoverPhoenixConnection}. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class FailoverPhoenixConnectionIT { + private static final Logger LOG = LoggerFactory.getLogger(FailoverPhoenixConnectionIT.class); + private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair(); + + @Rule + public final TestName testName = new TestName(); + @Rule + public final Timeout globalTimeout= new Timeout(300, TimeUnit.SECONDS); + + /** Client properties to create a connection per test. */ + private Properties clientProperties; + /** HA group for this test. */ + private HighAvailabilityGroup haGroup; + /** Table name per test case. */ + private String tableName; + /** HA Group name for this test. */ + private String haGroupName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setup() throws Exception { + haGroupName = testName.getMethodName(); + clientProperties = new Properties(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + // Set some client configurations to make test run faster + clientProperties.setProperty(COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); + clientProperties.setProperty(FAILOVER_TIMEOUT_MS_ATTR, "30000"); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3"); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, "1000"); + clientProperties.setProperty(ZK_SESSION_TIMEOUT, "3000"); + clientProperties.setProperty(PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY, "3000"); + clientProperties.setProperty("zookeeper.recovery.retry.maxsleeptime", "1000"); + + // Make first cluster ACTIVE + CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); + + haGroup = getHighAvailibilityGroup(CLUSTERS.getJdbcUrl(), clientProperties); + LOG.info("Initialized haGroup {} with URL {}", haGroup, CLUSTERS.getJdbcUrl()); + tableName = testName.getMethodName().toUpperCase(); + CLUSTERS.createTableOnClusterPair(tableName); + } + + @After + public void tearDown() throws Exception { + try { + haGroup.close(); + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl1(), haGroup.getProperties()) + .close(); + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl2(), haGroup.getProperties()) + .close(); + } catch (Exception e) { + LOG.error("Fail to tear down the HA group and the CQS. Will ignore", e); + } + } + + /** + * Test Phoenix connection creation and basic operations with HBase cluster pair. + */ + @Test + public void testOperationUsingConnection() throws Exception { + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + } + + /** + * Test close() once more should not fail, as the second close should be a no-op. + */ + @Test + public void testCloseConnectionOnceMore() throws Exception { + Connection conn = createFailoverConnection(); + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + conn.close(); + conn.close(); // this is NOT duplicate code, but instead this is essential for this test. + } + + /** + * Tests that new Phoenix connections are not created during failover. + */ + @Test + public void testConnectionCreationFailsIfNoActiveCluster() throws Exception { + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.STANDBY); + + try { + createFailoverConnection(); + fail("Should have failed because neither cluster is ACTIVE"); + } catch (SQLException e) { + LOG.info("Got expected exception when creating new connection", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + } // all other type of exception will fail this test. + } + + /** + * Tests new Phoenix connections are created if one cluster is OFFLINE and the other ACTIVE. + */ + @Test + public void testConnectionOneOfflineOneActive() throws Exception { + CLUSTERS.transitClusterRole(haGroup, ClusterRole.OFFLINE, ClusterRole.ACTIVE); + + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + } + + /** + * Test that failover can finish according to the policy. + * + * In this test case, there is no existing CQS or open connections to close. + * + * @see #testFailoverCanFinishWhenOneZKDownWithCQS + */ + @Test + public void testFailoverCanFinishWhenOneZKDown() throws Exception { + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { + // Because cluster1 is down, any version record will only be updated in cluster2. + // Become ACTIVE cluster is still ACTIVE in current version record, no CQS to be closed. + CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE, ClusterRole.OFFLINE); + + // Usually making this cluster STANDBY will close the CQS and all existing connections. + // The HA group was created in setup() but no CQS is yet opened for this ACTIVE cluster. + // As a result there is neither the CQS nor existing opened connections. + // In this case, the failover should finish instead of failing to get/close the CQS. + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + try (Connection conn = createFailoverConnection()) { + FailoverPhoenixConnection failoverConn = (FailoverPhoenixConnection) conn; + assertEquals(CLUSTERS.getUrl2(), failoverConn.getWrappedConnection().getURL()); + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + }); + } + + /** + * Test that policy can close bad CQS and then finish failover according to the policy. + * + * Internally, the failover will get the CQS for this HA group and try to close all connections. + * However, the CQS may be in a bad state since the target ZK cluster is down. This test is for + * the scenario that when CQS is in bad state, failover should finish transition so HA is ready. + * + * @see #testFailoverCanFinishWhenOneZKDown + */ + @Test + public void testFailoverCanFinishWhenOneZKDownWithCQS() throws Exception { + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { + // Try to make a connection to current ACTIVE cluster, which should fail. + try { + createFailoverConnection(); + fail("Should have failed since ACTIVE ZK '" + CLUSTERS.getUrl1() + "' is down"); + } catch (SQLException e) { + LOG.info("Got expected exception when ACTIVE ZK cluster is down", e); + } + + // As the target ZK is down, now existing CQS if any is in a bad state. In this case, + // the failover should still finish. After all, this is the most important use case the + // failover is designed for. + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + try (Connection conn = createFailoverConnection()) { + FailoverPhoenixConnection failoverConn = (FailoverPhoenixConnection) conn; + assertEquals(CLUSTERS.getUrl2(), failoverConn.getWrappedConnection().getURL()); + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + }); + } + + /** + * Tests the scenario where ACTIVE ZK cluster restarts. + * + * When ACTIVE ZK cluster shutdown, client can not connect to this HA group. + * After failover, client can connect to this HA group (against the new ACTIVE cluster2). + * After restarts and fails back, client can connect to this HA group (against cluster 1). + */ + @Test + public void testConnectionWhenActiveZKRestarts() throws Exception { + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { + try { + createFailoverConnection(); + fail("Should have failed since ACTIVE ZK cluster was shutdown"); + } catch (SQLException e) { + LOG.info("Got expected exception when ACTIVE ZK cluster is down", e); + } + + // So on-call engineer comes into play, and triggers a failover + // Because cluster1 is down, new version record will only be updated in cluster2 + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + // After failover, the FailoverPhoenixConnection should go to the second cluster + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + }); + + // After cluster1 comes back, new connection should still go to cluster2. + // This is the case where ZK1 believes itself is ACTIVE, with role record v1 + // while ZK2 believes itself is ACTIVE, with role record v2. + // Client should believe ZK2 is ACTIVE in this case because high version wins. + LOG.info("Testing failover connection when both clusters are up and running"); + try (Connection conn = createFailoverConnection()) { + FailoverPhoenixConnection failoverConn = conn.unwrap(FailoverPhoenixConnection.class); + assertEquals(CLUSTERS.getUrl2(), failoverConn.getWrappedConnection().getURL()); + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + + LOG.info("Testing failover back to cluster1 when bot clusters are up and running"); + // Failover back to the first cluster since it is healthy after restart + CLUSTERS.transitClusterRole(haGroup, ClusterRole.ACTIVE, ClusterRole.STANDBY); + // After ACTIVE ZK restarts and fail back, this should still work + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + } + + /** + * Tests the scenario where STANDBY ZK cluster restarts. + * + * When STANDBY ZK cluster shutdown, client can still connect to this HA group. + * After failover, client can not connect to this HA group (ACTIVE cluster 2 is down). + * After cluster 2 (ACTIVE) restarts, client can connect to this HA group. + */ + @Test + public void testConnectionWhenStandbyZKRestarts() throws Exception { + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> { + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + + // Innocent on-call engineer triggers a failover to second cluster + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + try { + createFailoverConnection(); + fail("Should have failed since ACTIVE ZK cluster was shutdown"); + } catch (SQLException e) { + LOG.info("Got expected exception when ACTIVE ZK cluster {} was shutdown", + CLUSTERS.getUrl2(), e); + } + }); + + // After the second cluster (ACTIVE) ZK restarts, this should still work + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + } + + /** + * Tests the scenario where two ZK clusters both restart. + * + * Client can not connect to this HA group when two ZK clusters are both down. + * When STANDBY ZK cluster2 first restarts, client still can not connect to this HA group. + * After failover, client can connect to this HA group because cluster 2 is ACTIVE and healthy. + */ + @Test + public void testConnectionWhenTwoZKRestarts() throws Exception { + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> { + try { + createFailoverConnection(); + fail("Should have failed since ACTIVE ZK cluster was shutdown"); + } catch (SQLException e) { + LOG.info("Got expected exception when both clusters are down", e); + } + }); + + // Client cannot connect to HA group because cluster 2 is still STANDBY after restarted + try { + createFailoverConnection(); + fail("Should have failed since ACTIVE ZK cluster was shutdown"); + } catch (SQLException e) { + LOG.info("Got expected exception when ACTIVE ZK cluster {} was shutdown", + CLUSTERS.getUrl2(), e); + } + + // So on-call engineer comes into play, and triggers a failover + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + // After the second cluster (ACTIVE) ZK restarts, this should still work + try (Connection conn = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + } + }); + } + + /** + * Tests that new Phoenix connections are not created if both clusters are OFFLINE. + */ + @Test + public void testConnectionCreationFailsIfBothClustersOffline() throws Exception { + CLUSTERS.transitClusterRole(haGroup, ClusterRole.OFFLINE, ClusterRole.OFFLINE); + + try { + createFailoverConnection(); + fail("Should have failed because both clusters are OFFLINE"); + } catch (SQLException e) { + LOG.info("Got expected exception when creating new connection", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + } // all other type of exception will fail this test. + } + + /** + * Tests that existing wrapped Phoenix connection is closed in the Failover event. + */ + @Test + public void testWrappedConnectionClosedAfterStandby() throws Exception { + Connection conn = createFailoverConnection(); + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + // The wrapped connection is still against the first cluster, and is closed + PhoenixConnection pc = ((FailoverPhoenixConnection)conn).getWrappedConnection(); + assertNotNull(pc); + assertEquals(CLUSTERS.getUrl1(), pc.getURL()); + assertTrue(pc.isClosed()); + doTestActionShouldFailBecauseOfFailover(conn::createStatement); + } + + /** + * Tests that existing Phoenix statement is closed when cluster transits into STANDBY. + */ + @Test + public void testStatementClosedAfterStandby() throws Exception { + Connection conn = createFailoverConnection(); + Statement stmt = conn.createStatement(); + doTestBasicOperationsWithStatement(conn, stmt, tableName); + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + assertFalse(conn.isClosed()); + assertTrue(stmt.isClosed()); + doTestActionShouldFailBecauseOfFailover( + () -> stmt.executeQuery("SELECT * FROM " + tableName)); + } + + /** + * Tests non-HA connection (vanilla Phoenix connection) is intact when cluster role transits. + * + * The reason is that, high availability group has its own CQSI which tracks only those Phoenix + * connections that are wrapped by failover connections. + */ + @Test + public void testNonHAConnectionNotClosedAfterFailover() throws Exception { + String firstUrl = String.format("jdbc:phoenix:%s", CLUSTERS.getUrl1()); + // This is a vanilla Phoenix connection without using high availability (HA) feature. + Connection phoenixConn = DriverManager.getConnection(firstUrl, new Properties()); + Connection failoverConn = createFailoverConnection(); + PhoenixConnection wrappedConn = ((FailoverPhoenixConnection) failoverConn) + .getWrappedConnection(); + + assertFalse(phoenixConn.isClosed()); + assertFalse(failoverConn.isClosed()); + assertFalse(wrappedConn.isClosed()); + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + assertFalse(phoenixConn.isClosed()); // normal Phoenix connection is not closed + assertFalse(failoverConn.isClosed()); // failover connection is not closed by close() method + assertTrue(wrappedConn.isClosed()); + } + + /** + * Tests that one HA group cluster role transit will not affect connections in other HA groups. + */ + @Test + public void testOtherHAGroupConnectionUnchanged() throws Exception { + Connection conn = createFailoverConnection(); + PhoenixConnection wrappedConn = ((FailoverPhoenixConnection) conn).getWrappedConnection(); + // Following we create a new HA group and create a connection against this HA group + String haGroupName2 = haGroup.getGroupInfo().getName() + "2"; + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + Properties clientProperties2 = new Properties(clientProperties); + clientProperties2.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + Connection conn2 = DriverManager.getConnection(CLUSTERS.getJdbcUrl(), clientProperties2); + PhoenixConnection wrappedConn2 = ((FailoverPhoenixConnection) conn2).getWrappedConnection(); + + assertFalse(wrappedConn.isClosed()); + assertFalse(wrappedConn2.isClosed()); + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + assertTrue(wrappedConn.isClosed()); + assertFalse(wrappedConn2.isClosed()); + } + + /** + * Test that failover can finish even if one connection can not be closed. + * + * When once cluster becomes STANDBY from ACTIVE, all its connections and the associated CQS + * will get closed asynchronously. In case of errors when closing those connections and CQS, + * the HA group is still able to transit to target state after the maximum timeout. + * Closing the existing connections is guaranteed with best effort and timeout in favor of + * improved availability. + * + * @see #testFailoverTwice which fails over back to the first cluster + */ + @Test + public void testFailoverCanFinishWhenOneConnectionGotStuckClosing() throws Exception { + Connection conn = createFailoverConnection(); + doTestBasicOperationsWithConnection(conn, tableName, haGroupName); + assertEquals(CLUSTERS.getUrl1(), // active connection is against the first cluster + conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL()); + + // Spy the wrapped connection + Connection wrapped = conn.unwrap(FailoverPhoenixConnection.class).getWrappedConnection(); + Connection spy = Mockito.spy(wrapped); + final CountDownLatch latch = new CountDownLatch(1); + // Make close() stuck before closing + doAnswer((invocation) -> { + latch.await(); + invocation.callRealMethod(); + return null; + }).when(spy).close(); + ConnectionQueryServices cqs = PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl1(), clientProperties); + // replace the wrapped connection with the spied connection in CQS + cqs.removeConnection(wrapped.unwrap(PhoenixConnection.class)); + cqs.addConnection(spy.unwrap(PhoenixConnection.class)); + + // (ACTIVE, STANDBY) -> (STANDBY, ACTIVE) + // The transition will finish as we set PHOENIX_HA_TRANSITION_TIMEOUT_MS_KEY for this class + // even though the spied connection is stuck at the latch when closing + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + // Verify the spied object has been called once + verify(spy, times(1)).close(); + // The spy is not closed because the real method was blocked by latch + assertFalse(spy.isClosed()); + // connection is not closed as Phoenix HA does not close failover connections. + assertFalse(conn.isClosed()); + + try (Connection conn2 = createFailoverConnection()) { + doTestBasicOperationsWithConnection(conn2, tableName, haGroupName); + assertEquals(CLUSTERS.getUrl2(), // active connection is against the second cluster + conn2.unwrap(FailoverPhoenixConnection.class).getWrappedConnection().getURL()); + } + + latch.countDown(); + conn.close(); + // The CQS should be closed eventually. + waitFor(() -> { + try { + ((ConnectionQueryServicesImpl) cqs).checkClosed(); + return false; + } catch (IllegalStateException e) { + LOG.info("CQS got closed as we get expected exception.", e); + return true; + } + }, 100, 10_000); + } + + /** + * This is to make sure all Phoenix connections are closed when cluster becomes STANDBY. + * + * Test with many connections. + */ + @Test + public void testAllWrappedConnectionsClosedAfterStandby() throws Exception { + short numberOfConnections = 10; + List<Connection> connectionList = new ArrayList<>(numberOfConnections); + for (short i = 0; i < numberOfConnections; i++) { + connectionList.add(createFailoverConnection()); + } + + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.ACTIVE); + + for (short i = 0; i < numberOfConnections; i++) { + LOG.info("Asserting connection number {}", i); + FailoverPhoenixConnection conn = ((FailoverPhoenixConnection) connectionList.get(i)); + assertFalse(conn.isClosed()); + assertTrue(conn.getWrappedConnection().isClosed()); + } + } + + /** + * This is to make sure all Phoenix connections are closed when cluster becomes STANDBY. + * + * Test with many connections. + */ + @Test + public void testAllWrappedConnectionsClosedAfterStandbyAsync() throws Exception { + short numberOfThreads = 10; + // Test thread waits for half of connections to be created before triggering a failover + CountDownLatch latchToTransitRole = new CountDownLatch(numberOfThreads / 2); + // Clients wait for failover to finish before creating more connections + CountDownLatch latchToCreateMoreConnections = new CountDownLatch(1); + ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); + List<Future<Connection>> connections = new ArrayList<>(numberOfThreads); + for (short i = 0; i < numberOfThreads; i++) { + Future<Connection> future = executor.submit(() -> { + if (latchToTransitRole.getCount() <= 0) { + latchToCreateMoreConnections.await(); + } + Connection conn = createFailoverConnection(); + latchToTransitRole.countDown(); + return conn; + }); + connections.add(future); + } + + latchToTransitRole.await(); + CLUSTERS.transitClusterRole(haGroup, ClusterRole.STANDBY, ClusterRole.STANDBY); + latchToCreateMoreConnections.countDown(); + + waitFor(() -> { + for (Future<Connection> future : connections) { + if (!future.isDone()) { + return false; + } + try { + Connection conn = future.get(100, TimeUnit.MILLISECONDS); + FailoverPhoenixConnection failoverConn = (FailoverPhoenixConnection) conn; + if (!failoverConn.getWrappedConnection().isClosed()) { + return false; + } + } catch (Exception e) { + LOG.info("Got exception when getting client connection; ignored", e); + } + } + return true; + }, 100, 60_000); + } + + /** + * Test all Phoenix connections are closed when ZK is down and its role becomes STANDBY. + * + * This tests with many connections, as {@link #testAllWrappedConnectionsClosedAfterStandby()}. + * The difference is that, the ACTIVE cluster first shuts down and hence Review Comment: nit: and hence? (Comment seems to end in mid-sentence) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
