This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new f87d94859 Improve setInstanceOperation performance (#3017)
f87d94859 is described below

commit f87d948598b33de0788653c2438307bf089dece6
Author: Zachary Pinto <zapi...@linkedin.com>
AuthorDate: Tue Apr 15 09:37:40 2025 -0700

    Improve setInstanceOperation performance (#3017)
    
    This change will switch to parallel/async get on all instance configs, 
using HelixDataAccessor, and avoid calling findInstancesWithMatchingLogicalId 
for instance operation transitions where this check is not required.
---
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |   9 +-
 .../java/org/apache/helix/util/InstanceUtil.java   | 179 ++++++++++++++++-----
 2 files changed, 142 insertions(+), 46 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index ae914682b..0d72ac4aa 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -211,7 +211,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     List<InstanceConfig> matchingLogicalIdInstances =
-        InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, 
clusterName,
+        InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, 
clusterName,
             instanceConfig);
     if (matchingLogicalIdInstances.size() > 1) {
       throw new HelixException(
@@ -224,7 +224,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     InstanceConstants.InstanceOperation attemptedInstanceOperation =
         instanceConfig.getInstanceOperation().getOperation();
     try {
-      InstanceUtil.validateInstanceOperationTransition(_configAccessor, 
clusterName, instanceConfig,
+      InstanceUtil.validateInstanceOperationTransition(_baseDataAccessor, 
clusterName,
+          instanceConfig,
           InstanceConstants.InstanceOperation.UNKNOWN, 
attemptedInstanceOperation);
     } catch (HelixException e) {
       
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
@@ -616,7 +617,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     List<InstanceConfig> swappingInstances =
-        InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, 
clusterName,
+        InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, 
clusterName,
             instanceConfig);
     if (swappingInstances.size() != 1) {
       logger.warn(
@@ -655,7 +656,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     List<InstanceConfig> swappingInstances =
-        InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, 
clusterName,
+        InstanceUtil.findInstancesWithMatchingLogicalId(_baseDataAccessor, 
clusterName,
             instanceConfig);
     if (swappingInstances.size() != 1) {
       logger.warn(
diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
index 967d561e7..e632f0857 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java
@@ -20,23 +20,25 @@ package org.apache.helix.util;
  */
 
 import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import javax.annotation.Nullable;
+
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.helix.zookeeper.zkclient.DataUpdater;
 
 public class InstanceUtil {
 
@@ -45,27 +47,43 @@ public class InstanceUtil {
   }
 
   // Validators for instance operation transitions
-  private static final Function<List<InstanceConfig>, Boolean> ALWAYS_ALLOWED =
-      (matchingInstances) -> true;
-  private static final Function<List<InstanceConfig>, Boolean> 
ALL_MATCHES_ARE_UNKNOWN =
-      (matchingInstances) -> matchingInstances.isEmpty() || 
matchingInstances.stream().allMatch(
-          instance -> instance.getInstanceOperation().getOperation()
-              .equals(InstanceConstants.InstanceOperation.UNKNOWN));
-  private static final Function<List<InstanceConfig>, Boolean> 
ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE =
-      (matchingInstances) -> matchingInstances.isEmpty() || 
matchingInstances.stream().allMatch(
-          instance -> instance.getInstanceOperation().getOperation()
-              .equals(InstanceConstants.InstanceOperation.UNKNOWN)
-              || instance.getInstanceOperation().getOperation()
-              .equals(InstanceConstants.InstanceOperation.EVACUATE));
-  private static final Function<List<InstanceConfig>, Boolean> 
ANY_MATCH_ENABLE_OR_DISABLE =
-      (matchingInstances) -> !matchingInstances.isEmpty() && 
matchingInstances.stream().anyMatch(
-          instance -> instance.getInstanceOperation().getOperation()
-              .equals(InstanceConstants.InstanceOperation.ENABLE) || 
instance.getInstanceOperation()
-              
.getOperation().equals(InstanceConstants.InstanceOperation.DISABLE));
+  private static final InstanceOperationValidator ALWAYS_ALLOWED =
+      (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> true;
+  private static final InstanceOperationValidator ALL_MATCHES_ARE_UNKNOWN =
+      (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> {
+        List<InstanceConfig> matchingInstances =
+            findInstancesWithMatchingLogicalId(baseDataAccessor, 
configAccessor, clusterName,
+                instanceConfig);
+        return matchingInstances.isEmpty() || 
matchingInstances.stream().allMatch(
+            instance -> instance.getInstanceOperation().getOperation()
+                .equals(InstanceConstants.InstanceOperation.UNKNOWN));
+      };
+  private static final InstanceOperationValidator 
ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE =
+      (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> {
+        List<InstanceConfig> matchingInstances =
+            findInstancesWithMatchingLogicalId(baseDataAccessor, 
configAccessor, clusterName,
+                instanceConfig);
+        return matchingInstances.isEmpty() || 
matchingInstances.stream().allMatch(instance ->
+            instance.getInstanceOperation().getOperation()
+                .equals(InstanceConstants.InstanceOperation.UNKNOWN)
+                || instance.getInstanceOperation().getOperation()
+                .equals(InstanceConstants.InstanceOperation.EVACUATE));
+      };
+  private static final InstanceOperationValidator ANY_MATCH_ENABLE_OR_DISABLE =
+      (baseDataAccessor, configAccessor, clusterName, instanceConfig) -> {
+        List<InstanceConfig> matchingInstances =
+            findInstancesWithMatchingLogicalId(baseDataAccessor, 
configAccessor, clusterName,
+                instanceConfig);
+        return !matchingInstances.isEmpty() && 
matchingInstances.stream().anyMatch(instance ->
+            instance.getInstanceOperation().getOperation()
+                .equals(InstanceConstants.InstanceOperation.ENABLE)
+                || instance.getInstanceOperation().getOperation()
+                .equals(InstanceConstants.InstanceOperation.DISABLE));
+      };
 
   // Validator map for valid instance operation transitions 
<currentOperation>:<targetOperation>:<validator>
-  private static final ImmutableMap<InstanceConstants.InstanceOperation, 
ImmutableMap<InstanceConstants.InstanceOperation, 
Function<List<InstanceConfig>, Boolean>>>
-      validInstanceOperationTransitions =
+  private static final ImmutableMap<InstanceConstants.InstanceOperation, 
ImmutableMap<InstanceConstants.InstanceOperation, InstanceOperationValidator>>
+      VALID_INSTANCE_OPERATION_TRANSITIONS =
       ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE,
       // ENABLE and DISABLE can be set to UNKNOWN when matching instance is in 
SWAP_IN and set to ENABLE in a transaction.
           ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE, 
ALWAYS_ALLOWED,
@@ -100,22 +118,55 @@ public class InstanceUtil {
    * @param instanceConfig   The current instance configuration
    * @param currentOperation The current operation
    * @param targetOperation  The target operation
+   * @deprecated Use {@link 
#validateInstanceOperationTransition(BaseDataAccessor, String, InstanceConfig, 
InstanceConstants.InstanceOperation, InstanceConstants.InstanceOperation)}
+   *        instead for better performance.
    */
+  @Deprecated
   public static void validateInstanceOperationTransition(ConfigAccessor 
configAccessor,
       String clusterName, InstanceConfig instanceConfig,
       InstanceConstants.InstanceOperation currentOperation,
       InstanceConstants.InstanceOperation targetOperation) {
-    // Check if the current operation and target operation are in the valid 
transitions map
-    if (!validInstanceOperationTransitions.containsKey(currentOperation)
-        || 
!validInstanceOperationTransitions.get(currentOperation).containsKey(targetOperation))
 {
+
+    validateInstanceOperationTransition(null, configAccessor, clusterName, 
instanceConfig,
+        currentOperation, targetOperation);
+  }
+
+  /**
+   * Validates if the transition from the current operation to the target 
operation is valid.
+   *
+   * @param baseDataAccessor The BaseDataAccessor instance
+   * @param clusterName      The cluster name
+   * @param instanceConfig   The current instance configuration
+   * @param currentOperation The current operation
+   * @param targetOperation  The target operation
+   */
+  public static void validateInstanceOperationTransition(
+      BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName,
+      InstanceConfig instanceConfig,
+      InstanceConstants.InstanceOperation currentOperation,
+      InstanceConstants.InstanceOperation targetOperation) {
+
+    validateInstanceOperationTransition(baseDataAccessor, null, clusterName, 
instanceConfig,
+        currentOperation, targetOperation);
+  }
+
+  private static void validateInstanceOperationTransition(
+      @Nullable BaseDataAccessor<ZNRecord> baseDataAccessor,
+      @Nullable ConfigAccessor configAccessor, String clusterName, 
InstanceConfig instanceConfig,
+      InstanceConstants.InstanceOperation currentOperation,
+      InstanceConstants.InstanceOperation targetOperation) {
+    ImmutableMap<InstanceConstants.InstanceOperation, 
InstanceOperationValidator> transitionMap =
+        VALID_INSTANCE_OPERATION_TRANSITIONS.get(currentOperation);
+
+    if (transitionMap == null || !transitionMap.containsKey(targetOperation)) {
       throw new HelixException(
           "Invalid instance operation transition from " + currentOperation + " 
to "
               + targetOperation);
     }
 
-    // Throw exception if the validation fails
-    if 
(!validInstanceOperationTransitions.get(currentOperation).get(targetOperation)
-        .apply(findInstancesWithMatchingLogicalId(configAccessor, clusterName, 
instanceConfig))) {
+    InstanceOperationValidator validator = transitionMap.get(targetOperation);
+    if (validator == null || !validator.validate(baseDataAccessor, 
configAccessor, clusterName,
+        instanceConfig)) {
       throw new HelixException(
           "Failed validation for instance operation transition from " + 
currentOperation + " to "
               + targetOperation);
@@ -130,6 +181,7 @@ public class InstanceUtil {
    * @param instanceConfig  The instance configuration to match
    * @return A list of matching instances
    */
+  @Deprecated
   public static List<InstanceConfig> findInstancesWithMatchingLogicalId(
       ConfigAccessor configAccessor, String clusterName, InstanceConfig 
instanceConfig) {
     String logicalIdKey =
@@ -148,17 +200,57 @@ public class InstanceUtil {
         .collect(Collectors.toList());
   }
 
+  /**
+   * Finds the instances that have a matching logical ID with the given 
instance.
+   *
+   * @param clusterName    The cluster name
+   * @param instanceConfig The instance configuration to match
+   * @return A list of matching instances
+   */
+  public static List<InstanceConfig> findInstancesWithMatchingLogicalId(
+      BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName,
+      InstanceConfig instanceConfig) {
+    HelixDataAccessor helixDataAccessor = new ZKHelixDataAccessor(clusterName, 
baseDataAccessor);
+
+    ClusterConfig clusterConfig =
+        
helixDataAccessor.getProperty(helixDataAccessor.keyBuilder().clusterConfig());
+    String logicalIdKey =
+        
ClusterTopologyConfig.createFromClusterConfig(clusterConfig).getEndNodeType();
+
+    List<InstanceConfig> instanceConfigs =
+        
helixDataAccessor.getChildValues(helixDataAccessor.keyBuilder().instanceConfigs(),
 true);
+
+    // Retrieve and filter instances with matching logical ID
+    return instanceConfigs.stream().filter(potentialInstanceConfig ->
+        
!potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())
+            && potentialInstanceConfig.getLogicalId(logicalIdKey)
+            
.equals(instanceConfig.getLogicalId(logicalIdKey))).collect(Collectors.toList());
+  }
+
+  private static List<InstanceConfig> findInstancesWithMatchingLogicalId(
+      @Nullable BaseDataAccessor<ZNRecord> baseDataAccessor,
+      @Nullable ConfigAccessor configAccessor, String clusterName, 
InstanceConfig instanceConfig) {
+    if (baseDataAccessor == null && configAccessor == null) {
+      throw new HelixException(
+          "Both BaseDataAccessor and ConfigAccessor cannot be null at the same 
time");
+    }
+
+    return baseDataAccessor != null ? 
findInstancesWithMatchingLogicalId(baseDataAccessor,
+        clusterName, instanceConfig)
+        : findInstancesWithMatchingLogicalId(configAccessor, clusterName, 
instanceConfig);
+  }
+
   /**
    * Sets the instance operation for the given instance.
    *
    * @param configAccessor      The ConfigAccessor instance
-   * @param baseAccessor        The BaseDataAccessor instance
+   * @param baseDataAccessor    The BaseDataAccessor instance
    * @param clusterName         The cluster name
    * @param instanceName        The instance name
    * @param instanceOperation   The instance operation to set
    */
   public static void setInstanceOperation(ConfigAccessor configAccessor,
-      BaseDataAccessor<ZNRecord> baseAccessor, String clusterName, String 
instanceName,
+      BaseDataAccessor<ZNRecord> baseDataAccessor, String clusterName, String 
instanceName,
       InstanceConfig.InstanceOperation instanceOperation) {
     String path = PropertyPathBuilder.instanceConfig(clusterName, 
instanceName);
 
@@ -170,24 +262,22 @@ public class InstanceUtil {
     }
 
     // Validate the instance operation transition
-    validateInstanceOperationTransition(configAccessor, clusterName, 
instanceConfig,
+    validateInstanceOperationTransition(baseDataAccessor, configAccessor, 
clusterName,
+        instanceConfig,
         instanceConfig.getInstanceOperation().getOperation(),
         instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE
             : instanceOperation.getOperation());
 
     // Update the instance operation
-    boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        if (currentData == null) {
-          throw new HelixException("Cluster: " + clusterName + ", instance: " 
+ instanceName
-              + ", participant config is null");
-        }
-
-        InstanceConfig config = new InstanceConfig(currentData);
-        config.setInstanceOperation(instanceOperation);
-        return config.getRecord();
+    boolean succeeded = baseDataAccessor.update(path, currentData -> {
+      if (currentData == null) {
+        throw new HelixException("Cluster: " + clusterName + ", instance: " + 
instanceName
+            + ", participant config is null");
       }
+
+      InstanceConfig config = new InstanceConfig(currentData);
+      config.setInstanceOperation(instanceOperation);
+      return config.getRecord();
     }, AccessOption.PERSISTENT);
 
     if (!succeeded) {
@@ -195,4 +285,9 @@ public class InstanceUtil {
           "Failed to update instance operation. Please check if instance is 
disabled.");
     }
   }
+
+  private interface InstanceOperationValidator {
+    boolean validate(@Nullable BaseDataAccessor<ZNRecord> baseDataAccessor,
+        @Nullable ConfigAccessor configAccessor, String clusterName, 
InstanceConfig instanceConfig);
+  }
 }

Reply via email to