Repository: helix
Updated Branches:
  refs/heads/master 74145e8ad -> 1ad490ec7


Selective update for Resource (include Workflow and Job) Config read.

For task framework 2.0, we need to try to minimize the impact of data loading 
from cache. One of the target is trying to reduce the latency of read/write for 
task fraemwork. In this rb, read operation will be minimized for resource 
config. For workflow and job context read, it will be writing only operation 
and changes are in another commit.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fc868b34
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fc868b34
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fc868b34

Branch: refs/heads/master
Commit: fc868b34d60eaba82ad3f9e60004e8cb5db1c9f3
Parents: 74145e8
Author: Junkai Xue <[email protected]>
Authored: Mon Jul 30 10:14:33 2018 -0700
Committer: Lei Xia <[email protected]>
Committed: Mon Sep 17 15:29:52 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 48 ++++++++++++++++++--
 .../TestClusterDataCacheSelectiveUpdate.java    | 18 ++++++++
 .../apache/helix/tools/TestHelixAdminCli.java   | 18 ++++----
 3 files changed, 71 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/fc868b34/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 577b2c7..f960603 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -19,11 +19,13 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -33,6 +35,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.IdealStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
@@ -68,7 +71,7 @@ import static org.apache.helix.HelixConstants.ChangeType;
  * Reads the data from the cluster using data accessor. This output 
ClusterData which
  * provides useful methods to search/lookup properties
  */
-public class ClusterDataCache {
+public class ClusterDataCache extends AbstractDataCache {
   private static final Logger LOG = 
LoggerFactory.getLogger(ClusterDataCache.class.getName());
 
   private ClusterConfig _clusterConfig;
@@ -78,7 +81,7 @@ public class ClusterDataCache {
   private Map<String, InstanceConfig> _instanceConfigMap;
   private Map<String, InstanceConfig> _instanceConfigCacheMap;
   private Map<String, Long> _instanceOfflineTimeMap;
-  private Map<String, ResourceConfig> _resourceConfigMap;
+  private Map<String, ResourceConfig> _resourceConfigMap = new HashMap<>();
   private Map<String, ResourceConfig> _resourceConfigCacheMap;
   private Map<String, ClusterConstraints> _constraintMap;
   private Map<String, Map<String, String>> _idealStateRuleMap;
@@ -182,8 +185,8 @@ public class ClusterDataCache {
     if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) {
       _propertyDataChangedMap.put(ChangeType.RESOURCE_CONFIG, false);
       clearCachedResourceAssignments();
-      _resourceConfigCacheMap =
-          accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), 
true);
+
+      _resourceConfigCacheMap = refreshResourceConfigs(accessor);
       LogUtil.logInfo(LOG, _eventId, "Reload ResourceConfigs: " + 
_resourceConfigCacheMap.keySet()
           + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
     }
@@ -942,6 +945,43 @@ public class ClusterDataCache {
     return change;
   }
 
+  private Map<String, ResourceConfig> refreshResourceConfigs(HelixDataAccessor 
accessor) {
+    Map<String, ResourceConfig> refreshedResourceConfigs = Maps.newHashMap();
+
+    long startTime = System.currentTimeMillis();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Set<PropertyKey> currentResourceConfigKeys = new HashSet<>();
+    for (String resourceConfig : 
accessor.getChildNames(keyBuilder.resourceConfigs())) {
+      currentResourceConfigKeys.add(keyBuilder.resourceConfig(resourceConfig));
+    }
+
+    Set<PropertyKey> cachedKeys = new HashSet<>();
+    Map<PropertyKey, ResourceConfig> cachedResourceConfigMap = 
Maps.newHashMap();
+
+    for (String resourceConfig : _resourceConfigMap.keySet()) {
+      cachedKeys.add(keyBuilder.resourceConfig(resourceConfig));
+      cachedResourceConfigMap
+          .put(keyBuilder.resourceConfig(resourceConfig), 
_resourceConfigMap.get(resourceConfig));
+    }
+    cachedKeys.retainAll(currentResourceConfigKeys);
+
+    Set<PropertyKey> reloadKeys = new HashSet<>(currentResourceConfigKeys);
+    reloadKeys.removeAll(cachedKeys);
+
+    Map<PropertyKey, ResourceConfig> updatedMap =
+        refreshProperties(accessor, new LinkedList<>(reloadKeys), new 
ArrayList<>(cachedKeys),
+            cachedResourceConfigMap);
+    for (ResourceConfig resourceConfig : updatedMap.values()) {
+      refreshedResourceConfigs.put(resourceConfig.getResourceName(), 
resourceConfig);
+    }
+
+    long endTime = System.currentTimeMillis();
+    LogUtil.logInfo(LOG, getEventId(),
+        "Refresh " + refreshedResourceConfigs.size() + " resource configs for 
cluster "
+            + _clusterName + ", took " + (endTime - startTime) + " ms");
+    return refreshedResourceConfigs;
+  }
+
   /**
    * toString method to print the entire cluster state
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/fc868b34/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
index a749265..11044b7 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
@@ -25,8 +25,12 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.mock.MockZkHelixDataAccessor;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -124,5 +128,19 @@ public class TestClusterDataCacheSelectiveUpdate extends 
ZkStandAloneCMTestBase
     cache.notifyDataChange(HelixConstants.ChangeType.IDEAL_STATE);
     cache.refresh(accessor);
     Assert.assertEquals(accessor.getReadCount(PropertyType.IDEALSTATES), 2);
+
+    // Test WorkflowConfig/JobConfigs
+    TaskDriver driver = new TaskDriver(_manager);
+    Workflow.Builder workflow = 
WorkflowGenerator.generateSingleJobWorkflowBuilder("Job",
+        new 
JobConfig.Builder().setCommand("ReIndex").setTargetResource("TestDB_2"));
+    driver.start(workflow.build());
+
+    Thread.sleep(100);
+    accessor.clearReadCounters();
+
+    cache.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
+    cache.refresh(accessor);
+    // 1 Cluster Config change + 2 Resource Config Changes
+    Assert.assertEquals(accessor.getReadCount(PropertyType.CONFIGS), 3);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/fc868b34/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java 
b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
index fdab158..27d6dc5 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
@@ -105,7 +105,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     Assert.assertFalse(_gZkClient.exists("/clusterTest1"));
   }
 
-  @Test
+  @Test (dependsOnMethods = "testAddCluster")
   public void testAddResource() throws Exception {
     String command = "-zkSvr localhost:2183 -addCluster " + clusterName;
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
@@ -130,7 +130,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
   }
 
-  @Test
+  @Test (dependsOnMethods = "testAddResource")
   public void testAddInstance() throws Exception {
     String command = "-zkSvr localhost:2183 -addCluster " + clusterName;
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
@@ -180,7 +180,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     }
   }
 
-  @Test
+  @Test (dependsOnMethods = "testAddInstance")
   public void testRebalanceResource() throws Exception {
     String command = "-zkSvr localhost:2183 -addCluster " + clusterName;
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
@@ -211,7 +211,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
   }
 
-  @Test
+  @Test (dependsOnMethods = "testRebalanceResource")
   public void testStartCluster() throws Exception {
     final int n = 6;
 
@@ -294,7 +294,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     }
   }
 
-  @Test
+  @Test (dependsOnMethods = "testStartCluster")
   public void testDropAddResource() throws Exception {
     final int n = 6;
 
@@ -398,7 +398,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     Thread.sleep(100);
   }
 
-  @Test
+  @Test (dependsOnMethods = "testDropAddResource")
   public void testInstanceOperations() throws Exception {
     final int n = 6;
 
@@ -469,7 +469,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     }
   }
 
-  @Test
+  @Test (dependsOnMethods = "testInstanceOperations")
   public void testExpandCluster() throws Exception {
     final int n = 6;
 
@@ -520,7 +520,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     }
   }
 
-  @Test
+  @Test (dependsOnMethods = "testExpandCluster")
   public void testDeactivateCluster() throws Exception {
     final int n = 6;
 
@@ -577,7 +577,7 @@ public class TestHelixAdminCli extends ZkTestBase {
     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
   }
 
-  @Test
+  @Test (dependsOnMethods = "testDeactivateCluster")
   public void testInstanceGroupTags() throws Exception {
     BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_gZkClient);
     HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
baseAccessor);

Reply via email to