Fix a bug in caching bestpossible states in ClusterDataCache.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a83e8d65 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a83e8d65 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a83e8d65 Branch: refs/heads/master Commit: a83e8d65f037691847ea2925bb75edb3f915ce8c Parents: de38a7d Author: Lei Xia <l...@linkedin.com> Authored: Mon Oct 2 10:38:48 2017 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Mon Nov 6 17:08:56 2017 -0800 ---------------------------------------------------------------------- .../controller/stages/ClusterDataCache.java | 10 ++++----- .../controller/stages/TaskAssignmentStage.java | 2 +- .../ZkHelixClusterVerifier.java | 12 ++++++++--- .../apache/helix/integration/TestDisable.java | 22 ++++++++------------ .../common/ZkIntegrationTestBase.java | 8 +++++++ 5 files changed, 31 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/a83e8d65/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 5048325..8ce614c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -137,6 +137,7 @@ public class ClusterDataCache { if (_propertyDataChangedMap.get(ChangeType.IDEAL_STATE)) { long start = System.currentTimeMillis(); _propertyDataChangedMap.put(ChangeType.IDEAL_STATE, Boolean.valueOf(false)); + clearCachedResourceAssignments(); _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates()); if (LOG.isDebugEnabled()) { LOG.debug("Reload IdealStates: " + _idealStateCacheMap.keySet() + ". Takes " + ( @@ -146,6 +147,7 @@ public class ClusterDataCache { if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) { _propertyDataChangedMap.put(ChangeType.LIVE_INSTANCE, Boolean.valueOf(false)); + clearCachedResourceAssignments(); _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances()); _updateInstanceOfflineTime = true; LOG.debug("Reload LiveInstances: " + _liveInstanceCacheMap.keySet()); @@ -153,12 +155,14 @@ public class ClusterDataCache { if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) { _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false)); + clearCachedResourceAssignments(); _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs()); LOG.debug("Reload InstanceConfig: " + _instanceConfigCacheMap.keySet()); } if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) { _propertyDataChangedMap.put(ChangeType.RESOURCE_CONFIG, Boolean.valueOf(false)); + clearCachedResourceAssignments(); _resourceConfigCacheMap = accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs()); LOG.debug("Reload ResourceConfigs: " + _resourceConfigCacheMap.size()); } @@ -623,11 +627,6 @@ public class ClusterDataCache { */ public void notifyDataChange(ChangeType changeType) { _propertyDataChangedMap.put(changeType, Boolean.valueOf(true)); - - if (changeType.equals(ChangeType.IDEAL_STATE) || changeType.equals(ChangeType.INSTANCE_CONFIG) - || changeType.equals(ChangeType.LIVE_INSTANCE)) { - clearCachedResourceAssignments(); - } } /** @@ -867,7 +866,6 @@ public class ClusterDataCache { for(ChangeType type : ChangeType.values()) { _propertyDataChangedMap.put(type, Boolean.valueOf(true)); } - clearCachedResourceAssignments(); } /** http://git-wip-us.apache.org/repos/asf/helix/blob/a83e8d65/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index 39a6e76..7cc08df 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -154,6 +154,6 @@ public class TaskAssignmentStage extends AbstractBaseStage { keys.add(keyBuilder.message(message.getTgtName(), message.getId())); } - dataAccessor.createChildren(keys, new ArrayList<Message>(messages)); + dataAccessor.createChildren(keys, new ArrayList<>(messages)); } } http://git-wip-us.apache.org/repos/asf/helix/blob/a83e8d65/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java index 472157f..e1e6c0c 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java @@ -258,13 +258,17 @@ public abstract class ZkHelixClusterVerifier @Override public void handleDataChange(String dataPath, Object data) throws Exception { - _verifyTaskThreadPool.submit(new VerifyStateCallbackTask()); + if (!_verifyTaskThreadPool.isShutdown()) { + _verifyTaskThreadPool.submit(new VerifyStateCallbackTask()); + } } @Override public void handleDataDeleted(String dataPath) throws Exception { _zkClient.unsubscribeDataChanges(dataPath, this); - _verifyTaskThreadPool.submit(new VerifyStateCallbackTask()); + if (!_verifyTaskThreadPool.isShutdown()) { + _verifyTaskThreadPool.submit(new VerifyStateCallbackTask()); + } } @Override @@ -273,7 +277,9 @@ public abstract class ZkHelixClusterVerifier String childPath = String.format("%s/%s", parentPath, child); _zkClient.subscribeDataChanges(childPath, this); } - _verifyTaskThreadPool.submit(new VerifyStateCallbackTask()); + if (!_verifyTaskThreadPool.isShutdown()) { + _verifyTaskThreadPool.submit(new VerifyStateCallbackTask()); + } } public ZkClient getZkClient() { http://git-wip-us.apache.org/repos/asf/helix/blob/a83e8d65/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java index 657dbc9..d512e9e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisable.java @@ -37,6 +37,8 @@ import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; import org.testng.Assert; import org.testng.annotations.Test; @@ -85,36 +87,30 @@ public class TestDisable extends ZkIntegrationTestBase { participants[i].syncStart(); } - boolean result = - ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, - clusterName)); - Assert.assertTrue(result); + HelixClusterVerifier _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(clusterName).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(_clusterVerifier.verify()); + // disable localhost_12918 String command = "--zkSvr " + ZK_ADDR + " --enableInstance " + clusterName + " " + disableNode + " false"; ClusterSetup.processCommandLineArgs(command.split("\\s+")); - result = - ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, - clusterName)); - Assert.assertTrue(result); + Assert.assertTrue(_clusterVerifier.verify()); // make sure localhost_12918 is in OFFLINE state Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>(); Map<String, String> expectInstanceStateMap = new HashMap<String, String>(); expectInstanceStateMap.put(disableNode, "OFFLINE"); expectStateMap.put(".*", expectInstanceStateMap); - result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "=="); + boolean result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "=="); Assert.assertTrue(result, disableNode + " should be in OFFLINE"); // re-enable localhost_12918 command = "--zkSvr " + ZK_ADDR + " --enableInstance " + clusterName + " " + disableNode + " true"; ClusterSetup.processCommandLineArgs(command.split("\\s+")); - result = - ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, - clusterName)); - Assert.assertTrue(result); + Assert.assertTrue(_clusterVerifier.verify()); // make sure localhost_12918 is NOT in OFFLINE state result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!="); http://git-wip-us.apache.org/repos/asf/helix/blob/a83e8d65/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java index ac48001..4920471 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java @@ -123,6 +123,14 @@ public class ZkIntegrationTestBase { configAccessor.setClusterConfig(clusterName, clusterConfig); } + protected void enablePersistIntermediateAssignment(ZkClient zkClient, String clusterName, + Boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setPersistIntermediateAssignment(enabled); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + protected void enableTopologyAwareRebalance(ZkClient zkClient, String clusterName, Boolean enabled) { ConfigAccessor configAccessor = new ConfigAccessor(zkClient);