This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d8c51fdbd0fffe3db199d5e9ce4db1e0b4a05113
Author: jackjlli <[email protected]>
AuthorDate: Fri Jun 14 16:35:14 2019 -0700

    Add logic for lead controller resource on controller side
---
 .../pinot/common/utils/helix/HelixHelper.java      |   9 ++
 .../controller/ControllerLeadershipManager.java    | 117 ---------------------
 .../apache/pinot/controller/ControllerStarter.java |  69 ++++++------
 .../pinot/controller/LeadControllerManager.java    |  74 +++++++++++++
 .../controller/LeadershipChangeSubscriber.java     |  35 ------
 .../PinotSegmentUploadRestletResource.java         |   7 +-
 .../controller/api/upload/SegmentValidator.java    |  10 +-
 .../controller/helix/SegmentStatusChecker.java     |   8 +-
 .../helix/core/PinotHelixResourceManager.java      |  28 ++++-
 .../helix/core/minion/PinotTaskManager.java        |   8 +-
 .../core/periodictask/ControllerPeriodicTask.java  |  10 +-
 .../ControllerPeriodicTaskScheduler.java           |  60 -----------
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  32 +++---
 .../core/realtime/PinotRealtimeSegmentManager.java |  43 +++-----
 .../core/realtime/SegmentCompletionManager.java    |  54 +++++-----
 .../core/relocation/RealtimeSegmentRelocator.java  |   8 +-
 .../helix/core/retention/RetentionManager.java     |   8 +-
 .../core/statemodel/LeadControllerChecker.java     |  57 ++++++++++
 ...rollerResourceMasterSlaveStateModelFactory.java |  64 +++++++++++
 .../BrokerResourceValidationManager.java           |   5 +-
 .../validation/OfflineSegmentIntervalChecker.java  |  13 ++-
 .../RealtimeSegmentValidationManager.java          |   8 +-
 .../controller/validation/StorageQuotaChecker.java |  14 +--
 .../controller/helix/PinotControllerModeTest.java  |  16 ++-
 .../controller/helix/SegmentStatusCheckerTest.java |  73 +++++++++++--
 .../periodictask/ControllerPeriodicTaskTest.java   |   7 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   7 +-
 .../helix/core/realtime/SegmentCompletionTest.java |  10 +-
 .../relocation/RealtimeSegmentRelocatorTest.java   |  14 ++-
 .../helix/core/retention/RetentionManagerTest.java |  12 ++-
 .../validation/StorageQuotaCheckerTest.java        |  27 +++--
 .../server/realtime/ControllerLeaderLocator.java   |  17 +--
 32 files changed, 509 insertions(+), 415 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index 21f2153..b000246 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -500,4 +500,13 @@ public class HelixHelper {
   public static Set<String> getBrokerInstancesForTenant(List<InstanceConfig> 
instanceConfigs, String tenant) {
     return new HashSet<>(HelixHelper.getInstancesWithTag(instanceConfigs, 
TagNameUtils.getBrokerTagForTenant(tenant)));
   }
+
+  /**
+   * Gets hash code for table.
+   * @param rawTableName table name
+   * @return hash code
+   */
+  public static int getHashCodeForTable(String rawTableName) {
+    return rawTableName.hashCode();
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
deleted file mode 100644
index 7d4705e..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller;
-
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.helix.HelixManager;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Single place for listening on controller changes.
- * This should be created at controller startup and everyone who wants to 
listen to controller changes should subscribe.
- */
-@ThreadSafe
-public class ControllerLeadershipManager {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerLeadershipManager.class);
-
-  private final HelixManager _helixControllerManager;
-  private final ControllerMetrics _controllerMetrics;
-
-  private Map<String, LeadershipChangeSubscriber> _subscribers = new 
HashMap<>();
-  private boolean _amILeader = false;
-
-  public ControllerLeadershipManager(HelixManager helixControllerManager, 
ControllerMetrics controllerMetrics) {
-    _helixControllerManager = helixControllerManager;
-    _controllerMetrics = controllerMetrics;
-    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PINOT_CONTROLLER_LEADER,
 0L);
-  }
-
-  /**
-   * Subscribes to changes in the controller leadership.
-   * <p>If controller is already leader, invoke {@link 
LeadershipChangeSubscriber#onBecomingLeader()}
-   */
-  public synchronized void subscribe(String name, LeadershipChangeSubscriber 
subscriber) {
-    LOGGER.info("{} subscribing to leadership changes", name);
-    _subscribers.put(name, subscriber);
-    if (_amILeader) {
-      subscriber.onBecomingLeader();
-    }
-  }
-
-  public boolean isLeader() {
-    return _amILeader;
-  }
-
-  /**
-   * Stops the service.
-   * <p>If controller is leader, invoke {@link 
ControllerLeadershipManager#onBecomingNonLeader()}
-   */
-  public synchronized void stop() {
-    if (_amILeader) {
-      onBecomingNonLeader();
-    }
-  }
-
-  /**
-   * Callback on changes in the controller. Should be registered to the 
controller callback.
-   */
-  synchronized void onControllerChange() {
-    if (_helixControllerManager.isLeader()) {
-      if (!_amILeader) {
-        _amILeader = true;
-        LOGGER.info("Became leader");
-        onBecomingLeader();
-      } else {
-        LOGGER.info("Already leader. Duplicate notification");
-      }
-    } else {
-      if (_amILeader) {
-        _amILeader = false;
-        LOGGER.info("Lost leadership");
-        onBecomingNonLeader();
-      } else {
-        LOGGER.info("Already not leader. Duplicate notification");
-      }
-    }
-  }
-
-  private void onBecomingLeader() {
-    long startTimeMs = System.currentTimeMillis();
-    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PINOT_CONTROLLER_LEADER,
 1L);
-    for (LeadershipChangeSubscriber subscriber : _subscribers.values()) {
-      subscriber.onBecomingLeader();
-    }
-    LOGGER.info("Finished on becoming leader in {}ms", 
System.currentTimeMillis() - startTimeMs);
-  }
-
-  private void onBecomingNonLeader() {
-    long startTimeMs = System.currentTimeMillis();
-    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PINOT_CONTROLLER_LEADER,
 0L);
-    for (LeadershipChangeSubscriber subscriber : _subscribers.values()) {
-      subscriber.onBecomingNonLeader();
-    }
-    LOGGER.info("Finished on becoming non-leader in {}ms", 
System.currentTimeMillis() - startTimeMs);
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index c313616..f7eb2a0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -37,7 +37,6 @@ import 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.SystemPropertyKeys;
-import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.task.TaskDriver;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -54,7 +53,6 @@ import org.apache.pinot.controller.helix.SegmentStatusChecker;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
-import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
 import 
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
@@ -68,6 +66,7 @@ import 
org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.crypt.PinotCrypterFactory;
 import org.apache.pinot.core.periodictask.PeriodicTask;
+import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
 import org.apache.pinot.filesystem.PinotFSFactory;
 import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.slf4j.Logger;
@@ -107,12 +106,12 @@ public class ControllerStarter {
   private RetentionManager _retentionManager;
   private SegmentStatusChecker _segmentStatusChecker;
   private PinotTaskManager _taskManager;
-  private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
+  private PeriodicTaskScheduler _periodicTaskScheduler;
   private PinotHelixTaskResourceManager _helixTaskResourceManager;
   private PinotRealtimeSegmentManager _realtimeSegmentsManager;
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private SegmentCompletionManager _segmentCompletionManager;
-  private ControllerLeadershipManager _controllerLeadershipManager;
+  private LeadControllerManager _leadControllerManager;
   private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
 
   public ControllerStarter(ControllerConf conf) {
@@ -184,10 +183,6 @@ public class ControllerStarter {
     return _taskManager;
   }
 
-  public ControllerLeadershipManager getControllerLeadershipManager() {
-    return _controllerLeadershipManager;
-  }
-
   public void start() {
     LOGGER.info("Starting Pinot controller in mode: {}.", 
_controllerMode.name());
     Utils.logVersions();
@@ -230,9 +225,6 @@ public class ControllerStarter {
         () -> 
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.HELIX_ZOOKEEPER_RECONNECTS,
 1L));
 
     
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixControllerManager));
-
-    LOGGER.info("Initializing controller leadership manager");
-    _controllerLeadershipManager = new 
ControllerLeadershipManager(_helixControllerManager, _controllerMetrics);
   }
 
   private void setUpPinotController() {
@@ -255,30 +247,25 @@ public class ControllerStarter {
     _helixResourceManager.start();
     HelixManager helixParticipantManager = 
_helixResourceManager.getHelixZkManager();
 
-    LOGGER.info("Registering controller leadership manager");
-    // TODO: when Helix separation is completed, leadership only depends on 
the master in leadControllerResource, remove
-    //       ControllerLeadershipManager and this callback.
-    helixParticipantManager.addControllerListener(
-        (ControllerChangeListener) changeContext -> 
_controllerLeadershipManager.onControllerChange());
+    // Get lead controller manager from resource manager.
+    _leadControllerManager = _helixResourceManager.getLeadControllerManager();
 
     LOGGER.info("Starting task resource manager");
     _helixTaskResourceManager = new PinotHelixTaskResourceManager(new 
TaskDriver(helixParticipantManager));
 
     // Helix resource manager must be started in order to create 
PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
-
     _pinotLLCRealtimeSegmentManager =
-        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, 
_controllerMetrics,
-            _controllerLeadershipManager);
-    // TODO: Need to put this inside HelixResourceManager when 
ControllerLeadershipManager is removed.
+        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, 
_controllerMetrics, _leadControllerManager);
+    // TODO: Need to put this inside HelixResourceManager when 
HelixControllerLeadershipManager is removed.
     
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
     _segmentCompletionManager =
         new SegmentCompletionManager(helixParticipantManager, 
_pinotLLCRealtimeSegmentManager, _controllerMetrics,
-            _controllerLeadershipManager, 
_config.getSegmentCommitTimeoutSeconds());
+            _leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
 
     if (_config.getHLCTablesAllowed()) {
       LOGGER.info("Realtime tables with High Level consumers will be 
supported");
-      _realtimeSegmentsManager = new 
PinotRealtimeSegmentManager(_helixResourceManager, 
_controllerLeadershipManager);
+      _realtimeSegmentsManager = new 
PinotRealtimeSegmentManager(_helixResourceManager, _leadControllerManager);
       _realtimeSegmentsManager.start(_controllerMetrics);
     } else {
       LOGGER.info("Realtime tables with High Level consumers will NOT be 
supported");
@@ -288,8 +275,10 @@ public class ControllerStarter {
     // Setting up periodic tasks
     List<PeriodicTask> controllerPeriodicTasks = 
setupControllerPeriodicTasks();
     LOGGER.info("Init controller periodic tasks scheduler");
-    _controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
-    _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, 
_controllerLeadershipManager);
+    _periodicTaskScheduler = new PeriodicTaskScheduler();
+    _periodicTaskScheduler.init(controllerPeriodicTasks);
+
+    _periodicTaskScheduler.start();
 
     LOGGER.info("Registering rebalance segments factory");
     _helixResourceManager
@@ -327,7 +316,7 @@ public class ControllerStarter {
         bind(_controllerMetrics).to(ControllerMetrics.class);
         bind(accessControlFactory).to(AccessControlFactory.class);
         
bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
-        
bind(_controllerLeadershipManager).to(ControllerLeadershipManager.class);
+        bind(_leadControllerManager).to(LeadControllerManager.class);
       }
     });
 
@@ -433,24 +422,29 @@ public class ControllerStarter {
   protected List<PeriodicTask> setupControllerPeriodicTasks() {
     LOGGER.info("Setting up periodic tasks");
     List<PeriodicTask> periodicTasks = new ArrayList<>();
-    _taskManager = new PinotTaskManager(_helixTaskResourceManager, 
_helixResourceManager, _config, _controllerMetrics);
+    _taskManager =
+        new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, 
_leadControllerManager, _config,
+            _controllerMetrics);
     periodicTasks.add(_taskManager);
-    _retentionManager = new RetentionManager(_helixResourceManager, _config, 
_controllerMetrics);
+    _retentionManager =
+        new RetentionManager(_helixResourceManager, _leadControllerManager, 
_config, _controllerMetrics);
     periodicTasks.add(_retentionManager);
     _offlineSegmentIntervalChecker =
-        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new 
ValidationMetrics(_metricsRegistry),
-            _controllerMetrics);
+        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, 
_leadControllerManager,
+            new ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_offlineSegmentIntervalChecker);
     _realtimeSegmentValidationManager =
-        new RealtimeSegmentValidationManager(_config, _helixResourceManager, 
_pinotLLCRealtimeSegmentManager,
-            new ValidationMetrics(_metricsRegistry), _controllerMetrics);
+        new RealtimeSegmentValidationManager(_config, _helixResourceManager, 
_leadControllerManager,
+            _pinotLLCRealtimeSegmentManager, new 
ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_realtimeSegmentValidationManager);
     _brokerResourceValidationManager =
-        new BrokerResourceValidationManager(_config, _helixResourceManager, 
_controllerMetrics);
+        new BrokerResourceValidationManager(_config, _helixResourceManager, 
_leadControllerManager, _controllerMetrics);
     periodicTasks.add(_brokerResourceValidationManager);
-    _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, 
_config, _controllerMetrics);
+    _segmentStatusChecker =
+        new SegmentStatusChecker(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_segmentStatusChecker);
-    _realtimeSegmentRelocator = new 
RealtimeSegmentRelocator(_helixResourceManager, _config, _controllerMetrics);
+    _realtimeSegmentRelocator =
+        new RealtimeSegmentRelocator(_helixResourceManager, 
_leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_realtimeSegmentRelocator);
 
     return periodicTasks;
@@ -478,9 +472,10 @@ public class ControllerStarter {
 
   private void stopPinotController() {
     try {
-      // Stopping ControllerLeadershipManager has to be done before stopping 
HelixResourceManager.
-      LOGGER.info("Stopping controller leadership manager");
-      _controllerLeadershipManager.stop();
+      // Stopping periodic tasks has to be done before stopping 
HelixResourceManager.
+      // Stop controller periodic task.
+      LOGGER.info("Stopping controller periodic tasks");
+      _periodicTaskScheduler.stop();
 
       // Stop PinotLLCSegmentManager before stopping Jersey API. It is 
possible that stopping Jersey API
       // may interrupt the handlers waiting on an I/O.
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
new file mode 100644
index 0000000..3ec507e
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller;
+
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.statemodel.LeadControllerChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+
+
+public class LeadControllerManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LeadControllerManager.class);
+
+  private final LeadControllerChecker _leadControllerChecker;
+  private PinotHelixResourceManager _pinotHelixResourceManager;
+
+  public LeadControllerManager() {
+    _leadControllerChecker = new LeadControllerChecker();
+  }
+
+  public void registerResourceManager(PinotHelixResourceManager 
pinotHelixResourceManager) {
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+  }
+
+  /**
+   * Check whether the current controller is the leader for the given table. 
Return true if current controller is the leader for this table.
+   * Otherwise check whether the current controller is helix leader.
+   * @param tableName table name with/without table type.
+   */
+  public boolean isLeaderForTable(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    int partitionIndex = HelixHelper.getHashCodeForTable(rawTableName) % 
NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+    if (_leadControllerChecker.isPartitionLeader(partitionIndex)) {
+      return true;
+    } else {
+      return isHelixLeader();
+    }
+  }
+
+  public synchronized void addPartitionLeader(String partitionName) {
+    _leadControllerChecker.addPartitionLeader(partitionName);
+  }
+
+  public synchronized void removePartitionLeader(String partitionName) {
+    _leadControllerChecker.removePartitionLeader(partitionName);
+  }
+
+  private boolean isHelixLeader() {
+    return _pinotHelixResourceManager != null && 
_pinotHelixResourceManager.isHelixLeader();
+  }
+
+  public void onLeadControllerChange() {
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadershipChangeSubscriber.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadershipChangeSubscriber.java
deleted file mode 100644
index d2c646f..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadershipChangeSubscriber.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller;
-
-/**
- * Interface for a subscriber to the {@link ControllerLeadershipManager}
- */
-public interface LeadershipChangeSubscriber {
-
-  /**
-   * Callback to invoke on becoming leader
-   */
-  void onBecomingLeader();
-
-  /**
-   * Callback to invoke on losing leadership
-   */
-  void onBecomingNonLeader();
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 5259d9f..bd4b8a0 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -57,7 +57,6 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -70,7 +69,7 @@ import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.access.AccessControl;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.upload.SegmentValidator;
@@ -117,7 +116,7 @@ public class PinotSegmentUploadRestletResource {
   AccessControlFactory _accessControlFactory;
 
   @Inject
-  ControllerLeadershipManager _controllerLeadershipManager;
+  LeadControllerManager _leadControllerManager;
 
   @GET
   @Produces(MediaType.APPLICATION_JSON)
@@ -325,7 +324,7 @@ public class PinotSegmentUploadRestletResource {
       // Validate segment
       SegmentValidatorResponse segmentValidatorResponse =
           new SegmentValidator(_pinotHelixResourceManager, _controllerConf, 
_executor, _connectionManager,
-              _controllerMetrics, _controllerLeadershipManager)
+              _controllerMetrics, _leadControllerManager)
               .validateSegment(rawTableName, segmentMetadata, tempSegmentDir);
 
       // Zk operations
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
index 39a9657..42c7cf8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.time.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import 
org.apache.pinot.controller.api.resources.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
@@ -55,17 +55,17 @@ public class SegmentValidator {
   private final Executor _executor;
   private final HttpConnectionManager _connectionManager;
   private final ControllerMetrics _controllerMetrics;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager, 
ControllerConf controllerConf,
       Executor executor, HttpConnectionManager connectionManager, 
ControllerMetrics controllerMetrics,
-      ControllerLeadershipManager controllerLeadershipManager) {
+      LeadControllerManager leadControllerManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _controllerConf = controllerConf;
     _executor = executor;
     _connectionManager = connectionManager;
     _controllerMetrics = controllerMetrics;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
   public SegmentValidatorResponse validateSegment(String rawTableName, 
SegmentMetadata segmentMetadata,
@@ -135,7 +135,7 @@ public class SegmentValidator {
         new TableSizeReader(_executor, _connectionManager, _controllerMetrics, 
_pinotHelixResourceManager);
     StorageQuotaChecker quotaChecker =
         new StorageQuotaChecker(offlineTableConfig, tableSizeReader, 
_controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     return quotaChecker.isSegmentStorageWithinQuota(segmentFile, 
metadata.getName(),
         _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index f40e2b3..9261420 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.slf4j.Logger;
@@ -56,10 +57,11 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
    * @param pinotHelixResourceManager The resource checker used to interact 
with Helix
    * @param config The controller configuration object
    */
-  public SegmentStatusChecker(PinotHelixResourceManager 
pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public SegmentStatusChecker(PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics) {
     super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
-        config.getStatusCheckerInitialDelayInSeconds(), 
pinotHelixResourceManager, controllerMetrics);
+        config.getStatusCheckerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
 
     _waitForPushTimeSeconds = 
config.getStatusCheckerWaitForPushTimeInSeconds();
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a4d4d0b..cb68304 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -53,7 +53,6 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.examples.MasterSlaveStateModelFactory;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
@@ -99,6 +98,7 @@ import 
org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvid
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.retry.RetryPolicy;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.pojos.Instance;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy;
@@ -106,6 +106,7 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy
 import 
org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy;
 import 
org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyEnum;
 import 
org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyFactory;
+import 
org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
@@ -144,6 +145,7 @@ public class PinotHelixResourceManager {
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private RebalanceSegmentStrategyFactory _rebalanceSegmentStrategyFactory;
   private TableRebalancer _tableRebalancer;
+  private LeadControllerManager _leadControllerManager;
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String 
helixClusterName,
       @Nonnull String controllerInstanceId, String dataDir, long 
externalViewOnlineToOfflineTimeoutMillis,
@@ -170,6 +172,8 @@ public class PinotHelixResourceManager {
    * Create Helix cluster if needed, and then start a Pinot controller 
instance.
    */
   public synchronized void start() {
+    // LeadControllerManager needs to be initialized before registering to 
Helix participant.
+    _leadControllerManager = new LeadControllerManager();
     _helixZkManager = registerAndConnectAsHelixParticipant();
     _helixAdmin = _helixZkManager.getClusterManagmentTool();
     _propertyStore = _helixZkManager.getHelixPropertyStore();
@@ -189,6 +193,7 @@ public class PinotHelixResourceManager {
     _segmentDeletionManager = new SegmentDeletionManager(_dataDir, 
_helixAdmin, _helixClusterName, _propertyStore);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, 
_isSingleTenantCluster);
     _tableRebalancer = new TableRebalancer(_helixZkManager, _helixAdmin, 
_helixClusterName);
+    _leadControllerManager.registerResourceManager(this);
   }
 
   /**
@@ -255,6 +260,16 @@ public class PinotHelixResourceManager {
     return _propertyStore;
   }
 
+
+  /**
+   * Get lead controller manager.
+   *
+   * @return lead controller manager
+   */
+  public LeadControllerManager getLeadControllerManager() {
+    return _leadControllerManager;
+  }
+
   /**
    * Register and connect to Helix cluster as PARTICIPANT role.
    */
@@ -263,8 +278,8 @@ public class PinotHelixResourceManager {
         HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, 
InstanceType.PARTICIPANT, _helixZkURL);
 
     // Registers Master-Slave state model to state machine engine, which is 
for calculating participant assignment in lead controller resource.
-    helixManager.getStateMachineEngine()
-        .registerStateModelFactory(MasterSlaveSMD.name, new 
MasterSlaveStateModelFactory());
+    
helixManager.getStateMachineEngine().registerStateModelFactory(MasterSlaveSMD.name,
+        new 
LeadControllerResourceMasterSlaveStateModelFactory(_leadControllerManager));
 
     try {
       helixManager.connect();
@@ -2372,6 +2387,13 @@ public class PinotHelixResourceManager {
     return endpointToInstance;
   }
 
+  public boolean isHelixLeader() {
+    PropertyKey propertyKey = _keyBuilder.controllerLeader();
+    LiveInstance liveInstance = _helixDataAccessor.getProperty(propertyKey);
+    String helixLeaderInstanceId = liveInstance.getInstanceName();
+    return 
_instanceId.equals(CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + 
helixLeaderInstanceId);
+  }
+
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 7a09c00..649b966 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.config.TableTaskConfig;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
 import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
@@ -51,10 +52,11 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
 
   public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
-      PinotHelixResourceManager helixResourceManager, ControllerConf 
controllerConf,
-      ControllerMetrics controllerMetrics) {
+      PinotHelixResourceManager helixResourceManager, LeadControllerManager 
leadControllerManager,
+      ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
     super("PinotTaskManager", 
controllerConf.getTaskManagerFrequencyInSeconds(),
-        controllerConf.getPinotTaskManagerInitialDelaySeconds(), 
helixResourceManager, controllerMetrics);
+        controllerConf.getPinotTaskManagerInitialDelaySeconds(), 
helixResourceManager, leadControllerManager,
+        controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, 
helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 7e764f3..a782385 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -23,6 +23,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
 import org.slf4j.Logger;
@@ -40,12 +41,15 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerPeriodicTask.class);
 
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
+  protected final LeadControllerManager _leadControllerManager;
   protected final ControllerMetrics _controllerMetrics;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, 
long initialDelayInSeconds,
-      PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics 
controllerMetrics) {
+      PinotHelixResourceManager pinotHelixResourceManager, 
LeadControllerManager leadControllerManager,
+      ControllerMetrics controllerMetrics) {
     super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
     _pinotHelixResourceManager = pinotHelixResourceManager;
+    _leadControllerManager = leadControllerManager;
     _controllerMetrics = controllerMetrics;
   }
 
@@ -75,6 +79,10 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
         LOGGER.info("Task: {} is stopped, early terminate the task", 
_taskName);
         break;
       }
+      // Check if current controller is the leader for this table.
+      if (!_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+        continue;
+      }
       try {
         processTable(tableNameWithType, context);
       } catch (Exception e) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
deleted file mode 100644
index 8a2b63c..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.helix.core.periodictask;
-
-import java.util.List;
-import org.apache.pinot.controller.ControllerLeadershipManager;
-import org.apache.pinot.controller.LeadershipChangeSubscriber;
-import org.apache.pinot.core.periodictask.PeriodicTask;
-import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * A {@link PeriodicTaskScheduler} for scheduling {@link 
ControllerPeriodicTask} according to controller leadership changes.
- * Any controllerPeriodicTasks provided during initialization, will run only 
on leadership, and stop when leadership lost
- */
-public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler 
implements LeadershipChangeSubscriber {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerPeriodicTaskScheduler.class);
-
-  /**
-   * Initialize the {@link ControllerPeriodicTaskScheduler} with the list of 
{@link ControllerPeriodicTask} created at startup
-   * This is called only once during controller startup
-   * @param controllerPeriodicTasks
-   * @param controllerLeadershipManager
-   */
-  public void init(List<PeriodicTask> controllerPeriodicTasks, 
ControllerLeadershipManager controllerLeadershipManager) {
-    super.init(controllerPeriodicTasks);
-    
controllerLeadershipManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(),
 this);
-  }
-
-  @Override
-  public void onBecomingLeader() {
-    LOGGER.info("Received callback for controller leadership gain. Starting 
PeriodicTaskScheduler.");
-    start();
-  }
-
-  @Override
-  public void onBecomingNonLeader() {
-    LOGGER.info("Received callback for controller leadership loss. Stopping 
PeriodicTaskScheduler.");
-    stop();
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c833261..0982c30 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -66,7 +66,7 @@ import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -123,13 +123,13 @@ public class PinotLLCRealtimeSegmentManager {
   private final TableConfigCache _tableConfigCache;
   private final StreamPartitionAssignmentGenerator 
_streamPartitionAssignmentGenerator;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
 
   public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager 
helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager 
controllerLeadershipManager) {
+      ControllerMetrics controllerMetrics, LeadControllerManager 
leadControllerManager) {
     _helixAdmin = helixResourceManager.getHelixAdmin();
     _helixManager = helixResourceManager.getHelixZkManager();
     _propertyStore = helixResourceManager.getPropertyStore();
@@ -145,7 +145,7 @@ public class PinotLLCRealtimeSegmentManager {
     _tableConfigCache = new TableConfigCache(_propertyStore);
     _streamPartitionAssignmentGenerator = new 
StreamPartitionAssignmentGenerator(_helixManager);
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
 
@@ -182,8 +182,8 @@ public class PinotLLCRealtimeSegmentManager {
     LOGGER.info("Wait completed: Number of completing segments = {}", 
_numCompletingSegments.get());
   }
 
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 
   protected boolean isConnected() {
@@ -341,10 +341,10 @@ public class PinotLLCRealtimeSegmentManager {
     URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", 
tableDirURI.toString(), segmentName));
     PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
 
-    if (!isConnected() || !isLeader()) {
+    if (!isConnected() || !isLeader(tableName)) {
       // We can potentially log a different value than what we saw ....
       LOGGER.warn("Lost leadership while committing segment file {}, {} for 
table {}: isLeader={}, isConnected={}",
-          segmentName, segmentLocation, tableName, isLeader(), isConnected());
+          segmentName, segmentLocation, tableName, isLeader(tableName), 
isConnected());
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return false;
     }
@@ -540,17 +540,17 @@ public class PinotLLCRealtimeSegmentManager {
     final String oldZnodePath =
         
ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, 
committingSegmentNameStr);
 
-    if (!isConnected() || !isLeader()) {
+    if (!isConnected() || !isLeader(realtimeTableName)) {
       // We can potentially log a different value than what we saw ....
       LOGGER.warn("Lost leadership while committing segment metadata for {} 
for table {}: isLeader={}, isConnected={}",
-          committingSegmentNameStr, realtimeTableName, isLeader(), 
isConnected());
+          committingSegmentNameStr, realtimeTableName, 
isLeader(realtimeTableName), isConnected());
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return false;
     }
     boolean success = writeSegmentToPropertyStore(oldZnodePath, oldZnRecord, 
realtimeTableName, stat.getVersion());
     if (!success) {
       LOGGER.warn("Fail to write old segment to property store for {} for 
table {}: isLeader={}, isConnected={}",
-          committingSegmentNameStr, realtimeTableName, isLeader(), 
isConnected());
+          committingSegmentNameStr, realtimeTableName, 
isLeader(realtimeTableName), isConnected());
     }
     return success;
   }
@@ -599,11 +599,11 @@ public class PinotLLCRealtimeSegmentManager {
         
ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, 
newSegmentNameStr);
 
     if (!isNewTableSetup) {
-      if (!isLeader() || !isConnected()) {
+      if (!isLeader(realtimeTableName) || !isConnected()) {
         // We can potentially log a different value than what we saw ....
         LOGGER.warn(
             "Lost leadership while committing new segment metadata for {} for 
table {}: isLeader={}, isConnected={}",
-            newSegmentNameStr, rawTableName, isLeader(), isConnected());
+            newSegmentNameStr, rawTableName, isLeader(realtimeTableName), 
isConnected());
         
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
         return false;
       }
@@ -612,7 +612,7 @@ public class PinotLLCRealtimeSegmentManager {
     boolean success = writeSegmentToPropertyStore(newZnodePath, newZnRecord, 
realtimeTableName);
     if (!success) {
       LOGGER.warn("Fail to write new segment to property store for {} for 
table {}: isLeader={}, isConnected={}",
-          newSegmentNameStr, rawTableName, isLeader(), isConnected());
+          newSegmentNameStr, rawTableName, isLeader(realtimeTableName), 
isConnected());
     }
     return success;
   }
@@ -1347,8 +1347,4 @@ public class PinotLLCRealtimeSegmentManager {
 
     return idealState;
   }
-
-  public ControllerLeadershipManager getControllerLeadershipManager() {
-    return _controllerLeadershipManager;
-  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
index 331a4a0..1f115fe 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
@@ -52,8 +52,7 @@ import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
-import org.apache.pinot.controller.ControllerLeadershipManager;
-import org.apache.pinot.controller.LeadershipChangeSubscriber;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.core.query.utils.Pair;
@@ -66,7 +65,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Realtime segment manager, which assigns realtime segments to server 
instances so that they can consume from the stream.
  */
-public class PinotRealtimeSegmentManager implements HelixPropertyListener, 
IZkChildListener, IZkDataListener, LeadershipChangeSubscriber {
+public class PinotRealtimeSegmentManager implements HelixPropertyListener, 
IZkChildListener, IZkDataListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotRealtimeSegmentManager.class);
   private static final String TABLE_CONFIG = "/CONFIGS/TABLE";
   private static final String SEGMENTS_PATH = "/SEGMENTS";
@@ -80,12 +79,12 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
   private final PinotHelixResourceManager _pinotHelixResourceManager;
   private ZkClient _zkClient;
   private ControllerMetrics _controllerMetrics;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotManager,
-      ControllerLeadershipManager controllerLeadershipManager) {
+      LeadControllerManager leadControllerManager) {
     _pinotHelixResourceManager = pinotManager;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
     String clusterName = _pinotHelixResourceManager.getHelixClusterName();
     _propertyStorePath = 
PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
     _tableConfigPath = _propertyStorePath + TABLE_CONFIG;
@@ -104,9 +103,6 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
     _zkClient.subscribeChildChanges(_tableConfigPath, this);
     _zkClient.subscribeDataChanges(_tableConfigPath, this);
 
-    // Subscribe to leadership changes
-    
_controllerLeadershipManager.subscribe(PinotLLCRealtimeSegmentManager.class.getName(),
 this);
-
     // Setup change listeners for already existing tables, if any.
     processPropertyStoreChange(_tableConfigPath);
   }
@@ -128,6 +124,11 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
         continue;
       }
 
+      // Skip if the current controller isn't the leader of this table
+      if (!_leadControllerManager.isLeaderForTable(realtimeTableName)) {
+        continue;
+      }
+
       StreamConfig metadata = new 
StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
       if (metadata.hasHighLevelConsumerType()) {
         idealStateMap.put(realtimeTableName, 
_pinotHelixResourceManager.getHelixAdmin()
@@ -273,10 +274,6 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
     }
   }
 
-  private boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
-  }
-
   @Override
   public synchronized void onDataChange(String path) {
     LOGGER.info("PinotRealtimeSegmentManager.onDataChange: {}", path);
@@ -300,13 +297,9 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
       LOGGER.info("Processing change notification for path: {}", path);
       refreshWatchers(path);
 
-      if (isLeader()) {
-        if (path.matches(REALTIME_SEGMENT_PROPERTY_STORE_PATH_PATTERN) || path
-            .matches(REALTIME_TABLE_CONFIG_PROPERTY_STORE_PATH_PATTERN) || 
path.equals(CONTROLLER_LEADER_CHANGE)) {
-          assignRealtimeSegmentsToServerInstancesIfNecessary();
-        }
-      } else {
-        LOGGER.info("Not the leader of this cluster, ignoring realtime segment 
property store change.");
+      if (path.matches(REALTIME_SEGMENT_PROPERTY_STORE_PATH_PATTERN) || path
+          .matches(REALTIME_TABLE_CONFIG_PROPERTY_STORE_PATH_PATTERN) || 
path.equals(CONTROLLER_LEADER_CHANGE)) {
+        assignRealtimeSegmentsToServerInstancesIfNecessary();
       }
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing change for path {}", 
path, e);
@@ -415,14 +408,4 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
     LOGGER.info("PinotRealtimeSegmentManager.handleDataDeleted: {}", dataPath);
     processPropertyStoreChange(dataPath);
   }
-
-  @Override
-  public void onBecomingLeader() {
-    processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
-  }
-
-  @Override
-  public void onBecomingNonLeader() {
-    processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
-  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index a4da6b4..8935c6b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -29,7 +28,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -37,12 +35,13 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.SegmentName.SEPARATOR;
+
 
 /**
  * This is a singleton class in the controller that drives the state machines 
for segments that are in the
@@ -73,7 +72,7 @@ public class SegmentCompletionManager {
   private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap<>();
   private final PinotLLCRealtimeSegmentManager _segmentManager;
   private final ControllerMetrics _controllerMetrics;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
   private final Lock[] _fsmLocks;
   private static final int NUM_FSM_LOCKS = 20;
 
@@ -87,12 +86,12 @@ public class SegmentCompletionManager {
   // TODO keep some history of past committed segments so that we can avoid 
looking up PROPERTYSTORE if some server comes in late.
 
   public SegmentCompletionManager(HelixManager helixManager, 
PinotLLCRealtimeSegmentManager segmentManager,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager 
controllerLeadershipManager,
+      ControllerMetrics controllerMetrics, LeadControllerManager 
leadControllerManager,
       int segmentCommitTimeoutSeconds) {
     _helixManager = helixManager;
     _segmentManager = segmentManager;
     _controllerMetrics = controllerMetrics;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
     SegmentCompletionProtocol
         
.setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds,
 TimeUnit.SECONDS));
     _fsmLocks = new Lock[NUM_FSM_LOCKS];
@@ -163,11 +162,12 @@ public class SegmentCompletionManager {
    * that it currently has (i.e. next offset that it will consume, if it 
continues to consume).
    */
   public SegmentCompletionProtocol.Response 
segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final String stopReason = reqParams.getReason();
     final long offset = reqParams.getOffset();
@@ -201,11 +201,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentCommitStart(
       final SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
@@ -225,11 +226,12 @@ public class SegmentCompletionManager {
   }
 
   public SegmentCompletionProtocol.Response extendBuildTime(final 
SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     final int extTimeSec = reqParams.getExtraTimeSec();
@@ -256,11 +258,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentStoppedConsuming(
       SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     final String reason = reqParams.getReason();
@@ -292,11 +295,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response 
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
       boolean success, boolean isSplitCommit, CommittingSegmentDescriptor 
committingSegmentDescriptor) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
     SegmentCompletionFSM fsm = null;
     SegmentCompletionProtocol.Response response = 
SegmentCompletionProtocol.RESP_FAILED;
@@ -352,6 +356,7 @@ public class SegmentCompletionManager {
     State _state = State.HOLDING;   // Typically start off in HOLDING state.
     final long _startTimeMs;
     private final LLCSegmentName _segmentName;
+    private final String _realtimeTableName;
     private final int _numReplicas;
     private final Set<String> _excludedServerStateMap;
     private final Map<String, Long> _commitStateMap;
@@ -394,6 +399,7 @@ public class SegmentCompletionManager {
     private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
         SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName, int numReplicas) {
       _segmentName = segmentName;
+      _realtimeTableName = _segmentName.getTableName();
       _numReplicas = numReplicas;
       _segmentManager = segmentManager;
       _commitStateMap = new HashMap<>(_numReplicas);
@@ -403,8 +409,8 @@ public class SegmentCompletionManager {
       _maxTimeToPickWinnerMs = _startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
       _maxTimeToNotifyWinnerMs = _startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
       long initialCommitTimeMs =
-          MAX_TIME_TO_NOTIFY_WINNER_MS + 
_segmentManager.getCommitTimeoutMS(_segmentName.getTableName());
-      Long savedCommitTime = 
_segmentCompletionManager._commitTimeMap.get(segmentName.getTableName());
+          MAX_TIME_TO_NOTIFY_WINNER_MS + 
_segmentManager.getCommitTimeoutMS(_realtimeTableName);
+      Long savedCommitTime = 
_segmentCompletionManager._commitTimeMap.get(_realtimeTableName);
       if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) {
         initialCommitTimeMs = savedCommitTime;
       }
@@ -413,7 +419,7 @@ public class SegmentCompletionManager {
         // The table has a really high value configured for max commit time. 
Set it to a higher value than default
         // and go from there.
         LOGGER.info("Configured max commit time {}s too high for table {}, 
changing to {}s", initialCommitTimeMs / 1000,
-            segmentName.getTableName(), 
MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
+            _realtimeTableName, MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
         initialCommitTimeMs = MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 1000;
       }
       _initialCommitTimeMs = initialCommitTimeMs;
@@ -681,14 +687,14 @@ public class SegmentCompletionManager {
     private SegmentCompletionProtocol.Response abortAndReturnHold(long now, 
String instanceId, long offset) {
       _state = State.ABORTED;
       _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_segmentName.getTableName(), 
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+          .addMeteredTableValue(_realtimeTableName, 
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
       return hold(instanceId, offset);
     }
 
     private SegmentCompletionProtocol.Response abortAndReturnFailed() {
       _state = State.ABORTED;
       _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_segmentName.getTableName(), 
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+          .addMeteredTableValue(_realtimeTableName, 
ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
       return SegmentCompletionProtocol.RESP_FAILED;
     }
 
@@ -1117,7 +1123,7 @@ public class SegmentCompletionManager {
   }
 
   @VisibleForTesting
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index ea5b05b..2c2c017 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.time.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
@@ -55,10 +56,11 @@ import org.slf4j.LoggerFactory;
 public class RealtimeSegmentRelocator extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
 
-  public RealtimeSegmentRelocator(PinotHelixResourceManager 
pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public RealtimeSegmentRelocator(PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentRelocator", 
getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
-        config.getRealtimeSegmentRelocationInitialDelayInSeconds(), 
pinotHelixResourceManager, controllerMetrics);
+        config.getRealtimeSegmentRelocationInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
   }
 
   @Override
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 7d5182d..fecb4ac 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import 
org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -54,10 +55,11 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
 
   private final int _deletedSegmentsRetentionInDays;
 
-  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, 
ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics) {
     super("RetentionManager", 
config.getRetentionControllerFrequencyInSeconds(),
-        config.getRetentionManagerInitialDelayInSeconds(), 
pinotHelixResourceManager, controllerMetrics);
+        config.getRetentionManagerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
     _deletedSegmentsRetentionInDays = 
config.getDeletedSegmentsRetentionInDays();
 
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, 
deletedSegmentsRetentionInDays: {}",
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
new file mode 100644
index 0000000..d2888ed
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.statemodel;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+
+
+public class LeadControllerChecker {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LeadControllerChecker.class);
+
+  private Map<Integer, Integer> _partitionCache;
+
+  public LeadControllerChecker() {
+    _partitionCache = new ConcurrentHashMap<>();
+  }
+
+  public void addPartitionLeader(String partitionName) {
+    LOGGER.info("Add Partition: {} to LeadControllerChecker", partitionName);
+    int partitionIndex = 
Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+    _partitionCache.put(partitionIndex, partitionIndex);
+  }
+
+  public void removePartitionLeader(String partitionName) {
+    LOGGER.info("Remove Partition: {} from LeadControllerChecker", 
partitionName);
+    int partitionIndex = 
Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+    _partitionCache.remove(partitionIndex);
+  }
+
+  public boolean isPartitionLeader(int partitionIndex) {
+    Preconditions
+        .checkArgument(partitionIndex >= 0 && partitionIndex < 
NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE,
+            "Invalid partition index: " + partitionIndex);
+    return _partitionCache.containsKey(partitionIndex);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
new file mode 100644
index 0000000..d063c58
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.statemodel;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LeadControllerResourceMasterSlaveStateModelFactory extends 
MasterSlaveStateModelFactory {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LeadControllerResourceMasterSlaveStateModelFactory.class);
+
+  private final LeadControllerManager _leadControllerManager;
+
+  public 
LeadControllerResourceMasterSlaveStateModelFactory(LeadControllerManager 
leadControllerManager) {
+    super();
+    _leadControllerManager = leadControllerManager;
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String 
partitionName) {
+    MasterSlaveStateModel stateModel = new 
LeadControllerResourceMasterSlaveStateModel();
+    stateModel.setPartitionName(partitionName);
+    return stateModel;
+  }
+
+  public class LeadControllerResourceMasterSlaveStateModel extends 
MasterSlaveStateModel {
+    @Override
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext 
context) {
+      super.onBecomeSlaveFromMaster(message, context);
+      String partitionName = message.getPartitionName();
+      _leadControllerManager.addPartitionLeader(partitionName);
+      _leadControllerManager.onLeadControllerChange();
+    }
+
+    @Override
+    public void onBecomeMasterFromSlave(Message message, NotificationContext 
context) {
+      super.onBecomeMasterFromSlave(message, context);
+      String partitionName = message.getPartitionName();
+      _leadControllerManager.removePartitionLeader(partitionName);
+      _leadControllerManager.onLeadControllerChange();
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 5748d3c..a82a161 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -24,6 +24,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.slf4j.Logger;
@@ -37,9 +38,9 @@ public class BrokerResourceValidationManager extends 
ControllerPeriodicTask<Brok
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BrokerResourceValidationManager.class);
 
   public BrokerResourceValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
-      ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics) {
     super("BrokerResourceValidationManager", 
config.getBrokerResourceValidationFrequencyInSeconds(),
-        config.getBrokerResourceValidationInitialDelayInSeconds(), 
pinotHelixResourceManager, controllerMetrics);
+        config.getBrokerResourceValidationInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager, controllerMetrics);
   }
 
   @Override
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 7f19395..0eeadc7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.util.SegmentIntervalUtils;
@@ -50,9 +51,11 @@ public class OfflineSegmentIntervalChecker extends 
ControllerPeriodicTask<Void>
   private final ValidationMetrics _validationMetrics;
 
   public OfflineSegmentIntervalChecker(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
-      ValidationMetrics validationMetrics, ControllerMetrics 
controllerMetrics) {
+      LeadControllerManager leadControllerManager, ValidationMetrics 
validationMetrics,
+      ControllerMetrics controllerMetrics) {
     super("OfflineSegmentIntervalChecker", 
config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
-        config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), 
pinotHelixResourceManager, controllerMetrics);
+        config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _validationMetrics = validationMetrics;
   }
 
@@ -88,12 +91,12 @@ public class OfflineSegmentIntervalChecker extends 
ControllerPeriodicTask<Void>
         if (SegmentIntervalUtils.isValidInterval(timeInterval)) {
           segmentIntervals.add(timeInterval);
         } else {
-          numSegmentsWithInvalidIntervals ++;
+          numSegmentsWithInvalidIntervals++;
         }
       }
       if (numSegmentsWithInvalidIntervals > 0) {
-        LOGGER.warn("Table: {} has {} segments with invalid interval", 
offlineTableName,
-            numSegmentsWithInvalidIntervals);
+        LOGGER
+            .warn("Table: {} has {} segments with invalid interval", 
offlineTableName, numSegmentsWithInvalidIntervals);
       }
       Duration frequency = 
SegmentIntervalUtils.convertToDuration(validationConfig.getSegmentPushFrequency());
       numMissingSegments = computeNumMissingSegments(segmentIntervals, 
frequency);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 5eb5a6f..88ad642 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
@@ -53,10 +54,11 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
 
   public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
-      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, 
ValidationMetrics validationMetrics,
-      ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ValidationMetrics validationMetrics, ControllerMetrics 
controllerMetrics) {
     super("RealtimeSegmentValidationManager", 
config.getRealtimeSegmentValidationFrequencyInSeconds(),
-        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), 
pinotHelixResourceManager, controllerMetrics);
+        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), 
pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..c78c756 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -30,7 +30,7 @@ import 
org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.DataSize;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.slf4j.Logger;
@@ -48,16 +48,16 @@ public class StorageQuotaChecker {
   private final TableConfig _tableConfig;
   private final ControllerMetrics _controllerMetrics;
   private final PinotHelixResourceManager _pinotHelixResourceManager;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   public StorageQuotaChecker(TableConfig tableConfig, TableSizeReader 
tableSizeReader,
       ControllerMetrics controllerMetrics, PinotHelixResourceManager 
pinotHelixResourceManager,
-      ControllerLeadershipManager controllerLeadershipManager) {
+      LeadControllerManager leadControllerManager) {
     _tableConfig = tableConfig;
     _tableSizeReader = tableSizeReader;
     _controllerMetrics = controllerMetrics;
     _pinotHelixResourceManager = pinotHelixResourceManager;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
   public static class QuotaCheckerResponse {
@@ -157,7 +157,7 @@ public class StorageQuotaChecker {
         tableNameWithType, tableSubtypeSize.estimatedSizeInBytes, 
tableSubtypeSize.reportedSizeInBytes);
 
     // Only emit the real percentage of storage quota usage by lead 
controller, otherwise emit 0L.
-    if (isLeader() && allowedStorageBytes != 0L) {
+    if (isLeader(tableNameWithType) && allowedStorageBytes != 0L) {
       long existingStorageQuotaUtilization = 
tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes;
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
           existingStorageQuotaUtilization);
@@ -213,7 +213,7 @@ public class StorageQuotaChecker {
     }
   }
 
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index d91e612..511c349 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -95,9 +95,9 @@ public class PinotControllerModeTest extends ControllerTest {
     _controllerStarter = null;
   }
 
-  // TODO: enable it after removing ControllerLeadershipManager which requires 
both CONTROLLER and PARTICIPANT
+  // TODO: enable it after removing HelixControllerLeadershipManager which 
requires both CONTROLLER and PARTICIPANT
   //       HelixManager
-  @Test(enabled = false)
+  @Test
   public void testPinotOnlyController()
       throws Exception {
     config.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
@@ -123,8 +123,13 @@ public class PinotControllerModeTest extends 
ControllerTest {
     TestUtils.waitForCondition(aVoid -> helixControllerManager.isConnected(), 
TIMEOUT_IN_MS,
         "Failed to start " + config2.getControllerMode() + " controller in " + 
TIMEOUT_IN_MS + "ms.");
 
-    // Enable the lead controller resource.
-    helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+    try {
+      // Enable the lead controller resource.
+      helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+      Assert.fail("Enabling resource before starting the 1st Pinot controller 
should fail.");
+    } catch (Exception e) {
+      // Expected.
+    }
 
     // Starting a pinot only controller.
     ControllerConf config3 = getDefaultControllerConfiguration();
@@ -141,6 +146,9 @@ public class PinotControllerModeTest extends ControllerTest 
{
         TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " 
controller in " + TIMEOUT_IN_MS + "ms.");
     Assert.assertEquals(firstPinotOnlyController.getControllerMode(), 
ControllerConf.ControllerMode.PINOT_ONLY);
 
+    // Enable the lead controller resource.
+    helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
     // Start a second Pinot only controller.
     ControllerConf config4 = getDefaultControllerConfiguration();
     config4.setHelixClusterName(getHelixClusterName());
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index c235483..066040e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -35,10 +35,12 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -46,6 +48,7 @@ import static org.mockito.Mockito.when;
 public class SegmentStatusCheckerTest {
   private SegmentStatusChecker segmentStatusChecker;
   private PinotHelixResourceManager helixResourceManager;
+  private LeadControllerManager leadControllerManager;
   private MetricsRegistry metricsRegistry;
   private ControllerMetrics controllerMetrics;
   private ControllerConf config;
@@ -84,9 +87,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, 
config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -146,9 +154,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, 
config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -222,9 +235,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(0);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, 
config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -264,9 +282,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, 
config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -291,9 +314,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, 
config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
@@ -349,9 +377,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, 
config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -390,9 +423,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, 
config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -429,9 +467,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, 
config, controllerMetrics);
     // verify state before test
     
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
 0);
     // update metrics
@@ -463,9 +506,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, 
config, controllerMetrics);
     // verify state before test
     
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
 0);
     // update metrics
@@ -508,9 +556,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      
when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, 
config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index a928922..dcd5469 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -27,10 +27,12 @@ import java.util.stream.IntStream;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -43,6 +45,7 @@ public class ControllerPeriodicTaskTest {
   private final ControllerConf _controllerConf = new ControllerConf();
 
   private final PinotHelixResourceManager _resourceManager = 
mock(PinotHelixResourceManager.class);
+  private final LeadControllerManager _leadControllerManager = 
mock(LeadControllerManager.class);
   private final ControllerMetrics _controllerMetrics = new 
ControllerMetrics(new MetricsRegistry());
   private final AtomicBoolean _startTaskCalled = new AtomicBoolean();
   private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
@@ -52,7 +55,8 @@ public class ControllerPeriodicTaskTest {
   private static final String TASK_NAME = "TestTask";
 
   private final ControllerPeriodicTask _task = new 
ControllerPeriodicTask<Void>(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
-      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), 
_resourceManager, _controllerMetrics) {
+      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), 
_resourceManager, _leadControllerManager,
+      _controllerMetrics) {
 
     @Override
     protected void setUpTask() {
@@ -81,6 +85,7 @@ public class ControllerPeriodicTaskTest {
     List<String> tables = new ArrayList<>(_numTables);
     IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " 
_OFFLINE"));
     when(_resourceManager.getAllTables()).thenReturn(tables);
+    
when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
   }
 
   private void resetState() {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 04e2b7b..af35420 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -56,7 +56,7 @@ import 
org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -66,7 +66,6 @@ import 
org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.stream.OffsetCriteria;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.filesystem.PinotFSFactory;
 import org.apache.zookeeper.data.Stat;
@@ -1323,7 +1322,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     protected FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager 
pinotHelixResourceManager,
         List<String> existingLLCSegments, ControllerMetrics controllerMetrics) 
{
       super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics,
-          new 
ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), 
controllerMetrics));
+          new LeadControllerManager());
 
       try {
         TableConfigCache mockCache = mock(TableConfigCache.class);
@@ -1513,7 +1512,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return IS_LEADER;
     }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index b05f9bf..22e317f 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -30,7 +30,7 @@ import 
org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.zookeeper.data.Stat;
@@ -40,7 +40,6 @@ import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.common.protocols.SegmentCompletionProtocol.ControllerResponseStatus;
 import static 
org.apache.pinot.common.protocols.SegmentCompletionProtocol.Request;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -1151,8 +1150,7 @@ public class SegmentCompletionTest {
 
     protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager 
pinotHelixResourceManager,
         ControllerMetrics controllerMetrics) {
-      super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics,
-          new 
ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), 
controllerMetrics));
+      super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics, new 
LeadControllerManager());
     }
 
     @Override
@@ -1210,7 +1208,7 @@ public class SegmentCompletionTest {
     protected MockSegmentCompletionManager(HelixManager helixManager, 
PinotLLCRealtimeSegmentManager segmentManager,
         boolean isLeader, ControllerMetrics controllerMetrics) {
       super(helixManager, segmentManager, controllerMetrics,
-          new ControllerLeadershipManager(helixManager, controllerMetrics),
+          new LeadControllerManager(),
           SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
       _isLeader = isLeader;
     }
@@ -1221,7 +1219,7 @@ public class SegmentCompletionTest {
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return _isLeader;
     }
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 8c5c8ec..9316062 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -34,12 +34,14 @@ import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.pinot.common.config.RealtimeTagConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -48,6 +50,7 @@ public class RealtimeSegmentRelocatorTest {
 
   private TestRealtimeSegmentRelocator _realtimeSegmentRelocator;
   private HelixManager _mockHelixManager;
+  private LeadControllerManager _leadControllerManager;
 
   private String[] serverNames;
   private String[] consumingServerNames;
@@ -69,10 +72,13 @@ public class RealtimeSegmentRelocatorTest {
     PinotHelixResourceManager mockPinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
     _mockHelixManager = mock(HelixManager.class);
     
when(mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(_mockHelixManager);
+    LeadControllerManager mockLeadControllerManager = 
mock(LeadControllerManager.class);
+    
when(mockLeadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     ControllerConf controllerConfig = new ControllerConf();
     ControllerMetrics controllerMetrics = new ControllerMetrics(new 
MetricsRegistry());
     _realtimeSegmentRelocator =
-        new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, 
controllerConfig, controllerMetrics);
+        new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, 
mockLeadControllerManager, controllerConfig,
+            controllerMetrics);
 
     final int maxInstances = 20;
     serverNames = new String[maxInstances];
@@ -268,9 +274,9 @@ public class RealtimeSegmentRelocatorTest {
 
     private Map<String, List<String>> tagToInstances;
 
-    public TestRealtimeSegmentRelocator(PinotHelixResourceManager 
pinotHelixResourceManager, ControllerConf config,
-        ControllerMetrics controllerMetrics) {
-      super(pinotHelixResourceManager, config, controllerMetrics);
+    public TestRealtimeSegmentRelocator(PinotHelixResourceManager 
pinotHelixResourceManager,
+        LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics) {
+      super(pinotHelixResourceManager, leadControllerManager, config, 
controllerMetrics);
       tagToInstances = new HashedMap();
     }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index d4adfe1..465a9af 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
@@ -84,6 +85,9 @@ public class RetentionManagerTest {
     PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
     setupPinotHelixResourceManager(tableConfig, removedSegments, 
pinotHelixResourceManager);
 
+    LeadControllerManager leadControllerManager = 
mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
     
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
@@ -91,7 +95,8 @@ public class RetentionManagerTest {
     ControllerMetrics controllerMetrics = new ControllerMetrics(new 
MetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
-    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
+    RetentionManager retentionManager =
+        new RetentionManager(pinotHelixResourceManager, leadControllerManager, 
conf, controllerMetrics);
     retentionManager.start();
     retentionManager.run();
 
@@ -210,11 +215,14 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, 
removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, 
pinotHelixResourceManager);
 
+    LeadControllerManager leadControllerManager = 
mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
     ControllerConf conf = new ControllerConf();
     ControllerMetrics controllerMetrics = new ControllerMetrics(new 
MetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
-    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
+    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, 
controllerMetrics);
     retentionManager.start();
     retentionManager.run();
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index e84a947..f8fb6ca 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.mockito.invocation.InvocationOnMock;
@@ -49,7 +49,7 @@ public class StorageQuotaCheckerTest {
   private TableConfig _tableConfig;
   private ControllerMetrics _controllerMetrics;
   private PinotHelixResourceManager _pinotHelixResourceManager;
-  private ControllerLeadershipManager _controllerLeadershipManager;
+  private LeadControllerManager _leadControllerManager;
   private QuotaConfig _quotaConfig;
   private SegmentsValidationAndRetentionConfig _validationConfig;
   private static final File TEST_DIR = new 
File(StorageQuotaCheckerTest.class.getName());
@@ -62,7 +62,7 @@ public class StorageQuotaCheckerTest {
     _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     _validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
     _pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
-    _controllerLeadershipManager = mock(ControllerLeadershipManager.class);
+    _leadControllerManager = mock(LeadControllerManager.class);
     when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
     when(_validationConfig.getReplicationNumber()).thenReturn(2);
     TEST_DIR.mkdirs();
@@ -78,10 +78,9 @@ public class StorageQuotaCheckerTest {
       throws InvalidConfigException {
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, 
_controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(null);
-    StorageQuotaChecker.QuotaCheckerResponse res =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
+    StorageQuotaChecker.QuotaCheckerResponse res = 
checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
     Assert.assertTrue(res.isSegmentWithinQuota);
   }
 
@@ -90,11 +89,10 @@ public class StorageQuotaCheckerTest {
       throws InvalidConfigException {
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, 
_controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
-    StorageQuotaChecker.QuotaCheckerResponse res =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
+    StorageQuotaChecker.QuotaCheckerResponse res = 
checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
     Assert.assertTrue(res.isSegmentWithinQuota);
   }
 
@@ -134,9 +132,8 @@ public class StorageQuotaCheckerTest {
     when(_quotaConfig.getStorage()).thenReturn("3K");
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, 
_controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
-    StorageQuotaChecker.QuotaCheckerResponse response =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000);
+            _leadControllerManager);
+    StorageQuotaChecker.QuotaCheckerResponse response = 
checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
     Assert.assertEquals(
         _controllerMetrics.getValueOfTableGauge(tableName, 
ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION), 80L);
@@ -184,12 +181,12 @@ public class StorageQuotaCheckerTest {
 
     public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader 
tableSizeReader,
         ControllerMetrics controllerMetrics, PinotHelixResourceManager 
pinotHelixResourceManager,
-        ControllerLeadershipManager controllerLeadershipManager) {
-      super(tableConfig, tableSizeReader, controllerMetrics, 
pinotHelixResourceManager, controllerLeadershipManager);
+        LeadControllerManager leadControllerManager) {
+      super(tableConfig, tableSizeReader, controllerMetrics, 
pinotHelixResourceManager, leadControllerManager);
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return true;
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
 
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
index 07b0087..9217259 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
@@ -26,6 +26,7 @@ import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.core.query.utils.Pair;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -113,7 +114,8 @@ public class ControllerLeaderLocator {
    */
   private String getLeaderForTable(String rawTableName) {
     String leaderForTable;
-    ExternalView leadControllerResourceExternalView = 
_helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, 
LEAD_CONTROLLER_RESOURCE_NAME);
+    ExternalView leadControllerResourceExternalView =
+        
_helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, 
LEAD_CONTROLLER_RESOURCE_NAME);
     String partitionLeader = 
getPartitionLeader(leadControllerResourceExternalView, rawTableName);
     if (partitionLeader != null) {
       leaderForTable = partitionLeader;
@@ -146,11 +148,12 @@ public class ControllerLeaderLocator {
       return null;
     }
     int numPartitions = partitionSet.size();
-    int partitionIndex = rawTableName.hashCode() % numPartitions;
+    int partitionIndex = HelixHelper.getHashCodeForTable(rawTableName) % 
numPartitions;
     String partitionName = LEAD_CONTROLLER_RESOURCE_NAME + "_" + 
partitionIndex;
-    Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partitionName);
+    Map<String, String> partitionStateMap = 
leadControllerResourceExternalView.getStateMap(partitionName);
 
-    for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+    // Get master host from partition map. Return null if no master found.
+    for (Map.Entry<String, String> entry : partitionStateMap.entrySet()) {
       if ("MASTER".equals(entry.getValue())) {
         return entry.getKey();
       }
@@ -166,9 +169,11 @@ public class ControllerLeaderLocator {
     BaseDataAccessor<ZNRecord> dataAccessor = 
_helixManager.getHelixDataAccessor().getBaseDataAccessor();
     Stat stat = new Stat();
     try {
-      ZNRecord znRecord = dataAccessor.get("/" + _clusterName + 
"/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+      ZNRecord znRecord =
+          dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, 
AccessOption.THROW_EXCEPTION_IFNOTEXIST);
       String helixLeader = znRecord.getId();
-      LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime 
{}", helixLeader, stat.getVersion(), stat.getMtime());
+      LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime 
{}", helixLeader, stat.getVersion(),
+          stat.getMtime());
       return helixLeader;
     } catch (Exception e) {
       LOGGER.warn("Could not locate Helix leader", e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to