This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 2fb17d8b37 PHOENIX-7566 Returning waitTime as part of state transition
response (#2314) (#2350)
2fb17d8b37 is described below
commit 2fb17d8b3733661cf21644c3b706cd4a341aa96b
Author: Lokesh Khurana <[email protected]>
AuthorDate: Tue Jan 20 14:58:46 2026 -0800
PHOENIX-7566 Returning waitTime as part of state transition response
(#2314) (#2350)
Co-authored-by: ritegarg <[email protected]>
---
.../apache/phoenix/jdbc/HAGroupStoreClient.java | 118 +++++++++++----------
.../apache/phoenix/jdbc/HAGroupStoreManager.java | 41 ++++---
.../phoenix/hbase/index/IndexRegionObserver.java | 19 ++--
.../apache/phoenix/jdbc/HAGroupStoreClientIT.java | 29 ++---
.../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 39 +++----
5 files changed, 134 insertions(+), 112 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index 957c803f31..83532a7447 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -291,30 +291,32 @@ public class HAGroupStoreClient implements Closeable {
/**
* Set the HA group status for the specified HA group name. Checks if the
status is needed to be
- * updated based on logic in isUpdateNeeded function.
+ * updated based on logic in validateTransitionAndGetWaitTime function.
* @param haGroupState the HA group state to set
* @throws IOException if the client is not
healthy or the operation
* fails
* @throws StaleHAGroupStoreRecordVersionException if the version is stale
* @throws InvalidClusterRoleTransitionException when transition is not
valid
*/
- public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState)
+ public long setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState)
throws IOException, InvalidClusterRoleTransitionException, SQLException,
StaleHAGroupStoreRecordVersionException {
- setHAGroupStatusIfNeeded(haGroupState, null);
+ return setHAGroupStatusIfNeeded(haGroupState, null);
}
/**
* Set the HA group status for the specified HA group name. Checks if the
status is needed to be
- * updated based on logic in isUpdateNeeded function.
+ * updated based on logic in validateTransitionAndGetWaitTime function.
* @param haGroupState the HA group state to set
* @param lastSyncTimeInMsNullable the last sync time in milliseconds, can
be null if not known.
* @throws IOException if the client is not
healthy or the operation
* fails
* @throws StaleHAGroupStoreRecordVersionException if the version is stale
* @throws InvalidClusterRoleTransitionException when transition is not
valid
+ * @return the wait time in milliseconds for state transition, if 0 then the
state transition is
+ * successful.
*/
- public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState,
+ public long setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState,
Long lastSyncTimeInMsNullable) throws IOException,
InvalidClusterRoleTransitionException,
SQLException, StaleHAGroupStoreRecordVersionException {
Preconditions.checkNotNull(haGroupState, "haGroupState cannot be null");
@@ -330,58 +332,58 @@ public class HAGroupStoreClient implements Closeable {
+ "cannot update HAGroupStoreRecord, the record should be initialized "
+ "in System Table first" + haGroupName);
}
- if (
- isUpdateNeeded(currentHAGroupStoreRecord.getHAGroupState(),
- currentHAGroupStoreRecordStat.getMtime(), haGroupState)
- ) {
- // We maintain last sync time as the last time cluster was in sync state.
- // If state changes from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, record
that time
- // Once state changes back to ACTIVE_IN_SYNC or the role is
- // NOT ACTIVE or ACTIVE_TO_STANDBY
- // set the time to null to mark that we are current(or we don't have any
reader).
- long lastSyncTimeInMs = lastSyncTimeInMsNullable != null
- ? lastSyncTimeInMsNullable
- : currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
- ClusterRole clusterRole = haGroupState.getClusterRole();
- if (
- currentHAGroupStoreRecord.getHAGroupState()
- == HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC
- && haGroupState == HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC
- ) {
- // We record the last round timestamp by subtracting the rotationTime
and then
- // taking the beginning of last round (floor) by first integer
division and then
- // multiplying again.
- lastSyncTimeInMs =
- ((System.currentTimeMillis() - rotationTimeMs) / rotationTimeMs) *
rotationTimeMs;
- }
- HAGroupStoreRecord newHAGroupStoreRecord =
- new HAGroupStoreRecord(currentHAGroupStoreRecord.getProtocolVersion(),
- currentHAGroupStoreRecord.getHaGroupName(), haGroupState,
lastSyncTimeInMs,
- currentHAGroupStoreRecord.getPolicy(),
currentHAGroupStoreRecord.getPeerZKUrl(),
- currentHAGroupStoreRecord.getClusterUrl(),
currentHAGroupStoreRecord.getPeerClusterUrl(),
- currentHAGroupStoreRecord.getAdminCRRVersion());
- phoenixHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
newHAGroupStoreRecord,
- currentHAGroupStoreRecordStat.getVersion());
- // If cluster role is changing, if so, we update,
- // the system table on best effort basis.
- // We also have a periodic job which syncs the ZK
- // state with System Table periodically.
- if (currentHAGroupStoreRecord.getClusterRole() != clusterRole) {
- HAGroupStoreRecord peerZkRecord = getHAGroupStoreRecordFromPeer();
- ClusterRoleRecord.ClusterRole peerClusterRole = peerZkRecord != null
- ? peerZkRecord.getClusterRole()
- : ClusterRoleRecord.ClusterRole.UNKNOWN;
- SystemTableHAGroupRecord systemTableRecord = new
SystemTableHAGroupRecord(
- HighAvailabilityPolicy.valueOf(newHAGroupStoreRecord.getPolicy()),
clusterRole,
- peerClusterRole, newHAGroupStoreRecord.getClusterUrl(),
- newHAGroupStoreRecord.getPeerClusterUrl(), this.zkUrl,
- newHAGroupStoreRecord.getPeerZKUrl(),
newHAGroupStoreRecord.getAdminCRRVersion());
- updateSystemTableHAGroupRecordSilently(haGroupName, systemTableRecord);
- }
- } else {
+ long stateTransitionWaitTime =
+
validateTransitionAndGetWaitTime(currentHAGroupStoreRecord.getHAGroupState(),
+ currentHAGroupStoreRecordStat.getMtime(), haGroupState);
+ if (stateTransitionWaitTime > 0) {
LOGGER.info("Not updating HAGroupStoreRecord for HA group {} with state
{}", haGroupName,
haGroupState);
+ return stateTransitionWaitTime;
+ }
+ // We maintain last sync time as the last time cluster was in sync state.
+ // If state changes from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, record that
time
+ // Once state changes back to ACTIVE_IN_SYNC or the role is
+ // NOT ACTIVE or ACTIVE_TO_STANDBY
+ // set the time to null to mark that we are current(or we don't have any
reader).
+ long lastSyncTimeInMs = lastSyncTimeInMsNullable != null
+ ? lastSyncTimeInMsNullable
+ : currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
+ ClusterRole clusterRole = haGroupState.getClusterRole();
+ if (
+ currentHAGroupStoreRecord.getHAGroupState() ==
HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC
+ && haGroupState == HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC
+ ) {
+ // We record the last round timestamp by subtracting the rotationTime
and then
+ // taking the beginning of last round (floor) by first integer
+ // division and then multiplying again.
+ lastSyncTimeInMs =
+ ((System.currentTimeMillis() - rotationTimeMs) / rotationTimeMs) *
rotationTimeMs;
+ }
+ HAGroupStoreRecord newHAGroupStoreRecord =
+ new HAGroupStoreRecord(currentHAGroupStoreRecord.getProtocolVersion(),
+ currentHAGroupStoreRecord.getHaGroupName(), haGroupState,
lastSyncTimeInMs,
+ currentHAGroupStoreRecord.getPolicy(),
currentHAGroupStoreRecord.getPeerZKUrl(),
+ currentHAGroupStoreRecord.getClusterUrl(),
currentHAGroupStoreRecord.getPeerClusterUrl(),
+ currentHAGroupStoreRecord.getAdminCRRVersion());
+ phoenixHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
newHAGroupStoreRecord,
+ currentHAGroupStoreRecordStat.getVersion());
+ // If cluster role is changing, if so, we update,
+ // the system table on best effort basis.
+ // We also have a periodic job which syncs the ZK
+ // state with System Table periodically.
+ if (currentHAGroupStoreRecord.getClusterRole() != clusterRole) {
+ HAGroupStoreRecord peerZkRecord = getHAGroupStoreRecordFromPeer();
+ ClusterRoleRecord.ClusterRole peerClusterRole = peerZkRecord != null
+ ? peerZkRecord.getClusterRole()
+ : ClusterRoleRecord.ClusterRole.UNKNOWN;
+ SystemTableHAGroupRecord systemTableRecord = new
SystemTableHAGroupRecord(
+ HighAvailabilityPolicy.valueOf(newHAGroupStoreRecord.getPolicy()),
clusterRole,
+ peerClusterRole, newHAGroupStoreRecord.getClusterUrl(),
+ newHAGroupStoreRecord.getPeerClusterUrl(), this.zkUrl,
newHAGroupStoreRecord.getPeerZKUrl(),
+ newHAGroupStoreRecord.getAdminCRRVersion());
+ updateSystemTableHAGroupRecordSilently(haGroupName, systemTableRecord);
}
+ return 0L;
}
/**
@@ -986,10 +988,11 @@ public class HAGroupStoreClient implements Closeable {
* @param currentHAGroupState the current HAGroupState of the
HAGroupStoreRecord
* @param currentHAGroupStoreRecordMtime the last modified time of the
current HAGroupStoreRecord
* @param newHAGroupState the cluster state to check
- * @return true if the HAGroupStoreRecord needs to be updated, false
otherwise
+ * @return the wait time in milliseconds for state transition, if 0 then the
state transition is
+ * valid.
* @throws InvalidClusterRoleTransitionException if the cluster role
transition is invalid
*/
- private boolean isUpdateNeeded(HAGroupStoreRecord.HAGroupState
currentHAGroupState,
+ private long
validateTransitionAndGetWaitTime(HAGroupStoreRecord.HAGroupState
currentHAGroupState,
long currentHAGroupStoreRecordMtime, HAGroupStoreRecord.HAGroupState
newHAGroupState)
throws InvalidClusterRoleTransitionException {
long waitTime = 0L;
@@ -1006,7 +1009,8 @@ public class HAGroupStoreClient implements Closeable {
throw new InvalidClusterRoleTransitionException(
"Cannot transition from " + currentHAGroupState + " to " +
newHAGroupState);
}
- return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime) >
waitTime);
+ long remainingTime = currentHAGroupStoreRecordMtime + waitTime -
System.currentTimeMillis();
+ return Math.max(0, remainingTime);
}
// ========== HA Group State Change Subscription Methods ==========
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index a51f730de1..3f2524a35d 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -304,6 +304,8 @@ public class HAGroupStoreManager {
/**
* Sets the HAGroupStoreRecord to StoreAndForward mode in local cluster.
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition, if 0 then the
state transition is
+ * successful.
* @throws StaleHAGroupStoreRecordVersionException if the cached version is
invalid, the state
* might have been updated
by some other RS, check
* the state again and retry
if the use case still
@@ -313,16 +315,19 @@ public class HAGroupStoreManager {
* state again and retry if
the use case still
* needs it.
*/
- public void setHAGroupStatusToStoreAndForward(final String haGroupName)
throws IOException,
+ public long setHAGroupStatusToStoreAndForward(final String haGroupName)
throws IOException,
StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
+ return haGroupStoreClient
+
.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
}
/**
* Sets the HAGroupStoreRecord to Sync mode in local cluster.
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition, if 0 then the
state transition is
+ * successful.
* @throws IOException when HAGroupStoreClient
is not healthy.
* @throws StaleHAGroupStoreRecordVersionException if the cached version is
invalid, the state
* might have been updated
by some other RS, check
@@ -333,7 +338,7 @@ public class HAGroupStoreManager {
* state again and retry if
the use case still
* needs it.
*/
- public void setHAGroupStatusToSync(final String haGroupName) throws
IOException,
+ public long setHAGroupStatusToSync(final String haGroupName) throws
IOException,
StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
@@ -343,7 +348,7 @@ public class HAGroupStoreManager {
haGroupStoreRecord.getHAGroupState() ==
HAGroupState.ACTIVE_NOT_IN_SYNC_TO_STANDBY
? ACTIVE_IN_SYNC_TO_STANDBY
: ACTIVE_IN_SYNC;
- haGroupStoreClient.setHAGroupStatusIfNeeded(targetHAGroupState);
+ return haGroupStoreClient.setHAGroupStatusIfNeeded(targetHAGroupState);
} else {
throw new IOException("Current HAGroupStoreRecord is null for HA group:
" + haGroupName);
}
@@ -354,6 +359,8 @@ public class HAGroupStoreManager {
* Checks current state and transitions to: - ACTIVE_IN_SYNC_TO_STANDBY if
currently
* ACTIVE_IN_SYNC - ACTIVE_NOT_IN_SYNC_TO_STANDBY if currently
ACTIVE_NOT_IN_SYNC
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition, if 0 then the
state transition is
+ * successful.
* @throws IOException when HAGroupStoreClient
is not healthy.
* @throws StaleHAGroupStoreRecordVersionException when the version is
stale, the state might have
* been updated by some
other RS, check the state
@@ -365,7 +372,7 @@ public class HAGroupStoreManager {
* @throws SQLException when there is an error
with the database
* operation
*/
- public void initiateFailoverOnActiveCluster(final String haGroupName) throws
IOException,
+ public long initiateFailoverOnActiveCluster(final String haGroupName) throws
IOException,
StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
@@ -389,13 +396,15 @@ public class HAGroupStoreManager {
+ currentState + ". Cluster must be in ACTIVE_IN_SYNC or
ACTIVE_NOT_IN_SYNC state.");
}
- haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
+ return haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
}
/**
* Sets the HAGroupStoreRecord to abort failover and return to STANDBY in
local cluster. This
* aborts an ongoing failover process by moving the standby cluster to abort
state.
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition, if 0 then the
state transition is
+ * successful.
* @throws IOException when HAGroupStoreClient
is not healthy.
* @throws StaleHAGroupStoreRecordVersionException when the version is
stale, the state might have
* been updated by some
other RS, check the state
@@ -407,17 +416,20 @@ public class HAGroupStoreManager {
* @throws SQLException when there is an error
with the database
* operation
*/
- public void setHAGroupStatusToAbortToStandby(final String haGroupName)
throws IOException,
+ public long setHAGroupStatusToAbortToStandby(final String haGroupName)
throws IOException,
StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ABORT_TO_STANDBY);
+ return haGroupStoreClient
+
.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ABORT_TO_STANDBY);
}
/**
* Sets the HAGroupStoreRecord to degrade reader functionality in local
cluster. Transitions from
* STANDBY to DEGRADED_STANDBY_FOR_READER or from
DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY.
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition, if 0 then the
state transition is
+ * successful.
* @throws IOException when HAGroupStoreClient
is not healthy.
* @throws StaleHAGroupStoreRecordVersionException when the version is
stale, the state might have
* been updated by some
other RS, check the state
@@ -427,7 +439,7 @@ public class HAGroupStoreManager {
* the state again and retry
if the use case still
* needs it.
*/
- public void setReaderToDegraded(final String haGroupName) throws IOException,
+ public long setReaderToDegraded(final String haGroupName) throws IOException,
StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
@@ -436,13 +448,16 @@ public class HAGroupStoreManager {
if (currentRecord == null) {
throw new IOException("Current HAGroupStoreRecord is null for HA group:
" + haGroupName);
}
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY);
+ return haGroupStoreClient
+
.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY);
}
/**
* Sets the HAGroupStoreRecord to restore reader functionality in local
cluster. Transitions from
* DEGRADED_STANDBY_FOR_READER to STANDBY or from DEGRADED_STANDBY to
DEGRADED_STANDBY_FOR_WRITER.
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition, if 0 then the
state transition is
+ * successful.
* @throws IOException when HAGroupStoreClient
is not healthy.
* @throws StaleHAGroupStoreRecordVersionException when the version is
stale, the state might have
* been updated by some
other RS, check the state
@@ -452,7 +467,7 @@ public class HAGroupStoreManager {
* the state again and retry
if the use case still
* needs it.
*/
- public void setReaderToHealthy(final String haGroupName) throws IOException,
+ public long setReaderToHealthy(final String haGroupName) throws IOException,
StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient =
getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
@@ -462,10 +477,10 @@ public class HAGroupStoreManager {
throw new IOException("Current HAGroupStoreRecord is null " + "for HA
group: " + haGroupName);
} else if (currentRecord.getHAGroupState() == STANDBY) {
LOGGER.info("Current HAGroupStoreRecord is already STANDBY for HA group:
" + haGroupName);
- return;
+ return 0L;
}
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.STANDBY);
+ return
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.STANDBY);
}
/**
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 571bc848ad..3b2fed58c8 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -34,7 +34,6 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -649,9 +648,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// We don't want to check for mutation blocking for the system ha group
table
if (!tableName.equals(SYSTEM_HA_GROUP_NAME)) {
// Extract HAGroupName from the mutations
- final Set<String> haGroupNames =
extractHAGroupNameAttribute(miniBatchOp);
+ final String haGroupName = extractHAGroupNameAttribute(miniBatchOp);
// Check if mutation is blocked for any of the HAGroupNames
- for (String haGroupName : haGroupNames) {
+ if (StringUtils.isNotBlank(haGroupName)) {
// TODO: Below approach might be slow need to figure out faster way,
// slower part is getting haGroupStoreClient We can also cache
// roleRecord (I tried it and still it's slow due to
haGroupStoreClient
@@ -660,8 +659,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
boolean isHAGroupOnClientStale =
haGroupStoreManager.isHAGroupOnClientStale(haGroupName);
if (StringUtils.isNotBlank(haGroupName) && isHAGroupOnClientStale) {
- throw new StaleClusterRoleRecordException(
- String.format("HAGroupStoreRecord is stale for haGroup %s on
client", haGroupName));
+ throw new StaleClusterRoleRecordException(String
+ .format("HAGroupStoreRecord is stale for haGroup %s on " +
"client", haGroupName));
}
// Check if mutation's haGroup is stale
@@ -670,7 +669,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
&& haGroupStoreManager.isMutationBlocked(haGroupName)
) {
throw new MutationBlockedIOException(
- "Blocking Mutation as Some CRRs are in " + "ACTIVE_TO_STANDBY
state and "
+ "Blocking Mutation as Some CRRs " + "are in ACTIVE_TO_STANDBY
state and "
+ "CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is true");
}
}
@@ -684,17 +683,15 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
"Somehow didn't return an index update but also didn't propagate the
failure to the client!");
}
- private Set<String>
- extractHAGroupNameAttribute(MiniBatchOperationInProgress<Mutation>
miniBatchOp) {
- Set<String> haGroupNames = new HashSet<>();
+ private String
extractHAGroupNameAttribute(MiniBatchOperationInProgress<Mutation> miniBatchOp)
{
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
byte[] haGroupName =
m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
if (haGroupName != null) {
- haGroupNames.add(new String(haGroupName, StandardCharsets.UTF_8));
+ return Bytes.toString(haGroupName);
}
}
- return haGroupNames;
+ return null;
}
@Override
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index 3f59115503..e095c3d0a9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.jdbc;
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_HA_GROUP_NAME;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
@@ -615,8 +614,8 @@ public class HAGroupStoreClientIT extends BaseTest {
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
currentRecord.getHAGroupState());
// Update to STANDBY (this should succeed as it's a valid transition)
- haGroupStoreClient
-
.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+ assertEquals(0L, haGroupStoreClient
+
.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the record was updated
@@ -647,8 +646,8 @@ public class HAGroupStoreClientIT extends BaseTest {
// Update to STANDBY (this should succeed as it's a valid transition)
long timestamp = System.currentTimeMillis();
- haGroupStoreClient.setHAGroupStatusIfNeeded(
- HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, timestamp);
+ assertEquals(0L, haGroupStoreClient.setHAGroupStatusIfNeeded(
+ HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY, timestamp));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the record was updated
@@ -675,7 +674,8 @@ public class HAGroupStoreClientIT extends BaseTest {
.getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(),
haGroupName, zkUrl);
// Try to set to ACTIVE_IN_SYNC immediately (should not update due to
timing)
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+ assert haGroupStoreClient
+
.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC) > 0;
// Add sleep if due to any bug the update might have gone through and we
can assert below this.
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
@@ -702,9 +702,13 @@ public class HAGroupStoreClientIT extends BaseTest {
HAGroupStoreClient haGroupStoreClient = HAGroupStoreClient
.getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(),
haGroupName, zkUrl);
- Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS + (long) Math
- .ceil(DEFAULT_ZK_SESSION_TIMEOUT *
HAGroupStoreClient.ZK_SESSION_TIMEOUT_MULTIPLIER));
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+ // Find wait time for ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC
+ long waitTime =
+
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+ assert waitTime > 0;
+ Thread.sleep(waitTime);
+ assertEquals(0L,
+
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the record was updated
@@ -777,8 +781,8 @@ public class HAGroupStoreClientIT extends BaseTest {
.getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(),
haGroupName, zkUrl);
// First transition: ACTIVE -> ACTIVE_TO_STANDBY
- haGroupStoreClient
-
.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+ assertEquals(0L, haGroupStoreClient
+
.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
HAGroupStoreRecord afterFirst = haGroupStoreClient.getHAGroupStoreRecord();
@@ -787,7 +791,8 @@ public class HAGroupStoreClientIT extends BaseTest {
// Wait and make another transition: ACTIVE_TO_STANDBY -> STANDBY
Thread.sleep(100); // Small delay to ensure timestamp difference
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.STANDBY);
+ assertEquals(0L,
+
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.STANDBY));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
HAGroupStoreRecord afterSecond =
haGroupStoreClient.getHAGroupStoreRecord();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
index 801d779588..6c1c88d510 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -17,10 +17,8 @@
*/
package org.apache.phoenix.jdbc;
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_CONSISTENT_HA_GROUP_RECORD_NAMESPACE;
-import static
org.apache.phoenix.jdbc.HAGroupStoreClient.ZK_SESSION_TIMEOUT_MULTIPLIER;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.getLocalZkUrl;
import static org.apache.phoenix.jdbc.PhoenixHAAdmin.toPath;
import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
@@ -352,7 +350,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Set the HA group status to store and forward (ACTIVE_NOT_IN_SYNC)
- haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName);
+ assertEquals(0L,
haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the status was updated to ACTIVE_NOT_IN_SYNC
@@ -367,7 +365,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Set the HA group status to store and forward again and verify
// that getLastSyncStateTimeInMs is same (ACTIVE_NOT_IN_SYNC)
// The time should only update when we move to AIS to ANIS
- haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName);
+ assertEquals(0L,
haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
Optional<HAGroupStoreRecord> updatedRecordOpt2 =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
@@ -392,10 +390,13 @@ public class HAGroupStoreManagerIT extends BaseTest {
initialRecord.getHAGroupState());
assertNotNull(initialRecord.getLastSyncStateTimeInMs());
+ // Setting it too soon should return wait time.
+ long transitionWaitTime =
haGroupStoreManager.setHAGroupStatusToSync(haGroupName);
+ assert (transitionWaitTime > 0);
+
// Set the HA group status to sync (ACTIVE), we need to wait for
ZK_SESSION_TIMEOUT * Multiplier
- Thread.sleep((long) Math.ceil(config.getLong(ZK_SESSION_TIMEOUT,
DEFAULT_ZK_SESSION_TIMEOUT)
- * ZK_SESSION_TIMEOUT_MULTIPLIER));
- haGroupStoreManager.setHAGroupStatusToSync(haGroupName);
+ Thread.sleep(transitionWaitTime);
+ assertEquals(0L, haGroupStoreManager.setHAGroupStatusToSync(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the state was updated to ACTIVE_IN_SYNC
@@ -508,7 +509,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Call setReaderToDegraded
- haGroupStoreManager.setReaderToDegraded(haGroupName);
+ assertEquals(0L, haGroupStoreManager.setReaderToDegraded(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the status was updated to DEGRADED_STANDBY
@@ -539,7 +540,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Call setReaderToHealthy
- haGroupStoreManager.setReaderToHealthy(haGroupName);
+ assertEquals(0L, haGroupStoreManager.setReaderToHealthy(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the status was updated to STANDBY
@@ -571,7 +572,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Test setReaderToDegraded with invalid state
try {
- haGroupStoreManager.setReaderToDegraded(haGroupName);
+ assertEquals(0L, haGroupStoreManager.setReaderToDegraded(haGroupName));
fail(
"Expected InvalidClusterRoleTransitionException for
setReaderToDegraded with ACTIVE_IN_SYNC state");
} catch (InvalidClusterRoleTransitionException e) {
@@ -625,10 +626,10 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Move cluster1 from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC,
// we can move after DEFAULT_ZK_SESSION_TIMEOUT *
ZK_SESSION_TIMEOUT_MULTIPLIER
Thread.sleep(20 * 1000);
- cluster1HAManager.setHAGroupStatusToSync(haGroupName);
+ assertEquals(0L, cluster1HAManager.setHAGroupStatusToSync(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
- cluster2HAManager.setReaderToHealthy(haGroupName);
+ assertEquals(0L, cluster2HAManager.setReaderToHealthy(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Simulates action taken by reader to complete the replay and become new
ACTIVE
@@ -636,7 +637,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
(haGroupName1, fromState, toState, modifiedTime, clusterType,
lastSyncStateTimeInMs) -> {
try {
if (toState == HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE) {
- cluster2HAManager.setHAGroupStatusToSync(haGroupName1);
+ assertEquals(0L,
cluster2HAManager.setHAGroupStatusToSync(haGroupName1));
}
} catch (Exception e) {
fail("Peer Cluster should be able to move to ACTIVE_IN_SYNC" +
e.getMessage());
@@ -716,10 +717,10 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Move cluster1 from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC,
// we can move after DEFAULT_ZK_SESSION_TIMEOUT *
ZK_SESSION_TIMEOUT_MULTIPLIER
Thread.sleep(20 * 1000);
- cluster1HAManager.setHAGroupStatusToSync(haGroupName);
+ assertEquals(0L, cluster1HAManager.setHAGroupStatusToSync(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
- cluster2HAManager.setReaderToHealthy(haGroupName);
+ assertEquals(0L, cluster2HAManager.setReaderToHealthy(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// === INITIAL STATE VERIFICATION ===
@@ -758,7 +759,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// === STEP 3: Operator decides to abort failover ===
// Set cluster2 (which is in STANDBY_TO_ACTIVE) to ABORT_TO_STANDBY
- cluster2HAManager.setHAGroupStatusToAbortToStandby(haGroupName);
+ assertEquals(0L,
cluster2HAManager.setHAGroupStatusToAbortToStandby(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// === STEP 4: Verify automatic cluster1 abort reaction ===
@@ -834,7 +835,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Move cluster1 from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC,
// we can move after DEFAULT_ZK_SESSION_TIMEOUT *
ZK_SESSION_TIMEOUT_MULTIPLIER
Thread.sleep(20 * 1000);
- cluster1HAManager.setHAGroupStatusToSync(haGroupName);
+ assertEquals(0L, cluster1HAManager.setHAGroupStatusToSync(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// === INITIAL STATE VERIFICATION ===
@@ -854,7 +855,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// === STEP 1: Transition to store-and-forward mode ===
// Move cluster1 from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC
(store-and-forward mode)
- cluster1HAManager.setHAGroupStatusToStoreAndForward(haGroupName);
+ assertEquals(0L,
cluster1HAManager.setHAGroupStatusToStoreAndForward(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify cluster1 is now in store-and-forward state
@@ -880,7 +881,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Move cluster1 back from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC
// Wait for the required time before transitioning back to sync
Thread.sleep(20 * 1000);
- cluster1HAManager.setHAGroupStatusToSync(haGroupName);
+ assertEquals(0L, cluster1HAManager.setHAGroupStatusToSync(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify cluster1 is back in sync state