Fix a bug in caching bestpossible states in ClusterDataCache.

Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a83e8d65
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a83e8d65
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a83e8d65

Branch: refs/heads/master
Commit: a83e8d65f037691847ea2925bb75edb3f915ce8c
Parents: de38a7d
Author: Lei Xia <l...@linkedin.com>
Authored: Mon Oct 2 10:38:48 2017 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Mon Nov 6 17:08:56 2017 -0800

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 10 ++++-----
 .../controller/stages/TaskAssignmentStage.java  |  2 +-
 .../ZkHelixClusterVerifier.java                 | 12 ++++++++---
 .../apache/helix/integration/TestDisable.java   | 22 ++++++++------------
 .../common/ZkIntegrationTestBase.java           |  8 +++++++
 5 files changed, 31 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/a83e8d65/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 5048325..8ce614c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -137,6 +137,7 @@ public class ClusterDataCache {
     if (_propertyDataChangedMap.get(ChangeType.IDEAL_STATE)) {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(ChangeType.IDEAL_STATE, 
Boolean.valueOf(false));
+      clearCachedResourceAssignments();
       _idealStateCacheMap = 
accessor.getChildValuesMap(keyBuilder.idealStates());
       if (LOG.isDebugEnabled()) {
         LOG.debug("Reload IdealStates: " + _idealStateCacheMap.keySet() + ". 
Takes " + (
@@ -146,6 +147,7 @@ public class ClusterDataCache {
 
     if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) {
       _propertyDataChangedMap.put(ChangeType.LIVE_INSTANCE, 
Boolean.valueOf(false));
+      clearCachedResourceAssignments();
       _liveInstanceCacheMap = 
accessor.getChildValuesMap(keyBuilder.liveInstances());
       _updateInstanceOfflineTime = true;
       LOG.debug("Reload LiveInstances: " + _liveInstanceCacheMap.keySet());
@@ -153,12 +155,14 @@ public class ClusterDataCache {
 
     if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, 
Boolean.valueOf(false));
+      clearCachedResourceAssignments();
       _instanceConfigCacheMap = 
accessor.getChildValuesMap(keyBuilder.instanceConfigs());
       LOG.debug("Reload InstanceConfig: " + _instanceConfigCacheMap.keySet());
     }
 
     if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.RESOURCE_CONFIG, 
Boolean.valueOf(false));
+      clearCachedResourceAssignments();
       _resourceConfigCacheMap = 
accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs());
       LOG.debug("Reload ResourceConfigs: " + _resourceConfigCacheMap.size());
     }
@@ -623,11 +627,6 @@ public class ClusterDataCache {
    */
   public void notifyDataChange(ChangeType changeType) {
     _propertyDataChangedMap.put(changeType, Boolean.valueOf(true));
-
-    if (changeType.equals(ChangeType.IDEAL_STATE) || 
changeType.equals(ChangeType.INSTANCE_CONFIG)
-        || changeType.equals(ChangeType.LIVE_INSTANCE)) {
-      clearCachedResourceAssignments();
-    }
   }
 
   /**
@@ -867,7 +866,6 @@ public class ClusterDataCache {
     for(ChangeType type : ChangeType.values()) {
       _propertyDataChangedMap.put(type, Boolean.valueOf(true));
     }
-    clearCachedResourceAssignments();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/a83e8d65/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 39a6e76..7cc08df 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -154,6 +154,6 @@ public class TaskAssignmentStage extends AbstractBaseStage {
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }
 
-    dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
+    dataAccessor.createChildren(keys, new ArrayList<>(messages));
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/a83e8d65/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 472157f..e1e6c0c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -258,13 +258,17 @@ public abstract class ZkHelixClusterVerifier
 
   @Override
   public void handleDataChange(String dataPath, Object data) throws Exception {
-    _verifyTaskThreadPool.submit(new VerifyStateCallbackTask());
+    if (!_verifyTaskThreadPool.isShutdown()) {
+      _verifyTaskThreadPool.submit(new VerifyStateCallbackTask());
+    }
   }
 
   @Override
   public void handleDataDeleted(String dataPath) throws Exception {
     _zkClient.unsubscribeDataChanges(dataPath, this);
-    _verifyTaskThreadPool.submit(new VerifyStateCallbackTask());
+    if (!_verifyTaskThreadPool.isShutdown()) {
+      _verifyTaskThreadPool.submit(new VerifyStateCallbackTask());
+    }
   }
 
   @Override
@@ -273,7 +277,9 @@ public abstract class ZkHelixClusterVerifier
       String childPath = String.format("%s/%s", parentPath, child);
       _zkClient.subscribeDataChanges(childPath, this);
     }
-    _verifyTaskThreadPool.submit(new VerifyStateCallbackTask());
+    if (!_verifyTaskThreadPool.isShutdown()) {
+      _verifyTaskThreadPool.submit(new VerifyStateCallbackTask());
+    }
   }
 
   public ZkClient getZkClient() {

http://git-wip-us.apache.org/repos/asf/helix/blob/a83e8d65/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java 
b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
index 657dbc9..d512e9e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java
@@ -37,6 +37,8 @@ import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -85,36 +87,30 @@ public class TestDisable extends ZkIntegrationTestBase {
       participants[i].syncStart();
     }
 
-    boolean result =
-        ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
-            clusterName));
-    Assert.assertTrue(result);
+    HelixClusterVerifier _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(clusterName).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+
 
     // disable localhost_12918
     String command =
         "--zkSvr " + ZK_ADDR + " --enableInstance " + clusterName + " " + 
disableNode + " false";
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
-            clusterName));
-    Assert.assertTrue(result);
+    Assert.assertTrue(_clusterVerifier.verify());
 
     // make sure localhost_12918 is in OFFLINE state
     Map<String, Map<String, String>> expectStateMap = new HashMap<String, 
Map<String, String>>();
     Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
     expectInstanceStateMap.put(disableNode, "OFFLINE");
     expectStateMap.put(".*", expectInstanceStateMap);
-    result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", 
expectStateMap, "==");
+    boolean result = ZkTestHelper.verifyState(_gZkClient, clusterName, 
"TestDB0", expectStateMap, "==");
     Assert.assertTrue(result, disableNode + " should be in OFFLINE");
 
     // re-enable localhost_12918
     command =
         "--zkSvr " + ZK_ADDR + " --enableInstance " + clusterName + " " + 
disableNode + " true";
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
-    result =
-        ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR,
-            clusterName));
-    Assert.assertTrue(result);
+    Assert.assertTrue(_clusterVerifier.verify());
 
     // make sure localhost_12918 is NOT in OFFLINE state
     result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", 
expectStateMap, "!=");

http://git-wip-us.apache.org/repos/asf/helix/blob/a83e8d65/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
 
b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
index ac48001..4920471 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
@@ -123,6 +123,14 @@ public class ZkIntegrationTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
+  protected void enablePersistIntermediateAssignment(ZkClient zkClient, String 
clusterName,
+      Boolean enabled) {
+    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+    clusterConfig.setPersistIntermediateAssignment(enabled);
+    configAccessor.setClusterConfig(clusterName, clusterConfig);
+  }
+
   protected void enableTopologyAwareRebalance(ZkClient zkClient, String 
clusterName,
       Boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);

Reply via email to