This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this
push:
new fe2147a3d4 PHOENIX-7566 Returning waitTime as part of state transition
response (#2314)
fe2147a3d4 is described below
commit fe2147a3d451a0122ebea70199d151268238d2a2
Author: ritegarg <[email protected]>
AuthorDate: Thu Nov 13 17:38:22 2025 -0800
PHOENIX-7566 Returning waitTime as part of state transition response (#2314)
Co-authored-by: Ritesh Garg
<[email protected]>
---
.../apache/phoenix/jdbc/HAGroupStoreClient.java | 142 +++++++++++----------
.../apache/phoenix/jdbc/HAGroupStoreManager.java | 38 ++++--
.../phoenix/hbase/index/IndexRegionObserver.java | 74 ++++++-----
.../apache/phoenix/jdbc/HAGroupStoreClientIT.java | 23 ++--
.../apache/phoenix/jdbc/HAGroupStoreManagerIT.java | 39 +++---
5 files changed, 172 insertions(+), 144 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
index c24d63a96f..f031ee7550 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreClient.java
@@ -298,7 +298,8 @@ public class HAGroupStoreClient implements Closeable {
/**
* Set the HA group status for the specified HA group name.
- * Checks if the status is needed to be updated based on logic in
isUpdateNeeded function.
+ * Checks if the status is needed to be updated based on logic in
+ * validateTransitionAndGetWaitTime function.
*
* @param haGroupState the HA group state to set
* @throws IOException if the client is not healthy or the operation fails
@@ -306,25 +307,28 @@ public class HAGroupStoreClient implements Closeable {
* @throws InvalidClusterRoleTransitionException when transition is not
valid
* @throws SQLException
*/
- public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState)
+ public long setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState)
throws IOException,
InvalidClusterRoleTransitionException,
SQLException, StaleHAGroupStoreRecordVersionException {
- setHAGroupStatusIfNeeded(haGroupState, null);
+ return setHAGroupStatusIfNeeded(haGroupState, null);
}
/**
* Set the HA group status for the specified HA group name.
- * Checks if the status is needed to be updated based on logic in
isUpdateNeeded function.
+ * Checks if the status is needed to be updated based on logic in
+ * validateTransitionAndGetWaitTime function.
*
* @param haGroupState the HA group state to set
* @param lastSyncTimeInMsNullable the last sync time in milliseconds, can
be null if not known.
* @throws IOException if the client is not healthy or the operation fails
* @throws StaleHAGroupStoreRecordVersionException if the version is stale
* @throws InvalidClusterRoleTransitionException when transition is not
valid
+ * @return the wait time in milliseconds for state transition,
+ * if 0 then the state transition is successful.
* @throws SQLException
*/
- public void setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState,
+ public long setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState
haGroupState,
Long lastSyncTimeInMsNullable)
throws IOException,
InvalidClusterRoleTransitionException,
@@ -342,63 +346,69 @@ public class HAGroupStoreClient implements Closeable {
+ "cannot update HAGroupStoreRecord, the record should be
initialized "
+ "in System Table first" + haGroupName);
}
- if (isUpdateNeeded(currentHAGroupStoreRecord.getHAGroupState(),
- currentHAGroupStoreRecordStat.getMtime(), haGroupState)) {
- // We maintain last sync time as the last time cluster was in
sync state.
- // If state changes from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC,
record that time
- // Once state changes back to ACTIVE_IN_SYNC or the role is
- // NOT ACTIVE or ACTIVE_TO_STANDBY
- // set the time to null to mark that we are current(or we
don't have any reader).
- long lastSyncTimeInMs
- = lastSyncTimeInMsNullable != null
- ? lastSyncTimeInMsNullable
- : currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
- ClusterRole clusterRole = haGroupState.getClusterRole();
- if (currentHAGroupStoreRecord.getHAGroupState()
- == HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC
- && haGroupState ==
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC) {
- // We record the last round timestamp by subtracting the
rotationTime and then
- // taking the beginning of last round (floor) by first
integer division and then multiplying again.
- lastSyncTimeInMs = ((System.currentTimeMillis() -
rotationTimeMs)/rotationTimeMs) * (rotationTimeMs);
- }
- HAGroupStoreRecord newHAGroupStoreRecord = new
HAGroupStoreRecord(
- currentHAGroupStoreRecord.getProtocolVersion(),
- currentHAGroupStoreRecord.getHaGroupName(),
- haGroupState,
- lastSyncTimeInMs,
- currentHAGroupStoreRecord.getPolicy(),
- currentHAGroupStoreRecord.getPeerZKUrl(),
- currentHAGroupStoreRecord.getClusterUrl(),
- currentHAGroupStoreRecord.getPeerClusterUrl(),
- currentHAGroupStoreRecord.getAdminCRRVersion()
- );
- phoenixHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
- newHAGroupStoreRecord,
currentHAGroupStoreRecordStat.getVersion());
- // If cluster role is changing, if so, we update,
- // the system table on best effort basis.
- // We also have a periodic job which syncs the ZK
- // state with System Table periodically.
- if (currentHAGroupStoreRecord.getClusterRole() != clusterRole)
{
- HAGroupStoreRecord peerZkRecord =
getHAGroupStoreRecordFromPeer();
- ClusterRoleRecord.ClusterRole peerClusterRole =
peerZkRecord != null
- ? peerZkRecord.getClusterRole()
- : ClusterRoleRecord.ClusterRole.UNKNOWN;
- SystemTableHAGroupRecord systemTableRecord = new
SystemTableHAGroupRecord(
-
HighAvailabilityPolicy.valueOf(newHAGroupStoreRecord.getPolicy()),
- clusterRole,
- peerClusterRole,
- newHAGroupStoreRecord.getClusterUrl(),
- newHAGroupStoreRecord.getPeerClusterUrl(),
- this.zkUrl,
- newHAGroupStoreRecord.getPeerZKUrl(),
- newHAGroupStoreRecord.getAdminCRRVersion()
- );
- updateSystemTableHAGroupRecordSilently(haGroupName,
systemTableRecord);
- }
- } else {
+ long stateTransitionWaitTime
+ =
validateTransitionAndGetWaitTime(currentHAGroupStoreRecord.getHAGroupState(),
+ currentHAGroupStoreRecordStat.getMtime(), haGroupState);
+ if (stateTransitionWaitTime > 0) {
LOGGER.info("Not updating HAGroupStoreRecord for HA group {} with
state {}",
haGroupName, haGroupState);
+ return stateTransitionWaitTime;
+ }
+ // We maintain last sync time as the last time cluster was in sync
state.
+ // If state changes from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC, record
that time
+ // Once state changes back to ACTIVE_IN_SYNC or the role is
+ // NOT ACTIVE or ACTIVE_TO_STANDBY
+ // set the time to null to mark that we are current(or we don't have
any reader).
+ long lastSyncTimeInMs
+ = lastSyncTimeInMsNullable != null
+ ? lastSyncTimeInMsNullable
+ : currentHAGroupStoreRecord.getLastSyncStateTimeInMs();
+ ClusterRole clusterRole = haGroupState.getClusterRole();
+ if (currentHAGroupStoreRecord.getHAGroupState()
+ == HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC
+ && haGroupState ==
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC) {
+ // We record the last round timestamp by subtracting the
rotationTime and then
+ // taking the beginning of last round (floor) by first integer
+ // division and then multiplying again.
+ lastSyncTimeInMs
+ = ((System.currentTimeMillis() - rotationTimeMs) /
rotationTimeMs)
+ * rotationTimeMs;
+ }
+ HAGroupStoreRecord newHAGroupStoreRecord = new HAGroupStoreRecord(
+ currentHAGroupStoreRecord.getProtocolVersion(),
+ currentHAGroupStoreRecord.getHaGroupName(),
+ haGroupState,
+ lastSyncTimeInMs,
+ currentHAGroupStoreRecord.getPolicy(),
+ currentHAGroupStoreRecord.getPeerZKUrl(),
+ currentHAGroupStoreRecord.getClusterUrl(),
+ currentHAGroupStoreRecord.getPeerClusterUrl(),
+ currentHAGroupStoreRecord.getAdminCRRVersion()
+ );
+ phoenixHaAdmin.updateHAGroupStoreRecordInZooKeeper(haGroupName,
+ newHAGroupStoreRecord,
currentHAGroupStoreRecordStat.getVersion());
+ // If cluster role is changing, if so, we update,
+ // the system table on best effort basis.
+ // We also have a periodic job which syncs the ZK
+ // state with System Table periodically.
+ if (currentHAGroupStoreRecord.getClusterRole() != clusterRole) {
+ HAGroupStoreRecord peerZkRecord = getHAGroupStoreRecordFromPeer();
+ ClusterRoleRecord.ClusterRole peerClusterRole = peerZkRecord !=
null
+ ? peerZkRecord.getClusterRole()
+ : ClusterRoleRecord.ClusterRole.UNKNOWN;
+ SystemTableHAGroupRecord systemTableRecord = new
SystemTableHAGroupRecord(
+
HighAvailabilityPolicy.valueOf(newHAGroupStoreRecord.getPolicy()),
+ clusterRole,
+ peerClusterRole,
+ newHAGroupStoreRecord.getClusterUrl(),
+ newHAGroupStoreRecord.getPeerClusterUrl(),
+ this.zkUrl,
+ newHAGroupStoreRecord.getPeerZKUrl(),
+ newHAGroupStoreRecord.getAdminCRRVersion()
+ );
+ updateSystemTableHAGroupRecordSilently(haGroupName,
systemTableRecord);
}
+ return 0L;
}
/**
@@ -1050,12 +1060,14 @@ public class HAGroupStoreClient implements Closeable {
* @param currentHAGroupStoreRecordMtime the last modified time of the
current
* HAGroupStoreRecord
* @param newHAGroupState the cluster state to check
- * @return true if the HAGroupStoreRecord needs to be updated, false
otherwise
+ * @return the wait time in milliseconds for state transition,
+ * if 0 then the state transition is valid.
* @throws InvalidClusterRoleTransitionException if the cluster role
transition is invalid
*/
- private boolean isUpdateNeeded(HAGroupStoreRecord.HAGroupState
currentHAGroupState,
- long currentHAGroupStoreRecordMtime,
- HAGroupStoreRecord.HAGroupState
newHAGroupState)
+ private long validateTransitionAndGetWaitTime(
+ HAGroupStoreRecord.HAGroupState currentHAGroupState,
+ long currentHAGroupStoreRecordMtime,
+ HAGroupStoreRecord.HAGroupState newHAGroupState)
throws InvalidClusterRoleTransitionException {
long waitTime = 0L;
if (currentHAGroupState.isTransitionAllowed(newHAGroupState)) {
@@ -1069,7 +1081,9 @@ public class HAGroupStoreClient implements Closeable {
throw new InvalidClusterRoleTransitionException("Cannot transition
from "
+ currentHAGroupState + " to " + newHAGroupState);
}
- return ((System.currentTimeMillis() - currentHAGroupStoreRecordMtime)
> waitTime);
+ long remainingTime
+ = currentHAGroupStoreRecordMtime + waitTime -
System.currentTimeMillis();
+ return Math.max(0, remainingTime);
}
// ========== HA Group State Change Subscription Methods ==========
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
index b358ab5b88..fb3d7d5ad2 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreManager.java
@@ -326,6 +326,8 @@ public class HAGroupStoreManager {
* Sets the HAGroupStoreRecord to StoreAndForward mode in local cluster.
*
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition,
+ * if 0 then the state transition is successful.
* @throws StaleHAGroupStoreRecordVersionException if the cached version
is invalid,
* the state might have been updated by some other RS,
* check the state again and retry if the use case still needs it.
@@ -333,12 +335,12 @@ public class HAGroupStoreManager {
* the state might have been updated by some other RS,
* check the state again and retry if the use case still needs it.
*/
- public void setHAGroupStatusToStoreAndForward(final String haGroupName)
+ public long setHAGroupStatusToStoreAndForward(final String haGroupName)
throws IOException, StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient
= getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
- haGroupStoreClient.setHAGroupStatusIfNeeded(
+ return haGroupStoreClient.setHAGroupStatusIfNeeded(
HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC);
}
@@ -346,6 +348,8 @@ public class HAGroupStoreManager {
* Sets the HAGroupStoreRecord to Sync mode in local cluster.
*
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition,
+ * if 0 then the state transition is successful.
* @throws IOException when HAGroupStoreClient is not healthy.
* @throws StaleHAGroupStoreRecordVersionException if the cached version
is invalid,
* the state might have been updated by some other RS,
@@ -354,7 +358,7 @@ public class HAGroupStoreManager {
* the state might have been updated by some other RS,
* check the state again and retry if the use case still needs it.
*/
- public void setHAGroupStatusToSync(final String haGroupName)
+ public long setHAGroupStatusToSync(final String haGroupName)
throws IOException, StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient
@@ -365,7 +369,7 @@ public class HAGroupStoreManager {
== HAGroupState.ACTIVE_NOT_IN_SYNC_TO_STANDBY
? ACTIVE_IN_SYNC_TO_STANDBY
: ACTIVE_IN_SYNC;
- haGroupStoreClient.setHAGroupStatusIfNeeded(targetHAGroupState);
+ return
haGroupStoreClient.setHAGroupStatusIfNeeded(targetHAGroupState);
} else {
throw new IOException("Current HAGroupStoreRecord is null for HA
group: "
+ haGroupName);
@@ -379,6 +383,8 @@ public class HAGroupStoreManager {
* - ACTIVE_NOT_IN_SYNC_TO_STANDBY if currently ACTIVE_NOT_IN_SYNC
*
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition,
+ * if 0 then the state transition is successful.
* @throws IOException when HAGroupStoreClient is not healthy.
* @throws StaleHAGroupStoreRecordVersionException when the version is
stale,
* the state might have been updated by some other RS,
@@ -388,7 +394,7 @@ public class HAGroupStoreManager {
* check the state again and retry if the use case still needs it.
* @throws SQLException when there is an error with the database operation
*/
- public void initiateFailoverOnActiveCluster(final String haGroupName)
+ public long initiateFailoverOnActiveCluster(final String haGroupName)
throws IOException, StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient
@@ -414,7 +420,7 @@ public class HAGroupStoreManager {
". Cluster must be in ACTIVE_IN_SYNC or ACTIVE_NOT_IN_SYNC
state.");
}
- haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
+ return haGroupStoreClient.setHAGroupStatusIfNeeded(targetState);
}
/**
@@ -422,6 +428,8 @@ public class HAGroupStoreManager {
* This aborts an ongoing failover process by moving the standby cluster
to abort state.
*
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition,
+ * if 0 then the state transition is successful.
* @throws IOException when HAGroupStoreClient is not healthy.
* @throws StaleHAGroupStoreRecordVersionException when the version is
stale,
* the state might have been updated by some other RS,
@@ -431,12 +439,12 @@ public class HAGroupStoreManager {
* check the state again and retry if the use case still needs it.
* @throws SQLException when there is an error with the database operation
*/
- public void setHAGroupStatusToAbortToStandby(final String haGroupName)
+ public long setHAGroupStatusToAbortToStandby(final String haGroupName)
throws IOException, StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient
= getHAGroupStoreClientAndSetupFailoverManagement(haGroupName);
- haGroupStoreClient.setHAGroupStatusIfNeeded(
+ return haGroupStoreClient.setHAGroupStatusIfNeeded(
HAGroupStoreRecord.HAGroupState.ABORT_TO_STANDBY);
}
@@ -446,6 +454,8 @@ public class HAGroupStoreManager {
* DEGRADED_STANDBY_FOR_WRITER to DEGRADED_STANDBY.
*
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition,
+ * if 0 then the state transition is successful.
* @throws IOException when HAGroupStoreClient is not healthy.
* @throws StaleHAGroupStoreRecordVersionException when the version is
stale,
* the state might have been updated by some other RS,
@@ -454,7 +464,7 @@ public class HAGroupStoreManager {
* the state might have been updated by some other RS,
* check the state again and retry if the use case still needs it.
*/
- public void setReaderToDegraded(final String haGroupName)
+ public long setReaderToDegraded(final String haGroupName)
throws IOException, StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient
@@ -466,7 +476,7 @@ public class HAGroupStoreManager {
throw new IOException("Current HAGroupStoreRecord is null for HA
group: "
+ haGroupName);
}
- haGroupStoreClient.setHAGroupStatusIfNeeded(
+ return haGroupStoreClient.setHAGroupStatusIfNeeded(
HAGroupStoreRecord.HAGroupState.DEGRADED_STANDBY);
}
@@ -476,6 +486,8 @@ public class HAGroupStoreManager {
* DEGRADED_STANDBY to DEGRADED_STANDBY_FOR_WRITER.
*
* @param haGroupName name of the HA group
+ * @return the wait time in milliseconds for state transition,
+ * if 0 then the state transition is successful.
* @throws IOException when HAGroupStoreClient is not healthy.
* @throws StaleHAGroupStoreRecordVersionException when the version is
stale,
* the state might have been updated by some other RS,
@@ -484,7 +496,7 @@ public class HAGroupStoreManager {
* the state might have been updated by some other RS,
* check the state again and retry if the use case still needs it.
*/
- public void setReaderToHealthy(final String haGroupName)
+ public long setReaderToHealthy(final String haGroupName)
throws IOException, StaleHAGroupStoreRecordVersionException,
InvalidClusterRoleTransitionException, SQLException {
HAGroupStoreClient haGroupStoreClient
@@ -497,10 +509,10 @@ public class HAGroupStoreManager {
+ "for HA group: " + haGroupName);
} else if (currentRecord.getHAGroupState() == STANDBY) {
LOGGER.info("Current HAGroupStoreRecord is already STANDBY for HA
group: " + haGroupName);
- return;
+ return 0L;
}
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.STANDBY);
+ return
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.STANDBY);
}
/**
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index e6b91d5bae..f11f15d3ab 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -129,7 +129,6 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -636,32 +635,32 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
=
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
// We don't want to check for mutation blocking for the system ha
group table
if (!tableName.equals(SYSTEM_HA_GROUP_NAME)) {
- // Extract HAGroupName from the mutations
- final Set<String> haGroupNames =
extractHAGroupNameAttribute(miniBatchOp);
- // Check if mutation is blocked for any of the HAGroupNames
- for (String haGroupName : haGroupNames) {
- //TODO: Below approach might be slow need to figure out
faster way,
- // slower part is getting haGroupStoreClient We can also
cache
- // roleRecord (I tried it and still it's slow due to
haGroupStoreClient
- // initialization) and caching will give us old result in
case one cluster
- // is unreachable instead of UNKNOWN.
-
- boolean isHAGroupOnClientStale = haGroupStoreManager
- .isHAGroupOnClientStale(haGroupName);
- if (StringUtils.isNotBlank(haGroupName) &&
isHAGroupOnClientStale) {
- throw new StaleClusterRoleRecordException(
- String.format("HAGroupStoreRecord is stale for
haGroup %s on client"
- , haGroupName));
- }
+ // Extract HAGroupName from the mutations
+ final String haGroupName =
extractHAGroupNameAttribute(miniBatchOp);
+ // Check if mutation is blocked for any of the HAGroupNames
+ if (StringUtils.isNotBlank(haGroupName)) {
+ //TODO: Below approach might be slow need to figure out
faster way,
+ // slower part is getting haGroupStoreClient We can also
cache
+ // roleRecord (I tried it and still it's slow due to
haGroupStoreClient
+ // initialization) and caching will give us old result in
case one cluster
+ // is unreachable instead of UNKNOWN.
+
+ boolean isHAGroupOnClientStale = haGroupStoreManager
+ .isHAGroupOnClientStale(haGroupName);
+ if (StringUtils.isNotBlank(haGroupName) &&
isHAGroupOnClientStale) {
+ throw new StaleClusterRoleRecordException(
+ String.format("HAGroupStoreRecord is stale for
haGroup %s on "
+ + "client", haGroupName));
+ }
- //Check if mutation's haGroup is stale
- if (StringUtils.isNotBlank(haGroupName)
- &&
haGroupStoreManager.isMutationBlocked(haGroupName)) {
- throw new MutationBlockedIOException("Blocking Mutation
as Some CRRs are in "
- + "ACTIVE_TO_STANDBY state and "
- + "CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is
true");
- }
- }
+ //Check if mutation's haGroup is stale
+ if (StringUtils.isNotBlank(haGroupName)
+ &&
haGroupStoreManager.isMutationBlocked(haGroupName)) {
+ throw new MutationBlockedIOException("Blocking
Mutation as Some CRRs "
+ + "are in ACTIVE_TO_STANDBY state and "
+ + "CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED
is true");
+ }
+ }
}
preBatchMutateWithExceptions(c, miniBatchOp);
return;
@@ -672,18 +671,17 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
"Somehow didn't return an index update but also didn't propagate the
failure to the client!");
}
- private Set<String> extractHAGroupNameAttribute(
- MiniBatchOperationInProgress<Mutation> miniBatchOp) {
- Set<String> haGroupNames = new HashSet<>();
- for (int i = 0; i < miniBatchOp.size(); i++) {
- Mutation m = miniBatchOp.getOperation(i);
- byte[] haGroupName = m.getAttribute(
- BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
- if (haGroupName != null) {
- haGroupNames.add(new String(haGroupName,
StandardCharsets.UTF_8));
- }
- }
- return haGroupNames;
+ private String extractHAGroupNameAttribute(
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ byte[] haGroupName = m.getAttribute(
+ BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
+ if (haGroupName != null) {
+ return String.valueOf(haGroupName);
+ }
+ }
+ return null;
}
@Override
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
index cf3ed2c045..ae5d5d456c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreClientIT.java
@@ -550,7 +550,7 @@ public class HAGroupStoreClientIT extends BaseTest {
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC,
currentRecord.getHAGroupState());
// Update to STANDBY (this should succeed as it's a valid transition)
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+ assertEquals(0L,
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the record was updated
@@ -577,7 +577,7 @@ public class HAGroupStoreClientIT extends BaseTest {
// Update to STANDBY (this should succeed as it's a valid transition)
long timestamp = System.currentTimeMillis();
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
timestamp);
+ assertEquals(0L,
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
timestamp));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the record was updated
@@ -585,8 +585,8 @@ public class HAGroupStoreClientIT extends BaseTest {
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY,
currentRecord.getHAGroupState());
assertEquals(timestamp, (long)
currentRecord.getLastSyncStateTimeInMs());
}
-
-
+
+
@Test
public void testSetHAGroupStatusIfNeededNoUpdateWhenNotNeeded() throws
Exception {
@@ -603,7 +603,7 @@ public class HAGroupStoreClientIT extends BaseTest {
HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(),
haGroupName, zkUrl);
// Try to set to ACTIVE_IN_SYNC immediately (should not update due to
timing)
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+
assert(haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC)
> 0);
// Add sleep if due to any bug the update might have gone through and
we can assert below this.
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
@@ -628,10 +628,11 @@ public class HAGroupStoreClientIT extends BaseTest {
HAGroupStoreClient haGroupStoreClient
=
HAGroupStoreClient.getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(),
haGroupName, zkUrl);
- Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS
- + (long) Math.ceil(DEFAULT_ZK_SESSION_TIMEOUT
- * HAGroupStoreClient.ZK_SESSION_TIMEOUT_MULTIPLIER));
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+ // Find wait time for ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC
+ long waitTime =
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC);
+ assert waitTime > 0;
+ Thread.sleep(waitTime);
+ assertEquals(0L,
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the record was updated
@@ -698,7 +699,7 @@ public class HAGroupStoreClientIT extends BaseTest {
HAGroupStoreClient haGroupStoreClient =
HAGroupStoreClient.getInstanceForZkUrl(CLUSTERS.getHBaseCluster1().getConfiguration(),
haGroupName, zkUrl);
// First transition: ACTIVE -> ACTIVE_TO_STANDBY
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY);
+ assertEquals(0L,
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.ACTIVE_IN_SYNC_TO_STANDBY));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
HAGroupStoreRecord afterFirst =
haGroupStoreClient.getHAGroupStoreRecord();
@@ -706,7 +707,7 @@ public class HAGroupStoreClientIT extends BaseTest {
// Wait and make another transition: ACTIVE_TO_STANDBY -> STANDBY
Thread.sleep(100); // Small delay to ensure timestamp difference
-
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.STANDBY);
+ assertEquals(0L,
haGroupStoreClient.setHAGroupStatusIfNeeded(HAGroupStoreRecord.HAGroupState.STANDBY));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
HAGroupStoreRecord afterSecond =
haGroupStoreClient.getHAGroupStoreRecord();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
index f5729b49f7..8aaf4b4957 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/jdbc/HAGroupStoreManagerIT.java
@@ -351,7 +351,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Set the HA group status to store and forward (ACTIVE_NOT_IN_SYNC)
- haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName);
+ assertEquals(0L,
haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the status was updated to ACTIVE_NOT_IN_SYNC
@@ -364,7 +364,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Set the HA group status to store and forward again and verify
// that getLastSyncStateTimeInMs is same (ACTIVE_NOT_IN_SYNC)
// The time should only update when we move to AIS to ANIS
- haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName);
+ assertEquals(0L,
haGroupStoreManager.setHAGroupStatusToStoreAndForward(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
Optional<HAGroupStoreRecord> updatedRecordOpt2 =
haGroupStoreManager.getHAGroupStoreRecord(haGroupName);
assertTrue(updatedRecordOpt2.isPresent());
@@ -384,10 +384,13 @@ public class HAGroupStoreManagerIT extends BaseTest {
assertEquals(HAGroupStoreRecord.HAGroupState.ACTIVE_NOT_IN_SYNC,
initialRecord.getHAGroupState());
assertNotNull(initialRecord.getLastSyncStateTimeInMs());
+ // Setting it too soon should return wait time.
+ long transitionWaitTime =
haGroupStoreManager.setHAGroupStatusToSync(haGroupName);
+ assert(transitionWaitTime > 0);
+
// Set the HA group status to sync (ACTIVE), we need to wait for
ZK_SESSION_TIMEOUT * Multiplier
- Thread.sleep((long) Math.ceil(config.getLong(ZK_SESSION_TIMEOUT,
DEFAULT_ZK_SESSION_TIMEOUT)
- * ZK_SESSION_TIMEOUT_MULTIPLIER));
- haGroupStoreManager.setHAGroupStatusToSync(haGroupName);
+ Thread.sleep(transitionWaitTime);
+ assertEquals(0L,
haGroupStoreManager.setHAGroupStatusToSync(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the state was updated to ACTIVE_IN_SYNC
@@ -495,7 +498,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Call setReaderToDegraded
- haGroupStoreManager.setReaderToDegraded(haGroupName);
+ assertEquals(0L, haGroupStoreManager.setReaderToDegraded(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the status was updated to DEGRADED_STANDBY
@@ -524,7 +527,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Call setReaderToHealthy
- haGroupStoreManager.setReaderToHealthy(haGroupName);
+ assertEquals(0L, haGroupStoreManager.setReaderToHealthy(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify the status was updated to STANDBY
@@ -554,7 +557,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Test setReaderToDegraded with invalid state
try {
- haGroupStoreManager.setReaderToDegraded(haGroupName);
+ assertEquals(0L,
haGroupStoreManager.setReaderToDegraded(haGroupName));
fail("Expected InvalidClusterRoleTransitionException for
setReaderToDegraded with ACTIVE_IN_SYNC state");
} catch (InvalidClusterRoleTransitionException e) {
// Expected behavior
@@ -564,7 +567,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Test setReaderToHealthy with invalid state
try {
- haGroupStoreManager.setReaderToHealthy(haGroupName);
+ assertEquals(0L,
haGroupStoreManager.setReaderToHealthy(haGroupName));
fail("Expected InvalidClusterRoleTransitionException for
setReaderToHealthy with ACTIVE_IN_SYNC state");
} catch (InvalidClusterRoleTransitionException e) {
// Expected behavior
@@ -602,10 +605,10 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Move cluster1 from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC,
// we can move after DEFAULT_ZK_SESSION_TIMEOUT *
ZK_SESSION_TIMEOUT_MULTIPLIER
Thread.sleep(20 * 1000);
- cluster1HAManager.setHAGroupStatusToSync(haGroupName);
+ assertEquals(0L,
cluster1HAManager.setHAGroupStatusToSync(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
- cluster2HAManager.setReaderToHealthy(haGroupName);
+ assertEquals(0L, cluster2HAManager.setReaderToHealthy(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Simulates action taken by reader to complete the replay and become
new ACTIVE
@@ -617,7 +620,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
lastSyncStateTimeInMs) -> {
try {
if (toState ==
HAGroupStoreRecord.HAGroupState.STANDBY_TO_ACTIVE) {
- cluster2HAManager.setHAGroupStatusToSync(haGroupName1);
+ assertEquals(0L,
cluster2HAManager.setHAGroupStatusToSync(haGroupName1));
}
} catch (Exception e) {
fail("Peer Cluster should be able to move to ACTIVE_IN_SYNC" +
e.getMessage());
@@ -694,10 +697,10 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Move cluster1 from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC,
// we can move after DEFAULT_ZK_SESSION_TIMEOUT *
ZK_SESSION_TIMEOUT_MULTIPLIER
Thread.sleep(20 * 1000);
- cluster1HAManager.setHAGroupStatusToSync(haGroupName);
+ assertEquals(0L,
cluster1HAManager.setHAGroupStatusToSync(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
- cluster2HAManager.setReaderToHealthy(haGroupName);
+ assertEquals(0L, cluster2HAManager.setReaderToHealthy(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// === INITIAL STATE VERIFICATION ===
@@ -733,7 +736,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// === STEP 3: Operator decides to abort failover ===
// Set cluster2 (which is in STANDBY_TO_ACTIVE) to ABORT_TO_STANDBY
- cluster2HAManager.setHAGroupStatusToAbortToStandby(haGroupName);
+ assertEquals(0L,
cluster2HAManager.setHAGroupStatusToAbortToStandby(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// === STEP 4: Verify automatic cluster1 abort reaction ===
@@ -805,7 +808,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Move cluster1 from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC,
// we can move after DEFAULT_ZK_SESSION_TIMEOUT *
ZK_SESSION_TIMEOUT_MULTIPLIER
Thread.sleep(20 * 1000);
- cluster1HAManager.setHAGroupStatusToSync(haGroupName);
+ assertEquals(0L,
cluster1HAManager.setHAGroupStatusToSync(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// === INITIAL STATE VERIFICATION ===
@@ -824,7 +827,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// === STEP 1: Transition to store-and-forward mode ===
// Move cluster1 from ACTIVE_IN_SYNC to ACTIVE_NOT_IN_SYNC
(store-and-forward mode)
- cluster1HAManager.setHAGroupStatusToStoreAndForward(haGroupName);
+ assertEquals(0L,
cluster1HAManager.setHAGroupStatusToStoreAndForward(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify cluster1 is now in store-and-forward state
@@ -851,7 +854,7 @@ public class HAGroupStoreManagerIT extends BaseTest {
// Move cluster1 back from ACTIVE_NOT_IN_SYNC to ACTIVE_IN_SYNC
// Wait for the required time before transitioning back to sync
Thread.sleep(20 * 1000);
- cluster1HAManager.setHAGroupStatusToSync(haGroupName);
+ assertEquals(0L,
cluster1HAManager.setHAGroupStatusToSync(haGroupName));
Thread.sleep(ZK_CURATOR_EVENT_PROPAGATION_TIMEOUT_MS);
// Verify cluster1 is back in sync state