This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 35d61aa  Add ControllerLeadershipManager as single place to check 
controller leadership changes (#3604)
35d61aa is described below

commit 35d61aafa6a8fdc67cdd47d6e383dd791684e92a
Author: Neha Pawar <[email protected]>
AuthorDate: Fri Dec 14 15:07:20 2018 -0800

    Add ControllerLeadershipManager as single place to check controller 
leadership changes (#3604)
---
 .../controller/ControllerLeadershipManager.java    | 126 +++++++++++++++++++++
 .../pinot/controller/ControllerStarter.java        |   7 +-
 .../controller/LeadershipChangeSubscriber.java     |  32 ++++++
 .../helix/core/PinotHelixResourceManager.java      |   9 --
 .../core/periodictask/ControllerPeriodicTask.java  |   9 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  24 +---
 .../core/realtime/PinotRealtimeSegmentManager.java |  29 +++--
 .../core/realtime/SegmentCompletionManager.java    |  17 ++-
 .../controller/validation/StorageQuotaChecker.java |   7 +-
 .../controller/helix/SegmentStatusCheckerTest.java |  43 +++----
 .../periodictask/ControllerPeriodicTaskTest.java   |  33 +++++-
 .../helix/core/realtime/SegmentCompletionTest.java |   7 ++
 .../helix/core/retention/RetentionManagerTest.java |  18 ++-
 .../validation/StorageQuotaCheckerTest.java        |  20 +++-
 14 files changed, 299 insertions(+), 82 deletions(-)

diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
new file mode 100644
index 0000000..66e0482
--- /dev/null
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
@@ -0,0 +1,126 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.pinot.controller;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.listeners.ControllerChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single place for listening on controller changes
+ * This should be created at controller startup and everyone who wants to 
listen to controller changes should subscribe
+ */
+public class ControllerLeadershipManager {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ControllerLeadershipManager.class);
+
+  private static ControllerLeadershipManager INSTANCE = null;
+
+  private HelixManager _helixManager;
+  private volatile boolean _amILeader = false;
+
+  private Map<String, LeadershipChangeSubscriber> _subscribers = new 
ConcurrentHashMap<>();
+
+  private ControllerLeadershipManager(HelixManager helixManager) {
+    _helixManager = helixManager;
+    _helixManager.addControllerListener((ControllerChangeListener) 
notificationContext -> onControllerChange());
+  }
+
+  /**
+   * Create an instance of ControllerLeadershipManager
+   * @param helixManager
+   */
+  public static synchronized void init(HelixManager helixManager) {
+    if (INSTANCE != null) {
+      throw new RuntimeException("Instance of ControllerLeadershipManager 
already created");
+    }
+    INSTANCE = new ControllerLeadershipManager(helixManager);
+  }
+
+  /**
+   * Get the instance of ControllerLeadershipManager
+   * @return
+   */
+  public static synchronized ControllerLeadershipManager getInstance() {
+    if (INSTANCE == null) {
+      throw new RuntimeException("Instance of ControllerLeadershipManager not 
yet created");
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * When stopping this service, if the controller is leader, invoke {@link 
ControllerLeadershipManager#onBecomingNonLeader()}
+   */
+  public void stop() {
+    if (_amILeader) {
+      onBecomingNonLeader();
+    }
+  }
+
+  /**
+   * Callback on changes in the controller
+   */
+  protected void onControllerChange() {
+    if (_helixManager.isLeader()) {
+      if (!_amILeader) {
+        _amILeader = true;
+        LOGGER.info("Became leader");
+        onBecomingLeader();
+      } else {
+        LOGGER.info("Already leader. Duplicate notification");
+      }
+    } else {
+      if (_amILeader) {
+        _amILeader = false;
+        LOGGER.info("Lost leadership");
+        onBecomingNonLeader();
+      } else {
+        LOGGER.info("Already not leader. Duplicate notification");
+      }
+    }
+  }
+
+  public boolean isLeader() {
+    return _amILeader;
+  }
+
+  private void onBecomingLeader() {
+    _subscribers.forEach((k, v) -> v.onBecomingLeader());
+  }
+
+  private void onBecomingNonLeader() {
+    _subscribers.forEach((k, v) -> v.onBecomingNonLeader());
+  }
+
+  /**
+   * Subscribe to changes in the controller leadership
+   * If controller is already leader, invoke {@link 
LeadershipChangeSubscriber#onBecomingLeader()}
+   * @param name
+   * @param subscriber
+   */
+  public void subscribe(String name, LeadershipChangeSubscriber subscriber) {
+    LOGGER.info("{} subscribing to leadership changes", name);
+    _subscribers.put(name, subscriber);
+    if (_amILeader) {
+      subscriber.onBecomingLeader();
+    }
+  }
+}
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index f1336d8..33fb5ce 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -161,13 +161,15 @@ public class ControllerStarter {
     _helixResourceManager.start();
     final HelixManager helixManager = 
_helixResourceManager.getHelixZkManager();
 
+    LOGGER.info("Init controller leadership manager");
+    ControllerLeadershipManager.init(helixManager);
+
     LOGGER.info("Starting task resource manager");
     _helixTaskResourceManager = new PinotHelixTaskResourceManager(new 
TaskDriver(helixManager));
 
     // Helix resource manager must be started in order to create 
PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
     PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, 
_controllerMetrics);
-    PinotLLCRealtimeSegmentManager.getInstance().start();
     _realtimeSegmentsManager.start(_controllerMetrics);
 
     // Setting up periodic tasks
@@ -287,6 +289,9 @@ public class ControllerStarter {
 
   public void stop() {
     try {
+      LOGGER.info("Stopping controller leadership manager");
+      ControllerLeadershipManager.getInstance().stop();
+
       // Stop PinotLLCSegmentManager before stopping Jersey API. It is 
possible that stopping Jersey API
       // may interrupt the handlers waiting on an I/O.
       PinotLLCRealtimeSegmentManager.getInstance().stop();
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/LeadershipChangeSubscriber.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/LeadershipChangeSubscriber.java
new file mode 100644
index 0000000..844fa3e
--- /dev/null
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/LeadershipChangeSubscriber.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.controller;
+
+/**
+ * Interface for a subscriber to the {@link ControllerLeadershipManager}
+ */
+public interface LeadershipChangeSubscriber {
+
+  /**
+   * Callback to invoke on becoming leader
+   */
+  void onBecomingLeader();
+
+  /**
+   * Callback to invoke on losing leadership
+   */
+  void onBecomingNonLeader();
+}
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index 971c908..423db8c 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -221,15 +221,6 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Check whether the Helix manager is the leader.
-   *
-   * @return Whether the Helix manager is the leader
-   */
-  public boolean isLeader() {
-    return _helixZkManager.isLeader();
-  }
-
-  /**
    * Get the Helix admin.
    *
    * @return Helix admin
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3d1ef6a..3b12c0f 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -15,6 +15,8 @@
  */
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
 import java.util.List;
@@ -59,7 +61,7 @@ public abstract class ControllerPeriodicTask extends 
BasePeriodicTask {
 
   @Override
   public void run() {
-    if (!_pinotHelixResourceManager.isLeader()) {
+    if (!isLeader()) {
       skipLeaderTask();
     } else {
       List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
@@ -109,4 +111,9 @@ public abstract class ControllerPeriodicTask extends 
BasePeriodicTask {
    * @param tables List of table names
    */
   public abstract void process(List<String> tables);
+
+  @VisibleForTesting
+  protected boolean isLeader() {
+    return ControllerLeadershipManager.getInstance().isLeader();
+  }
 }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 563fb80..486231a 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -42,6 +42,7 @@ import com.linkedin.pinot.common.utils.TarGzCompressionUtils;
 import com.linkedin.pinot.common.utils.helix.HelixHelper;
 import com.linkedin.pinot.common.utils.retry.RetryPolicies;
 import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.api.events.MetadataEventNotifierFactory;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -124,7 +125,6 @@ public class PinotLLCRealtimeSegmentManager {
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final PinotHelixResourceManager _helixResourceManager;
   private final String _clusterName;
-  private boolean _amILeader = false;
   private final ControllerConf _controllerConf;
   private final ControllerMetrics _controllerMetrics;
   private final int _numIdealStateUpdateLocks;
@@ -163,11 +163,6 @@ public class PinotLLCRealtimeSegmentManager {
     SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, 
controllerMetrics);
   }
 
-  public void start() {
-    _helixManager.addControllerListener(changeContext -> onBecomeLeader());
-  }
-
-
   public void stop() {
     _isStopping = true;
     LOGGER.info("Awaiting segment metadata commits: maxWaitTimeMillis = {}", 
MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS);
@@ -218,24 +213,9 @@ public class PinotLLCRealtimeSegmentManager {
     return INSTANCE;
   }
 
-  private void onBecomeLeader() {
-    if (isLeader()) {
-      if (!_amILeader) {
-        // We were not leader before, now we are.
-        _amILeader = true;
-        LOGGER.info("Became leader");
-      } else {
-        // We already had leadership, nothing to do.
-        LOGGER.info("Already leader. Duplicate notification");
-      }
-    } else {
-      _amILeader = false;
-      LOGGER.info("Lost leadership");
-    }
-  }
 
   protected boolean isLeader() {
-    return _helixManager.isLeader();
+    return ControllerLeadershipManager.getInstance().isLeader();
   }
 
   protected boolean isConnected() {
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
index 07c411f..c427b0d 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
@@ -22,10 +22,9 @@ import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
 import com.linkedin.pinot.common.metadata.instance.InstanceZKMetadata;
 import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import com.linkedin.pinot.common.utils.CommonConstants;
-import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import com.linkedin.pinot.common.metrics.ControllerMeter;
 import com.linkedin.pinot.common.metrics.ControllerMetrics;
+import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.CommonConstants.Helix.TableType;
 import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import com.linkedin.pinot.common.utils.CommonConstants.Segment.SegmentType;
@@ -33,9 +32,12 @@ import com.linkedin.pinot.common.utils.HLCSegmentName;
 import com.linkedin.pinot.common.utils.SegmentName;
 import com.linkedin.pinot.common.utils.helix.HelixHelper;
 import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.controller.LeadershipChangeSubscriber;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import com.linkedin.pinot.core.query.utils.Pair;
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -47,8 +49,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
@@ -65,7 +65,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Realtime segment manager, which assigns realtime segments to server 
instances so that they can consume from the stream.
  */
-public class PinotRealtimeSegmentManager implements HelixPropertyListener, 
IZkChildListener, IZkDataListener {
+public class PinotRealtimeSegmentManager implements HelixPropertyListener, 
IZkChildListener, IZkDataListener, LeadershipChangeSubscriber {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotRealtimeSegmentManager.class);
   private static final String TABLE_CONFIG = "/CONFIGS/TABLE";
   private static final String SEGMENTS_PATH = "/SEGMENTS";
@@ -101,12 +101,7 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
     _zkClient.subscribeDataChanges(_tableConfigPath, this);
 
     // Subscribe to leadership changes
-    _pinotHelixResourceManager.getHelixZkManager().addControllerListener(new 
ControllerChangeListener() {
-      @Override
-      public void onControllerChange(NotificationContext changeContext) {
-        processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
-      }
-    });
+    
ControllerLeadershipManager.getInstance().subscribe(PinotLLCRealtimeSegmentManager.class.getName(),
 this);
 
     // Setup change listeners for already existing tables, if any.
     processPropertyStoreChange(_tableConfigPath);
@@ -265,7 +260,7 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
   }
 
   private boolean isLeader() {
-    return _pinotHelixResourceManager.isLeader();
+    return ControllerLeadershipManager.getInstance().isLeader();
   }
 
   @Override
@@ -408,4 +403,14 @@ public class PinotRealtimeSegmentManager implements 
HelixPropertyListener, IZkCh
     LOGGER.info("PinotRealtimeSegmentManager.handleDataDeleted: {}", dataPath);
     processPropertyStoreChange(dataPath);
   }
+
+  @Override
+  public void onBecomingLeader() {
+    processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
+  }
+
+  @Override
+  public void onBecomingNonLeader() {
+    processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
+  }
 }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 1ba51bc..6d47ca7 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.controller.helix.core.realtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.ControllerMeter;
@@ -23,6 +24,7 @@ import 
com.linkedin.pinot.common.protocols.SegmentCompletionProtocol;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.LLCSegmentName;
 import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import 
com.linkedin.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -153,7 +155,7 @@ public class SegmentCompletionManager {
    * that it currently has (i.e. next offset that it will consume, if it 
continues to consume).
    */
   public SegmentCompletionProtocol.Response 
segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -190,7 +192,7 @@ public class SegmentCompletionManager {
    * incoming segment).
    */
   public SegmentCompletionProtocol.Response segmentCommitStart(final 
SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -214,7 +216,7 @@ public class SegmentCompletionManager {
   }
 
   public SegmentCompletionProtocol.Response extendBuildTime(final 
SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -245,7 +247,7 @@ public class SegmentCompletionManager {
    * @return
    */
   public SegmentCompletionProtocol.Response 
segmentStoppedConsuming(SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -280,7 +282,7 @@ public class SegmentCompletionManager {
    * @return
    */
   public SegmentCompletionProtocol.Response 
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, boolean 
success, boolean isSplitCommit) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 
1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -1091,4 +1093,9 @@ public class SegmentCompletionManager {
       return false;
     }
   }
+
+  @VisibleForTesting
+  protected boolean isLeader() {
+    return ControllerLeadershipManager.getInstance().isLeader();
+  }
 }
diff --git 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
index 7d6be94..2614c2d 100644
--- 
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
+++ 
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
@@ -23,6 +23,7 @@ import 
com.linkedin.pinot.common.exception.InvalidConfigException;
 import com.linkedin.pinot.common.metrics.ControllerGauge;
 import com.linkedin.pinot.common.metrics.ControllerMetrics;
 import com.linkedin.pinot.common.utils.DataSize;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.util.TableSizeReader;
 import java.io.File;
@@ -150,7 +151,7 @@ public class StorageQuotaChecker {
         tableName, tableSubtypeSize.estimatedSizeInBytes, 
tableSubtypeSize.reportedSizeInBytes);
 
     // Only emit the real percentage of storage quota usage by lead 
controller, otherwise emit 0L.
-    if (_pinotHelixResourceManager.isLeader() && allowedStorageBytes != 0L) {
+    if (isLeader() && allowedStorageBytes != 0L) {
       long existingStorageQuotaUtilization = 
tableSubtypeSize.estimatedSizeInBytes  * 100 / allowedStorageBytes;
       _controllerMetrics.setValueOfTableGauge(tableName, 
ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
           existingStorageQuotaUtilization);
@@ -193,4 +194,8 @@ public class StorageQuotaChecker {
       return failure(message);
     }
   }
+
+  protected boolean isLeader() {
+    return ControllerLeadershipManager.getInstance().isLeader();
+  }
 }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index de6eca9..4d4c324 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -75,7 +75,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -87,7 +86,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -141,7 +140,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -153,7 +151,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -218,7 +216,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -231,7 +228,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -266,7 +263,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -278,7 +274,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -301,7 +297,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -313,7 +308,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -369,7 +364,6 @@ public class SegmentStatusCheckerTest {
 
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -382,7 +376,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -415,7 +409,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -427,7 +420,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -454,7 +447,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -466,7 +458,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -501,7 +493,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -513,7 +504,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
         ControllerGauge.DISABLED_TABLE_COUNT), 0);
@@ -553,7 +544,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -565,7 +555,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, 
config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -577,4 +567,17 @@ public class SegmentStatusCheckerTest {
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
   }
+
+  private class MockSegmentStatusChecker extends SegmentStatusChecker {
+
+    public MockSegmentStatusChecker(PinotHelixResourceManager 
pinotHelixResourceManager, ControllerConf config,
+        ControllerMetrics metricsRegistry) {
+      super(pinotHelixResourceManager, config, metricsRegistry);
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return true;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 6a2c1b1..9a5d26c 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -32,8 +32,8 @@ public class ControllerPeriodicTaskTest {
   private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
   private final AtomicBoolean _processCalled = new AtomicBoolean();
 
-  private final ControllerPeriodicTask _task =
-      new ControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, 
_resourceManager) {
+  private final MockControllerPeriodicTask _task =
+      new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, 
_resourceManager) {
         @Override
         public void onBecomeLeader() {
           _onBecomeLeaderCalled.set(true);
@@ -48,6 +48,7 @@ public class ControllerPeriodicTaskTest {
         public void process(List<String> tables) {
           _processCalled.set(true);
         }
+
       };
 
   private void resetState() {
@@ -68,6 +69,7 @@ public class ControllerPeriodicTaskTest {
   public void testChangeLeadership() {
     // Initial state
     resetState();
+    _task.setLeader(false);
     _task.init();
     assertFalse(_onBecomeLeaderCalled.get());
     assertFalse(_onBecomeNonLeaderCalled.get());
@@ -82,7 +84,7 @@ public class ControllerPeriodicTaskTest {
 
     // From non-leader to leader
     resetState();
-    when(_resourceManager.isLeader()).thenReturn(true);
+    _task.setLeader(true);
     _task.run();
     assertTrue(_onBecomeLeaderCalled.get());
     assertFalse(_onBecomeNonLeaderCalled.get());
@@ -97,10 +99,33 @@ public class ControllerPeriodicTaskTest {
 
     // From leader to non-leader
     resetState();
-    when(_resourceManager.isLeader()).thenReturn(false);
+    _task.setLeader(false);
     _task.run();
     assertFalse(_onBecomeLeaderCalled.get());
     assertTrue(_onBecomeNonLeaderCalled.get());
     assertFalse(_processCalled.get());
   }
+
+  private class MockControllerPeriodicTask extends ControllerPeriodicTask {
+
+    private boolean _isLeader = true;
+    public MockControllerPeriodicTask(String taskName, long 
runFrequencyInSeconds,
+        PinotHelixResourceManager pinotHelixResourceManager) {
+      super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
+    }
+
+    @Override
+    public void process(List<String> tables) {
+
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return _isLeader;
+    }
+
+    void setLeader(boolean isLeader) {
+      _isLeader = isLeader;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index d867be3..310df33 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -1131,13 +1131,20 @@ public class SegmentCompletionTest {
 
   public static class MockSegmentCompletionManager extends 
SegmentCompletionManager {
     public long _secconds;
+    private boolean _isLeader;
     protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager 
segmentManager, boolean isLeader,
         boolean isConnected) {
       super(createMockHelixManager(isLeader, isConnected), segmentManager, new 
ControllerMetrics(new MetricsRegistry()));
+      _isLeader = isLeader;
     }
     @Override
     protected long getCurrentTimeMs() {
       return _secconds * 1000L;
     }
+
+    @Override
+    protected boolean isLeader() {
+      return _isLeader;
+    }
   }
 }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 1da115e..8e25505 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -84,7 +84,7 @@ public class RetentionManagerTest {
     
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
-    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new 
MockRetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -156,7 +156,6 @@ public class RetentionManagerTest {
   private void setupPinotHelixResourceManager(TableConfig tableConfig, final 
List<String> removedSegments,
       PinotHelixResourceManager resourceManager) {
     final String tableNameWithType = tableConfig.getTableName();
-    when(resourceManager.isLeader()).thenReturn(true);
     
when(resourceManager.getAllTables()).thenReturn(Collections.singletonList(tableNameWithType));
 
     SegmentDeletionManager deletionManager = 
mock(SegmentDeletionManager.class);
@@ -202,7 +201,7 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, 
removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, 
pinotHelixResourceManager);
 
-    RetentionManager retentionManager = new 
RetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new 
MockRetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -306,4 +305,17 @@ public class RetentionManagerTest {
     when(segmentMetadata.getTimeGranularity()).thenReturn(new 
Duration(timeUnit.toMillis(1)));
     return segmentMetadata;
   }
+
+  private class MockRetentionManager extends RetentionManager {
+
+    public MockRetentionManager(PinotHelixResourceManager 
pinotHelixResourceManager, int runFrequencyInSeconds,
+        int deletedSegmentsRetentionInDays) {
+      super(pinotHelixResourceManager, runFrequencyInSeconds, 
deletedSegmentsRetentionInDays);
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return true;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
index e64b6ee..4ea6967 100644
--- 
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ 
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -58,7 +58,6 @@ public class StorageQuotaCheckerTest {
     _pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
     when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
     when(_validationConfig.getReplicationNumber()).thenReturn(2);
-    when(_pinotHelixResourceManager.isLeader()).thenReturn(true);
     TEST_DIR.mkdirs();
   }
 
@@ -69,7 +68,7 @@ public class StorageQuotaCheckerTest {
 
   @Test
   public void testNoQuota() throws InvalidConfigException {
-    StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+    StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(null);
     StorageQuotaChecker.QuotaCheckerResponse res =
         checker.isSegmentStorageWithinQuota(TEST_DIR, "myTable", "segment", 
1000);
@@ -78,7 +77,7 @@ public class StorageQuotaCheckerTest {
 
   @Test
   public void testNoStorageQuotaConfig() throws InvalidConfigException {
-    StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+    StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
     StorageQuotaChecker.QuotaCheckerResponse res =
@@ -118,7 +117,7 @@ public class StorageQuotaCheckerTest {
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(3000L);
     when(_quotaConfig.getStorage()).thenReturn("3K");
-    StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+    StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, 
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
     StorageQuotaChecker.QuotaCheckerResponse response =
         checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, "segment1", 
1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
@@ -160,4 +159,17 @@ public class StorageQuotaCheckerTest {
     response = checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, 
"segment1", 1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
   }
+
+  private class MockStorageQuotaChecker extends StorageQuotaChecker {
+
+    public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader 
tableSizeReader,
+        ControllerMetrics controllerMetrics, PinotHelixResourceManager 
pinotHelixResourceManager) {
+      super(tableConfig, tableSizeReader, controllerMetrics, 
pinotHelixResourceManager);
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return true;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to