This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new c188ec7  Fix the race condition while Helix refresh cluster status 
cache. (#363)
c188ec7 is described below

commit c188ec7ff2fbac22ac677fdf8c64ab3f6b40f404
Author: jiajunwang <[email protected]>
AuthorDate: Tue Jul 30 14:41:32 2019 -0700

    Fix the race condition while Helix refresh cluster status cache. (#363)
    
    * Fix the race condition while Helix refresh cluster status cache.
    
    This change fix issue #331.
    The design is ensuring one read only to avoid locking during the change 
notification. However, a later update introduced addition read. The result is 
that two reads may have different results because notification is lock free. 
This leads the cache to be in an inconsistent state. The impact is that the 
expected rebalance might not happen.
---
 .../dataproviders/BaseControllerDataProvider.java  | 86 ++++++++++++++--------
 .../ResourceControllerDataProvider.java            | 29 ++++----
 .../WorkflowControllerDataProvider.java            | 30 ++++----
 3 files changed, 83 insertions(+), 62 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index a45813e..5eb6077 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -19,17 +19,6 @@ package org.apache.helix.controller.dataproviders;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
@@ -54,6 +43,19 @@ import org.apache.helix.model.StateModelDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * Common building block for controller to cache their data. This common 
building block contains
  * information about cluster config, instance config, resource config, ideal 
states, current state,
@@ -82,7 +84,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   private ExecutorService _asyncTasksThreadPool;
 
   // A map recording what data has changed
-  protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
+  protected Map<HelixConstants.ChangeType, AtomicBoolean> 
_propertyDataChangedMap;
 
   // Property caches
   private final PropertyCache<ResourceConfig> _resourceConfigCache;
@@ -112,7 +114,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     _propertyDataChangedMap = new ConcurrentHashMap<>();
     for (HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
       // refresh every type when it is initialized
-      _propertyDataChangedMap.put(type, true);
+      _propertyDataChangedMap
+          .put(type, new AtomicBoolean(true));
     }
 
     // initialize caches
@@ -217,11 +220,11 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     _instanceMessagesCache = new InstanceMessagesCache(_clusterName);
   }
 
-  private void refreshIdealState(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.IDEAL_STATE, 
false);
-
+  private void refreshIdealState(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE).getAndSet(false))
 {
       _idealStateCache.refresh(accessor);
+      refreshedType.add(HelixConstants.ChangeType.IDEAL_STATE);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No ideal state change for %s cluster, %s pipeline", 
_clusterName,
@@ -229,11 +232,12 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     }
   }
 
-  private void refreshLiveInstances(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, 
false);
+  private void refreshLiveInstances(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE).getAndSet(false))
 {
       _liveInstanceCache.refresh(accessor);
       _updateInstanceOfflineTime = true;
+      refreshedType.add(HelixConstants.ChangeType.LIVE_INSTANCE);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No live instance change for %s cluster, %s pipeline", 
_clusterName,
@@ -241,13 +245,14 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     }
   }
 
-  private void refreshInstanceConfigs(final HelixDataAccessor accessor) {
-    if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG, 
false);
+  private void refreshInstanceConfigs(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG).getAndSet(false))
 {
       _instanceConfigCache.refresh(accessor);
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("Reloaded InstanceConfig for cluster %s, %s pipeline. Keys: 
%s", _clusterName,
               getPipelineName(), 
_instanceConfigCache.getPropertyMap().keySet()));
+      refreshedType.add(HelixConstants.ChangeType.INSTANCE_CONFIG);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No instance config change for %s cluster, %s pipeline", 
_clusterName,
@@ -255,13 +260,14 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     }
   }
 
-  private void refreshResourceConfig(final HelixDataAccessor accessor) {
-    if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.RESOURCE_CONFIG, 
false);
+  private void refreshResourceConfig(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG).getAndSet(false))
 {
       _resourceConfigCache.refresh(accessor);
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("Reloaded ResourceConfig for cluster %s, %s pipeline. Cnt: 
%s", _clusterName,
               getPipelineName(), 
_resourceConfigCache.getPropertyMap().keySet().size()));
+      refreshedType.add(HelixConstants.ChangeType.RESOURCE_CONFIG);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No resource config change for %s cluster, %s pipeline", 
_clusterName,
@@ -290,12 +296,22 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
+    doRefresh(accessor);
+  }
+
+  /**
+   * @param accessor
+   * @return The types that has been updated during the refresh.
+   */
+  protected synchronized Set<HelixConstants.ChangeType> 
doRefresh(HelixDataAccessor accessor) {
+    Set<HelixConstants.ChangeType> refreshedTypes = new HashSet<>();
+
     // Refresh raw data
     _clusterConfig = 
accessor.getProperty(accessor.keyBuilder().clusterConfig());
-    refreshIdealState(accessor);
-    refreshLiveInstances(accessor);
-    refreshInstanceConfigs(accessor);
-    refreshResourceConfig(accessor);
+    refreshIdealState(accessor, refreshedTypes);
+    refreshLiveInstances(accessor, refreshedTypes);
+    refreshInstanceConfigs(accessor, refreshedTypes);
+    refreshResourceConfig(accessor, refreshedTypes);
     _stateModelDefinitionCache.refresh(accessor);
     _clusterConstraintsCache.refresh(accessor);
     updateMaintenanceInfo(accessor);
@@ -314,6 +330,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
 
     updateIdealRuleMap();
     updateDisabledInstances();
+
+    return refreshedTypes;
   }
 
   protected void dumpDebugInfo() {
@@ -595,9 +613,13 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
 
   /**
    * Notify the cache that some part of the cluster data has been changed.
+   *
+   * Don't lock the propertyDataChangedMap here because the refresh process, 
which also read this map,
+   * may take a long time to finish. If a lock is required, the notification 
might be blocked by refresh.
+   * In this case, the delayed notification processing might cause performance 
issue.
    */
   public void notifyDataChange(HelixConstants.ChangeType changeType) {
-    _propertyDataChangedMap.put(changeType, true);
+    _propertyDataChangedMap.get(changeType).set(true);
   }
 
   /**
@@ -669,7 +691,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   public void requireFullRefresh() {
     for (HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
       if (!_noFullRefreshProperty.contains(type)) {
-        _propertyDataChangedMap.put(type, true);
+        _propertyDataChangedMap.get(type).set(true);
       }
     }
   }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index cf21230..59c973b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -21,8 +21,11 @@ package org.apache.helix.controller.dataproviders;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
@@ -109,17 +112,17 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
 
-    // Invalidate cached information
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE)
-        || _propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)
-        || 
_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)
-        || 
_propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
+    // Refresh base
+    Set<HelixConstants.ChangeType> propertyRefreshed = 
super.doRefresh(accessor);
+
+    // Invalidate cached information if any of the important data has been 
refreshed
+    if (propertyRefreshed.contains(HelixConstants.ChangeType.IDEAL_STATE)
+        || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
+        || 
propertyRefreshed.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
+        || 
propertyRefreshed.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
       clearCachedResourceAssignments();
     }
 
-    // Refresh base
-    super.refresh(accessor);
-
     // Refresh resource controller specific property caches
     refreshExternalViews(accessor);
     refreshTargetExternalViews(accessor);
@@ -140,20 +143,16 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
   private void refreshExternalViews(final HelixDataAccessor accessor) {
     // As we are not listening on external view change, external view will be
     // refreshed once during the cache's first refresh() call, or when full 
refresh is required
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
+    if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW).getAndSet(false))
 {
       _externalViewCache.refresh(accessor);
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, 
false);
     }
   }
 
   private void refreshTargetExternalViews(final HelixDataAccessor accessor) {
-    if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) {
+    if 
(_propertyDataChangedMap.get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW).getAndSet(false))
 {
       if (getClusterConfig() != null && 
getClusterConfig().isTargetExternalViewEnabled()) {
+        // Only refresh with data accessor for the first time
         _targetExternalViewCache.refresh(accessor);
-
-        // Only set the change type back once we get refreshed with data 
accessor for the
-        // first time
-        
_propertyDataChangedMap.put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, 
false);
       }
     }
   }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 2eee09e..e637e3d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -19,9 +19,6 @@ package org.apache.helix.controller.dataproviders;
  * under the License.
  */
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.ZNRecord;
@@ -41,6 +38,11 @@ import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Data provider for workflow controller.
@@ -69,24 +71,22 @@ public class WorkflowControllerDataProvider extends 
BaseControllerDataProvider {
     _taskDataCache = new TaskDataCache(this);
   }
 
-  private void refreshClusterStateChangeFlags() {
+  private void refreshClusterStateChangeFlags(Set<HelixConstants.ChangeType> 
propertyRefreshed) {
     // This is for targeted jobs' task assignment. It needs to watch for 
current state changes for
     // when targeted resources' state transitions complete
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE)
-        || 
_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
-      _existsLiveInstanceOrCurrentStateChange = true;
-
-      // BaseControllerDataProvider will take care of marking live instance 
change
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, 
false);
-    } else {
-      _existsLiveInstanceOrCurrentStateChange = false;
-    }
+    _existsLiveInstanceOrCurrentStateChange =
+        // TODO read and update CURRENT_STATE in the 
BaseControllerDataProvider as well.
+        // This check (and set) is necessary for now since the current state 
flag in _propertyDataChangedMap is not used by the BaseControllerDataProvider 
for now.
+        
_propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false)
+            || 
propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
+            || 
propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE);
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
-    refreshClusterStateChangeFlags();
-    super.refresh(accessor);
+    Set<HelixConstants.ChangeType> propertyRefreshed = 
super.doRefresh(accessor);
+
+    refreshClusterStateChangeFlags(propertyRefreshed);
 
     // Refresh TaskCache
     _taskDataCache.refresh(accessor, getResourceConfigMap());

Reply via email to