Repository: helix
Updated Branches:
  refs/heads/master 675904095 -> 0af6e8c19


[HELIX-745] Make AssignableInstanceManager listen on data changes to update 
AssignableInstances

Previously, although AssignableInstanceManager provided an API for updating its 
AssignableInstances, this API was not being called at all. This RB fixes this.

Changelist:
1. Add a boolean flag in ClusterDataCache for LiveInstance, ClusterConfig, 
InstanceConfig changes
2. If the ClusterDataCache is a taskDataCache, call 
AssignableInstanceManager.updateAssignableInstances() when the said boolean 
flag is true
3. Use thread-safe map in AssignableInstanceManager
4. Address the issue of targeted tasks having null taskIds (use pName 
convention instead)
5. Address the issue of LiveInstanceChange not notifying the caches by 
explicitly using setLiveInstance() function
6. Fix bug in restoreTaskAssignResult where tasks with null quota type were not 
being restored properly


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0af6e8c1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0af6e8c1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0af6e8c1

Branch: refs/heads/master
Commit: 0af6e8c19af5ee916f93acd8582e53b776e9c712
Parents: 6759040
Author: Hunter Lee <[email protected]>
Authored: Tue Jul 24 14:10:01 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Tue Jul 24 17:47:50 2018 -0700

----------------------------------------------------------------------
 .../helix/common/caches/TaskDataCache.java      | 24 -----------
 .../controller/stages/ClusterDataCache.java     | 39 +++++++++++++++--
 .../helix/task/AssignableInstanceManager.java   | 36 +++++++++-------
 .../task/TestQuotaBasedScheduling.java          | 44 ++++++++++++++++++++
 4 files changed, 99 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0af6e8c1/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java 
b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
index 478b03a..8892d2e 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java
@@ -72,7 +72,6 @@ public class TaskDataCache extends AbstractDataCache {
   public synchronized boolean refresh(HelixDataAccessor accessor,
       Map<String, ResourceConfig> resourceConfigMap) {
     refreshJobContexts(accessor);
-
     // update workflow and job configs.
     _workflowConfigMap.clear();
     _jobConfigMap.clear();
@@ -85,32 +84,9 @@ public class TaskDataCache extends AbstractDataCache {
         _jobConfigMap.put(entry.getKey(), new JobConfig(entry.getValue()));
       }
     }
-
     return true;
   }
 
-  /**
-   * Refreshes Task Framework contexts and configs from ZooKeeper. This method 
also re-instantiates
-   * AssignableInstanceManager.
-   * @param accessor
-   * @param resourceConfigMap
-   * @param liveInstanceMap
-   * @param instanceConfigMap
-   * @return
-   */
-  public synchronized boolean refresh(HelixDataAccessor accessor,
-      Map<String, ResourceConfig> resourceConfigMap, ClusterConfig 
clusterConfig,
-      Map<String, LiveInstance> liveInstanceMap, Map<String, InstanceConfig> 
instanceConfigMap) {
-    // First, call the original refresh for contexts and configs
-    if (refresh(accessor, resourceConfigMap)) {
-      // Upon refresh success, re-instantiate AssignableInstanceManager from 
scratch
-      _assignableInstanceManager.buildAssignableInstances(clusterConfig, this, 
liveInstanceMap,
-          instanceConfigMap);
-      return true;
-    }
-    return false;
-  }
-
   private void refreshJobContexts(HelixDataAccessor accessor) {
     // TODO: Need an optimize for reading context only if the refresh is 
needed.
     long start = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/helix/blob/0af6e8c1/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 1f32c04..3e6bd86 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -114,8 +114,13 @@ public class ClusterDataCache {
   private String _clusterName;
 
   // For detecting liveinstance and target resource partition state change in 
task assignment
+  // Used in AbstractTaskDispatcher
   private boolean _existsLiveInstanceOrCurrentStateChange = false;
 
+  // These two flags are used to detect ClusterConfig change or 
LiveInstance/InstanceConfig change
+  private boolean _existsClusterConfigChange = false;
+  private boolean _existsInstanceChange = false;
+
   public ClusterDataCache() {
     this(null);
   }
@@ -153,7 +158,6 @@ public class ClusterDataCache {
     }
 
     if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) {
-      _existsLiveInstanceOrCurrentStateChange = true;
       startTime = System.currentTimeMillis();
       _propertyDataChangedMap.put(ChangeType.LIVE_INSTANCE, false);
       clearCachedResourceAssignments();
@@ -166,6 +170,7 @@ public class ClusterDataCache {
     }
 
     if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) {
+      _existsInstanceChange = true;
       _propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, false);
       clearCachedResourceAssignments();
       _instanceConfigCacheMap = 
accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
@@ -182,13 +187,20 @@ public class ClusterDataCache {
           + " for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
     }
 
-    // This is for target jobs' task assignment. It needs to watch for current 
state changes for
+    // This is for targeted jobs' task assignment. It needs to watch for 
current state changes for
     // when targeted resources' state transitions complete
     if (_propertyDataChangedMap.get(ChangeType.CURRENT_STATE)) {
       _existsLiveInstanceOrCurrentStateChange = true;
       _propertyDataChangedMap.put(ChangeType.CURRENT_STATE, false);
     }
 
+    // This is for AssignableInstances. Whenever there is a quota config 
change in ClusterConfig, we
+    // must trigger an update to AssignableInstanceManager
+    if (_propertyDataChangedMap.get(ChangeType.CLUSTER_CONFIG)) {
+      _existsClusterConfigChange = true;
+      _propertyDataChangedMap.put(ChangeType.CLUSTER_CONFIG, false);
+    }
+
     _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap);
     _liveInstanceMap = new HashMap<>(_liveInstanceCacheMap);
     _instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap);
@@ -205,8 +217,23 @@ public class ClusterDataCache {
     _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
 
     if (_isTaskCache) {
-      _taskDataCache.refresh(accessor, _resourceConfigMap, _clusterConfig, 
_liveInstanceMap,
-          _instanceConfigMap);
+      // Refresh TaskCache
+      _taskDataCache.refresh(accessor, _resourceConfigMap);
+
+      // Refresh AssignableInstanceManager
+      AssignableInstanceManager assignableInstanceManager =
+          _taskDataCache.getAssignableInstanceManager();
+      if (_existsClusterConfigChange) {
+        // Update both flags since buildAssignableInstances includes 
updateAssignableInstances
+        _existsClusterConfigChange = false;
+        _existsInstanceChange = false;
+        assignableInstanceManager.buildAssignableInstances(_clusterConfig, 
_taskDataCache,
+            _liveInstanceMap, _instanceConfigMap);
+      } else if (_existsInstanceChange) {
+        _existsInstanceChange = false;
+        assignableInstanceManager.updateAssignableInstances(_clusterConfig, 
_liveInstanceMap,
+            _instanceConfigMap);
+      }
     }
 
     _instanceMessagesCache.refresh(accessor, _liveInstanceMap);
@@ -423,6 +450,10 @@ public class ClusterDataCache {
     }
     _liveInstanceCacheMap = liveInstanceMap;
     _updateInstanceOfflineTime = true;
+
+    // TODO: Move this when listener for LiveInstance is being refactored
+    _existsInstanceChange = true;
+    _existsLiveInstanceOrCurrentStateChange = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/0af6e8c1/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java 
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
index e25e23a..abe5f1c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.common.caches.TaskDataCache;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
@@ -41,21 +42,19 @@ public class AssignableInstanceManager {
   private Map<String, AssignableInstance> _assignableInstanceMap;
   // TaskID -> TaskAssignResult TODO: Hunter: Move this if not needed
   private Map<String, TaskAssignResult> _taskAssignResultMap;
-  private boolean _hasBeenBuilt; // Flag for whether AssignableInstances have 
been built
 
   /**
    * Basic constructor for AssignableInstanceManager to allow an empty 
instantiation.
    * buildAssignableInstances() must be explicitly called after instantiation.
    */
   public AssignableInstanceManager() {
-    _assignableInstanceMap = new HashMap<>();
-    _taskAssignResultMap = new HashMap<>();
-    _hasBeenBuilt = false; // AssignableInstances haven't been built
+    _assignableInstanceMap = new ConcurrentHashMap<>();
+    _taskAssignResultMap = new ConcurrentHashMap<>();
   }
 
   /**
    * Builds AssignableInstances and restores TaskAssignResults from scratch by 
reading from
-   * TaskDataCache.
+   * TaskDataCache. It re-computes current quota profile for each 
AssignableInstance.
    * @param clusterConfig
    * @param taskDataCache
    * @param liveInstances
@@ -63,13 +62,9 @@ public class AssignableInstanceManager {
    */
   public void buildAssignableInstances(ClusterConfig clusterConfig, 
TaskDataCache taskDataCache,
       Map<String, LiveInstance> liveInstances, Map<String, InstanceConfig> 
instanceConfigs) {
-    // Only need to build from scratch during Controller switch, etc.
-    // This keeps the pipeline from building from scratch every cache refresh
-    if (_hasBeenBuilt) {
-      // If it has been already built, just update (configs and LiveInstance 
changes may be present)
-      updateAssignableInstances(clusterConfig, liveInstances, instanceConfigs);
-      return;
-    }
+    // Reset all cached information
+    _assignableInstanceMap.clear();
+    _taskAssignResultMap.clear();
 
     // Create all AssignableInstance objects based on what's in liveInstances
     for (Map.Entry<String, LiveInstance> liveInstanceEntry : 
liveInstances.entrySet()) {
@@ -100,6 +95,9 @@ public class AssignableInstanceManager {
         continue; // Ignore this job if either the config or context is null
       }
       String quotaType = jobConfig.getJobType();
+      if (quotaType == null) {
+        quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
+      }
       Set<Integer> taskIndices = jobContext.getPartitionSet(); // Each integer 
represents a task in
       // this job (this is NOT taskId)
       for (int taskIndex : taskIndices) {
@@ -111,6 +109,11 @@ public class AssignableInstanceManager {
 
           String assignedInstance = 
jobContext.getAssignedParticipant(taskIndex);
           String taskId = jobContext.getTaskIdForPartition(taskIndex);
+          if (taskId == null) {
+            // For targeted tasks, taskId will be null
+            // We instead use pName (see FixedTargetTaskAssignmentCalculator)
+            taskId = String.format("%s_%s", jobConfig.getJobId(), taskIndex);
+          }
           if (assignedInstance == null) {
             LOG.warn(
                 "This task's TaskContext does not have an assigned instance! 
Task will be ignored. "
@@ -138,13 +141,13 @@ public class AssignableInstanceManager {
         }
       }
     }
-    _hasBeenBuilt = true; // Set the flag so that it's not re-building from 
cache every pipeline
-    // iteration
   }
 
   /**
-   * Updates AssignableInstances when there are any config changes. This 
update will be based on the
-   * list of LiveInstances provided.
+   * Updates AssignableInstances when there are changes in LiveInstances or 
InstanceConfig. This
+   * update only keeps an up-to-date count of AssignableInstances and does NOT 
re-build tasks
+   * (because it's costly).
+   * Call this when there is only LiveInstance/InstanceConfig change.
    * @param clusterConfig
    * @param liveInstances
    * @param instanceConfigs
@@ -156,6 +159,7 @@ public class AssignableInstanceManager {
     Collection<AssignableInstance> staleAssignableInstances =
         new HashSet<>(_assignableInstanceMap.values());
 
+    // Loop over new LiveInstances
     for (Map.Entry<String, LiveInstance> liveInstanceEntry : 
liveInstances.entrySet()) {
       // Prepare instance-specific metadata
       String instanceName = liveInstanceEntry.getKey();

http://git-wip-us.apache.org/repos/asf/helix/blob/0af6e8c1/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
index db471ff..abbcf75 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestQuotaBasedScheduling.java
@@ -35,6 +35,7 @@ import 
org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobQueue;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
@@ -260,6 +261,49 @@ public class TestQuotaBasedScheduling extends TaskTestBase 
{
     
Assert.assertFalse(_quotaTypeExecutionCount.containsKey(DEFAULT_QUOTA_TYPE));
   }
 
+  @Test(dependsOnMethods = "testSchedulingWithoutQuota")
+  public void testQuotaConfigChange() throws InterruptedException {
+    ClusterConfig clusterConfig = 
_manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
+    clusterConfig.resetTaskQuotaRatioMap();
+    clusterConfig.setTaskQuotaRatio(DEFAULT_QUOTA_TYPE, 38);
+    clusterConfig.setTaskQuotaRatio("A", 1);
+    clusterConfig.setTaskQuotaRatio("B", 1);
+    _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // 2 nodes - create 4 workflows with LongTask so that only 2 would get 
scheduled and run
+    for (int i = 0; i < 4; i++) {
+      String workflowName = TestHelper.getTestMethodName() + "_" + i;
+      _driver.start(createWorkflow(workflowName, true, "A", 1, 1, "LongTask"));
+      Thread.sleep(500L);
+    }
+    // Test that only 2 of the workflows are executed
+    for (int i = 0; i < 2; i++) {
+      String workflowName = TestHelper.getTestMethodName() + "_" + i;
+      _driver.pollForWorkflowState(workflowName, TaskState.IN_PROGRESS);
+    }
+
+    // Test that the next two are not executing
+    JobContext context_2 = 
_driver.getJobContext("testQuotaConfigChange_2_testQuotaConfigChange_2_0");
+    JobContext context_3 = 
_driver.getJobContext("testQuotaConfigChange_3_testQuotaConfigChange_3_0");
+    Assert.assertNull(context_2.getPartitionState(0));
+    Assert.assertNull(context_3.getPartitionState(0));
+
+    // Change the quota config so that the rest of the workflows are in 
progress
+    clusterConfig = 
_manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
+    clusterConfig.resetTaskQuotaRatioMap();
+    clusterConfig.setTaskQuotaRatio(DEFAULT_QUOTA_TYPE, 1);
+    clusterConfig.setTaskQuotaRatio("A", 38);
+    clusterConfig.setTaskQuotaRatio("B", 1);
+    _manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // Wait for the change to propagate through and test that the next two are 
not executing
+    Thread.sleep(1000L);
+    context_2 = 
_driver.getJobContext("testQuotaConfigChange_2_testQuotaConfigChange_2_0");
+    context_3 = 
_driver.getJobContext("testQuotaConfigChange_3_testQuotaConfigChange_3_0");
+    Assert.assertNotNull(context_2.getPartitionState(0));
+    Assert.assertNotNull(context_3.getPartitionState(0));
+  }
+
   /**
    * Tests that quota ratios are being observed. This is done by creating 
short tasks for some quota
    * types and long tasks for some quota types.

Reply via email to