gjacoby126 commented on code in PR #1430: URL: https://github.com/apache/phoenix/pull/1430#discussion_r877208201
########## phoenix-core/src/it/java/org/apache/phoenix/jdbc/HighAvailabilityGroupIT.java: ########## @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import static org.apache.hadoop.test.GenericTestUtils.waitFor; +import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_SHOULD_FALLBACK_WHEN_MISSING_CRR_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneHBaseDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneZKDown; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection; +import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.curator.utils.ZKPaths; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole; +import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; +import org.apache.phoenix.jdbc.PhoenixHAAdminTool.PhoenixHAAdminHelper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for {@link HighAvailabilityGroup} with mini clusters. + * + * @see HighAvailabilityGroupTestIT + */ +@SuppressWarnings("UnstableApiUsage") +@Category(NeedsOwnMiniClusterTest.class) +public class HighAvailabilityGroupIT { + private static final Logger LOG = LoggerFactory.getLogger(HighAvailabilityGroupIT.class); + private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair(); + + /** Client properties to create a connection per test. */ + private Properties clientProperties; + /** JDBC connection string for this test HA group. */ + private String jdbcUrl; + /** Failover HA group for to test. */ + private HighAvailabilityGroup haGroup; + /** HA Group name for this test. */ + private String haGroupName; + + @Rule + public final TestName testName = new TestName(); + @Rule + public final Timeout globalTimeout = new Timeout(180, TimeUnit.SECONDS); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CLUSTERS.start(); + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + HighAvailabilityGroup.CURATOR_CACHE.invalidateAll(); + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + CLUSTERS.close(); + } + + @Before + public void setup() throws Exception { + haGroupName = testName.getMethodName(); + clientProperties = new Properties(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_KEY, "3"); + clientProperties.setProperty(PHOENIX_HA_ZK_RETRY_MAX_SLEEP_MS_KEY, "2000"); + // Make first cluster ACTIVE + CLUSTERS.initClusterRole(haGroupName, HighAvailabilityPolicy.FAILOVER); + jdbcUrl = CLUSTERS.getJdbcUrl(); + haGroup = getHighAvailibilityGroup(jdbcUrl,clientProperties); + } + + @After + public void tearDown() throws Exception { + haGroup.close(); + try { + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl1(), haGroup.getProperties()) + .close(); + PhoenixDriver.INSTANCE + .getConnectionQueryServices(CLUSTERS.getUrl2(), haGroup.getProperties()) + .close(); + } catch (Exception e) { + LOG.error("Fail to tear down the HA group and the CQS. Will ignore", e); + } + } + + /** + * Test get static method. + */ + @Test + public void testGet() throws Exception { + // Client will get the same HighAvailabilityGroup using the same information as key + Optional<HighAvailabilityGroup> haGroup2 = Optional.empty(); + try { + haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertSame(haGroup, haGroup2.get()); + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get a different HighAvailabilityGroup when group name is different + String haGroupName3 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + CLUSTERS.initClusterRole(haGroupName3, HighAvailabilityPolicy.FAILOVER); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName3); + Optional<HighAvailabilityGroup> haGroup3 = Optional.empty(); + try { + haGroup3 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup3.isPresent()); + assertNotSame(haGroup, haGroup3.get()); + } finally { + haGroup3.ifPresent(HighAvailabilityGroup::close); + } + + // Client will get the same HighAvailabilityGroup using the same information as key again + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroup.getGroupInfo().getName()); + Optional<HighAvailabilityGroup> haGroup4 = Optional.empty(); + try { + haGroup4 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup4.isPresent()); + assertSame(haGroup, haGroup4.get()); + } finally { + haGroup4.ifPresent(HighAvailabilityGroup::close); + } + } + + /** + * Test that HA group should see latest version of cluster role record. + */ + @Test + public void testGetWithDifferentRecordVersion() throws Exception { + String haGroupName2 = testName.getMethodName() + RandomStringUtils.randomAlphabetic(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + + // create cluster role records with different versions on two ZK clusters + final String zpath = ZKPaths.PATH_SEPARATOR + haGroupName2; + ClusterRoleRecord record1 = new ClusterRoleRecord( + haGroupName2, HighAvailabilityPolicy.FAILOVER, + CLUSTERS.getUrl1(), ClusterRole.ACTIVE, + CLUSTERS.getUrl2(), ClusterRole.STANDBY, + 1); + CLUSTERS.createCurator1().create().forPath(zpath, ClusterRoleRecord.toJson(record1)); + ClusterRoleRecord record2 = new ClusterRoleRecord( + record1.getHaGroupName(), record1.getPolicy(), + record1.getZk1(), record1.getRole1(), + record1.getZk2(), record1.getRole2(), + record1.getVersion() + 1); // record2 is newer + CLUSTERS.createCurator2().create().forPath(zpath, ClusterRoleRecord.toJson(record2)); + + Optional<HighAvailabilityGroup> haGroup2 = Optional.empty(); + try { + haGroup2 = HighAvailabilityGroup.get(jdbcUrl, clientProperties); + assertTrue(haGroup2.isPresent()); + assertNotSame(haGroup2.get(), haGroup); + // HA group should see latest version when both role managers are started + HighAvailabilityGroup finalHaGroup = haGroup2.get(); + waitFor(() -> record2.equals(finalHaGroup.getRoleRecord()), 100, 30_000); + } finally { + haGroup2.ifPresent(HighAvailabilityGroup::close); + } + } + + /** + * Test that client can get an HA group when ACTIVE ZK cluster is down. + * + * NOTE: we can not test with existing HA group because {@link HighAvailabilityGroup#get} would + * get the cached object, which has also been initialized. + * + * The reason this works is because getting an HA group depends on one ZK watcher connects to a + * ZK cluster to get the associated cluster role record. It does not depend on both ZK cluster + * being up and running. It does not actually create HBase connection either. + */ + @Test + public void testCanGetHaGroupWhenActiveZKClusterDown() throws Exception { + String haGroupName2 = testName.getMethodName() + 2; + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> { + HighAvailabilityGroup haGroup2 = null; + try { + haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties); + LOG.info("Can get the new HA group {} after both ZK clusters restart", haGroup2); + } finally { + if (haGroup2 != null) { + haGroup2.close(); + } + } + }); + } + + /** + * Test that client can not get an HA group when both ZK clusters are down. + * + * NOTE: we can not test with existing HA group because {@link HighAvailabilityGroup#get} would + * get the cached object, which has also been initialized. + * + * What if two HBase clusters instead of two ZK clusters are down? We may still get a new HA + * group because creating and initializing HA group do not set up the HBase connection, which + * should indeed fail when ACTIVE HBase cluster is down. + */ + @Test + public void testCanNotGetHaGroupWhenTwoZKClustersDown() throws Exception { + String haGroupName2 = testName.getMethodName() + 2; + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName2); + CLUSTERS.initClusterRole(haGroupName2, HighAvailabilityPolicy.FAILOVER); + + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster1(), () -> + doTestWhenOneZKDown(CLUSTERS.getHBaseCluster2(), () -> { + try { + HighAvailabilityGroup.get(jdbcUrl, clientProperties); + fail("Should have failed because both ZK cluster were shutdown!"); + } catch (SQLException e) { + LOG.info("Got expected SQLException because both ZK clusters are down", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + assertTrue(e.getCause() instanceof IOException); + } + }) + ); + + HighAvailabilityGroup haGroup2 = null; + try { + haGroup2 = getHighAvailibilityGroup(jdbcUrl, clientProperties); + LOG.info("Can get the new HA group {} after both ZK clusters restart", haGroup2); + } finally { + if (haGroup2 != null) { + haGroup2.close(); + } + } + } + + @Test + public void testGetHaGroupFailsFastWhenBothZKClusterDownFromBeginning() { + String haGroupName = testName.getMethodName(); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, haGroupName); + String badJdbcUrl = String.format("jdbc:phoenix:[%s|%s]", "127.0.0.1:0", "127.0.0.1:1"); + LOG.info("Start testing HighAvailabilityGroup::get() when both ZK clusters are down..."); + long startTime = System.currentTimeMillis(); + try { + HighAvailabilityGroup.get(badJdbcUrl, clientProperties); + fail("Should always throw an exception."); + } catch (SQLException e){ + LOG.info("Got expected exception", e); + assertEquals(CANNOT_ESTABLISH_CONNECTION.getErrorCode(), e.getErrorCode()); + } finally { + LOG.info("Stop testing HighAvailabilityGroup::get() when both ZK clusters are down..."); + long elapsedTime = System.currentTimeMillis() - startTime; + long maxTime = (4 * HighAvailabilityGroup.PHOENIX_HA_ZK_CONNECTION_TIMEOUT_MS_DEFAULT); + assertTrue(String.format("Expected elapsed time %d to be less than %d",elapsedTime,maxTime), elapsedTime <= maxTime); + } + } + + /** + * Test that it should fail fast to get HA group if the cluster role information is not there. + */ + @Test + public void testGetShouldFailWithoutClusterRoleData() throws SQLException { + String invalidHaGroupName = testName.getMethodName() + RandomStringUtils.randomAlphanumeric(3); + clientProperties.setProperty(PHOENIX_HA_GROUP_ATTR, invalidHaGroupName); + assertFalse(HighAvailabilityGroup.get(jdbcUrl, clientProperties).isPresent()); + } + + /** + * Test with invalid High Availability connection string. + */ + @Test + public void testGetShouldFailWithNonHAJdbcString() { + final String oldJdbcString = String.format("jdbc:phoenix:%s", CLUSTERS.getUrl1()); + try { + HighAvailabilityGroup.get(oldJdbcString, clientProperties); + fail("Should have failed with invalid connection string '" + oldJdbcString + "'"); + } catch (SQLException e) { + LOG.info("Got expected exception with invalid connection string {}", oldJdbcString, e); + assertEquals(SQLExceptionCode.MALFORMED_CONNECTION_URL.getErrorCode(), e.getErrorCode()); + } + } + + /** + * Test that we can connect to this HA group to get a JDBC connection. + */ + @Test + public void testConnect() throws SQLException { + Connection connection = haGroup.connect(clientProperties); + assertNotNull(connection); + assertNotNull(connection.unwrap(FailoverPhoenixConnection.class)); + } + + /** + * Test connect to one cluster and returns a Phoenix connection which can be wrapped. + */ + @Test + public void testConnectToOneCluster() throws SQLException { + final String url = CLUSTERS.getUrl1(); + PhoenixConnection connection = haGroup.connectToOneCluster(url, clientProperties); + // TODO: should we ignore the security info in url? Review Comment: @dbwong - makes sense that supporting multiple-and-different users can be future work. Would be good though to flesh out the comment a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
