This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch customizeView
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/customizeView by this push:
new db5bbd3 Use updater to update customized state for concurrency
control (#859)
db5bbd3 is described below
commit db5bbd3ae8179559c459a168046b5a17ea3b1985
Author: Molly Gao <[email protected]>
AuthorDate: Wed Mar 11 10:45:58 2020 -0700
Use updater to update customized state for concurrency control (#859)
Currently the update customized state method is made synchronized for
concurrency control. This commit modifies the implementation of update to leave
the responsibility of concurrency control to ZooKeeper by using updater to
update the customize state. With delete method already implemented with
updater, we can prevent unexpected change of the customize state data.
---
.../customizedstate/CustomizedStateProvider.java | 23 +++++-----------------
.../paticipant/TestCustomizedStateUpdate.java | 21 ++++++++++----------
2 files changed, 16 insertions(+), 28 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
index 3807ea6..80f4d91 100644
---
a/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/customizedstate/CustomizedStateProvider.java
@@ -50,7 +50,7 @@ public class CustomizedStateProvider {
* Update a specific customized state based on the resource name and
partition name. The
* customized state is input as a single string
*/
- public synchronized void updateCustomizedState(String customizedStateName,
String resourceName,
+ public void updateCustomizedState(String customizedStateName, String
resourceName,
String partitionName, String customizedState) {
Map<String, String> customizedStateMap = new HashMap<>();
customizedStateMap.put(CustomizedState.CustomizedStateProperty.CURRENT_STATE.name(),
customizedState);
@@ -61,29 +61,16 @@ public class CustomizedStateProvider {
* Update a specific customized state based on the resource name and
partition name. The
* customized state is input as a map
*/
- public synchronized void updateCustomizedState(String customizedStateName,
String resourceName,
+ public void updateCustomizedState(String customizedStateName, String
resourceName,
String partitionName, Map<String, String> customizedStateMap) {
PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
PropertyKey propertyKey =
keyBuilder.customizedState(_instanceName, customizedStateName,
resourceName);
ZNRecord record = new ZNRecord(resourceName);
- Map<String, Map<String, String>> mapFields = new HashMap<>();
- CustomizedState existingState = getCustomizedState(customizedStateName,
resourceName);
- if (existingState != null
- &&
existingState.getRecord().getMapFields().containsKey(partitionName)) {
- Map<String, String> existingMap = new HashMap<>();
- for (String key : customizedStateMap.keySet()) {
- existingMap.put(key, customizedStateMap.get(key));
- }
-
- mapFields.put(partitionName, existingMap);
- } else {
- mapFields.put(partitionName, customizedStateMap);
- }
- record.setMapFields(mapFields);
+ record.setMapField(partitionName, customizedStateMap);
if (!_helixDataAccessor.updateProperty(propertyKey, new
CustomizedState(record))) {
- throw new HelixException(
- String.format("Failed to persist customized state %s to zk for
instance %s, resource %s",
+ throw new HelixException(String
+ .format("Failed to persist customized state %s to zk for instance
%s, resource %s",
customizedStateName, _instanceName, record.getId()));
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
index bc086ba..740f2fd 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestCustomizedStateUpdate.java
@@ -57,6 +57,8 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
private final String PARTITION_STATE = "partitionState";
private static HelixManager _manager;
private static CustomizedStateProvider _mockProvider;
+ private PropertyKey _propertyKey;
+ private HelixDataAccessor _dataAccessor;
@BeforeClass
public void beforeClass() throws Exception {
@@ -67,6 +69,9 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
_participants[0].connect();
_mockProvider = CustomizedStateProviderFactory.getInstance()
.buildCustomizedStateProvider(_manager,
_participants[0].getInstanceName());
+ _dataAccessor = _manager.getHelixDataAccessor();
+ _propertyKey = _dataAccessor.keyBuilder()
+ .customizedStates(_participants[0].getInstanceName(),
CUSTOMIZE_STATE_NAME);
}
@AfterClass
@@ -77,11 +82,8 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
@BeforeMethod
public void beforeMethod() {
- HelixDataAccessor dataAccessor = _manager.getHelixDataAccessor();
- PropertyKey propertyKey = dataAccessor.keyBuilder()
- .customizedStates(_participants[0].getInstanceName(),
CUSTOMIZE_STATE_NAME);
- dataAccessor.removeProperty(propertyKey);
- CustomizedState customizedStates = dataAccessor.getProperty(propertyKey);
+ _dataAccessor.removeProperty(_propertyKey);
+ CustomizedState customizedStates = _dataAccessor.getProperty(_propertyKey);
Assert.assertNull(customizedStates);
}
@@ -278,15 +280,14 @@ public class TestCustomizedStateUpdate extends
ZkStandAloneCMTestBase {
@Test
public void testSimultaneousUpdateCustomizedState() {
- int n = 10;
-
List<Callable<Boolean>> threads = new ArrayList<>();
- for (int i = 0; i < n; i++) {
+ int threadCount = 10;
+ for (int i = 0; i < threadCount; i++) {
threads.add(new TestSimultaneousUpdate());
}
Map<String, Boolean> resultMap =
TestHelper.startThreadsConcurrently(threads, 1000);
- Assert.assertEquals(resultMap.size(), n);
- Boolean[] results = new Boolean[n];
+ Assert.assertEquals(resultMap.size(), threadCount);
+ Boolean[] results = new Boolean[threadCount];
Arrays.fill(results, true);
Assert.assertEqualsNoOrder(resultMap.values().toArray(), results);
}