This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 3a494a4 Add concurrent control to
DistClusterControllerStateModel._controller access to avoid NPE. (#1753)
3a494a4 is described below
commit 3a494a4573ebe60f76e98a475446e8f52626befd
Author: Jiajun Wang <[email protected]>
AuthorDate: Wed May 26 13:54:50 2021 -0700
Add concurrent control to DistClusterControllerStateModel._controller
access to avoid NPE. (#1753)
This PR fixes a potential NPE Exception that may be thrown in the
DistClusterControllerStateModel state transition methods.
When this error happens, a follower-to-leader state transition might be
interrupted. The controller instance partition will be set with the ERROR state
but the controller instance may have connected to the Zookeeper already. This
causes inconsistency and leakage since the controller instance won't be
properly cleaned up when it is dropped directly from the ERROR state.
---
.../DistClusterControllerStateModel.java | 47 +++++++++++-----------
.../TestClusterStatusMonitorLifecycle.java | 13 +++---
2 files changed, 29 insertions(+), 31 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
index 6877dc5..430f47f 100644
---
a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
+++
b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
@@ -19,6 +19,7 @@ package org.apache.helix.participant;
* under the License.
*/
+import java.util.Optional;
import java.util.Set;
import com.google.common.collect.Sets;
@@ -32,20 +33,18 @@ import
org.apache.helix.participant.statemachine.StateModelInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@StateModelInfo(initialState = "OFFLINE", states = {
- "LEADER", "STANDBY"
-})
+
+@StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"})
public class DistClusterControllerStateModel extends
AbstractHelixLeaderStandbyStateModel {
private static Logger logger =
LoggerFactory.getLogger(DistClusterControllerStateModel.class);
- protected HelixManager _controller = null;
+ protected Optional<HelixManager> _controllerOpt = Optional.empty();
private final Set<Pipeline.Type> _enabledPipelineTypes;
public DistClusterControllerStateModel(String zkAddr) {
this(zkAddr, Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK));
}
- public DistClusterControllerStateModel(String zkAddr,
- Set<Pipeline.Type> enabledPipelineTypes) {
+ public DistClusterControllerStateModel(String zkAddr, Set<Pipeline.Type>
enabledPipelineTypes) {
super(zkAddr);
_enabledPipelineTypes = enabledPipelineTypes;
}
@@ -63,19 +62,20 @@ public class DistClusterControllerStateModel extends
AbstractHelixLeaderStandbyS
logger.info(controllerName + " becoming leader from standby for " +
clusterName);
- if (_controller == null) {
- _controller =
- HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
- InstanceType.CONTROLLER, _zkAddr);
- _controller.setEnabledControlPipelineTypes(_enabledPipelineTypes);
- _controller.connect();
- _controller.startTimerTasks();
- logStateTransition("STANDBY", "LEADER", clusterName, controllerName);
- } else {
- logger.error("controller already exists:" +
_controller.getInstanceName() + " for "
- + clusterName);
+ synchronized (_controllerOpt) {
+ if (!_controllerOpt.isPresent()) {
+ HelixManager newController = HelixManagerFactory
+ .getZKHelixManager(clusterName, controllerName,
InstanceType.CONTROLLER, _zkAddr);
+ newController.setEnabledControlPipelineTypes(_enabledPipelineTypes);
+ newController.connect();
+ newController.startTimerTasks();
+ _controllerOpt = Optional.of(newController);
+ logStateTransition("STANDBY", "LEADER", clusterName, controllerName);
+ } else {
+ logger.error("controller already exists:" +
_controllerOpt.get().getInstanceName() + " for "
+ + clusterName);
+ }
}
-
}
@Override
@@ -85,7 +85,7 @@ public class DistClusterControllerStateModel extends
AbstractHelixLeaderStandbyS
logger.info(controllerName + " becoming standby from leader for " +
clusterName);
- if (_controller != null) {
+ if (_controllerOpt.isPresent()) {
reset();
logStateTransition("LEADER", "STANDBY", clusterName, controllerName);
} else {
@@ -112,10 +112,11 @@ public class DistClusterControllerStateModel extends
AbstractHelixLeaderStandbyS
@Override
public void reset() {
- if (_controller != null) {
- _controller.disconnect();
- _controller = null;
+ synchronized (_controllerOpt) {
+ if (_controllerOpt.isPresent()) {
+ _controllerOpt.get().disconnect();
+ _controllerOpt = Optional.empty();
+ }
}
-
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 1eab81b..6a55264 100644
---
a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++
b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -288,16 +288,13 @@ public class TestClusterStatusMonitorLifecycle extends
ZkTestBase {
cleanupControllers();
// Check if any MBeans leftover.
// Note that MessageQueueStatus is not bound with controller only. So it
will still exist.
- final QueryExp exp2 = Query.and(
- Query.not(Query.match(Query.attr("SensorName"),
Query.value("MessageQueueStatus.*"))),
- exp1);
+ final QueryExp exp2 = Query
+ .and(Query.not(Query.match(Query.attr("SensorName"),
Query.value("MessageQueueStatus.*"))),
+ exp1);
- // Note, the _asyncTasksThreadPool shutting down logic in
GenericHelixController is best effort
- // there is not guarantee that all threads in the pool is gone. Mossstly
they will, but not always.
- // see https://github.com/apache/helix/issues/1280
boolean result = TestHelper.verify(() ->
ManagementFactory.getPlatformMBeanServer()
.queryMBeans(new ObjectName("ClusterStatus:*"), exp2).isEmpty(),
TestHelper.WAIT_DURATION);
- Assert.assertTrue(result,
- "A small chance this may fail due to _asyncThread pool in controller
may not shutdown in time. Please check issue 1280 to verify if this is the
case.");
+ Assert.assertTrue(result, "Remaining MBeans: " +
ManagementFactory.getPlatformMBeanServer()
+ .queryMBeans(new ObjectName("ClusterStatus:*"), exp2).toString());
}
}