This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this
push:
new d63f2bcae5 PHOENIX-7719 Prewarm HAGroupStore Client (#2313)
d63f2bcae5 is described below
commit d63f2bcae553b0a02e736c415b9ad75689d160e2
Author: ritegarg <[email protected]>
AuthorDate: Fri Nov 14 15:35:47 2025 -0800
PHOENIX-7719 Prewarm HAGroupStore Client (#2313)
Co-authored-by: Ritesh Garg
<[email protected]>
---
.../apache/phoenix/jdbc/HAGroupStoreClient.java | 1 -
.../org/apache/phoenix/query/QueryServices.java | 4 +
.../apache/phoenix/query/QueryServicesOptions.java | 6 +-
.../coprocessor/PhoenixRegionServerEndpoint.java | 111 ++++++++++++-
...gionServerEndpointWithConsistentFailoverIT.java | 183 ++++++++++++++++++++-
5 files changed, 290 insertions(+), 15 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index f031ee7550..60603dcf16 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -57,7 +57,6 @@ import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTes
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.util.JDBCUtil;
import org.apache.zookeeper.data.Stat;
-import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 6e00b7499f..b3ec1cbabd 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -389,6 +389,10 @@ public interface QueryServices extends SQLCloseable {
// Check HAGroup is Stale for mutations
public static final String HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED =
"phoenix.ha.group.stale.for.mutation.check.enabled";
+ // Enable prewarming of HAGroupStoreClients at RegionServer startup
+ String HA_GROUP_STORE_CLIENT_PREWARM_ENABLED
+ = "phoenix.ha.group.store.client.prewarm.enabled";
+
//Enable Thread Pool Creation in CQSI to be used for HBase Client.
String CQSI_THREAD_POOL_ENABLED = "phoenix.cqsi.thread.pool.enabled";
//CQSI Thread Pool Related Configuration.
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 5326b583b4..6853f5d1fa 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -125,6 +125,7 @@ import static
org.apache.phoenix.query.QueryServices.PHOENIX_TTL_SERVER_SIDE_MAS
import static
org.apache.phoenix.query.QueryServices.MAX_IN_LIST_SKIP_SCAN_SIZE;
import static org.apache.phoenix.query.QueryServices.WAL_EDIT_CODEC_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_SYNC_INTERVAL_SECONDS;
+import static
org.apache.phoenix.query.QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED;
import java.util.Map.Entry;
@@ -465,6 +466,7 @@ public class QueryServicesOptions {
public static final Boolean
DEFAULT_CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED = false;
public static final Boolean
DEFAULT_HA_GROUP_STALE_FOR_MUTATION_CHECK_ENABLED = true;
+ public static final Boolean DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED
= true;
public static final Boolean DEFAULT_CQSI_THREAD_POOL_ENABLED = false;
public static final int DEFAULT_CQSI_THREAD_POOL_KEEP_ALIVE_SECONDS = 60;
public static final int DEFAULT_CQSI_THREAD_POOL_CORE_POOL_SIZE = 25;
@@ -597,7 +599,9 @@ public class QueryServicesOptions {
.setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED,
DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED)
.setIfUnset(REPLICATION_LOG_ROTATION_TIME_MS_KEY,
DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS)
.setIfUnset(HA_GROUP_STORE_SYNC_INTERVAL_SECONDS,
- DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS);
+ DEFAULT_HA_GROUP_STORE_SYNC_INTERVAL_SECONDS)
+ .setIfUnset(HA_GROUP_STORE_CLIENT_PREWARM_ENABLED,
+ DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user
set
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index 8deaf473b9..8a04e6f2ad 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -23,7 +23,12 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -37,6 +42,8 @@ import
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSou
import org.apache.phoenix.jdbc.ClusterRoleRecord;
import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.replication.reader.ReplicationLogReplayService;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.SchemaUtil;
@@ -44,8 +51,6 @@ import org.apache.phoenix.util.ServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
-
/**
* This is first implementation of RegionServer coprocessor introduced by
Phoenix.
*/
@@ -55,14 +60,22 @@ public class PhoenixRegionServerEndpoint
private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
private MetricsMetadataCachingSource metricsSource;
protected Configuration conf;
- private String zkUrl;
+ private ExecutorService prewarmExecutor;
@Override
public void start(CoprocessorEnvironment env) throws IOException {
this.conf = env.getConfiguration();
this.metricsSource = MetricsPhoenixCoprocessorSourceFactory
.getInstance().getMetadataCachingSource();
- this.zkUrl = getLocalZkUrl(conf);
+ // Start async prewarming of HAGroupStoreClients if enabled
+ if (conf.getBoolean(
+ QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED,
+ QueryServicesOptions
+ .DEFAULT_HA_GROUP_STORE_CLIENT_PREWARM_ENABLED)) {
+ startHAGroupStoreClientPrewarming();
+ } else {
+ LOGGER.info("HAGroupStoreClient prewarming is disabled");
+ }
// Start replication log replay
ReplicationLogReplayService.getInstance(conf).start();
}
@@ -73,6 +86,10 @@ public class PhoenixRegionServerEndpoint
ReplicationLogReplayService.getInstance(conf).stop();
RegionServerCoprocessor.super.stop(env);
ServerUtil.ConnectionFactory.shutdown();
+ // Stop prewarming executor
+ if (prewarmExecutor != null) {
+ prewarmExecutor.shutdownNow();
+ }
}
@Override
@@ -195,4 +212,90 @@ public class PhoenixRegionServerEndpoint
return ServerMetadataCacheImpl.getInstance(conf);
}
+ /**
+ * Prewarms HAGroupStoreClients in background thread with retry.
+ * Initializes all HA group clients asynchronously at startup.
+ * <p>
+ * Phase 1 : Retry indefinitely until HAGroupStoreManager is initialized
+ * and HAGroupNames are retrieved. If the SYSTEM.HA_GROUP table region
+ * is not ready, manager.getHAGroupNames() would return an exception.
+ * So we need to retry until the SYSTEM.HA_GROUP table region is ready
+ * and then retrieve the HAGroupNames for prewarming.
+ *
+ * <p>
+ * Phase 2 : Prewarm individual HAGroupStoreClients with retry.
+ * If the HAGroupStoreClient is not ready/initialized,
+ * manager.getClusterRoleRecord(haGroup) would throw an exception.
+ * So we need to retry until the HAGroupStoreClient is ready/initialized.
+ */
+ private void startHAGroupStoreClientPrewarming() {
+ prewarmExecutor = Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "HAGroupStoreClient-Prewarm");
+ t.setDaemon(true);
+ return t;
+ });
+
+ prewarmExecutor.submit(() -> {
+ HAGroupStoreManager manager = null;
+ List<String> pending = null;
+ // Phase 1: Retry indefinitely until HAGroupStoreManager is
initialized
+ // and HAGroupNames are retrieved.
+ while (pending == null) {
+ try {
+ manager = HAGroupStoreManager.getInstance(conf);
+ if (manager != null) {
+ pending = new ArrayList<>(manager.getHAGroupNames());
+ LOGGER.info("Starting prewarming for {}
HAGroupStoreClients",
+ pending.size());
+ } else {
+ LOGGER.debug("HAGroupStoreManager is null, retrying in
2s...");
+ Thread.sleep(2000);
+ }
+ } catch (InterruptedException e) {
+ LOGGER.info("HAGroupStoreClient prewarming interrupted
during "
+ + "initialization");
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception e) {
+ LOGGER.debug("Failed to initialize HAGroupStoreManager,
retrying in "
+ + "2s...", e);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ie) {
+ LOGGER.info("HAGroupStoreClient prewarming
interrupted");
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
+ // Phase 2: Prewarm individual HAGroupStoreClients with retry
+ try {
+ while (!pending.isEmpty()) {
+ Iterator<String> iterator = pending.iterator();
+ while (iterator.hasNext()) {
+ String haGroup = iterator.next();
+ try {
+ manager.getClusterRoleRecord(haGroup);
+ iterator.remove();
+ LOGGER.info("Prewarmed HAGroupStoreClient: {} ({}
remaining)",
+ haGroup, pending.size());
+ } catch (Exception e) {
+ LOGGER.debug("Failed to prewarm {}, will retry",
haGroup, e);
+ }
+ }
+
+ if (!pending.isEmpty()) {
+ Thread.sleep(2000);
+ }
+ }
+
+ LOGGER.info("Completed prewarming all HAGroupStoreClients");
+ } catch (InterruptedException e) {
+ LOGGER.info("HAGroupStoreClient prewarming interrupted during
warmup");
+ Thread.currentThread().interrupt();
+ }
+ });
+ }
+
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java
index fbd8b0c921..6ab1cde5d2 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRegionServerEndpointWithConsistentFailoverIT.java
@@ -18,8 +18,15 @@
package org.apache.phoenix.end2end;
import com.google.protobuf.RpcCallback;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.OnlineRegions;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
@@ -31,20 +38,24 @@ import
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
import org.apache.phoenix.jdbc.PhoenixHAAdmin;
import org.apache.phoenix.jdbc.HAGroupStoreRecord.HAGroupState;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.HAGroupStoreTestUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Map;
import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE;
-import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -52,9 +63,11 @@ import static org.junit.Assert.assertNotNull;
@Category({NeedsOwnMiniClusterTest.class })
public class PhoenixRegionServerEndpointWithConsistentFailoverIT extends
BaseTest {
+ private static final Logger LOGGER
+ =
LoggerFactory.getLogger(PhoenixRegionServerEndpointWithConsistentFailoverIT.class);
private static final Long ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS = 5000L;
- private static final
HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS = new
HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
- private String zkUrl;
+ private static final
HighAvailabilityTestingUtility.HBaseTestingUtilityPair CLUSTERS
+ = new HighAvailabilityTestingUtility.HBaseTestingUtilityPair();
private String peerZkUrl;
@Rule
@@ -64,23 +77,110 @@ public class
PhoenixRegionServerEndpointWithConsistentFailoverIT extends BaseTes
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ // Set prewarm enabled to true for cluster 1 and false for cluster 2
for comparison.
+ CLUSTERS.getHBaseCluster1().getConfiguration().setBoolean(
+ QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, true);
+ CLUSTERS.getHBaseCluster2().getConfiguration().setBoolean(
+ QueryServices.HA_GROUP_STORE_CLIENT_PREWARM_ENABLED, false);
CLUSTERS.start();
}
+ @AfterClass
+ public static synchronized void doTeardown() throws Exception {
+ CLUSTERS.close();
+ }
+
@Before
public void setUp() throws Exception {
- zkUrl = getLocalZkUrl(config);
peerZkUrl = CLUSTERS.getZkUrl2();
-
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(),
zkUrl, peerZkUrl,
+
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(),
CLUSTERS.getZkUrl1(), CLUSTERS.getZkUrl2(),
CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY,
null);
+
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(),
CLUSTERS.getZkUrl2(), CLUSTERS.getZkUrl1(),
+ CLUSTERS.getMasterAddress2(), CLUSTERS.getMasterAddress1(),
+ ClusterRoleRecord.ClusterRole.STANDBY,
ClusterRoleRecord.ClusterRole.ACTIVE,
+ null);
+
+ }
+
+ @Test
+ public void testHAGroupStoreClientPrewarming() throws Exception {
+ // Use a different HA group name to avoid interference with setUp()
method
+ String haGroupName = testName.getMethodName() + "_test";
+
+ // There is a race condition between when RegionServerEndpoint Coproc
starts and
+ // when the HAGroupStoreRecord is inserted into the system table.
+ // To handle this condition and get predictable results, we will
insert the HAGroupStoreRecord into the system table first.
+ // Once the HAGroupStoreRecord is inserted into the system table, we
will start the RegionServerEndpoint Coproc again.
+ // This will ensure that the RegionServerEndpoint Coproc starts after
the HAGroupStoreRecord is inserted into the system table.
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName,
CLUSTERS.getZkUrl1(), CLUSTERS.getZkUrl2(),
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY,
+ null);
+
+ HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(haGroupName,
CLUSTERS.getZkUrl1(), CLUSTERS.getZkUrl2(),
+ CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
+ ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY,
+ CLUSTERS.getZkUrl2());
+
+ // Get RegionServer instances from both clusters
+ HRegionServer regionServer1 =
CLUSTERS.getHBaseCluster1().getHBaseCluster().getRegionServer(0);
+ PhoenixRegionServerEndpoint coprocessor1 =
getPhoenixRegionServerEndpoint(regionServer1);
+
+ // Start the RegionServerEndpoint Coproc for cluster 1
+
coprocessor1.start(getTestCoprocessorEnvironment(CLUSTERS.getHBaseCluster1().getConfiguration()));
+
+ HRegionServer regionServer2 =
CLUSTERS.getHBaseCluster2().getHBaseCluster().getRegionServer(0);
+ PhoenixRegionServerEndpoint coprocessor2 =
getPhoenixRegionServerEndpoint(regionServer2);
+
+ // Start the RegionServerEndpoint Coproc for cluster 2
+
coprocessor2.start(getTestCoprocessorEnvironment(CLUSTERS.getHBaseCluster2().getConfiguration()));
+
+ // Wait for prewarming to complete on cluster 1 (cluster 2 won't
prewarm)
+ Thread.sleep(5000);
+
+ // Expected records for each cluster
+ ClusterRoleRecord expectedRecord1 =
buildExpectedClusterRoleRecord(haGroupName,
+ ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.UNKNOWN);
+ ClusterRoleRecord expectedRecord2 =
buildExpectedClusterRoleRecord(haGroupName,
+ ClusterRoleRecord.ClusterRole.ACTIVE,
ClusterRoleRecord.ClusterRole.STANDBY);
+
+ // Test Cluster 1 WITH prewarming
+ ServerRpcController controller1 = new ServerRpcController();
+ long startTimeCluster1 = System.currentTimeMillis();
+ executeGetClusterRoleRecordAndVerify(coprocessor1, controller1,
+ haGroupName, expectedRecord1, false);
+ long timeCluster1 = System.currentTimeMillis() - startTimeCluster1;
+ LOGGER.info("Cluster 1 WITH prewarming (after restart, prewarmed at
startup): {} ms for {}",
+ timeCluster1, CLUSTERS.getZkUrl1());
+
+ // Test Cluster 2 WITHOUT prewarming
+ ServerRpcController controller2 = new ServerRpcController();
+ long startTimeCluster2 = System.currentTimeMillis();
+ executeGetClusterRoleRecordAndVerify(coprocessor2, controller2,
+ haGroupName, expectedRecord2, false);
+ long timeCluster2 = System.currentTimeMillis() - startTimeCluster2;
+ LOGGER.info("Cluster 2 WITHOUT prewarming (after restart, cold start):
{} ms for {}",
+ timeCluster2, CLUSTERS.getZkUrl2());
+
+ // Compare performance
+ LOGGER.info("Performance comparison: Cluster 1 (prewarmed) took {} ms,
" +
+ "Cluster 2 (not prewarmed) took {} ms",
+ timeCluster1, timeCluster2);
+ LOGGER.info("Performance improvement: {} ms faster with prewarming",
+ (timeCluster2 - timeCluster1));
+
+ // Prewarmed cluster should be faster than non-prewarmed cluster
+ assert(timeCluster1 < timeCluster2) :
+ String.format("Prewarmed cluster (cluster 1: %d ms) should be
faster than " +
+ "non-prewarmed cluster (cluster 2: %d ms)",
timeCluster1, timeCluster2);
}
@Test
public void testGetClusterRoleRecordAndInvalidate() throws Exception {
String haGroupName = testName.getMethodName();
- HRegionServer regionServer =
utility.getMiniHBaseCluster().getRegionServer(0);
+ HRegionServer regionServer =
CLUSTERS.getHBaseCluster1().getHBaseCluster().getRegionServer(0);
PhoenixRegionServerEndpoint coprocessor =
getPhoenixRegionServerEndpoint(regionServer);
assertNotNull(coprocessor);
ServerRpcController controller = new ServerRpcController();
@@ -98,19 +198,19 @@ public class
PhoenixRegionServerEndpointWithConsistentFailoverIT extends BaseTes
executeGetClusterRoleRecordAndVerify(coprocessor, controller,
haGroupName, expectedRecord, false);
// Delete the HAGroupStoreRecord from ZK
- try (PhoenixHAAdmin haAdmin = new PhoenixHAAdmin(config,
ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE)) {
+ try (PhoenixHAAdmin haAdmin = new
PhoenixHAAdmin(CLUSTERS.getHBaseCluster1().getConfiguration(),
ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE)) {
haAdmin.deleteHAGroupStoreRecordInZooKeeper(haGroupName);
}
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Delete the row from System Table
- HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName,
zkUrl);
+ HAGroupStoreTestUtil.deleteHAGroupRecordInSystemTable(haGroupName,
CLUSTERS.getZkUrl1());
// Expect exception when getting ClusterRoleRecord because the
HAGroupStoreRecord is not found in ZK
controller = new ServerRpcController();
executeGetClusterRoleRecordAndVerify(coprocessor, controller,
haGroupName, expectedRecord, true);
// Update the row
-
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(),
zkUrl, peerZkUrl,
+
HAGroupStoreTestUtil.upsertHAGroupRecordInSystemTable(testName.getMethodName(),
CLUSTERS.getZkUrl1(), peerZkUrl,
CLUSTERS.getMasterAddress1(), CLUSTERS.getMasterAddress2(),
ClusterRoleRecord.ClusterRole.ACTIVE_TO_STANDBY,
ClusterRoleRecord.ClusterRole.STANDBY, null);
@@ -170,4 +270,69 @@ public class
PhoenixRegionServerEndpointWithConsistentFailoverIT extends BaseTes
requestBuilder.setHaGroupName(ByteStringer.wrap(Bytes.toBytes(haGroupName)));
return requestBuilder.build();
}
+
+ private RegionServerCoprocessorEnvironment
getTestCoprocessorEnvironment(Configuration conf) {
+ return new RegionServerCoprocessorEnvironment() {
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public String getHBaseVersion() {
+ return "";
+ }
+
+ @Override
+ public RegionServerCoprocessor getInstance() {
+ return null;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public int getLoadSequence() {
+ return 0;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return null;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return null;
+ }
+
+ @Override
+ public OnlineRegions getOnlineRegions() {
+ return null;
+ }
+
+ @Override
+ public Connection getConnection() {
+ return null;
+ }
+
+ @Override
+ public Connection createConnection(Configuration conf) throws
IOException {
+ return null;
+ }
+
+ @Override
+ public MetricRegistry getMetricRegistryForRegionServer() {
+ return null;
+ }
+ };
+ }
}