jpisaac commented on code in PR #2224: URL: https://github.com/apache/phoenix/pull/2224#discussion_r2208457525
########## phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java: ########## @@ -621,10 +623,16 @@ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, } try { final Configuration conf = c.getEnvironment().getConfiguration(); - final HAGroupStoreManager haGroupStoreManager = HAGroupStoreManager.getInstance(conf); - if (haGroupStoreManager.isMutationBlocked()) { - throw new IOException("Blocking Mutation as Some CRRs are in ACTIVE_TO_STANDBY " - + "state and CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED is true"); + final Optional<HAGroupStoreManager> haGroupStoreManagerOptional + = HAGroupStoreManagerFactory.getInstance(conf); + if (!haGroupStoreManagerOptional.isPresent()) { + throw new IOException("HAGroupStoreManager is null " + + "for current cluster, check configuration"); + } + if (haGroupStoreManagerOptional.get().isMutationBlocked(conf)) { + throw new MutationBlockedIOException("Blocking Mutation as Some CRRs are in " Review Comment: nit: lowercase "some" ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/ClusterRoleRecord.java: ########## @@ -55,18 +65,136 @@ public class ClusterRoleRecord { private static final Logger LOG = LoggerFactory.getLogger(ClusterRoleRecord.class); + private static final Set<ClusterRole> CAN_CONNECT + = ImmutableSet.of(ClusterRole.ACTIVE, ClusterRole.ACTIVE_TO_STANDBY, + ClusterRole.STANDBY, ClusterRole.ACTIVE_NOT_IN_SYNC, + ClusterRole.ABORT_TO_ACTIVE, ClusterRole.ABORT_TO_STANDBY, + ClusterRole.DEGRADED_STANDBY, ClusterRole.DEGRADED_STANDBY_FOR_READER, + ClusterRole.DEGRADED_STANDBY_FOR_WRITER, ClusterRole.ACTIVE_WITH_OFFLINE_PEER, + ClusterRole.ACTIVE_NOT_IN_SYNC_TO_STANDBY, ClusterRole.STANDBY_TO_ACTIVE, + ClusterRole.ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER); + private static final Set<ClusterRole> IS_ACTIVE = ImmutableSet.of(ClusterRole.ACTIVE, + ClusterRole.ACTIVE_TO_STANDBY, ClusterRole.ACTIVE_NOT_IN_SYNC, + ClusterRole.ACTIVE_NOT_IN_SYNC_TO_STANDBY, ClusterRole.ACTIVE_WITH_OFFLINE_PEER, + ClusterRole.ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER); + private static final Set<ClusterRole> IS_STANDBY = ImmutableSet.of(ClusterRole.STANDBY, + ClusterRole.DEGRADED_STANDBY, ClusterRole.DEGRADED_STANDBY_FOR_READER, + ClusterRole.DEGRADED_STANDBY_FOR_WRITER, ClusterRole.STANDBY_TO_ACTIVE); + private static final Set<ClusterRole> IS_MUTATION_BLOCKED = ImmutableSet.of(ClusterRole.ACTIVE_TO_STANDBY, + ClusterRole.ACTIVE_NOT_IN_SYNC_TO_STANDBY); + /** * Enum for the current state of the cluster. Exact meaning depends on the Policy but in general Active clusters * take traffic, standby and offline do not, and unknown is used if the state cannot be determined. */ public enum ClusterRole { - ACTIVE, STANDBY, OFFLINE, UNKNOWN, ACTIVE_TO_STANDBY; + ABORT_TO_ACTIVE, + ABORT_TO_STANDBY, + ACTIVE, + ACTIVE_NOT_IN_SYNC, + ACTIVE_NOT_IN_SYNC_TO_STANDBY, + ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER, + ACTIVE_TO_STANDBY, + ACTIVE_WITH_OFFLINE_PEER, + DEGRADED_STANDBY, + DEGRADED_STANDBY_FOR_READER, + DEGRADED_STANDBY_FOR_WRITER, + OFFLINE, + STANDBY, + STANDBY_TO_ACTIVE, + UNKNOWN; + + private Set<ClusterRole> allowedTransitions; + + static { + // Initialize allowed transitions + ACTIVE_NOT_IN_SYNC.allowedTransitions = ImmutableSet.of( + ACTIVE_NOT_IN_SYNC, ACTIVE, + ACTIVE_NOT_IN_SYNC_TO_STANDBY, ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER + ); + + ACTIVE.allowedTransitions = ImmutableSet.of( + ACTIVE_NOT_IN_SYNC, ACTIVE_WITH_OFFLINE_PEER, ACTIVE_TO_STANDBY + ); + + STANDBY.allowedTransitions = ImmutableSet.of(STANDBY_TO_ACTIVE, + DEGRADED_STANDBY_FOR_READER, DEGRADED_STANDBY_FOR_WRITER); + // This needs to be manually recovered by operator + OFFLINE.allowedTransitions = ImmutableSet.of(); + // This needs to be manually recovered by operator + UNKNOWN.allowedTransitions = ImmutableSet.of(); + ACTIVE_TO_STANDBY.allowedTransitions = ImmutableSet.of(ABORT_TO_ACTIVE, STANDBY); + STANDBY_TO_ACTIVE.allowedTransitions = ImmutableSet.of(ABORT_TO_STANDBY, ACTIVE); + DEGRADED_STANDBY.allowedTransitions + = ImmutableSet.of(DEGRADED_STANDBY_FOR_READER, DEGRADED_STANDBY_FOR_WRITER); + DEGRADED_STANDBY_FOR_WRITER.allowedTransitions = ImmutableSet.of(STANDBY, DEGRADED_STANDBY); + DEGRADED_STANDBY_FOR_READER.allowedTransitions = ImmutableSet.of(STANDBY, DEGRADED_STANDBY); + ACTIVE_WITH_OFFLINE_PEER.allowedTransitions = ImmutableSet.of(ACTIVE); + ABORT_TO_ACTIVE.allowedTransitions = ImmutableSet.of(ACTIVE, ACTIVE_NOT_IN_SYNC); + ABORT_TO_STANDBY.allowedTransitions = ImmutableSet.of(STANDBY); + ACTIVE_NOT_IN_SYNC_TO_STANDBY.allowedTransitions = ImmutableSet.of(ACTIVE_TO_STANDBY, ACTIVE_NOT_IN_SYNC); + ACTIVE_NOT_IN_SYNC_WITH_OFFLINE_PEER.allowedTransitions = ImmutableSet.of(ACTIVE_NOT_IN_SYNC); + } + + /** + * Get the wait time required to transition from this role to the target role, + * reading from configuration. + * @param targetRole the role to transition to + * @param conf configuration to read from + * @return wait time in milliseconds, or 0 if transition is not allowed + */ + public long checkTransitionAndGetWaitTime(ClusterRole targetRole, Configuration conf) + throws InvalidClusterRoleTransitionException { + if (!allowedTransitions.contains(targetRole)) { + throw new InvalidClusterRoleTransitionException("Cannot transition from " + this + " to " + targetRole); + } + + // Read wait times from configuration based on the transition + switch (this) { + case ACTIVE_NOT_IN_SYNC: + if (targetRole == ACTIVE) { + return conf.getLong(HA_SYNC_MODE_REFRESH_INTERVAL_MS, DEFAULT_HA_SYNC_MODE_REFRESH_INTERVAL_MS); Review Comment: Does it need to check the conf object everytime? ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java: ########## @@ -435,9 +448,28 @@ static Optional<String> getFallbackCluster(String url, Properties properties) th @SuppressWarnings("UnstableApiUsage") public static CuratorFramework getCurator(String jdbcUrl, Properties properties) throws IOException { + return getCurator(jdbcUrl, properties, PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE); + } + + /** + * Get an active curator ZK client for the given properties, ZK endpoint and namespace. + * <p> + * This can be from cached object since Curator should be shared per cluster. + * + * @param jdbcUrl the ZK endpoint host:port or the JDBC connection String host:port:/hbase + * @param properties the properties defining time out values and retry count + * @param namespace the ZooKeeper namespace to use, defaults to PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE if null + * @return a new Curator framework client + */ + @SuppressWarnings("UnstableApiUsage") + public static CuratorFramework getCurator(String jdbcUrl, Properties properties, String namespace) + throws IOException { + // Use namespace as part of cache key to avoid conflicts between different namespaces + String effectiveNamespace = namespace != null ? namespace : PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE; Review Comment: Why the need for null check on namespace, can we require and expect namespace to be passed in if not throw an exception? ########## phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java: ########## @@ -119,10 +121,15 @@ public void invalidateHAGroupStoreClient(RpcController controller, RegionServerEndpointProtos.InvalidateHAGroupStoreClientRequest request, RpcCallback<RegionServerEndpointProtos.InvalidateHAGroupStoreClientResponse> done) { LOGGER.info("PhoenixRegionServerEndpoint invalidating HAGroupStoreClient"); - HAGroupStoreManager haGroupStoreManager; try { - haGroupStoreManager = HAGroupStoreManager.getInstance(conf); - haGroupStoreManager.invalidateHAGroupStoreClient(); + Optional<HAGroupStoreManager> haGroupStoreManagerOptional + = HAGroupStoreManagerFactory.getInstance(conf); Review Comment: Can we try and reduce the conf lookups? The necessary parameters can be looked up once and stored during start ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java: ########## @@ -498,7 +535,7 @@ private static CuratorFramework createCurator(String jdbcUrl, Properties propert CuratorFramework curator = CuratorFrameworkFactory .builder() .connectString(zkUrl) - .namespace(PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE) + .namespace(namespace != null ? namespace : PHOENIX_HA_ZOOKEEPER_ZNODE_NAMESPACE) Review Comment: Same thoughts here, namespace argument should not be null ########## phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HAGroupStoreRecord.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.jdbc; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.util.JacksonUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; +import java.util.Optional; + +/** + * Immutable class representing an HA group store record with simplified fields. + * + * This is a simplified version of ClusterRoleRecord that contains essential + * information about an HA group only for a single cluster. + * + * This class is immutable. + */ +public class HAGroupStoreRecord { + private static final Logger LOG = LoggerFactory.getLogger(HAGroupStoreRecord.class); + public static final String DEFAULT_PROTOCOL_VERSION = "1.0"; + + private final String protocolVersion; + private final String haGroupName; + private final ClusterRoleRecord.ClusterRole clusterRole; + private final long version; + private final String policy; + private final long lastUpdatedTimeInMs; + private final String peerZKUrl; + + @JsonCreator + public HAGroupStoreRecord(@JsonProperty("protocolVersion") String protocolVersion, + @JsonProperty("haGroupName") String haGroupName, + @JsonProperty("clusterRole") ClusterRoleRecord.ClusterRole clusterRole, + @JsonProperty("version") long version, + @JsonProperty("policy") String policy, + @JsonProperty("lastUpdatedTimeInMs") long lastUpdatedTimeInMs, + @JsonProperty("peerZKUrl") String peerZKUrl) { + Preconditions.checkNotNull(haGroupName, "HA group name cannot be null!"); + Preconditions.checkNotNull(clusterRole, "Cluster role cannot be null!"); + Preconditions.checkNotNull(policy, "Policy cannot be null!"); + + this.protocolVersion = Objects.toString(protocolVersion, DEFAULT_PROTOCOL_VERSION); + this.haGroupName = haGroupName; + this.clusterRole = clusterRole; + this.version = version; + this.policy = policy; + this.lastUpdatedTimeInMs = lastUpdatedTimeInMs; + this.peerZKUrl = peerZKUrl; + } + + public static Optional<HAGroupStoreRecord> fromJson(byte[] bytes) { + if (bytes == null) { + return Optional.empty(); + } + try { + return Optional.of(JacksonUtil.getObjectReader(HAGroupStoreRecord.class).readValue(bytes)); + } catch (Exception e) { + LOG.error("Fail to deserialize data to an HA group store record", e); + return Optional.empty(); + } + } + + public static byte[] toJson(HAGroupStoreRecord record) throws IOException { + return JacksonUtil.getObjectWriter().writeValueAsBytes(record); + } + + /** + * @return true if this record is newer than the given record. + */ + public boolean isNewerThan(HAGroupStoreRecord other) { Review Comment: Would it make sense to implement the Comparable interface? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org