This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new adb2e1d  Fix invoke rebalance by "touching" IdealState/ResourceConfig
adb2e1d is described below

commit adb2e1dece415000f14189684f84601ad5a309d6
Author: Junkai Xue <[email protected]>
AuthorDate: Mon Jul 15 18:32:57 2019 -0700

    Fix invoke rebalance by "touching" IdealState/ResourceConfig
    
    Current HelixDataAccesor updateProperty uses ZNRecordUpdater. It's merge 
logic just simply adding all elements when do a merge for ZNRecord. That could 
cause lot of duplication of listFields.
    This impact the invokeRebalanceForResourceConfig. The fix will be 
implementing a customized updater.
    
    In this commit:
    1. Fix invoke rebalance with customized updater.
    2. Add comments for ZNRecord merge.
    3. Add checks in TaskUtil to only trigger Workflow Config "touch" when 
purge job.
    4. Add a test for RebalanceScheduler.
---
 .../src/main/java/org/apache/helix/ZNRecord.java   |  3 +
 .../rebalancer/util/RebalanceScheduler.java        | 10 ++-
 .../rebalancer/TestZeroReplicaAvoidance.java       |  2 +-
 .../handling/TestResourceThreadpoolSize.java       |  6 +-
 .../apache/helix/util/TestRebalanceScheduler.java  | 72 ++++++++++++++++++++++
 5 files changed, 87 insertions(+), 6 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java 
b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
index 9dc1d8d..bd9acbb 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -506,6 +506,9 @@ public class ZNRecord {
       }
     }
     for (String key : record.listFields.keySet()) {
+      // Default merge logic could introduce duplicated values. For example, 
old Record has list field
+      // with value [1, 2, 3]. New Record is exactly same as previous one. 
Merged result will be
+      // [1, 2, 3, 1, 2, 3].
       List<String> list = listFields.get(key);
       if (list != null) {
         list.addAll(record.listFields.get(key));
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
index ef2dc8d..e26de5b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/RebalanceScheduler.java
@@ -1,8 +1,10 @@
 package org.apache.helix.controller.rebalancer.util;
 
+import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
 
 import org.apache.helix.model.ResourceConfig;
@@ -138,7 +140,9 @@ public class RebalanceScheduler {
     PropertyKey key = accessor.keyBuilder().idealStates(resource);
     IdealState is = accessor.getProperty(key);
     if (is != null) {
-      if (!accessor.updateProperty(key, is)) {
+      // Here it uses the updateProperty function with no-op DataUpdater. 
Otherwise, it will use default
+      // ZNRecordUpdater which will duplicate elements for listFields.
+      if (!accessor.updateProperty(key, znRecord -> znRecord, is)) {
         LOG.warn("Failed to invoke rebalance on resource {}", resource);
       }
     } else {
@@ -156,7 +160,9 @@ public class RebalanceScheduler {
     PropertyKey key = accessor.keyBuilder().resourceConfig(resource);
     ResourceConfig cfg = accessor.getProperty(key);
     if (cfg != null) {
-      if (!accessor.updateProperty(key, cfg)) {
+      // Here it uses the updateProperty function with no-op DataUpdater. 
Otherwise, it will use default
+      // ZNRecordUpdater which will duplicate elements for listFields.
+      if (!accessor.updateProperty(key, znRecord -> znRecord, cfg)) {
         LOG.warn("Failed to invoke rebalance on resource config {}", resource);
       }
     } else {
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
index 9ee4738..ab4a263 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestZeroReplicaAvoidance.java
@@ -123,7 +123,7 @@ public class TestZeroReplicaAvoidance extends ZkTestBase
       createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, 
partition, replica, replica,
           0);
     }
-    Assert.assertTrue(_clusterVerifier.verify(50000L));
+    Assert.assertTrue(_clusterVerifier.verifyByPolling(50000L, 100L));
 
     _startListen = true;
     DelayedTransition.setDelay(5);
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
index 46d67bb..9eeb6f5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestResourceThreadpoolSize.java
@@ -81,7 +81,7 @@ public class TestResourceThreadpoolSize extends 
ZkStandAloneCMTestBase {
     Assert.assertTrue(taskcount >= numPartition * (numReplica + 1));
   }
 
-  @Test
+  @Test (dependsOnMethods = "TestThreadPoolSizeConfig")
   public void TestCustomizedResourceThreadPool() {
     int customizedPoolSize = 7;
     int configuredPoolSize = 9;
@@ -134,7 +134,7 @@ public class TestResourceThreadpoolSize extends 
ZkStandAloneCMTestBase {
     }
   }
 
-  @Test
+  @Test (dependsOnMethods = "TestCustomizedResourceThreadPool")
   public void TestPerStateTransitionTypeThreadPool() throws 
InterruptedException {
     String MASTER_SLAVE = "MasterSlave";
 
@@ -173,7 +173,7 @@ public class TestResourceThreadpoolSize extends 
ZkStandAloneCMTestBase {
     }
   }
 
-  @Test
+  @Test (dependsOnMethods = "TestPerStateTransitionTypeThreadPool")
   public void testBatchMessageThreadPoolSize() throws InterruptedException {
     int customizedPoolSize = 5;
     
_participants[0].getStateMachineEngine().registerStateModelFactory("OnlineOffline",
diff --git 
a/helix-core/src/test/java/org/apache/helix/util/TestRebalanceScheduler.java 
b/helix-core/src/test/java/org/apache/helix/util/TestRebalanceScheduler.java
new file mode 100644
index 0000000..2c73c5b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/util/TestRebalanceScheduler.java
@@ -0,0 +1,72 @@
+package org.apache.helix.util;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.commons.math.stat.inference.TestUtils;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.ResourceConfig;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRebalanceScheduler extends ZkTestBase {
+  private final String CLASS_NAME = getShortClassName();
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  private HelixManager _manager;
+  private ConfigAccessor _configAccessor;
+  private final int NUM_ATTEMPTS = 10;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Test", InstanceType.ADMINISTRATOR, 
ZK_ADDR);
+    _manager.connect();
+    _configAccessor = new ConfigAccessor(_gZkClient);
+  }
+
+  @Test
+  public void testInvokeRebalanceAndInvokeRebalanceForResource() {
+    String resourceName = "ResourceToInvoke";
+    _gSetupTool.getClusterManagementTool()
+        .addResource(CLUSTER_NAME, resourceName, 5, MasterSlaveSMD.name);
+    IdealState idealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
resourceName);
+
+    // Add listfields for ResourceConfig
+    ResourceConfig resourceConfig = new ResourceConfig(resourceName);
+    resourceConfig.setPreferenceLists(Collections.singletonMap("0", 
Arrays.asList("1", "2", "3")));
+    _configAccessor.setResourceConfig(CLUSTER_NAME, resourceName, 
resourceConfig);
+
+    int i = 0;
+    while (i++ < NUM_ATTEMPTS) {
+      RebalanceScheduler.invokeRebalance(_manager.getHelixDataAccessor(), 
resourceName);
+      RebalanceScheduler
+          .invokeRebalanceForResourceConfig(_manager.getHelixDataAccessor(), 
resourceName);
+    }
+
+    IdealState newIdealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
resourceName);
+    ResourceConfig newResourceConfig =
+        _configAccessor.getResourceConfig(CLUSTER_NAME, resourceName);
+
+    // Starting version should be 0 and finally the version should be same as 
NUM_ATTEMPTS
+    
Assert.assertTrue(idealState.getRecord().equals(newIdealState.getRecord()));
+    Assert.assertEquals(idealState.getStat().getVersion(), 0);
+    Assert.assertEquals(newIdealState.getStat().getVersion(), NUM_ATTEMPTS);
+
+    
Assert.assertTrue(resourceConfig.getRecord().equals(newResourceConfig.getRecord()));
+    Assert.assertEquals(
+        resourceConfig.getStat().getVersion(), 0);
+    Assert.assertEquals(newResourceConfig.getStat().getVersion(), 
NUM_ATTEMPTS);
+
+  }
+}

Reply via email to