This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push: new f87d94859 Improve setInstanceOperation performance (#3017) f87d94859 is described below commit f87d948598b33de0788653c2438307bf089dece6 Author: Zachary Pinto <zapi...@linkedin.com> AuthorDate: Tue Apr 15 09:37:40 2025 -0700 Improve setInstanceOperation performance (#3017) This change will switch to parallel/async get on all instance configs, using HelixDataAccessor, and avoid calling findInstancesWithMatchingLogicalId for instance operation transitions where this check is not required. --- .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 9 +- .../java/org/apache/helix/util/InstanceUtil.java | 179 ++++++++++++++++----- 2 files changed, 142 insertions(+), 46 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index ae914682b..0d72ac4aa 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -211,7 +211,7 @@ public class ZKHelixAdmin implements HelixAdmin { } List<InstanceConfig> matchingLogicalIdInstances = - InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName, + InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, clusterName, instanceConfig); if (matchingLogicalIdInstances.size() > 1) { throw new HelixException( @@ -224,7 +224,8 @@ public class ZKHelixAdmin implements HelixAdmin { InstanceConstants.InstanceOperation attemptedInstanceOperation = instanceConfig.getInstanceOperation().getOperation(); try { - InstanceUtil.validateInstanceOperationTransition(_configAccessor, clusterName, instanceConfig, + InstanceUtil.validateInstanceOperationTransition(_baseDataAccessor, clusterName, + instanceConfig, InstanceConstants.InstanceOperation.UNKNOWN, attemptedInstanceOperation); } catch (HelixException e) { instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN); @@ -616,7 +617,7 @@ public class ZKHelixAdmin implements HelixAdmin { } List<InstanceConfig> swappingInstances = - InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName, + InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, clusterName, instanceConfig); if (swappingInstances.size() != 1) { logger.warn( @@ -655,7 +656,7 @@ public class ZKHelixAdmin implements HelixAdmin { } List<InstanceConfig> swappingInstances = - InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName, + InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, clusterName, instanceConfig); if (swappingInstances.size() != 1) { logger.warn( diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java index 967d561e7..e632f0857 100644 --- a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java @@ -20,23 +20,25 @@ package org.apache.helix.util; */ import java.util.List; -import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; + import com.google.common.collect.ImmutableMap; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.constants.InstanceConstants; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.helix.zookeeper.zkclient.DataUpdater; public class InstanceUtil { @@ -45,27 +47,43 @@ public class InstanceUtil { } // Validators for instance operation transitions - private static final Function<List<InstanceConfig>, Boolean> ALWAYS_ALLOWED = - (matchingInstances) -> true; - private static final Function<List<InstanceConfig>, Boolean> ALL_MATCHES_ARE_UNKNOWN = - (matchingInstances) -> matchingInstances.isEmpty() || matchingInstances.stream().allMatch( - instance -> instance.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.UNKNOWN)); - private static final Function<List<InstanceConfig>, Boolean> ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE = - (matchingInstances) -> matchingInstances.isEmpty() || matchingInstances.stream().allMatch( - instance -> instance.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.UNKNOWN) - || instance.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.EVACUATE)); - private static final Function<List<InstanceConfig>, Boolean> ANY_MATCH_ENABLE_OR_DISABLE = - (matchingInstances) -> !matchingInstances.isEmpty() && matchingInstances.stream().anyMatch( - instance -> instance.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.ENABLE) || instance.getInstanceOperation() - .getOperation().equals(InstanceConstants.InstanceOperation.DISABLE)); + private static final InstanceOperationValidator ALWAYS_ALLOWED = + (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> true; + private static final InstanceOperationValidator ALL_MATCHES_ARE_UNKNOWN = + (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> { + List<InstanceConfig> matchingInstances = + findInstancesWithMatchingLogicalId(baseDataAccessor, configAccessor, clusterName, + instanceConfig); + return matchingInstances.isEmpty() || matchingInstances.stream().allMatch( + instance -> instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.UNKNOWN)); + }; + private static final InstanceOperationValidator ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE = + (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> { + List<InstanceConfig> matchingInstances = + findInstancesWithMatchingLogicalId(baseDataAccessor, configAccessor, clusterName, + instanceConfig); + return matchingInstances.isEmpty() || matchingInstances.stream().allMatch(instance -> + instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.UNKNOWN) + || instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.EVACUATE)); + }; + private static final InstanceOperationValidator ANY_MATCH_ENABLE_OR_DISABLE = + (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> { + List<InstanceConfig> matchingInstances = + findInstancesWithMatchingLogicalId(baseDataAccessor, configAccessor, clusterName, + instanceConfig); + return !matchingInstances.isEmpty() && matchingInstances.stream().anyMatch(instance -> + instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.ENABLE) + || instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.DISABLE)); + }; // Validator map for valid instance operation transitions <currentOperation>:<targetOperation>:<validator> - private static final ImmutableMap<InstanceConstants.InstanceOperation, ImmutableMap<InstanceConstants.InstanceOperation, Function<List<InstanceConfig>, Boolean>>> - validInstanceOperationTransitions = + private static final ImmutableMap<InstanceConstants.InstanceOperation, ImmutableMap<InstanceConstants.InstanceOperation, InstanceOperationValidator>> + VALID_INSTANCE_OPERATION_TRANSITIONS = ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE, // ENABLE and DISABLE can be set to UNKNOWN when matching instance is in SWAP_IN and set to ENABLE in a transaction. ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE, ALWAYS_ALLOWED, @@ -100,22 +118,55 @@ public class InstanceUtil { * @param instanceConfig The current instance configuration * @param currentOperation The current operation * @param targetOperation The target operation + * @deprecated Use {@link #validateInstanceOperationTransition(BaseDataAccessor, String, InstanceConfig, InstanceConstants.InstanceOperation, InstanceConstants.InstanceOperation)} + * instead for better performance. */ + @Deprecated public static void validateInstanceOperationTransition(ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig, InstanceConstants.InstanceOperation currentOperation, InstanceConstants.InstanceOperation targetOperation) { - // Check if the current operation and target operation are in the valid transitions map - if (!validInstanceOperationTransitions.containsKey(currentOperation) - || !validInstanceOperationTransitions.get(currentOperation).containsKey(targetOperation)) { + + validateInstanceOperationTransition(null, configAccessor, clusterName, instanceConfig, + currentOperation, targetOperation); + } + + /** + * Validates if the transition from the current operation to the target operation is valid. + * + * @param baseDataAccessor The BaseDataAccessor instance + * @param clusterName The cluster name + * @param instanceConfig The current instance configuration + * @param currentOperation The current operation + * @param targetOperation The target operation + */ + public static void validateInstanceOperationTransition( + BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName, + InstanceConfig instanceConfig, + InstanceConstants.InstanceOperation currentOperation, + InstanceConstants.InstanceOperation targetOperation) { + + validateInstanceOperationTransition(baseDataAccessor, null, clusterName, instanceConfig, + currentOperation, targetOperation); + } + + private static void validateInstanceOperationTransition( + @Nullable BaseDataAccessor<ZNRecord> baseDataAccessor, + @Nullable ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig, + InstanceConstants.InstanceOperation currentOperation, + InstanceConstants.InstanceOperation targetOperation) { + ImmutableMap<InstanceConstants.InstanceOperation, InstanceOperationValidator> transitionMap = + VALID_INSTANCE_OPERATION_TRANSITIONS.get(currentOperation); + + if (transitionMap == null || !transitionMap.containsKey(targetOperation)) { throw new HelixException( "Invalid instance operation transition from " + currentOperation + " to " + targetOperation); } - // Throw exception if the validation fails - if (!validInstanceOperationTransitions.get(currentOperation).get(targetOperation) - .apply(findInstancesWithMatchingLogicalId(configAccessor, clusterName, instanceConfig))) { + InstanceOperationValidator validator = transitionMap.get(targetOperation); + if (validator == null || !validator.validate(baseDataAccessor, configAccessor, clusterName, + instanceConfig)) { throw new HelixException( "Failed validation for instance operation transition from " + currentOperation + " to " + targetOperation); @@ -130,6 +181,7 @@ public class InstanceUtil { * @param instanceConfig The instance configuration to match * @return A list of matching instances */ + @Deprecated public static List<InstanceConfig> findInstancesWithMatchingLogicalId( ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig) { String logicalIdKey = @@ -148,17 +200,57 @@ public class InstanceUtil { .collect(Collectors.toList()); } + /** + * Finds the instances that have a matching logical ID with the given instance. + * + * @param clusterName The cluster name + * @param instanceConfig The instance configuration to match + * @return A list of matching instances + */ + public static List<InstanceConfig> findInstancesWithMatchingLogicalId( + BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName, + InstanceConfig instanceConfig) { + HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor); + + ClusterConfig clusterConfig = + helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().clusterConfig()); + String logicalIdKey = + ClusterTopologyConfig.createFromClusterConfig(clusterConfig).getEndNodeType(); + + List<InstanceConfig> instanceConfigs = + helixDataAccessor.getChildValues(helixDataAccessor.keyBuilder().instanceConfigs(), true); + + // Retrieve and filter instances with matching logical ID + return instanceConfigs.stream().filter(potentialInstanceConfig -> + !potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName()) + && potentialInstanceConfig.getLogicalId(logicalIdKey) + .equals(instanceConfig.getLogicalId(logicalIdKey))).collect(Collectors.toList()); + } + + private static List<InstanceConfig> findInstancesWithMatchingLogicalId( + @Nullable BaseDataAccessor<ZNRecord> baseDataAccessor, + @Nullable ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig) { + if (baseDataAccessor == null && configAccessor == null) { + throw new HelixException( + "Both BaseDataAccessor and ConfigAccessor cannot be null at the same time"); + } + + return baseDataAccessor != null ? findInstancesWithMatchingLogicalId(baseDataAccessor, + clusterName, instanceConfig) + : findInstancesWithMatchingLogicalId(configAccessor, clusterName, instanceConfig); + } + /** * Sets the instance operation for the given instance. * * @param configAccessor The ConfigAccessor instance - * @param baseAccessor The BaseDataAccessor instance + * @param baseDataAccessor The BaseDataAccessor instance * @param clusterName The cluster name * @param instanceName The instance name * @param instanceOperation The instance operation to set */ public static void setInstanceOperation(ConfigAccessor configAccessor, - BaseDataAccessor<ZNRecord> baseAccessor, String clusterName, String instanceName, + BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName, String instanceName, InstanceConfig.InstanceOperation instanceOperation) { String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); @@ -170,24 +262,22 @@ public class InstanceUtil { } // Validate the instance operation transition - validateInstanceOperationTransition(configAccessor, clusterName, instanceConfig, + validateInstanceOperationTransition(baseDataAccessor, configAccessor, clusterName, + instanceConfig, instanceConfig.getInstanceOperation().getOperation(), instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE : instanceOperation.getOperation()); // Update the instance operation - boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { - if (currentData == null) { - throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName - + ", participant config is null"); - } - - InstanceConfig config = new InstanceConfig(currentData); - config.setInstanceOperation(instanceOperation); - return config.getRecord(); + boolean succeeded = baseDataAccessor.update(path, currentData -> { + if (currentData == null) { + throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName + + ", participant config is null"); } + + InstanceConfig config = new InstanceConfig(currentData); + config.setInstanceOperation(instanceOperation); + return config.getRecord(); }, AccessOption.PERSISTENT); if (!succeeded) { @@ -195,4 +285,9 @@ public class InstanceUtil { "Failed to update instance operation. Please check if instance is disabled."); } } + + private interface InstanceOperationValidator { + boolean validate(@Nullable BaseDataAccessor<ZNRecord> baseDataAccessor, + @Nullable ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig); + } }