Repository: helix Updated Branches: refs/heads/master 74145e8ad -> 1ad490ec7
Selective update for Resource (include Workflow and Job) Config read. For task framework 2.0, we need to try to minimize the impact of data loading from cache. One of the target is trying to reduce the latency of read/write for task fraemwork. In this rb, read operation will be minimized for resource config. For workflow and job context read, it will be writing only operation and changes are in another commit. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fc868b34 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fc868b34 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fc868b34 Branch: refs/heads/master Commit: fc868b34d60eaba82ad3f9e60004e8cb5db1c9f3 Parents: 74145e8 Author: Junkai Xue <[email protected]> Authored: Mon Jul 30 10:14:33 2018 -0700 Committer: Lei Xia <[email protected]> Committed: Mon Sep 17 15:29:52 2018 -0700 ---------------------------------------------------------------------- .../controller/stages/ClusterDataCache.java | 48 ++++++++++++++++++-- .../TestClusterDataCacheSelectiveUpdate.java | 18 ++++++++ .../apache/helix/tools/TestHelixAdminCli.java | 18 ++++---- 3 files changed, 71 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/fc868b34/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 577b2c7..f960603 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -19,11 +19,13 @@ package org.apache.helix.controller.stages; * under the License. */ +import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -33,6 +35,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; +import org.apache.helix.common.caches.AbstractDataCache; import org.apache.helix.common.caches.CurrentStateCache; import org.apache.helix.common.caches.IdealStateCache; import org.apache.helix.common.caches.InstanceMessagesCache; @@ -68,7 +71,7 @@ import static org.apache.helix.HelixConstants.ChangeType; * Reads the data from the cluster using data accessor. This output ClusterData which * provides useful methods to search/lookup properties */ -public class ClusterDataCache { +public class ClusterDataCache extends AbstractDataCache { private static final Logger LOG = LoggerFactory.getLogger(ClusterDataCache.class.getName()); private ClusterConfig _clusterConfig; @@ -78,7 +81,7 @@ public class ClusterDataCache { private Map<String, InstanceConfig> _instanceConfigMap; private Map<String, InstanceConfig> _instanceConfigCacheMap; private Map<String, Long> _instanceOfflineTimeMap; - private Map<String, ResourceConfig> _resourceConfigMap; + private Map<String, ResourceConfig> _resourceConfigMap = new HashMap<>(); private Map<String, ResourceConfig> _resourceConfigCacheMap; private Map<String, ClusterConstraints> _constraintMap; private Map<String, Map<String, String>> _idealStateRuleMap; @@ -182,8 +185,8 @@ public class ClusterDataCache { if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) { _propertyDataChangedMap.put(ChangeType.RESOURCE_CONFIG, false); clearCachedResourceAssignments(); - _resourceConfigCacheMap = - accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), true); + + _resourceConfigCacheMap = refreshResourceConfigs(accessor); LogUtil.logInfo(LOG, _eventId, "Reload ResourceConfigs: " + _resourceConfigCacheMap.keySet() + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); } @@ -942,6 +945,43 @@ public class ClusterDataCache { return change; } + private Map<String, ResourceConfig> refreshResourceConfigs(HelixDataAccessor accessor) { + Map<String, ResourceConfig> refreshedResourceConfigs = Maps.newHashMap(); + + long startTime = System.currentTimeMillis(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + Set<PropertyKey> currentResourceConfigKeys = new HashSet<>(); + for (String resourceConfig : accessor.getChildNames(keyBuilder.resourceConfigs())) { + currentResourceConfigKeys.add(keyBuilder.resourceConfig(resourceConfig)); + } + + Set<PropertyKey> cachedKeys = new HashSet<>(); + Map<PropertyKey, ResourceConfig> cachedResourceConfigMap = Maps.newHashMap(); + + for (String resourceConfig : _resourceConfigMap.keySet()) { + cachedKeys.add(keyBuilder.resourceConfig(resourceConfig)); + cachedResourceConfigMap + .put(keyBuilder.resourceConfig(resourceConfig), _resourceConfigMap.get(resourceConfig)); + } + cachedKeys.retainAll(currentResourceConfigKeys); + + Set<PropertyKey> reloadKeys = new HashSet<>(currentResourceConfigKeys); + reloadKeys.removeAll(cachedKeys); + + Map<PropertyKey, ResourceConfig> updatedMap = + refreshProperties(accessor, new LinkedList<>(reloadKeys), new ArrayList<>(cachedKeys), + cachedResourceConfigMap); + for (ResourceConfig resourceConfig : updatedMap.values()) { + refreshedResourceConfigs.put(resourceConfig.getResourceName(), resourceConfig); + } + + long endTime = System.currentTimeMillis(); + LogUtil.logInfo(LOG, getEventId(), + "Refresh " + refreshedResourceConfigs.size() + " resource configs for cluster " + + _clusterName + ", took " + (endTime - startTime) + " ms"); + return refreshedResourceConfigs; + } + /** * toString method to print the entire cluster state */ http://git-wip-us.apache.org/repos/asf/helix/blob/fc868b34/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java index a749265..11044b7 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java @@ -25,8 +25,12 @@ import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.integration.common.ZkStandAloneCMTestBase; +import org.apache.helix.integration.task.WorkflowGenerator; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.mock.MockZkHelixDataAccessor; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.Workflow; import org.testng.Assert; import org.testng.annotations.Test; @@ -124,5 +128,19 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE); cache.refresh(accessor); Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 2); + + // Test WorkflowConfig/JobConfigs + TaskDriver driver = new TaskDriver(_manager); + Workflow.Builder workflow = WorkflowGenerator.generateSingleJobWorkflowBuilder("Job", + new JobConfig.Builder().setCommand("ReIndex").setTargetResource("TestDB_2")); + driver.start(workflow.build()); + + Thread.sleep(100); + accessor.clearReadCounters(); + + cache.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG); + cache.refresh(accessor); + // 1 Cluster Config change + 2 Resource Config Changes + Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 3); } } http://git-wip-us.apache.org/repos/asf/helix/blob/fc868b34/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java index fdab158..27d6dc5 100644 --- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java +++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java @@ -105,7 +105,7 @@ public class TestHelixAdminCli extends ZkTestBase { Assert.assertFalse(_gZkClient.exists("/clusterTest1")); } - @Test + @Test (dependsOnMethods = "testAddCluster") public void testAddResource() throws Exception { String command = "-zkSvr localhost:2183 -addCluster " + clusterName; ClusterSetup.processCommandLineArgs(command.split("\\s+")); @@ -130,7 +130,7 @@ public class TestHelixAdminCli extends ZkTestBase { ClusterSetup.processCommandLineArgs(command.split("\\s+")); } - @Test + @Test (dependsOnMethods = "testAddResource") public void testAddInstance() throws Exception { String command = "-zkSvr localhost:2183 -addCluster " + clusterName; ClusterSetup.processCommandLineArgs(command.split("\\s+")); @@ -180,7 +180,7 @@ public class TestHelixAdminCli extends ZkTestBase { } } - @Test + @Test (dependsOnMethods = "testAddInstance") public void testRebalanceResource() throws Exception { String command = "-zkSvr localhost:2183 -addCluster " + clusterName; ClusterSetup.processCommandLineArgs(command.split("\\s+")); @@ -211,7 +211,7 @@ public class TestHelixAdminCli extends ZkTestBase { ClusterSetup.processCommandLineArgs(command.split("\\s+")); } - @Test + @Test (dependsOnMethods = "testRebalanceResource") public void testStartCluster() throws Exception { final int n = 6; @@ -294,7 +294,7 @@ public class TestHelixAdminCli extends ZkTestBase { } } - @Test + @Test (dependsOnMethods = "testStartCluster") public void testDropAddResource() throws Exception { final int n = 6; @@ -398,7 +398,7 @@ public class TestHelixAdminCli extends ZkTestBase { Thread.sleep(100); } - @Test + @Test (dependsOnMethods = "testDropAddResource") public void testInstanceOperations() throws Exception { final int n = 6; @@ -469,7 +469,7 @@ public class TestHelixAdminCli extends ZkTestBase { } } - @Test + @Test (dependsOnMethods = "testInstanceOperations") public void testExpandCluster() throws Exception { final int n = 6; @@ -520,7 +520,7 @@ public class TestHelixAdminCli extends ZkTestBase { } } - @Test + @Test (dependsOnMethods = "testExpandCluster") public void testDeactivateCluster() throws Exception { final int n = 6; @@ -577,7 +577,7 @@ public class TestHelixAdminCli extends ZkTestBase { ClusterSetup.processCommandLineArgs(command.split("\\s+")); } - @Test + @Test (dependsOnMethods = "testDeactivateCluster") public void testInstanceGroupTags() throws Exception { BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
