This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new bf8aa6254 Extend CustomizedStateProvider APIs (#2724)
bf8aa6254 is described below
commit bf8aa62541861eaaf414af53e90bd3a93503d1df
Author: Grant Paláu Spencer <[email protected]>
AuthorDate: Wed Jan 24 21:30:33 2024 -0800
Extend CustomizedStateProvider APIs (#2724)
Extend CustomizedStateProvider APIs
---
.../customizedstate/CustomizedStateProvider.java | 27 ++++
.../paticipant/TestCustomizedStateUpdate.java | 167 ++++++++++++++++-----
2 files changed, 157 insertions(+), 37 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
index 55117d05a..69afc07c2 100644
---
a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
@@ -125,4 +125,31 @@ public class CustomizedStateProvider {
+ "partition %s", customizedStateName, _instanceName,
resourceName, partitionName));
}
}
+
+ /**
+ * Deletes the specified customized state for an entire resource on this
instance
+ */
+ public void deleteResourceCustomizedState(String customizedStateName, String
resourceName) {
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ PropertyKey resourceCustomizedStateKey =
+ accessor.keyBuilder().customizedState(_instanceName,
customizedStateName, resourceName);
+ if (!accessor.removeProperty(resourceCustomizedStateKey)) {
+ throw new HelixException(String.format(
+ "Failed to delete customized state %s in ZooKeeper for instance %s,
resource %s",
+ customizedStateName, _instanceName, resourceName));
+ }
+ }
+
+ /**
+ * Deletes the specified customized state for all resources on this instance
+ */
+ public void deleteAllResourcesCustomizedStates(String customizedStateName) {
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ PropertyKey customizedStateKey =
+ accessor.keyBuilder().customizedStates(_instanceName,
customizedStateName);
+ List<String> resourceNames = accessor.getChildNames(customizedStateKey);
+ for (String resourceName : resourceNames) {
+ deleteResourceCustomizedState(customizedStateName, resourceName);
+ }
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
index c9b741c7f..8753a2115 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
@@ -46,7 +46,8 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
private final String CUSTOMIZE_STATE_NAME = "testState1";
private final String PARTITION_NAME1 = "testPartition1";
private final String PARTITION_NAME2 = "testPartition2";
- private final String RESOURCE_NAME = "testResource1";
+ private final String RESOURCE_NAME1 = "testResource1";
+ private final String RESOURCE_NAME2 = "testResource2";
private final String PARTITION_STATE = "partitionState";
private static CustomizedStateProvider _mockProvider;
private PropertyKey _propertyKey;
@@ -77,13 +78,13 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
Map<String, String> customizedStateMap = new HashMap<>();
customizedStateMap.put("PREVIOUS_STATE", "STARTED");
customizedStateMap.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
- _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1,
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1,
customizedStateMap);
CustomizedState customizedState =
- _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1);
Assert.assertNotNull(customizedState);
- Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
+ Assert.assertEquals(customizedState.getId(), RESOURCE_NAME1);
Map<String, Map<String, String>> mapView =
customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 1);
Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
@@ -96,11 +97,11 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
Map<String, String> stateMap1 = new HashMap<>();
stateMap1.put("PREVIOUS_STATE", "END_OF_PUSH_RECEIVED");
_mockProvider
- .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1, stateMap1);
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1, stateMap1);
- customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME);
+ customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1);
Assert.assertNotNull(customizedState);
- Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
+ Assert.assertEquals(customizedState.getId(), RESOURCE_NAME1);
mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 1);
Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
@@ -113,11 +114,11 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
stateMap1.put("PREVIOUS_STATE", "END_OF_PUSH_RECEIVED");
stateMap1.put("CURRENT_STATE", "COMPLETED");
_mockProvider
- .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1, stateMap1);
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1, stateMap1);
- customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME);
+ customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1);
Assert.assertNotNull(customizedState);
- Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
+ Assert.assertEquals(customizedState.getId(), RESOURCE_NAME1);
mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 1);
Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME1);
@@ -130,52 +131,144 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
stateMap2.put("PREVIOUS_STATE", "STARTED");
stateMap2.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
_mockProvider
- .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME2, stateMap2);
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME2, stateMap2);
- customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME);
+ customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1);
Assert.assertNotNull(customizedState);
- Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
+ Assert.assertEquals(customizedState.getId(), RESOURCE_NAME1);
mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 2);
Assert.assertEqualsNoOrder(mapView.keySet().toArray(),
new String[] { PARTITION_NAME1, PARTITION_NAME2 });
Map<String, String> partitionMap1 = _mockProvider
- .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1);
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1);
Assert.assertEquals(partitionMap1.keySet().size(), 3);
Assert.assertEquals(partitionMap1.get("PREVIOUS_STATE"),
"END_OF_PUSH_RECEIVED");
Assert.assertEquals(partitionMap1.get("CURRENT_STATE"), "COMPLETED");
Map<String, String> partitionMap2 = _mockProvider
- .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME2);
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME2);
Assert.assertEquals(partitionMap2.keySet().size(), 3);
Assert.assertEquals(partitionMap2.get("PREVIOUS_STATE"), "STARTED");
Assert.assertEquals(partitionMap2.get("CURRENT_STATE"),
"END_OF_PUSH_RECEIVED");
- // test delete customized state for a partition
+ // test deletePerPartitionCustomizedState -- operates on a per partition
level
_mockProvider
- .deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME, PARTITION_NAME1);
- customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME);
+ .deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1, PARTITION_NAME1);
+ customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1);
Assert.assertNotNull(customizedState);
- Assert.assertEquals(customizedState.getId(), RESOURCE_NAME);
+ Assert.assertEquals(customizedState.getId(), RESOURCE_NAME1);
mapView = customizedState.getRecord().getMapFields();
Assert.assertEquals(mapView.keySet().size(), 1);
Assert.assertEquals(mapView.keySet().iterator().next(), PARTITION_NAME2);
_mockProvider
- .deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME, PARTITION_NAME2);
- customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME);
+ .deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1, PARTITION_NAME2);
+ customizedState = _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1);
Assert.assertNull(customizedState);
}
+ // Testing deleteResourceCustomizedState -- operates on the resource level
+ @Test
+ public void testDeleteResourceCustomizedState() {
+ // Set up tests by adding customized states
+ Map<String, String> customizedStateMap = new HashMap<>();
+ customizedStateMap.put("PREVIOUS_STATE", "STARTED");
+ customizedStateMap.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1,
+ customizedStateMap);
+
+ Map<String, String> stateMap2 = new HashMap<>();
+ stateMap2.put("PREVIOUS_STATE", "STARTED");
+ stateMap2.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
+ _mockProvider
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME2, stateMap2);
+
Assert.assertNotNull(_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1));
+
+ // test deleting customized states for entire resource
+ _mockProvider.deleteResourceCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1);
+ Assert.assertNull(_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1));
+
+ // test delete for resource that does not exist - should not throw error
+ String mock_resource_name = "foobar_resource";
+ String mock_customized_state_name = "foobar_state";
+ try {
+ Assert.assertNull(_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
mock_resource_name));
+ _mockProvider.deleteResourceCustomizedState(CUSTOMIZE_STATE_NAME,
mock_resource_name);
+ } catch (Exception e) {
+ Assert.fail(String.format("Exception should not have been thrown - %s",
e));
+ }
+
+ // test delete for customizedState that does not exist - should not throw
error
+ try {
+ PropertyKey key = _dataAccessor.keyBuilder()
+ .customizedStates(_participants[0].getInstanceName(),
mock_customized_state_name);
+ Assert.assertNull(_dataAccessor.getPropertyStat(key));
+ _mockProvider.deleteResourceCustomizedState(mock_customized_state_name,
mock_resource_name);
+ } catch (Exception e) {
+ Assert.fail(String.format("Exception should not have been thrown - %s",
e));
+ }
+ }
+
+ // Testing deleteAllResourcesCustomizedStates method -- operates on the
customizedState level
+ @Test
+ public void testDeleteAllResourcesCustomizedState() {
+ // Set up test by adding customized states for both resources
+ Map<String, String> customizedStateMap = new HashMap<>();
+ customizedStateMap.put("PREVIOUS_STATE", "STARTED");
+ customizedStateMap.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1,
+ customizedStateMap);
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME2,
PARTITION_NAME1,
+ customizedStateMap);
+
+ Map<String, String> stateMap2 = new HashMap<>();
+ stateMap2.put("PREVIOUS_STATE", "STARTED");
+ stateMap2.put("CURRENT_STATE", "END_OF_PUSH_RECEIVED");
+ _mockProvider
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME2, stateMap2);
+ _mockProvider
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME2,
PARTITION_NAME2, stateMap2);
+
Assert.assertNotNull(_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1));
+
Assert.assertNotNull(_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME2));
+
+ // Test deleting customized states for all resources
+ _mockProvider.deleteAllResourcesCustomizedStates(CUSTOMIZE_STATE_NAME);
+ Assert.assertNull(_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1));
+ Assert.assertNull(_mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME2));
+
+ // test delete when no resources exist - should not throw error
+ try {
+ PropertyKey key = _dataAccessor.keyBuilder()
+ .customizedStates(_participants[0].getInstanceName(),
CUSTOMIZE_STATE_NAME);
+ Assert.assertTrue(_dataAccessor.getChildNames(key).isEmpty());
+ _mockProvider.deleteAllResourcesCustomizedStates(CUSTOMIZE_STATE_NAME);
+ } catch (Exception e) {
+ Assert.fail(String.format("Exception should not have been thrown - %s",
e));
+ }
+
+ // test delete for customizedState that does not exist - should not throw
error
+ String mock_customized_state_name = "foobar_state";
+ try {
+ PropertyKey key = _dataAccessor.keyBuilder()
+ .customizedStates(_participants[0].getInstanceName(),
mock_customized_state_name);
+ Assert.assertNull(_dataAccessor.getPropertyStat(key));
+
_mockProvider.deleteAllResourcesCustomizedStates(mock_customized_state_name);
+ } catch (Exception e) {
+ Assert.fail(String.format("Exception should not have been thrown - %s",
e));
+ }
+
+ }
+
@Test
public void testUpdateSinglePartitionCustomizedState() {
- _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1,
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1,
PARTITION_STATE);
// get customized state
CustomizedState customizedState =
- _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1);
// START_TIME field is automatically updated for monitoring
Assert.assertEquals(
customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE)
@@ -198,21 +291,21 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
map = new HashMap<>();
map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(),
PARTITION_STATE);
Map<String, String> partitionCustomizedState = _mockProvider
- .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1);
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1);
partitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name());
Assert.assertEquals(partitionCustomizedState, map);
Assert.assertNull(_mockProvider
- .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME2));
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME2));
}
@Test
public void testUpdateSinglePartitionCustomizedStateWithNullField() {
_mockProvider
- .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1, (String) null);
+ .updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1, (String) null);
// get customized state
CustomizedState customizedState =
- _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1);
Map<String, String> map = new HashMap<>();
map.put(PARTITION_NAME1, null);
Assert.assertEquals(
@@ -225,21 +318,21 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
map = new HashMap<>();
map.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(),
null);
Map<String, String> partitionCustomizedState = _mockProvider
- .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1);
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1);
partitionCustomizedState.remove(CustomizedState.CustomizedStateProperty.START_TIME.name());
Assert.assertEquals(partitionCustomizedState, map);
Assert.assertNull(_mockProvider
- .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME2));
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME2));
}
@Test
public void testUpdateCustomizedStateWithEmptyMap() {
- _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1,
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1,
new HashMap<>());
// get customized state
CustomizedState customizedState =
- _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME);
+ _mockProvider.getCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1);
Assert.assertNull(customizedState.getState(PARTITION_NAME1));
Map<String, String> partitionStateMap =
customizedState.getPartitionStateMap(CustomizedState.CustomizedStateProperty.CURRENT_STATE);
@@ -252,23 +345,23 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
// get per partition customized state
Map<String, String> partitionCustomizedState = _mockProvider
- .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1);
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1);
// START_TIME field is automatically updated for monitoring
Assert.assertEquals(partitionCustomizedState.size(), 1);
Assert.assertNull(_mockProvider
- .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME2));
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME2));
}
@Test
public void testDeleteNonExistingPerPartitionCustomizedState() {
- _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1,
+ _mockProvider.updateCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1,
PARTITION_STATE);
_mockProvider
- .deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME, PARTITION_NAME2);
+ .deletePerPartitionCustomizedState(CUSTOMIZE_STATE_NAME,
RESOURCE_NAME1, PARTITION_NAME2);
Assert.assertNotNull(_mockProvider
- .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME1));
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME1));
Assert.assertNull(_mockProvider
- .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME,
PARTITION_NAME2));
+ .getPerPartitionCustomizedState(CUSTOMIZE_STATE_NAME, RESOURCE_NAME1,
PARTITION_NAME2));
}
@Test