This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch run_validation_manager_on_leadership_change
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b2a761a3c4aed4ac039fd59a0cb13becd191599d
Author: Neha Pawar <[email protected]>
AuthorDate: Mon Dec 10 19:00:06 2018 -0800

    Add ControllerLeadershipManager as single place to check controller 
leadership changes
---
 .../controller/ControllerChangeSubscriber.java     |  32 ++++++
 .../controller/ControllerLeadershipManager.java    | 116 +++++++++++++++++++++
 .../pinot/controller/ControllerStarter.java        |   4 +-
 .../helix/core/PinotHelixResourceManager.java      |   9 --
 .../core/periodictask/ControllerPeriodicTask.java  |   9 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  23 +---
 .../core/realtime/PinotRealtimeSegmentManager.java |  30 +++---
 .../core/realtime/SegmentCompletionManager.java    |  17 ++-
 .../controller/validation/StorageQuotaChecker.java |   7 +-
 .../controller/helix/SegmentStatusCheckerTest.java |  43 ++++----
 .../periodictask/ControllerPeriodicTaskTest.java   |  33 +++++-
 .../helix/core/realtime/SegmentCompletionTest.java |   7 ++
 .../helix/core/retention/RetentionManagerTest.java |  18 +++-
 .../validation/StorageQuotaCheckerTest.java        |  20 +++-
 14 files changed, 287 insertions(+), 81 deletions(-)

diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerChangeSubscriber.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerChangeSubscriber.java
new file mode 100644
index 0000000..21e2945
--- /dev/null
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerChangeSubscriber.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.controller;
+
+/**
+ * Interface for a subscriber to the {@link ControllerLeadershipManager}
+ */
+public interface ControllerChangeSubscriber {
+
+  /**
+   * Callback to invoke on becoming leader
+   */
+  void onBecomingLeader();
+
+  /**
+   * Callback to invoke on losing leadership
+   */
+  void onBecomingNonLeader();
+}
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
new file mode 100644
index 0000000..99a8a9a
--- /dev/null
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.pinot.controller;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.listeners.ControllerChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single place for listening on controller changes
+ * This should be created at controller startup and everyone who wants to 
listen to controller changes should subscribe
+ */
+public class ControllerLeadershipManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerLeadershipManager.class);
+
+  private static ControllerLeadershipManager INSTANCE = null;
+
+  private HelixManager _helixManager;
+  private volatile boolean _amILeader = false;
+
+  private Map<String, ControllerChangeSubscriber> _subscribers = new 
ConcurrentHashMap<>();
+
+  private ControllerLeadershipManager(HelixManager helixManager) {
+    _helixManager = helixManager;
+    _helixManager.addControllerListener((ControllerChangeListener) 
notificationContext -> onControllerChange());
+  }
+
+  /**
+   * Create an instance of ControllerLeadershipManager
+   * @param helixManager
+   */
+  public static synchronized void createInstance(HelixManager helixManager) {
+    if (INSTANCE != null) {
+      throw new RuntimeException("Instance of ControllerLeadershipManager 
already created");
+    }
+    INSTANCE = new ControllerLeadershipManager(helixManager);
+  }
+
+  /**
+   * Get the instance of ControllerLeadershipManager
+   * @return
+   */
+  public static ControllerLeadershipManager getInstance() {
+    if (INSTANCE == null) {
+      throw new RuntimeException("Instance of ControllerLeadershipManager not 
yet created");
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Callback on changes in the controller
+   */
+  protected void onControllerChange() {
+    if (_helixManager.isLeader()) {
+      if (!_amILeader) {
+        _amILeader = true;
+        LOGGER.info("Became leader");
+        onBecomingLeader();
+      } else {
+        LOGGER.info("Already leader. Duplicate notification");
+      }
+    } else {
+      _amILeader = false;
+      LOGGER.info("Lost leadership");
+      onBecomingNonLeader();
+    }
+  }
+
+  public boolean isLeader() {
+    return _amILeader;
+  }
+
+  private void onBecomingLeader() {
+    _subscribers.forEach((k, v) -> v.onBecomingLeader());
+  }
+
+  private void onBecomingNonLeader() {
+    _subscribers.forEach((k, v) -> v.onBecomingNonLeader());
+  }
+
+  /**
+   * Subscribe to changes in the controller leadership
+   * @param name
+   * @param subscriber
+   */
+  public void subscribe(String name, ControllerChangeSubscriber subscriber) {
+    _subscribers.put(name, subscriber);
+  }
+
+  /**
+   * Unsubscribe from changes in controller leadership
+   * @param name
+   */
+  public void unsubscribe(String name) {
+    _subscribers.remove(name);
+  }
+}
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index bf25cef..8f64b6b 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -161,13 +161,15 @@ public class ControllerStarter {
     _helixResourceManager.start();
     final HelixManager helixManager = 
_helixResourceManager.getHelixZkManager();
 
+    LOGGER.info("Creating ControllerLeadershipManager instance");
+    ControllerLeadershipManager.createInstance(helixManager);
+
     LOGGER.info("Starting task resource manager");
     _helixTaskResourceManager = new PinotHelixTaskResourceManager(new 
TaskDriver(helixManager));
 
     // Helix resource manager must be started in order to create 
PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
     PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, 
_controllerMetrics);
-    PinotLLCRealtimeSegmentManager.getInstance().start();
     _realtimeSegmentsManager.start(_controllerMetrics);
 
     // Setting up periodic tasks
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index c12f70e..f695e51 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -218,15 +218,6 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Check whether the Helix manager is the leader.
-   *
-   * @return Whether the Helix manager is the leader
-   */
-  public boolean isLeader() {
-    return _helixZkManager.isLeader();
-  }
-
-  /**
    * Get the Helix admin.
    *
    * @return Helix admin
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3d1ef6a..3b12c0f 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -15,6 +15,8 @@
  */
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
 import java.util.List;
@@ -59,7 +61,7 @@ public abstract class ControllerPeriodicTask extends 
BasePeriodicTask {
 
   @Override
   public void run() {
-    if (!_pinotHelixResourceManager.isLeader()) {
+    if (!isLeader()) {
       skipLeaderTask();
     } else {
       List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
@@ -109,4 +111,9 @@ public abstract class ControllerPeriodicTask extends 
BasePeriodicTask {
    * @param tables List of table names
    */
   public abstract void process(List<String> tables);
+
+  @VisibleForTesting
+  protected boolean isLeader() {
+    return ControllerLeadershipManager.getInstance().isLeader();
+  }
 }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index b1ff634..6eeaf8e 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -42,6 +42,7 @@ import com.linkedin.pinot.common.utils.TarGzCompressionUtils;
 import com.linkedin.pinot.common.utils.helix.HelixHelper;
 import com.linkedin.pinot.common.utils.retry.RetryPolicies;
 import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.api.events.MetadataEventNotifierFactory;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -119,7 +120,6 @@ public class PinotLLCRealtimeSegmentManager {
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final PinotHelixResourceManager _helixResourceManager;
   private final String _clusterName;
-  private boolean _amILeader = false;
   private final ControllerConf _controllerConf;
   private final ControllerMetrics _controllerMetrics;
   private final int _numIdealStateUpdateLocks;
@@ -155,10 +155,6 @@ public class PinotLLCRealtimeSegmentManager {
     SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, 
controllerMetrics);
   }
 
-  public void start() {
-    _helixManager.addControllerListener(changeContext -> onBecomeLeader());
-  }
-
   protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String 
clusterName, HelixManager helixManager,
       ZkHelixPropertyStore propertyStore, PinotHelixResourceManager 
helixResourceManager, ControllerConf controllerConf,
       ControllerMetrics controllerMetrics) {
@@ -186,24 +182,9 @@ public class PinotLLCRealtimeSegmentManager {
     return INSTANCE;
   }
 
-  private void onBecomeLeader() {
-    if (isLeader()) {
-      if (!_amILeader) {
-        // We were not leader before, now we are.
-        _amILeader = true;
-        LOGGER.info("Became leader");
-      } else {
-        // We already had leadership, nothing to do.
-        LOGGER.info("Already leader. Duplicate notification");
-      }
-    } else {
-      _amILeader = false;
-      LOGGER.info("Lost leadership");
-    }
-  }
 
   protected boolean isLeader() {
-    return _helixManager.isLeader();
+    return ControllerLeadershipManager.getInstance().isLeader();
   }
 
   protected boolean isConnected() {
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
index 07c411f..03958b9 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
@@ -22,10 +22,9 @@ import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
 import com.linkedin.pinot.common.metadata.instance.InstanceZKMetadata;
 import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import com.linkedin.pinot.common.utils.CommonConstants;
-import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import com.linkedin.pinot.common.metrics.ControllerMeter;
 import com.linkedin.pinot.common.metrics.ControllerMetrics;
+import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.CommonConstants.Helix.TableType;
 import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import com.linkedin.pinot.common.utils.CommonConstants.Segment.SegmentType;
@@ -33,9 +32,12 @@ import com.linkedin.pinot.common.utils.HLCSegmentName;
 import com.linkedin.pinot.common.utils.SegmentName;
 import com.linkedin.pinot.common.utils.helix.HelixHelper;
 import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.controller.ControllerChangeSubscriber;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import com.linkedin.pinot.core.query.utils.Pair;
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -47,8 +49,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
@@ -65,7 +65,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Realtime segment manager, which assigns realtime segments to server 
instances so that they can consume from the stream.
  */
-public class PinotRealtimeSegmentManager implements HelixPropertyListener, 
IZkChildListener, IZkDataListener {
+public class PinotRealtimeSegmentManager implements HelixPropertyListener, 
IZkChildListener, IZkDataListener, ControllerChangeSubscriber {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotRealtimeSegmentManager.class);
   private static final String TABLE_CONFIG = "/CONFIGS/TABLE";
   private static final String SEGMENTS_PATH = "/SEGMENTS";
@@ -101,12 +101,7 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
     _zkClient.subscribeDataChanges(_tableConfigPath, this);
 
     // Subscribe to leadership changes
-    _pinotHelixResourceManager.getHelixZkManager().addControllerListener(new 
ControllerChangeListener() {
-      @Override
-      public void onControllerChange(NotificationContext changeContext) {
-        processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
-      }
-    });
+    
ControllerLeadershipManager.getInstance().subscribe(PinotLLCRealtimeSegmentManager.class.getName(),
 this);
 
     // Setup change listeners for already existing tables, if any.
     processPropertyStoreChange(_tableConfigPath);
@@ -115,6 +110,7 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
   public void stop() {
     LOGGER.info("Stopping realtime segments manager, stopping property 
store.");
     _pinotHelixResourceManager.getPropertyStore().stop();
+    
ControllerLeadershipManager.getInstance().unsubscribe(PinotRealtimeSegmentManager.class.getName());
   }
 
   private synchronized void 
assignRealtimeSegmentsToServerInstancesIfNecessary()
@@ -265,7 +261,7 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
   }
 
   private boolean isLeader() {
-    return _pinotHelixResourceManager.isLeader();
+    return ControllerLeadershipManager.getInstance().isLeader();
   }
 
   @Override
@@ -408,4 +404,14 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
     LOGGER.info("PinotRealtimeSegmentManager.handleDataDeleted: {}", dataPath);
     processPropertyStoreChange(dataPath);
   }
+
+  @Override
+  public void onBecomingLeader() {
+    processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
+  }
+
+  @Override
+  public void onBecomingNonLeader() {
+    processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
+  }
 }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 1ba51bc..6d47ca7 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.controller.helix.core.realtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.ControllerMeter;
@@ -23,6 +24,7 @@ import 
com.linkedin.pinot.common.protocols.SegmentCompletionProtocol;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.LLCSegmentName;
 import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import 
com.linkedin.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -153,7 +155,7 @@ public class SegmentCompletionManager {
    * that it currently has (i.e. next offset that it will consume, if it 
continues to consume).
    */
   public SegmentCompletionProtocol.Response 
segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -190,7 +192,7 @@ public class SegmentCompletionManager {
    * incoming segment).
    */
   public SegmentCompletionProtocol.Response segmentCommitStart(final 
SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -214,7 +216,7 @@ public class SegmentCompletionManager {
   }
 
   public SegmentCompletionProtocol.Response extendBuildTime(final 
SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -245,7 +247,7 @@ public class SegmentCompletionManager {
    * @return
    */
   public SegmentCompletionProtocol.Response 
segmentStoppedConsuming(SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -280,7 +282,7 @@ public class SegmentCompletionManager {
    * @return
    */
   public SegmentCompletionProtocol.Response 
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, boolean 
success, boolean isSplitCommit) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -1091,4 +1093,9 @@ public class SegmentCompletionManager {
       return false;
     }
   }
+
+  @VisibleForTesting
+  protected boolean isLeader() {
+    return ControllerLeadershipManager.getInstance().isLeader();
+  }
 }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
index 7d6be94..2614c2d 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
@@ -23,6 +23,7 @@ import 
com.linkedin.pinot.common.exception.InvalidConfigException;
 import com.linkedin.pinot.common.metrics.ControllerGauge;
 import com.linkedin.pinot.common.metrics.ControllerMetrics;
 import com.linkedin.pinot.common.utils.DataSize;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.util.TableSizeReader;
 import java.io.File;
@@ -150,7 +151,7 @@ public class StorageQuotaChecker {
         tableName, tableSubtypeSize.estimatedSizeInBytes, 
tableSubtypeSize.reportedSizeInBytes);
 
     // Only emit the real percentage of storage quota usage by lead 
controller, otherwise emit 0L.
-    if (_pinotHelixResourceManager.isLeader() && allowedStorageBytes != 0L) {
+    if (isLeader() && allowedStorageBytes != 0L) {
       long existingStorageQuotaUtilization = 
tableSubtypeSize.estimatedSizeInBytes  * 100 / allowedStorageBytes;
       _controllerMetrics.setValueOfTableGauge(tableName, 
ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
           existingStorageQuotaUtilization);
@@ -193,4 +194,8 @@ public class StorageQuotaChecker {
       return failure(message);
     }
   }
+
+  protected boolean isLeader() {
+    return ControllerLeadershipManager.getInstance().isLeader();
+  }
 }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index de6eca9..4d4c324 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -75,7 +75,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -87,7 +86,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -141,7 +140,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -153,7 +151,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -218,7 +216,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -231,7 +228,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -266,7 +263,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -278,7 +274,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -301,7 +297,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -313,7 +308,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -369,7 +364,6 @@ public class SegmentStatusCheckerTest {
 
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -382,7 +376,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -415,7 +409,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -427,7 +420,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -454,7 +447,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -466,7 +458,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -501,7 +493,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -513,7 +504,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
         ControllerGauge.DISABLED_TABLE_COUNT), 0);
@@ -553,7 +544,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -565,7 +555,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -577,4 +567,17 @@ public class SegmentStatusCheckerTest {
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
   }
+
+  private class MockSegmentStatusChecker extends SegmentStatusChecker {
+
+    public MockSegmentStatusChecker(PinotHelixResourceManager 
pinotHelixResourceManager, ControllerConf config,
+        ControllerMetrics metricsRegistry) {
+      super(pinotHelixResourceManager, config, metricsRegistry);
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return true;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 6a2c1b1..9a5d26c 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -32,8 +32,8 @@ public class ControllerPeriodicTaskTest {
   private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
   private final AtomicBoolean _processCalled = new AtomicBoolean();
 
-  private final ControllerPeriodicTask _task =
-      new ControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, 
_resourceManager) {
+  private final MockControllerPeriodicTask _task =
+      new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, 
_resourceManager) {
         @Override
         public void onBecomeLeader() {
           _onBecomeLeaderCalled.set(true);
@@ -48,6 +48,7 @@ public class ControllerPeriodicTaskTest {
         public void process(List<String> tables) {
           _processCalled.set(true);
         }
+
       };
 
   private void resetState() {
@@ -68,6 +69,7 @@ public class ControllerPeriodicTaskTest {
   public void testChangeLeadership() {
     // Initial state
     resetState();
+    _task.setLeader(false);
     _task.init();
     assertFalse(_onBecomeLeaderCalled.get());
     assertFalse(_onBecomeNonLeaderCalled.get());
@@ -82,7 +84,7 @@ public class ControllerPeriodicTaskTest {
 
     // From non-leader to leader
     resetState();
-    when(_resourceManager.isLeader()).thenReturn(true);
+    _task.setLeader(true);
     _task.run();
     assertTrue(_onBecomeLeaderCalled.get());
     assertFalse(_onBecomeNonLeaderCalled.get());
@@ -97,10 +99,33 @@ public class ControllerPeriodicTaskTest {
 
     // From leader to non-leader
     resetState();
-    when(_resourceManager.isLeader()).thenReturn(false);
+    _task.setLeader(false);
     _task.run();
     assertFalse(_onBecomeLeaderCalled.get());
     assertTrue(_onBecomeNonLeaderCalled.get());
     assertFalse(_processCalled.get());
   }
+
+  private class MockControllerPeriodicTask extends ControllerPeriodicTask {
+
+    private boolean _isLeader = true;
+    public MockControllerPeriodicTask(String taskName, long 
runFrequencyInSeconds,
+        PinotHelixResourceManager pinotHelixResourceManager) {
+      super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
+    }
+
+    @Override
+    public void process(List<String> tables) {
+
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return _isLeader;
+    }
+
+    void setLeader(boolean isLeader) {
+      _isLeader = isLeader;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index d867be3..310df33 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -1131,13 +1131,20 @@ public class SegmentCompletionTest {
 
   public static class MockSegmentCompletionManager extends 
SegmentCompletionManager {
     public long _secconds;
+    private boolean _isLeader;
     protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager 
segmentManager, boolean isLeader,
         boolean isConnected) {
       super(createMockHelixManager(isLeader, isConnected), segmentManager, new 
ControllerMetrics(new MetricsRegistry()));
+      _isLeader = isLeader;
     }
     @Override
     protected long getCurrentTimeMs() {
       return _secconds * 1000L;
     }
+
+    @Override
+    protected boolean isLeader() {
+      return _isLeader;
+    }
   }
 }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 1baf36f..b7c67a5 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -84,7 +84,7 @@ public class RetentionManagerTest {
     
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
-    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new 
MockRetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -156,7 +156,6 @@ public class RetentionManagerTest {
   private void setupPinotHelixResourceManager(TableConfig tableConfig, final 
List<String> removedSegments,
       PinotHelixResourceManager resourceManager) {
     final String tableNameWithType = tableConfig.getTableName();
-    when(resourceManager.isLeader()).thenReturn(true);
     
when(resourceManager.getAllTables()).thenReturn(Collections.singletonList(tableNameWithType));
 
     SegmentDeletionManager deletionManager = 
mock(SegmentDeletionManager.class);
@@ -202,7 +201,7 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, 
removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, 
pinotHelixResourceManager);
 
-    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new 
MockRetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -306,4 +305,17 @@ public class RetentionManagerTest {
     when(segmentMetadata.getTimeGranularity()).thenReturn(new 
Duration(timeUnit.toMillis(1)));
     return segmentMetadata;
   }
+
+  private class MockRetentionManager extends RetentionManager {
+
+    public MockRetentionManager(PinotHelixResourceManager 
pinotHelixResourceManager, int runFrequencyInSeconds,
+        int deletedSegmentsRetentionInDays) {
+      super(pinotHelixResourceManager, runFrequencyInSeconds, 
deletedSegmentsRetentionInDays);
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return true;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
index e64b6ee..4ea6967 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -58,7 +58,6 @@ public class StorageQuotaCheckerTest {
     _pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
     when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
     when(_validationConfig.getReplicationNumber()).thenReturn(2);
-    when(_pinotHelixResourceManager.isLeader()).thenReturn(true);
     TEST_DIR.mkdirs();
   }
 
@@ -69,7 +68,7 @@ public class StorageQuotaCheckerTest {
 
   @Test
   public void testNoQuota() throws InvalidConfigException {
-    StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+    StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(null);
     StorageQuotaChecker.QuotaCheckerResponse res =
         checker.isSegmentStorageWithinQuota(TEST_DIR, "myTable", "segment", 
1000);
@@ -78,7 +77,7 @@ public class StorageQuotaCheckerTest {
 
   @Test
   public void testNoStorageQuotaConfig() throws InvalidConfigException {
-    StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+    StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
     StorageQuotaChecker.QuotaCheckerResponse res =
@@ -118,7 +117,7 @@ public class StorageQuotaCheckerTest {
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(3000L);
     when(_quotaConfig.getStorage()).thenReturn("3K");
-    StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+    StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
     StorageQuotaChecker.QuotaCheckerResponse response =
         checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, "segment1", 
1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
@@ -160,4 +159,17 @@ public class StorageQuotaCheckerTest {
     response = checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, 
"segment1", 1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
   }
+
+  private class MockStorageQuotaChecker extends StorageQuotaChecker {
+
+    public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader 
tableSizeReader,
+        ControllerMetrics controllerMetrics, PinotHelixResourceManager 
pinotHelixResourceManager) {
+      super(tableConfig, tableSizeReader, controllerMetrics, 
pinotHelixResourceManager);
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return true;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to