This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 6bdc88804 Expose addListener and support listening on single instance
config (#2752)
6bdc88804 is described below
commit 6bdc888042daefa7e75d93df44270db6d4a26849
Author: Zachary Pinto <[email protected]>
AuthorDate: Mon Feb 26 17:47:22 2024 -0800
Expose addListener and support listening on single instance config (#2752)
Expose addListener method in HelixManager to allow users to register any
listeners that implement one of the Listener interfaces. Add support in
CallbackHandler for InstanceConfigChangeListener to be used on a single
instance config.
---
.../main/java/org/apache/helix/HelixManager.java | 14 +++-
.../apache/helix/manager/zk/CallbackHandler.java | 10 ++-
.../apache/helix/manager/zk/ZKHelixManager.java | 3 +-
.../event/MockCloudEventAwareHelixManager.java | 7 ++
.../controller/stages/DummyClusterManager.java | 7 ++
.../rebalancer/TestInstanceOperation.java | 74 ++++++++++++++++++----
.../java/org/apache/helix/mock/MockManager.java | 8 ++-
.../helix/participant/MockZKHelixManager.java | 7 ++
8 files changed, 113 insertions(+), 17 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java
b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index b5c2a6448..c1d2ad18c 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -51,7 +51,7 @@ import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
-
+import org.apache.zookeeper.Watcher;
/**
* Class that represents the Helix Agent.
@@ -108,6 +108,18 @@ public interface HelixManager {
*/
void disconnect();
+ /**
+ * Add a change listener on the specified propertyKey for the specified
+ * changeType and eventTypes.
+ * @see org.apache.helix.api.listeners for the list of available listeners
+ * @param listener the listener to add
+ * @param propertyKey the property key to listen to
+ * @param changeType the type of change to listen to
+ * @param eventType the event type to listen for
+ */
+ void addListener(Object listener, PropertyKey propertyKey,
HelixConstants.ChangeType changeType,
+ Watcher.Event.EventType[] eventType);
+
/**
* @see IdealStateChangeListener#onIdealStateChange(List,
NotificationContext)
* @param listener
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 2a19a7a4e..c2809620e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -358,7 +358,15 @@ public class CallbackHandler implements IZkChildListener,
IZkDataListener {
configChangeListener.onConfigChange(configs, changeContext);
} else if (_listener instanceof InstanceConfigChangeListener) {
InstanceConfigChangeListener listener =
(InstanceConfigChangeListener) _listener;
- List<InstanceConfig> configs = preFetch(_propertyKey);
+ List<InstanceConfig> configs = Collections.emptyList();
+ if (_propertyKey.getParams().length > 2 && _preFetchEnabled) {
+ // If there are more than 2 params, that means the property key is
for a specific instance
+ // and will not have children.
+ InstanceConfig config = _accessor.getProperty(_propertyKey);
+ configs = config != null ? Collections.singletonList(config) :
Collections.emptyList();
+ } else {
+ configs = preFetch(_propertyKey);
+ }
listener.onInstanceConfigChange(configs, changeContext);
}
} else if (_changeType == RESOURCE_CONFIG) {
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 7a62d1923..3473bed08 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -419,7 +419,8 @@ public class ZKHelixManager implements HelixManager,
IZkStateListener {
}
}
- void addListener(Object listener, PropertyKey propertyKey, ChangeType
changeType,
+ @Override
+ public void addListener(Object listener, PropertyKey propertyKey, ChangeType
changeType,
EventType[] eventType) {
checkConnected(_waitForConnectedTimeout);
diff --git
a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
index de46257d3..740c932fc 100644
---
a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
+++
b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventAwareHelixManager.java
@@ -26,6 +26,7 @@ import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixCloudProperty;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
@@ -59,6 +60,7 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.zookeeper.Watcher;
public class MockCloudEventAwareHelixManager implements HelixManager {
private final HelixManagerProperty _helixManagerProperty;
@@ -110,6 +112,11 @@ public class MockCloudEventAwareHelixManager implements
HelixManager {
return false;
}
+ @Override
+ public void addListener(Object listener, PropertyKey propertyKey,
HelixConstants.ChangeType changeType,
+ Watcher.Event.EventType[] eventType) {
+ }
+
@Override
public void addIdealStateChangeListener(IdealStateChangeListener listener)
throws Exception {
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index 5d23a321c..4903a3d8b 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
@@ -53,6 +54,7 @@ import
org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.zookeeper.Watcher;
public class DummyClusterManager implements HelixManager {
HelixDataAccessor _accessor;
@@ -303,6 +305,11 @@ public class DummyClusterManager implements HelixManager {
return null;
}
+ @Override
+ public void addListener(Object listener, PropertyKey propertyKey,
HelixConstants.ChangeType changeType,
+ Watcher.Event.EventType[] eventType) {
+ }
+
@Override
public void addInstanceConfigChangeListener(InstanceConfigChangeListener
listener)
throws Exception {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 276757741..010153e64 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
@@ -25,8 +26,10 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
+import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.constants.InstanceConstants;
import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
@@ -54,6 +57,7 @@ import org.apache.helix.spectator.RoutingTableProvider;
import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -546,12 +550,16 @@ public class TestInstanceOperation extends ZkTestBase {
validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
+ CustomIndividualInstanceConfigChangeListener
instanceToSwapInInstanceConfigListener =
+ new CustomIndividualInstanceConfigChangeListener();
// Add instance with InstanceOperation set to SWAP_IN
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
swapOutInstancesToSwapInInstances.put(instanceToSwapOutName,
instanceToSwapInName);
addParticipant(instanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ InstanceConstants.InstanceOperation.SWAP_IN, true, -1,
instanceToSwapInInstanceConfigListener);
+
+
Assert.assertFalse(instanceToSwapInInstanceConfigListener.isThrottlesEnabled());
// Validate that partitions on SWAP_OUT instance does not change after
setting the InstanceOperation to SWAP_OUT
// and adding the SWAP_IN instance to the cluster.
@@ -583,6 +591,9 @@ public class TestInstanceOperation extends ZkTestBase {
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME,
instanceToSwapOutName).getInstanceEnabled());
+ // Check to make sure the throttle was enabled again after the swap was
completed.
+
Assert.assertTrue(instanceToSwapInInstanceConfigListener.isThrottlesEnabled());
+
// Validate that the SWAP_IN instance has the same partitions the SWAP_OUT
instance had before
// swap was completed.
verifier(() -> (validateEVsCorrect(getEVs(), originalEVs,
swapOutInstancesToSwapInInstances,
@@ -1095,7 +1106,7 @@ public class TestInstanceOperation extends ZkTestBase {
}
@Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapWithSwapOutInstanceOffline")
- public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() {
+ public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() throws
Exception {
System.out.println(
"START
TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at "
+ new Date(System.currentTimeMillis()));
@@ -1113,7 +1124,7 @@ public class TestInstanceOperation extends ZkTestBase {
}
@Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet")
- public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() {
+ public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() throws
Exception {
System.out.println(
"START
TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at "
+ new Date(System.currentTimeMillis()));
@@ -1136,7 +1147,7 @@ public class TestInstanceOperation extends ZkTestBase {
}
@Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet")
- public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() {
+ public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut()
throws Exception {
System.out.println(
"START
TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut()
at "
+ new Date(System.currentTimeMillis()));
@@ -1380,7 +1391,7 @@ public class TestInstanceOperation extends ZkTestBase {
}
@Test(expectedExceptions = HelixException.class, dependsOnMethods =
"testEvacuationWithOfflineInstancesInCluster")
- public void testSwapEvacuateAddRemoveEvacuate() {
+ public void testSwapEvacuateAddRemoveEvacuate() throws Exception {
System.out.println("START
TestInstanceOperation.testSwapEvacuateAddRemoveEvacuate() at " + new Date(
System.currentTimeMillis()));
removeOfflineOrDisabledOrSwapInInstances();
@@ -1426,9 +1437,33 @@ public class TestInstanceOperation extends ZkTestBase {
}, timeout));
}
- private MockParticipantManager createParticipant(String participantName) {
+ private static class CustomIndividualInstanceConfigChangeListener implements
InstanceConfigChangeListener {
+ private boolean throttlesEnabled;
+
+ public CustomIndividualInstanceConfigChangeListener() {
+ throttlesEnabled = true;
+ }
+
+ public boolean isThrottlesEnabled() {
+ return throttlesEnabled;
+ }
+
+ @Override
+ public void onInstanceConfigChange(List<InstanceConfig> instanceConfig,
+ NotificationContext context) {
+ if (instanceConfig.get(0).getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+ throttlesEnabled = false;
+ } else if (instanceConfig.get(0).getInstanceOperation().isEmpty()) {
+ throttlesEnabled = true;
+ }
+ }
+ }
+
+ private MockParticipantManager createParticipant(String participantName)
throws Exception {
// start dummy participants
- MockParticipantManager participant = new MockParticipantManager(ZK_ADDR,
CLUSTER_NAME, participantName);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName, 10,
null);
StateMachineEngine stateMachine = participant.getStateMachineEngine();
// Using a delayed state model
StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory();
@@ -1436,8 +1471,20 @@ public class TestInstanceOperation extends ZkTestBase {
return participant;
}
+ private void addParticipant(String participantName) throws Exception {
+ addParticipant(participantName, UUID.randomUUID().toString(),
+ "zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
+ }
+
private void addParticipant(String participantName, String logicalId, String
zone,
- InstanceConstants.InstanceOperation instanceOperation, boolean enabled,
int capacity) {
+ InstanceConstants.InstanceOperation instanceOperation, boolean enabled,
int capacity)
+ throws Exception {
+ addParticipant(participantName, logicalId, zone, instanceOperation,
enabled, capacity, null);
+ }
+
+ private void addParticipant(String participantName, String logicalId, String
zone,
+ InstanceConstants.InstanceOperation instanceOperation, boolean enabled,
int capacity,
+ InstanceConfigChangeListener listener) throws Exception {
InstanceConfig config = new InstanceConfig.Builder().setDomain(
String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST,
participantName, LOGICAL_ID,
logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
@@ -1451,16 +1498,17 @@ public class TestInstanceOperation extends ZkTestBase {
MockParticipantManager participant = createParticipant(participantName);
participant.syncStart();
+ if (listener != null) {
+ participant.addListener(listener,
+ new
PropertyKey.Builder(CLUSTER_NAME).instanceConfig(participantName),
+ HelixConstants.ChangeType.INSTANCE_CONFIG,
+ new
Watcher.Event.EventType[]{Watcher.Event.EventType.NodeDataChanged});
+ }
_participants.add(participant);
_participantNames.add(participantName);
_nextStartPort++;
}
- private void addParticipant(String participantName) {
- addParticipant(participantName, UUID.randomUUID().toString(),
- "zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
- }
-
private void createTestDBs(long delayTime) throws InterruptedException {
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED",
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS,
REPLICA, REPLICA - 1, -1,
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
index afa8d9880..b2d474a52 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockManager.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
@@ -56,7 +57,7 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
-
+import org.apache.zookeeper.Watcher;
public class MockManager implements HelixManager {
MockAccessor accessor;
@@ -328,6 +329,11 @@ public class MockManager implements HelixManager {
return null;
}
+ @Override
+ public void addListener(Object listener, PropertyKey propertyKey,
HelixConstants.ChangeType changeType,
+ Watcher.Event.EventType[] eventType) {
+ }
+
@Override
public void addInstanceConfigChangeListener(InstanceConfigChangeListener
listener)
throws Exception {
diff --git
a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 897d99dac..997216518 100644
---
a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
@@ -58,6 +59,7 @@ import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.TaskConstants;
+import org.apache.zookeeper.Watcher;
import org.testng.collections.Lists;
public class MockZKHelixManager implements HelixManager {
@@ -324,6 +326,11 @@ public class MockZKHelixManager implements HelixManager {
(ZkBaseDataAccessor<ZNRecord>) _accessor.getBaseDataAccessor(),
TaskConstants.REBALANCER_CONTEXT_ROOT, Lists.<String>newArrayList());
}
+ @Override
+ public void addListener(Object listener, PropertyKey propertyKey,
HelixConstants.ChangeType changeType,
+ Watcher.Event.EventType[] eventType) {
+ }
+
@Override
public void addInstanceConfigChangeListener(InstanceConfigChangeListener
listener)
throws Exception {