This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch leader_callback in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 2568a63818f0dde543b979223ad453d8ca559251 Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Tue May 28 10:54:10 2019 -0700 Make ControllerLeadershipManager thread-safe and register it to PARTICIPANT HelixManager ControllerLeadershipManager should be registered to PARTICIPANT HelixManager instead of CONTROLLER HelixManager so that the Helix callbacks and custom callbacks won't affect each other. --- .../controller/ControllerLeadershipManager.java | 72 +++++++++++----------- .../apache/pinot/controller/ControllerStarter.java | 13 ++-- 2 files changed, 43 insertions(+), 42 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java index 035b22d..9e244d8 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java @@ -18,10 +18,10 @@ */ package org.apache.pinot.controller; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.concurrent.ThreadSafe; import org.apache.helix.HelixManager; -import org.apache.helix.api.listeners.ControllerChangeListener; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.slf4j.Logger; @@ -29,39 +29,55 @@ import org.slf4j.LoggerFactory; /** - * Single place for listening on controller changes - * This should be created at controller startup and everyone who wants to listen to controller changes should subscribe + * Single place for listening on controller changes. + * This should be created at controller startup and everyone who wants to listen to controller changes should subscribe. */ +@ThreadSafe public class ControllerLeadershipManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ControllerLeadershipManager.class); - private HelixManager _helixManager; - private ControllerMetrics _controllerMetrics; - private volatile boolean _amILeader = false; + private final HelixManager _helixManager; + private final ControllerMetrics _controllerMetrics; - private Map<String, LeadershipChangeSubscriber> _subscribers = new ConcurrentHashMap<>(); + private Map<String, LeadershipChangeSubscriber> _subscribers = new HashMap<>(); + private boolean _amILeader = false; public ControllerLeadershipManager(HelixManager helixManager, ControllerMetrics controllerMetrics) { _helixManager = helixManager; _controllerMetrics = controllerMetrics; - _helixManager.addControllerListener((ControllerChangeListener) notificationContext -> onControllerChange()); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PINOT_CONTROLLER_LEADER, 0L); } /** - * When stopping this service, if the controller is leader, invoke {@link ControllerLeadershipManager#onBecomingNonLeader()} + * Subscribes to changes in the controller leadership. + * <p>If controller is already leader, invoke {@link LeadershipChangeSubscriber#onBecomingLeader()} + */ + public synchronized void subscribe(String name, LeadershipChangeSubscriber subscriber) { + LOGGER.info("{} subscribing to leadership changes", name); + _subscribers.put(name, subscriber); + if (_amILeader) { + subscriber.onBecomingLeader(); + } + } + + public boolean isLeader() { + return _amILeader; + } + + /** + * Stops the service. + * <p>If controller is leader, invoke {@link ControllerLeadershipManager#onBecomingNonLeader()} */ - public void stop() { + public synchronized void stop() { if (_amILeader) { onBecomingNonLeader(); } } /** - * Callback on changes in the controller + * Callback on changes in the controller. Should be registered to the controller callback. */ - protected void onControllerChange() { + synchronized void onControllerChange() { if (_helixManager.isLeader()) { if (!_amILeader) { _amILeader = true; @@ -81,35 +97,21 @@ public class ControllerLeadershipManager { } } - public boolean isLeader() { - return _amILeader; - } - private void onBecomingLeader() { long startTimeMs = System.currentTimeMillis(); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PINOT_CONTROLLER_LEADER, 1L); - _subscribers.forEach((k, v) -> v.onBecomingLeader()); - LOGGER.info("Finished on becoming leader in {}ms", (System.currentTimeMillis() - startTimeMs)); + for (LeadershipChangeSubscriber subscriber : _subscribers.values()) { + subscriber.onBecomingLeader(); + } + LOGGER.info("Finished on becoming leader in {}ms", System.currentTimeMillis() - startTimeMs); } private void onBecomingNonLeader() { long startTimeMs = System.currentTimeMillis(); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PINOT_CONTROLLER_LEADER, 0L); - _subscribers.forEach((k, v) -> v.onBecomingNonLeader()); - LOGGER.info("Finished on becoming non-leader in {}ms", (System.currentTimeMillis() - startTimeMs)); - } - - /** - * Subscribe to changes in the controller leadership - * If controller is already leader, invoke {@link LeadershipChangeSubscriber#onBecomingLeader()} - * @param name - * @param subscriber - */ - public void subscribe(String name, LeadershipChangeSubscriber subscriber) { - LOGGER.info("{} subscribing to leadership changes", name); - _subscribers.put(name, subscriber); - if (_amILeader) { - subscriber.onBecomingLeader(); + for (LeadershipChangeSubscriber subscriber : _subscribers.values()) { + subscriber.onBecomingNonLeader(); } + LOGGER.info("Finished on becoming non-leader in {}ms", System.currentTimeMillis() - startTimeMs); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java index 7e09e7a..a3f507d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java @@ -37,6 +37,7 @@ import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.SystemPropertyKeys; +import org.apache.helix.api.listeners.ControllerChangeListener; import org.apache.helix.task.TaskDriver; import org.apache.pinot.common.Utils; import org.apache.pinot.common.metrics.ControllerMeter; @@ -229,6 +230,7 @@ public class ControllerStarter { _helixControllerManager.addPreConnectCallback( () -> _controllerMetrics.addMeteredGlobalValue(ControllerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L)); + _controllerLeadershipManager = new ControllerLeadershipManager(_helixControllerManager, _controllerMetrics); _serviceStatusCallbackList.add(generateServiceStatusCallback(_helixControllerManager)); } @@ -250,13 +252,10 @@ public class ControllerStarter { HelixManager helixParticipantManager = _helixResourceManager.getHelixZkManager(); LOGGER.info("Init controller leadership manager"); - // Note: Currently leadership depends on helix controller, thus assign helixControllerManager to ControllerLeadershipManager. - // TODO: In the future when Helix separation is completed, leadership only depends on the master in leadControllerResource, and ControllerLeadershipManager will be removed. - if (_helixControllerManager != null) { - _controllerLeadershipManager = new ControllerLeadershipManager(_helixControllerManager, _controllerMetrics); - } else { - _controllerLeadershipManager = new ControllerLeadershipManager(helixParticipantManager, _controllerMetrics); - } + // TODO: when Helix separation is completed, leadership only depends on the master in leadControllerResource, remove + // ControllerLeadershipManager and this callback. + helixParticipantManager.addControllerListener( + (ControllerChangeListener) changeContext -> _controllerLeadershipManager.onControllerChange()); LOGGER.info("Starting task resource manager"); _helixTaskResourceManager = new PinotHelixTaskResourceManager(new TaskDriver(helixParticipantManager)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
