jpisaac commented on code in PR #2224:
URL: https://github.com/apache/phoenix/pull/2224#discussion_r2208457525


##########
phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java:
##########
@@ -621,10 +623,16 @@ public void 
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
       }
       try {
           final Configuration conf = c.getEnvironment().getConfiguration();
-          final HAGroupStoreManager haGroupStoreManager = 
HAGroupStoreManager.getInstance(conf);
-          if (haGroupStoreManager.isMutationBlocked()) {
-              throw new IOException("Blocking Mutation as Some CRRs are in 
ACTIVE_TO_STANDBY "
-                      + "state and CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED 
is true");
+          final Optional<HAGroupStoreManager> haGroupStoreManagerOptional
+                  = HAGroupStoreManagerFactory.getInstance(conf);
+          if (!haGroupStoreManagerOptional.isPresent()) {
+              throw new IOException("HAGroupStoreManager is null "
+                      + "for current cluster, check configuration");
+          }
+          if (haGroupStoreManagerOptional.get().isMutationBlocked(conf)) {
+              throw new MutationBlockedIOException("Blocking Mutation as Some 
CRRs are in "

Review Comment:
   nit: lowercase "some"



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java:
##########
@@ -55,18 +65,136 @@
 public class ClusterRoleRecord {
     private static final Logger LOG = 
LoggerFactory.getLogger(ClusterRoleRecord.class);
 
+    private static final Set<ClusterRole> CAN_CONNECT
+            = ImmutableSet.of(ClusterRole.ACTIVE, 
ClusterRole.ACTIVE_TO_STANDBY,
+            ClusterRole.STANDBY, ClusterRole.ACTIVE_NOT_IN_SYNC,
+            ClusterRole.ABORT_TO_ACTIVE, ClusterRole.ABORT_TO_STANDBY,
+            ClusterRole.DEGRADED_STANDBY, 
ClusterRole.DEGRADED_STANDBY_FOR_READER,
+            ClusterRole.DEGRADED_STANDBY_FOR_WRITER, 
ClusterRole.ACTIVE_WITH_OFFLINE_PEER,
+            ClusterRole.ACTIVE_NOT_IN_SYNC_TO_STANDBY, 
ClusterRole.STANDBY_TO_ACTIVE,
+            ClusterRole.ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER);
+    private static final Set<ClusterRole> IS_ACTIVE = 
ImmutableSet.of(ClusterRole.ACTIVE,
+            ClusterRole.ACTIVE_TO_STANDBY, ClusterRole.ACTIVE_NOT_IN_SYNC,
+            ClusterRole.ACTIVE_NOT_IN_SYNC_TO_STANDBY, 
ClusterRole.ACTIVE_WITH_OFFLINE_PEER,
+            ClusterRole.ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER);
+    private static final Set<ClusterRole> IS_STANDBY = 
ImmutableSet.of(ClusterRole.STANDBY,
+            ClusterRole.DEGRADED_STANDBY, 
ClusterRole.DEGRADED_STANDBY_FOR_READER,
+            ClusterRole.DEGRADED_STANDBY_FOR_WRITER, 
ClusterRole.STANDBY_TO_ACTIVE);
+    private static final Set<ClusterRole> IS_MUTATION_BLOCKED = 
ImmutableSet.of(ClusterRole.ACTIVE_TO_STANDBY,
+            ClusterRole.ACTIVE_NOT_IN_SYNC_TO_STANDBY);
+
     /**
      * Enum for the current state of the cluster.  Exact meaning depends on 
the Policy but in general Active clusters
      * take traffic, standby and offline do not, and unknown is used if the 
state cannot be determined.
      */
     public enum ClusterRole {
-        ACTIVE, STANDBY, OFFLINE, UNKNOWN, ACTIVE_TO_STANDBY;
+        ABORT_TO_ACTIVE,
+        ABORT_TO_STANDBY,
+        ACTIVE,
+        ACTIVE_NOT_IN_SYNC,
+        ACTIVE_NOT_IN_SYNC_TO_STANDBY,
+        ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER,
+        ACTIVE_TO_STANDBY,
+        ACTIVE_WITH_OFFLINE_PEER,
+        DEGRADED_STANDBY,
+        DEGRADED_STANDBY_FOR_READER,
+        DEGRADED_STANDBY_FOR_WRITER,
+        OFFLINE,
+        STANDBY,
+        STANDBY_TO_ACTIVE,
+        UNKNOWN;
+
+        private Set<ClusterRole> allowedTransitions;
+
+        static {
+            // Initialize allowed transitions
+            ACTIVE_NOT_IN_SYNC.allowedTransitions = ImmutableSet.of(
+                    ACTIVE_NOT_IN_SYNC, ACTIVE,
+                    ACTIVE_NOT_IN_SYNC_TO_STANDBY, 
ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER
+            );
+
+            ACTIVE.allowedTransitions = ImmutableSet.of(
+                    ACTIVE_NOT_IN_SYNC, ACTIVE_WITH_OFFLINE_PEER, 
ACTIVE_TO_STANDBY
+            );
+
+            STANDBY.allowedTransitions = ImmutableSet.of(STANDBY_TO_ACTIVE,
+                    DEGRADED_STANDBY_FOR_READER, DEGRADED_STANDBY_FOR_WRITER);
+            // This needs to be manually recovered by operator
+            OFFLINE.allowedTransitions = ImmutableSet.of();
+            // This needs to be manually recovered by operator
+            UNKNOWN.allowedTransitions = ImmutableSet.of();
+            ACTIVE_TO_STANDBY.allowedTransitions = 
ImmutableSet.of(ABORT_TO_ACTIVE, STANDBY);
+            STANDBY_TO_ACTIVE.allowedTransitions = 
ImmutableSet.of(ABORT_TO_STANDBY, ACTIVE);
+            DEGRADED_STANDBY.allowedTransitions
+                    = ImmutableSet.of(DEGRADED_STANDBY_FOR_READER, 
DEGRADED_STANDBY_FOR_WRITER);
+            DEGRADED_STANDBY_FOR_WRITER.allowedTransitions = 
ImmutableSet.of(STANDBY, DEGRADED_STANDBY);
+            DEGRADED_STANDBY_FOR_READER.allowedTransitions = 
ImmutableSet.of(STANDBY, DEGRADED_STANDBY);
+            ACTIVE_WITH_OFFLINE_PEER.allowedTransitions = 
ImmutableSet.of(ACTIVE);
+            ABORT_TO_ACTIVE.allowedTransitions = ImmutableSet.of(ACTIVE, 
ACTIVE_NOT_IN_SYNC);
+            ABORT_TO_STANDBY.allowedTransitions = ImmutableSet.of(STANDBY);
+            ACTIVE_NOT_IN_SYNC_TO_STANDBY.allowedTransitions = 
ImmutableSet.of(ACTIVE_TO_STANDBY, ACTIVE_NOT_IN_SYNC);
+            ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER.allowedTransitions = 
ImmutableSet.of(ACTIVE_NOT_IN_SYNC);
+        }
+
+        /**
+         * Get the wait time required to transition from this role to the 
target role,
+         * reading from configuration.
+         * @param targetRole the role to transition to
+         * @param conf configuration to read from
+         * @return wait time in milliseconds, or 0 if transition is not allowed
+         */
+        public long checkTransitionAndGetWaitTime(ClusterRole targetRole, 
Configuration conf)
+                throws InvalidClusterRoleTransitionException {
+            if (!allowedTransitions.contains(targetRole)) {
+                throw new InvalidClusterRoleTransitionException("Cannot 
transition from " + this + " to " + targetRole);
+            }
+
+            // Read wait times from configuration based on the transition
+            switch (this) {
+                case ACTIVE_NOT_IN_SYNC:
+                    if (targetRole == ACTIVE) {
+                        return conf.getLong(HA_SYNC_MODE_REFRESH_INTERVAL_MS, 
DEFAULT_HA_SYNC_MODE_REFRESH_INTERVAL_MS);

Review Comment:
   Does it need to check the conf object everytime?



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java:
##########
@@ -435,9 +448,28 @@ static Optional<String> getFallbackCluster(String url, 
Properties properties) th
     @SuppressWarnings("UnstableApiUsage")
     public static CuratorFramework getCurator(String jdbcUrl, Properties 
properties)
             throws IOException {
+        return getCurator(jdbcUrl, properties, 
PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE);
+    }
+
+    /**
+     * Get an active curator ZK client for the given properties, ZK endpoint 
and namespace.
+     * <p>
+     * This can be from cached object since Curator should be shared per 
cluster.
+     *
+     * @param jdbcUrl    the ZK endpoint host:port or the JDBC connection 
String host:port:/hbase
+     * @param properties the properties defining time out values and retry 
count
+     * @param namespace  the ZooKeeper namespace to use, defaults to 
PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE if null
+     * @return a new Curator framework client
+     */
+    @SuppressWarnings("UnstableApiUsage")
+    public static CuratorFramework getCurator(String jdbcUrl, Properties 
properties, String namespace)
+            throws IOException {
+        // Use namespace as part of cache key to avoid conflicts between 
different namespaces
+        String effectiveNamespace = namespace != null ? namespace : 
PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE;

Review Comment:
   Why the need for null check on namespace, can we require and expect 
namespace to be passed in if not throw an exception?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java:
##########
@@ -119,10 +121,15 @@ public void invalidateHAGroupStoreClient(RpcController 
controller,
             RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest 
request,
             
RpcCallback<RegionServerEndpointProtos.InvalidateHAGroupStoreClientResponse> 
done) {
         LOGGER.info("PhoenixRegionServerEndpoint invalidating 
HAGroupStoreClient");
-        HAGroupStoreManager haGroupStoreManager;
         try {
-            haGroupStoreManager = HAGroupStoreManager.getInstance(conf);
-            haGroupStoreManager.invalidateHAGroupStoreClient();
+            Optional<HAGroupStoreManager> haGroupStoreManagerOptional
+                    = HAGroupStoreManagerFactory.getInstance(conf);

Review Comment:
   Can we try and reduce the conf lookups? The necessary parameters can be 
looked up once and stored during start



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java:
##########
@@ -498,7 +535,7 @@ private static CuratorFramework createCurator(String 
jdbcUrl, Properties propert
         CuratorFramework curator = CuratorFrameworkFactory
                 .builder()
                 .connectString(zkUrl)
-                .namespace(PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE)
+                .namespace(namespace != null ? namespace : 
PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE)

Review Comment:
   Same thoughts here, namespace argument should not be null



##########
phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.jdbc;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.util.JacksonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Immutable class representing an HA group store record with simplified 
fields.
+ *
+ * This is a simplified version of ClusterRoleRecord that contains essential
+ * information about an HA group only for a single cluster.
+ *
+ * This class is immutable.
+ */
+public class HAGroupStoreRecord {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HAGroupStoreRecord.class);
+    public static final String DEFAULT_PROTOCOL_VERSION = "1.0";
+
+    private final String protocolVersion;
+    private final String haGroupName;
+    private final ClusterRoleRecord.ClusterRole clusterRole;
+    private final long version;
+    private final String policy;
+    private final long lastUpdatedTimeInMs;
+    private final String peerZKUrl;
+
+    @JsonCreator
+    public HAGroupStoreRecord(@JsonProperty("protocolVersion") String 
protocolVersion,
+                              @JsonProperty("haGroupName") String haGroupName,
+                              @JsonProperty("clusterRole") 
ClusterRoleRecord.ClusterRole clusterRole,
+                              @JsonProperty("version") long version,
+                              @JsonProperty("policy") String policy,
+                              @JsonProperty("lastUpdatedTimeInMs") long 
lastUpdatedTimeInMs,
+                              @JsonProperty("peerZKUrl") String peerZKUrl) {
+        Preconditions.checkNotNull(haGroupName, "HA group name cannot be 
null!");
+        Preconditions.checkNotNull(clusterRole, "Cluster role cannot be 
null!");
+        Preconditions.checkNotNull(policy, "Policy cannot be null!");
+
+        this.protocolVersion = Objects.toString(protocolVersion, 
DEFAULT_PROTOCOL_VERSION);
+        this.haGroupName = haGroupName;
+        this.clusterRole = clusterRole;
+        this.version = version;
+        this.policy = policy;
+        this.lastUpdatedTimeInMs = lastUpdatedTimeInMs;
+        this.peerZKUrl = peerZKUrl;
+    }
+
+    public static Optional<HAGroupStoreRecord> fromJson(byte[] bytes) {
+        if (bytes == null) {
+            return Optional.empty();
+        }
+        try {
+            return 
Optional.of(JacksonUtil.getObjectReader(HAGroupStoreRecord.class).readValue(bytes));
+        } catch (Exception e) {
+            LOG.error("Fail to deserialize data to an HA group store record", 
e);
+            return Optional.empty();
+        }
+    }
+
+    public static byte[] toJson(HAGroupStoreRecord record) throws IOException {
+        return JacksonUtil.getObjectWriter().writeValueAsBytes(record);
+    }
+
+    /**
+     * @return true if this record is newer than the given record.
+     */
+    public boolean isNewerThan(HAGroupStoreRecord other) {

Review Comment:
   Would it make sense to implement the Comparable interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to