This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a494a4  Add concurrent control to 
DistClusterControllerStateModel._controller access to avoid NPE. (#1753)
3a494a4 is described below

commit 3a494a4573ebe60f76e98a475446e8f52626befd
Author: Jiajun Wang <[email protected]>
AuthorDate: Wed May 26 13:54:50 2021 -0700

    Add concurrent control to DistClusterControllerStateModel._controller 
access to avoid NPE. (#1753)
    
    This PR fixes a potential NPE Exception that may be thrown in the 
DistClusterControllerStateModel state transition methods.
    When this error happens, a follower-to-leader state transition might be 
interrupted. The controller instance partition will be set with the ERROR state 
but the controller instance may have connected to the Zookeeper already. This 
causes inconsistency and leakage since the controller instance won't be 
properly cleaned up when it is dropped directly from the ERROR state.
---
 .../DistClusterControllerStateModel.java           | 47 +++++++++++-----------
 .../TestClusterStatusMonitorLifecycle.java         | 13 +++---
 2 files changed, 29 insertions(+), 31 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
 
b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
index 6877dc5..430f47f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
@@ -19,6 +19,7 @@ package org.apache.helix.participant;
  * under the License.
  */
 
+import java.util.Optional;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
@@ -32,20 +33,18 @@ import 
org.apache.helix.participant.statemachine.StateModelInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@StateModelInfo(initialState = "OFFLINE", states = {
-    "LEADER", "STANDBY"
-})
+
+@StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"})
 public class DistClusterControllerStateModel extends 
AbstractHelixLeaderStandbyStateModel {
   private static Logger logger = 
LoggerFactory.getLogger(DistClusterControllerStateModel.class);
-  protected HelixManager _controller = null;
+  protected Optional<HelixManager> _controllerOpt = Optional.empty();
   private final Set<Pipeline.Type> _enabledPipelineTypes;
 
   public DistClusterControllerStateModel(String zkAddr) {
     this(zkAddr, Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK));
   }
 
-  public DistClusterControllerStateModel(String zkAddr,
-      Set<Pipeline.Type> enabledPipelineTypes) {
+  public DistClusterControllerStateModel(String zkAddr, Set<Pipeline.Type> 
enabledPipelineTypes) {
     super(zkAddr);
     _enabledPipelineTypes = enabledPipelineTypes;
   }
@@ -63,19 +62,20 @@ public class DistClusterControllerStateModel extends 
AbstractHelixLeaderStandbyS
 
     logger.info(controllerName + " becoming leader from standby for " + 
clusterName);
 
-    if (_controller == null) {
-      _controller =
-          HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
-              InstanceType.CONTROLLER, _zkAddr);
-      _controller.setEnabledControlPipelineTypes(_enabledPipelineTypes);
-      _controller.connect();
-      _controller.startTimerTasks();
-      logStateTransition("STANDBY", "LEADER", clusterName, controllerName);
-    } else {
-      logger.error("controller already exists:" + 
_controller.getInstanceName() + " for "
-          + clusterName);
+    synchronized (_controllerOpt) {
+      if (!_controllerOpt.isPresent()) {
+        HelixManager newController = HelixManagerFactory
+            .getZKHelixManager(clusterName, controllerName, 
InstanceType.CONTROLLER, _zkAddr);
+        newController.setEnabledControlPipelineTypes(_enabledPipelineTypes);
+        newController.connect();
+        newController.startTimerTasks();
+        _controllerOpt = Optional.of(newController);
+        logStateTransition("STANDBY", "LEADER", clusterName, controllerName);
+      } else {
+        logger.error("controller already exists:" + 
_controllerOpt.get().getInstanceName() + " for "
+            + clusterName);
+      }
     }
-
   }
 
   @Override
@@ -85,7 +85,7 @@ public class DistClusterControllerStateModel extends 
AbstractHelixLeaderStandbyS
 
     logger.info(controllerName + " becoming standby from leader for " + 
clusterName);
 
-    if (_controller != null) {
+    if (_controllerOpt.isPresent()) {
       reset();
       logStateTransition("LEADER", "STANDBY", clusterName, controllerName);
     } else {
@@ -112,10 +112,11 @@ public class DistClusterControllerStateModel extends 
AbstractHelixLeaderStandbyS
 
   @Override
   public void reset() {
-    if (_controller != null) {
-      _controller.disconnect();
-      _controller = null;
+    synchronized (_controllerOpt) {
+      if (_controllerOpt.isPresent()) {
+        _controllerOpt.get().disconnect();
+        _controllerOpt = Optional.empty();
+      }
     }
-
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 1eab81b..6a55264 100644
--- 
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ 
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -288,16 +288,13 @@ public class TestClusterStatusMonitorLifecycle extends 
ZkTestBase {
     cleanupControllers();
     // Check if any MBeans leftover.
     // Note that MessageQueueStatus is not bound with controller only. So it 
will still exist.
-    final QueryExp exp2 = Query.and(
-        Query.not(Query.match(Query.attr("SensorName"), 
Query.value("MessageQueueStatus.*"))),
-        exp1);
+    final QueryExp exp2 = Query
+        .and(Query.not(Query.match(Query.attr("SensorName"), 
Query.value("MessageQueueStatus.*"))),
+            exp1);
 
-    // Note, the _asyncTasksThreadPool shutting down logic in 
GenericHelixController is best effort
-    // there is not guarantee that all threads in the pool is gone. Mossstly 
they will, but not always.
-    // see https://github.com/apache/helix/issues/1280
     boolean result = TestHelper.verify(() -> 
ManagementFactory.getPlatformMBeanServer()
         .queryMBeans(new ObjectName("ClusterStatus:*"), exp2).isEmpty(), 
TestHelper.WAIT_DURATION);
-    Assert.assertTrue(result,
-        "A small chance this may fail due to _asyncThread pool in controller 
may not shutdown in time. Please check issue 1280 to verify if this is the 
case.");
+    Assert.assertTrue(result, "Remaining MBeans: " + 
ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp2).toString());
   }
 }

Reply via email to