This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bdc88804 Expose addListener and support listening on single instance 
config (#2752)
6bdc88804 is described below

commit 6bdc888042daefa7e75d93df44270db6d4a26849
Author: Zachary Pinto <[email protected]>
AuthorDate: Mon Feb 26 17:47:22 2024 -0800

    Expose addListener and support listening on single instance config (#2752)
    
    Expose addListener method in HelixManager to allow users to register any 
listeners that implement one of the Listener interfaces. Add support in 
CallbackHandler for InstanceConfigChangeListener to be used on a single 
instance config.
---
 .../main/java/org/apache/helix/HelixManager.java   | 14 +++-
 .../apache/helix/manager/zk/CallbackHandler.java   | 10 ++-
 .../apache/helix/manager/zk/ZKHelixManager.java    |  3 +-
 .../event/MockCloudEventAwareHelixManager.java     |  7 ++
 .../controller/stages/DummyClusterManager.java     |  7 ++
 .../rebalancer/TestInstanceOperation.java          | 74 ++++++++++++++++++----
 .../java/org/apache/helix/mock/MockManager.java    |  8 ++-
 .../helix/participant/MockZKHelixManager.java      |  7 ++
 8 files changed, 113 insertions(+), 17 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java 
b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index b5c2a6448..c1d2ad18c 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -51,7 +51,7 @@ import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
-
+import org.apache.zookeeper.Watcher;
 
 /**
  * Class that represents the Helix Agent.
@@ -108,6 +108,18 @@ public interface HelixManager {
    */
   void disconnect();
 
+  /**
+   * Add a change listener on the specified propertyKey for the specified
+   * changeType and eventTypes.
+   * @see org.apache.helix.api.listeners for the list of available listeners
+   * @param listener the listener to add
+   * @param propertyKey the property key to listen to
+   * @param changeType the type of change to listen to
+   * @param eventType the event type to listen for
+   */
+  void addListener(Object listener, PropertyKey propertyKey, 
HelixConstants.ChangeType changeType,
+      Watcher.Event.EventType[] eventType);
+
   /**
    * @see IdealStateChangeListener#onIdealStateChange(List, 
NotificationContext)
    * @param listener
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 2a19a7a4e..c2809620e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -358,7 +358,15 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
           configChangeListener.onConfigChange(configs, changeContext);
         } else if (_listener instanceof InstanceConfigChangeListener) {
           InstanceConfigChangeListener listener = 
(InstanceConfigChangeListener) _listener;
-          List<InstanceConfig> configs = preFetch(_propertyKey);
+          List<InstanceConfig> configs = Collections.emptyList();
+          if (_propertyKey.getParams().length > 2 && _preFetchEnabled) {
+            // If there are more than 2 params, that means the property key is 
for a specific instance
+            // and will not have children.
+            InstanceConfig config = _accessor.getProperty(_propertyKey);
+            configs = config != null ? Collections.singletonList(config) : 
Collections.emptyList();
+          } else {
+            configs = preFetch(_propertyKey);
+          }
           listener.onInstanceConfigChange(configs, changeContext);
         }
       } else if (_changeType == RESOURCE_CONFIG) {
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 7a62d1923..3473bed08 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -419,7 +419,8 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     }
   }
 
-  void addListener(Object listener, PropertyKey propertyKey, ChangeType 
changeType,
+  @Override
+  public void addListener(Object listener, PropertyKey propertyKey, ChangeType 
changeType,
       EventType[] eventType) {
     checkConnected(_waitForConnectedTimeout);
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
 
b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
index de46257d3..740c932fc 100644
--- 
a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
@@ -26,6 +26,7 @@ import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixCloudProperty;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
@@ -59,6 +60,7 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.zookeeper.Watcher;
 
 public class MockCloudEventAwareHelixManager implements HelixManager {
   private final HelixManagerProperty _helixManagerProperty;
@@ -110,6 +112,11 @@ public class MockCloudEventAwareHelixManager implements 
HelixManager {
     return false;
   }
 
+  @Override
+  public void addListener(Object listener, PropertyKey propertyKey, 
HelixConstants.ChangeType changeType,
+      Watcher.Event.EventType[] eventType) {
+  }
+
   @Override
   public void addIdealStateChangeListener(IdealStateChangeListener listener) 
throws Exception {
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 5d23a321c..4903a3d8b 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
@@ -53,6 +54,7 @@ import 
org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.Watcher;
 
 public class DummyClusterManager implements HelixManager {
   HelixDataAccessor _accessor;
@@ -303,6 +305,11 @@ public class DummyClusterManager implements HelixManager {
     return null;
   }
 
+  @Override
+  public void addListener(Object listener, PropertyKey propertyKey, 
HelixConstants.ChangeType changeType,
+      Watcher.Event.EventType[] eventType) {
+  }
+
   @Override
   public void addInstanceConfigChangeListener(InstanceConfigChangeListener 
listener)
       throws Exception {
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 276757741..010153e64 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -25,8 +26,10 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.HelixRollbackException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
+import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.constants.InstanceConstants;
 import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
@@ -54,6 +57,7 @@ import org.apache.helix.spectator.RoutingTableProvider;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -546,12 +550,16 @@ public class TestInstanceOperation extends ZkTestBase {
     validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
+    CustomIndividualInstanceConfigChangeListener 
instanceToSwapInInstanceConfigListener =
+        new CustomIndividualInstanceConfigChangeListener();
     // Add instance with InstanceOperation set to SWAP_IN
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, 
instanceToSwapInName);
     addParticipant(instanceToSwapInName, 
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+        InstanceConstants.InstanceOperation.SWAP_IN, true, -1, 
instanceToSwapInInstanceConfigListener);
+
+    
Assert.assertFalse(instanceToSwapInInstanceConfigListener.isThrottlesEnabled());
 
     // Validate that partitions on SWAP_OUT instance does not change after 
setting the InstanceOperation to SWAP_OUT
     // and adding the SWAP_IN instance to the cluster.
@@ -583,6 +591,9 @@ public class TestInstanceOperation extends ZkTestBase {
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, 
instanceToSwapOutName).getInstanceEnabled());
 
+    // Check to make sure the throttle was enabled again after the swap was 
completed.
+    
Assert.assertTrue(instanceToSwapInInstanceConfigListener.isThrottlesEnabled());
+
     // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT 
instance had before
     // swap was completed.
     verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, 
swapOutInstancesToSwapInInstances,
@@ -1095,7 +1106,7 @@ public class TestInstanceOperation extends ZkTestBase {
   }
 
   @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapWithSwapOutInstanceOffline")
-  public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() {
+  public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() throws 
Exception {
     System.out.println(
         "START 
TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at "
             + new Date(System.currentTimeMillis()));
@@ -1113,7 +1124,7 @@ public class TestInstanceOperation extends ZkTestBase {
   }
 
   @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet")
-  public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() {
+  public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() throws 
Exception {
     System.out.println(
         "START 
TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at "
             + new Date(System.currentTimeMillis()));
@@ -1136,7 +1147,7 @@ public class TestInstanceOperation extends ZkTestBase {
   }
 
   @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet")
-  public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() {
+  public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() 
throws Exception {
     System.out.println(
         "START 
TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut()
 at "
             + new Date(System.currentTimeMillis()));
@@ -1380,7 +1391,7 @@ public class TestInstanceOperation extends ZkTestBase {
   }
 
   @Test(expectedExceptions = HelixException.class, dependsOnMethods = 
"testEvacuationWithOfflineInstancesInCluster")
-  public void testSwapEvacuateAddRemoveEvacuate() {
+  public void testSwapEvacuateAddRemoveEvacuate() throws Exception {
     System.out.println("START 
TestInstanceOperation.testSwapEvacuateAddRemoveEvacuate() at " + new Date(
         System.currentTimeMillis()));
     removeOfflineOrDisabledOrSwapInInstances();
@@ -1426,9 +1437,33 @@ public class TestInstanceOperation extends ZkTestBase {
     }, timeout));
   }
 
-  private MockParticipantManager createParticipant(String participantName) {
+  private static class CustomIndividualInstanceConfigChangeListener implements 
InstanceConfigChangeListener {
+    private boolean throttlesEnabled;
+
+    public CustomIndividualInstanceConfigChangeListener() {
+      throttlesEnabled = true;
+    }
+
+    public boolean isThrottlesEnabled() {
+      return throttlesEnabled;
+    }
+
+    @Override
+    public void onInstanceConfigChange(List<InstanceConfig> instanceConfig,
+        NotificationContext context) {
+      if (instanceConfig.get(0).getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+        throttlesEnabled = false;
+      } else if (instanceConfig.get(0).getInstanceOperation().isEmpty()) {
+        throttlesEnabled = true;
+      }
+    }
+  }
+
+  private MockParticipantManager createParticipant(String participantName) 
throws Exception {
     // start dummy participants
-    MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, participantName);
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName, 10, 
null);
     StateMachineEngine stateMachine = participant.getStateMachineEngine();
     // Using a delayed state model
     StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory();
@@ -1436,8 +1471,20 @@ public class TestInstanceOperation extends ZkTestBase {
     return participant;
   }
 
+  private void addParticipant(String participantName) throws Exception {
+    addParticipant(participantName, UUID.randomUUID().toString(),
+        "zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
+  }
+
   private void addParticipant(String participantName, String logicalId, String 
zone,
-      InstanceConstants.InstanceOperation instanceOperation, boolean enabled, 
int capacity) {
+      InstanceConstants.InstanceOperation instanceOperation, boolean enabled, 
int capacity)
+      throws Exception {
+    addParticipant(participantName, logicalId, zone, instanceOperation, 
enabled, capacity, null);
+  }
+
+  private void addParticipant(String participantName, String logicalId, String 
zone,
+      InstanceConstants.InstanceOperation instanceOperation, boolean enabled, 
int capacity,
+      InstanceConfigChangeListener listener) throws Exception {
     InstanceConfig config = new InstanceConfig.Builder().setDomain(
             String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, 
participantName, LOGICAL_ID,
                 
logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
@@ -1451,16 +1498,17 @@ public class TestInstanceOperation extends ZkTestBase {
     MockParticipantManager participant = createParticipant(participantName);
 
     participant.syncStart();
+    if (listener != null) {
+      participant.addListener(listener,
+          new 
PropertyKey.Builder(CLUSTER_NAME).instanceConfig(participantName),
+          HelixConstants.ChangeType.INSTANCE_CONFIG,
+          new 
Watcher.Event.EventType[]{Watcher.Event.EventType.NodeDataChanged});
+    }
     _participants.add(participant);
     _participantNames.add(participantName);
     _nextStartPort++;
   }
 
-  private void addParticipant(String participantName) {
-    addParticipant(participantName, UUID.randomUUID().toString(),
-        "zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
-  }
-
    private void createTestDBs(long delayTime) throws InterruptedException {
      createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED",
          BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, 
REPLICA, REPLICA - 1, -1,
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java 
b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
index afa8d9880..b2d474a52 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
@@ -56,7 +57,7 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
-
+import org.apache.zookeeper.Watcher;
 
 public class MockManager implements HelixManager {
   MockAccessor accessor;
@@ -328,6 +329,11 @@ public class MockManager implements HelixManager {
     return null;
   }
 
+  @Override
+  public void addListener(Object listener, PropertyKey propertyKey, 
HelixConstants.ChangeType changeType,
+      Watcher.Event.EventType[] eventType) {
+  }
+
   @Override
   public void addInstanceConfigChangeListener(InstanceConfigChangeListener 
listener)
       throws Exception {
diff --git 
a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java 
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 897d99dac..997216518 100644
--- 
a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerProperties;
@@ -58,6 +59,7 @@ import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.task.TaskConstants;
+import org.apache.zookeeper.Watcher;
 import org.testng.collections.Lists;
 
 public class MockZKHelixManager implements HelixManager {
@@ -324,6 +326,11 @@ public class MockZKHelixManager implements HelixManager {
         (ZkBaseDataAccessor<ZNRecord>) _accessor.getBaseDataAccessor(), 
TaskConstants.REBALANCER_CONTEXT_ROOT, Lists.<String>newArrayList());
   }
 
+  @Override
+  public void addListener(Object listener, PropertyKey propertyKey, 
HelixConstants.ChangeType changeType,
+      Watcher.Event.EventType[] eventType) {
+  }
+
   @Override
   public void addInstanceConfigChangeListener(InstanceConfigChangeListener 
listener)
       throws Exception {

Reply via email to