This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new adb2e1d Fix invoke rebalance by "touching" IdealState/ResourceConfig
adb2e1d is described below
commit adb2e1dece415000f14189684f84601ad5a309d6
Author: Junkai Xue <[email protected]>
AuthorDate: Mon Jul 15 18:32:57 2019 -0700
Fix invoke rebalance by "touching" IdealState/ResourceConfig
Current HelixDataAccesor updateProperty uses ZNRecordUpdater. It's merge
logic just simply adding all elements when do a merge for ZNRecord. That could
cause lot of duplication of listFields.
This impact the invokeRebalanceForResourceConfig. The fix will be
implementing a customized updater.
In this commit:
1. Fix invoke rebalance with customized updater.
2. Add comments for ZNRecord merge.
3. Add checks in TaskUtil to only trigger Workflow Config "touch" when
purge job.
4. Add a test for RebalanceScheduler.
---
.../src/main/java/org/apache/helix/ZNRecord.java | 3 +
.../rebalancer/util/RebalanceScheduler.java | 10 ++-
.../rebalancer/TestZeroReplicaAvoidance.java | 2 +-
.../handling/TestResourceThreadpoolSize.java | 6 +-
.../apache/helix/util/TestRebalanceScheduler.java | 72 ++++++++++++++++++++++
5 files changed, 87 insertions(+), 6 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
index 9dc1d8d..bd9acbb 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -506,6 +506,9 @@ public class ZNRecord {
}
}
for (String key : record.listFields.keySet()) {
+ // Default merge logic could introduce duplicated values. For example,
old Record has list field
+ // with value [1, 2, 3]. New Record is exactly same as previous one.
Merged result will be
+ // [1, 2, 3, 1, 2, 3].
List<String> list = listFields.get(key);
if (list != null) {
list.addAll(record.listFields.get(key));
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
index ef2dc8d..e26de5b 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
@@ -1,8 +1,10 @@
package org.apache.helix.controller.rebalancer.util;
+import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
@@ -138,7 +140,9 @@ public class RebalanceScheduler {
PropertyKey key = accessor.keyBuilder().idealStates(resource);
IdealState is = accessor.getProperty(key);
if (is != null) {
- if (!accessor.updateProperty(key, is)) {
+ // Here it uses the updateProperty function with no-op DataUpdater.
Otherwise, it will use default
+ // ZNRecordUpdater which will duplicate elements for listFields.
+ if (!accessor.updateProperty(key, znRecord -> znRecord, is)) {
LOG.warn("Failed to invoke rebalance on resource {}", resource);
}
} else {
@@ -156,7 +160,9 @@ public class RebalanceScheduler {
PropertyKey key = accessor.keyBuilder().resourceConfig(resource);
ResourceConfig cfg = accessor.getProperty(key);
if (cfg != null) {
- if (!accessor.updateProperty(key, cfg)) {
+ // Here it uses the updateProperty function with no-op DataUpdater.
Otherwise, it will use default
+ // ZNRecordUpdater which will duplicate elements for listFields.
+ if (!accessor.updateProperty(key, znRecord -> znRecord, cfg)) {
LOG.warn("Failed to invoke rebalance on resource config {}", resource);
}
} else {
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
index 9ee4738..ab4a263 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
@@ -123,7 +123,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel,
partition, replica, replica,
0);
}
- Assert.assertTrue(_clusterVerifier.verify(50000L));
+ Assert.assertTrue(_clusterVerifier.verifyByPolling(50000L, 100L));
_startListen = true;
DelayedTransition.setDelay(5);
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index 46d67bb..9eeb6f5 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -81,7 +81,7 @@ public class TestResourceThreadpoolSize extends
ZkStandAloneCMTestBase {
Assert.assertTrue(taskcount >= numPartition * (numReplica + 1));
}
- @Test
+ @Test (dependsOnMethods = "TestThreadPoolSizeConfig")
public void TestCustomizedResourceThreadPool() {
int customizedPoolSize = 7;
int configuredPoolSize = 9;
@@ -134,7 +134,7 @@ public class TestResourceThreadpoolSize extends
ZkStandAloneCMTestBase {
}
}
- @Test
+ @Test (dependsOnMethods = "TestCustomizedResourceThreadPool")
public void TestPerStateTransitionTypeThreadPool() throws
InterruptedException {
String MASTER_SLAVE = "MasterSlave";
@@ -173,7 +173,7 @@ public class TestResourceThreadpoolSize extends
ZkStandAloneCMTestBase {
}
}
- @Test
+ @Test (dependsOnMethods = "TestPerStateTransitionTypeThreadPool")
public void testBatchMessageThreadPoolSize() throws InterruptedException {
int customizedPoolSize = 5;
_participants[0].getStateMachineEngine().registerStateModelFactory("OnlineOffline",
diff --git
a/helix-core/src/test/java/org/apache/helix/util/TestRebalanceScheduler.java
b/helix-core/src/test/java/org/apache/helix/util/TestRebalanceScheduler.java
new file mode 100644
index 0000000..2c73c5b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/util/TestRebalanceScheduler.java
@@ -0,0 +1,72 @@
+package org.apache.helix.util;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.commons.math.stat.inference.TestUtils;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.ResourceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRebalanceScheduler extends ZkTestBase {
+ private final String CLASS_NAME = getShortClassName();
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+ private HelixManager _manager;
+ private ConfigAccessor _configAccessor;
+ private final int NUM_ATTEMPTS = 10;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Test", InstanceType.ADMINISTRATOR,
ZK_ADDR);
+ _manager.connect();
+ _configAccessor = new ConfigAccessor(_gZkClient);
+ }
+
+ @Test
+ public void testInvokeRebalanceAndInvokeRebalanceForResource() {
+ String resourceName = "ResourceToInvoke";
+ _gSetupTool.getClusterManagementTool()
+ .addResource(CLUSTER_NAME, resourceName, 5, MasterSlaveSMD.name);
+ IdealState idealState =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
resourceName);
+
+ // Add listfields for ResourceConfig
+ ResourceConfig resourceConfig = new ResourceConfig(resourceName);
+ resourceConfig.setPreferenceLists(Collections.singletonMap("0",
Arrays.asList("1", "2", "3")));
+ _configAccessor.setResourceConfig(CLUSTER_NAME, resourceName,
resourceConfig);
+
+ int i = 0;
+ while (i++ < NUM_ATTEMPTS) {
+ RebalanceScheduler.invokeRebalance(_manager.getHelixDataAccessor(),
resourceName);
+ RebalanceScheduler
+ .invokeRebalanceForResourceConfig(_manager.getHelixDataAccessor(),
resourceName);
+ }
+
+ IdealState newIdealState =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
resourceName);
+ ResourceConfig newResourceConfig =
+ _configAccessor.getResourceConfig(CLUSTER_NAME, resourceName);
+
+ // Starting version should be 0 and finally the version should be same as
NUM_ATTEMPTS
+
Assert.assertTrue(idealState.getRecord().equals(newIdealState.getRecord()));
+ Assert.assertEquals(idealState.getStat().getVersion(), 0);
+ Assert.assertEquals(newIdealState.getStat().getVersion(), NUM_ATTEMPTS);
+
+
Assert.assertTrue(resourceConfig.getRecord().equals(newResourceConfig.getRecord()));
+ Assert.assertEquals(
+ resourceConfig.getStat().getVersion(), 0);
+ Assert.assertEquals(newResourceConfig.getStat().getVersion(),
NUM_ATTEMPTS);
+
+ }
+}