This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch run_validation_manager_on_leadership_change in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit b2a761a3c4aed4ac039fd59a0cb13becd191599d Author: Neha Pawar <[email protected]> AuthorDate: Mon Dec 10 19:00:06 2018 -0800 Add ControllerLeadershipManager as single place to check controller leadership changes --- .../controller/ControllerChangeSubscriber.java | 32 ++++++ .../controller/ControllerLeadershipManager.java | 116 +++++++++++++++++++++ .../pinot/controller/ControllerStarter.java | 4 +- .../helix/core/PinotHelixResourceManager.java | 9 -- .../core/periodictask/ControllerPeriodicTask.java | 9 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 23 +--- .../core/realtime/PinotRealtimeSegmentManager.java | 30 +++--- .../core/realtime/SegmentCompletionManager.java | 17 ++- .../controller/validation/StorageQuotaChecker.java | 7 +- .../controller/helix/SegmentStatusCheckerTest.java | 43 ++++---- .../periodictask/ControllerPeriodicTaskTest.java | 33 +++++- .../helix/core/realtime/SegmentCompletionTest.java | 7 ++ .../helix/core/retention/RetentionManagerTest.java | 18 +++- .../validation/StorageQuotaCheckerTest.java | 20 +++- 14 files changed, 287 insertions(+), 81 deletions(-) diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerChangeSubscriber.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerChangeSubscriber.java new file mode 100644 index 0000000..21e2945 --- /dev/null +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerChangeSubscriber.java @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected]) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linkedin.pinot.controller; + +/** + * Interface for a subscriber to the {@link ControllerLeadershipManager} + */ +public interface ControllerChangeSubscriber { + + /** + * Callback to invoke on becoming leader + */ + void onBecomingLeader(); + + /** + * Callback to invoke on losing leadership + */ + void onBecomingNonLeader(); +} diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java new file mode 100644 index 0000000..99a8a9a --- /dev/null +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected]) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.linkedin.pinot.controller; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.HelixManager; +import org.apache.helix.api.listeners.ControllerChangeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Single place for listening on controller changes + * This should be created at controller startup and everyone who wants to listen to controller changes should subscribe + */ +public class ControllerLeadershipManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(ControllerLeadershipManager.class); + + private static ControllerLeadershipManager INSTANCE = null; + + private HelixManager _helixManager; + private volatile boolean _amILeader = false; + + private Map<String, ControllerChangeSubscriber> _subscribers = new ConcurrentHashMap<>(); + + private ControllerLeadershipManager(HelixManager helixManager) { + _helixManager = helixManager; + _helixManager.addControllerListener((ControllerChangeListener) notificationContext -> onControllerChange()); + } + + /** + * Create an instance of ControllerLeadershipManager + * @param helixManager + */ + public static synchronized void createInstance(HelixManager helixManager) { + if (INSTANCE != null) { + throw new RuntimeException("Instance of ControllerLeadershipManager already created"); + } + INSTANCE = new ControllerLeadershipManager(helixManager); + } + + /** + * Get the instance of ControllerLeadershipManager + * @return + */ + public static ControllerLeadershipManager getInstance() { + if (INSTANCE == null) { + throw new RuntimeException("Instance of ControllerLeadershipManager not yet created"); + } + return INSTANCE; + } + + /** + * Callback on changes in the controller + */ + protected void onControllerChange() { + if (_helixManager.isLeader()) { + if (!_amILeader) { + _amILeader = true; + LOGGER.info("Became leader"); + onBecomingLeader(); + } else { + LOGGER.info("Already leader. Duplicate notification"); + } + } else { + _amILeader = false; + LOGGER.info("Lost leadership"); + onBecomingNonLeader(); + } + } + + public boolean isLeader() { + return _amILeader; + } + + private void onBecomingLeader() { + _subscribers.forEach((k, v) -> v.onBecomingLeader()); + } + + private void onBecomingNonLeader() { + _subscribers.forEach((k, v) -> v.onBecomingNonLeader()); + } + + /** + * Subscribe to changes in the controller leadership + * @param name + * @param subscriber + */ + public void subscribe(String name, ControllerChangeSubscriber subscriber) { + _subscribers.put(name, subscriber); + } + + /** + * Unsubscribe from changes in controller leadership + * @param name + */ + public void unsubscribe(String name) { + _subscribers.remove(name); + } +} diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java index bf25cef..8f64b6b 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java @@ -161,13 +161,15 @@ public class ControllerStarter { _helixResourceManager.start(); final HelixManager helixManager = _helixResourceManager.getHelixZkManager(); + LOGGER.info("Creating ControllerLeadershipManager instance"); + ControllerLeadershipManager.createInstance(helixManager); + LOGGER.info("Starting task resource manager"); _helixTaskResourceManager = new PinotHelixTaskResourceManager(new TaskDriver(helixManager)); // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager LOGGER.info("Starting realtime segment manager"); PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, _controllerMetrics); - PinotLLCRealtimeSegmentManager.getInstance().start(); _realtimeSegmentsManager.start(_controllerMetrics); // Setting up periodic tasks diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java index c12f70e..f695e51 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -218,15 +218,6 @@ public class PinotHelixResourceManager { } /** - * Check whether the Helix manager is the leader. - * - * @return Whether the Helix manager is the leader - */ - public boolean isLeader() { - return _helixZkManager.isLeader(); - } - - /** * Get the Helix admin. * * @return Helix admin diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java index 3d1ef6a..3b12c0f 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java @@ -15,6 +15,8 @@ */ package com.linkedin.pinot.controller.helix.core.periodictask; +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.pinot.controller.ControllerLeadershipManager; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; import com.linkedin.pinot.core.periodictask.BasePeriodicTask; import java.util.List; @@ -59,7 +61,7 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask { @Override public void run() { - if (!_pinotHelixResourceManager.isLeader()) { + if (!isLeader()) { skipLeaderTask(); } else { List<String> allTableNames = _pinotHelixResourceManager.getAllTables(); @@ -109,4 +111,9 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask { * @param tables List of table names */ public abstract void process(List<String> tables); + + @VisibleForTesting + protected boolean isLeader() { + return ControllerLeadershipManager.getInstance().isLeader(); + } } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index b1ff634..6eeaf8e 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -42,6 +42,7 @@ import com.linkedin.pinot.common.utils.TarGzCompressionUtils; import com.linkedin.pinot.common.utils.helix.HelixHelper; import com.linkedin.pinot.common.utils.retry.RetryPolicies; import com.linkedin.pinot.controller.ControllerConf; +import com.linkedin.pinot.controller.ControllerLeadershipManager; import com.linkedin.pinot.controller.api.events.MetadataEventNotifierFactory; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; import com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; @@ -119,7 +120,6 @@ public class PinotLLCRealtimeSegmentManager { private final ZkHelixPropertyStore<ZNRecord> _propertyStore; private final PinotHelixResourceManager _helixResourceManager; private final String _clusterName; - private boolean _amILeader = false; private final ControllerConf _controllerConf; private final ControllerMetrics _controllerMetrics; private final int _numIdealStateUpdateLocks; @@ -155,10 +155,6 @@ public class PinotLLCRealtimeSegmentManager { SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics); } - public void start() { - _helixManager.addControllerListener(changeContext -> onBecomeLeader()); - } - protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager, ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) { @@ -186,24 +182,9 @@ public class PinotLLCRealtimeSegmentManager { return INSTANCE; } - private void onBecomeLeader() { - if (isLeader()) { - if (!_amILeader) { - // We were not leader before, now we are. - _amILeader = true; - LOGGER.info("Became leader"); - } else { - // We already had leadership, nothing to do. - LOGGER.info("Already leader. Duplicate notification"); - } - } else { - _amILeader = false; - LOGGER.info("Lost leadership"); - } - } protected boolean isLeader() { - return _helixManager.isLeader(); + return ControllerLeadershipManager.getInstance().isLeader(); } protected boolean isConnected() { diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java index 07c411f..03958b9 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java @@ -22,10 +22,9 @@ import com.linkedin.pinot.common.config.TableNameBuilder; import com.linkedin.pinot.common.metadata.ZKMetadataProvider; import com.linkedin.pinot.common.metadata.instance.InstanceZKMetadata; import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; -import com.linkedin.pinot.common.utils.CommonConstants; -import com.linkedin.pinot.core.realtime.stream.StreamConfig; import com.linkedin.pinot.common.metrics.ControllerMeter; import com.linkedin.pinot.common.metrics.ControllerMetrics; +import com.linkedin.pinot.common.utils.CommonConstants; import com.linkedin.pinot.common.utils.CommonConstants.Helix.TableType; import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status; import com.linkedin.pinot.common.utils.CommonConstants.Segment.SegmentType; @@ -33,9 +32,12 @@ import com.linkedin.pinot.common.utils.HLCSegmentName; import com.linkedin.pinot.common.utils.SegmentName; import com.linkedin.pinot.common.utils.helix.HelixHelper; import com.linkedin.pinot.common.utils.retry.RetryPolicies; +import com.linkedin.pinot.controller.ControllerChangeSubscriber; +import com.linkedin.pinot.controller.ControllerLeadershipManager; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder; import com.linkedin.pinot.core.query.utils.Pair; +import com.linkedin.pinot.core.realtime.stream.StreamConfig; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -47,8 +49,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; -import org.apache.helix.ControllerChangeListener; -import org.apache.helix.NotificationContext; import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; @@ -65,7 +65,7 @@ import org.slf4j.LoggerFactory; /** * Realtime segment manager, which assigns realtime segments to server instances so that they can consume from the stream. */ -public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkChildListener, IZkDataListener { +public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkChildListener, IZkDataListener, ControllerChangeSubscriber { private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSegmentManager.class); private static final String TABLE_CONFIG = "/CONFIGS/TABLE"; private static final String SEGMENTS_PATH = "/SEGMENTS"; @@ -101,12 +101,7 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh _zkClient.subscribeDataChanges(_tableConfigPath, this); // Subscribe to leadership changes - _pinotHelixResourceManager.getHelixZkManager().addControllerListener(new ControllerChangeListener() { - @Override - public void onControllerChange(NotificationContext changeContext) { - processPropertyStoreChange(CONTROLLER_LEADER_CHANGE); - } - }); + ControllerLeadershipManager.getInstance().subscribe(PinotLLCRealtimeSegmentManager.class.getName(), this); // Setup change listeners for already existing tables, if any. processPropertyStoreChange(_tableConfigPath); @@ -115,6 +110,7 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh public void stop() { LOGGER.info("Stopping realtime segments manager, stopping property store."); _pinotHelixResourceManager.getPropertyStore().stop(); + ControllerLeadershipManager.getInstance().unsubscribe(PinotRealtimeSegmentManager.class.getName()); } private synchronized void assignRealtimeSegmentsToServerInstancesIfNecessary() @@ -265,7 +261,7 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh } private boolean isLeader() { - return _pinotHelixResourceManager.isLeader(); + return ControllerLeadershipManager.getInstance().isLeader(); } @Override @@ -408,4 +404,14 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh LOGGER.info("PinotRealtimeSegmentManager.handleDataDeleted: {}", dataPath); processPropertyStoreChange(dataPath); } + + @Override + public void onBecomingLeader() { + processPropertyStoreChange(CONTROLLER_LEADER_CHANGE); + } + + @Override + public void onBecomingNonLeader() { + processPropertyStoreChange(CONTROLLER_LEADER_CHANGE); + } } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java index 1ba51bc..6d47ca7 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java @@ -15,6 +15,7 @@ */ package com.linkedin.pinot.controller.helix.core.realtime; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.pinot.common.config.TableNameBuilder; import com.linkedin.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import com.linkedin.pinot.common.metrics.ControllerMeter; @@ -23,6 +24,7 @@ import com.linkedin.pinot.common.protocols.SegmentCompletionProtocol; import com.linkedin.pinot.common.utils.CommonConstants; import com.linkedin.pinot.common.utils.LLCSegmentName; import com.linkedin.pinot.controller.ControllerConf; +import com.linkedin.pinot.controller.ControllerLeadershipManager; import com.linkedin.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import java.util.HashMap; import java.util.HashSet; @@ -153,7 +155,7 @@ public class SegmentCompletionManager { * that it currently has (i.e. next offset that it will consume, if it continues to consume). */ public SegmentCompletionProtocol.Response segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) { - if (!_helixManager.isLeader() || !_helixManager.isConnected()) { + if (!isLeader() || !_helixManager.isConnected()) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return SegmentCompletionProtocol.RESP_NOT_LEADER; } @@ -190,7 +192,7 @@ public class SegmentCompletionManager { * incoming segment). */ public SegmentCompletionProtocol.Response segmentCommitStart(final SegmentCompletionProtocol.Request.Params reqParams) { - if (!_helixManager.isLeader() || !_helixManager.isConnected()) { + if (!isLeader() || !_helixManager.isConnected()) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return SegmentCompletionProtocol.RESP_NOT_LEADER; } @@ -214,7 +216,7 @@ public class SegmentCompletionManager { } public SegmentCompletionProtocol.Response extendBuildTime(final SegmentCompletionProtocol.Request.Params reqParams) { - if (!_helixManager.isLeader() || !_helixManager.isConnected()) { + if (!isLeader() || !_helixManager.isConnected()) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return SegmentCompletionProtocol.RESP_NOT_LEADER; } @@ -245,7 +247,7 @@ public class SegmentCompletionManager { * @return */ public SegmentCompletionProtocol.Response segmentStoppedConsuming(SegmentCompletionProtocol.Request.Params reqParams) { - if (!_helixManager.isLeader() || !_helixManager.isConnected()) { + if (!isLeader() || !_helixManager.isConnected()) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return SegmentCompletionProtocol.RESP_NOT_LEADER; } @@ -280,7 +282,7 @@ public class SegmentCompletionManager { * @return */ public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, boolean success, boolean isSplitCommit) { - if (!_helixManager.isLeader() || !_helixManager.isConnected()) { + if (!isLeader() || !_helixManager.isConnected()) { _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L); return SegmentCompletionProtocol.RESP_NOT_LEADER; } @@ -1091,4 +1093,9 @@ public class SegmentCompletionManager { return false; } } + + @VisibleForTesting + protected boolean isLeader() { + return ControllerLeadershipManager.getInstance().isLeader(); + } } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java index 7d6be94..2614c2d 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java @@ -23,6 +23,7 @@ import com.linkedin.pinot.common.exception.InvalidConfigException; import com.linkedin.pinot.common.metrics.ControllerGauge; import com.linkedin.pinot.common.metrics.ControllerMetrics; import com.linkedin.pinot.common.utils.DataSize; +import com.linkedin.pinot.controller.ControllerLeadershipManager; import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager; import com.linkedin.pinot.controller.util.TableSizeReader; import java.io.File; @@ -150,7 +151,7 @@ public class StorageQuotaChecker { tableName, tableSubtypeSize.estimatedSizeInBytes, tableSubtypeSize.reportedSizeInBytes); // Only emit the real percentage of storage quota usage by lead controller, otherwise emit 0L. - if (_pinotHelixResourceManager.isLeader() && allowedStorageBytes != 0L) { + if (isLeader() && allowedStorageBytes != 0L) { long existingStorageQuotaUtilization = tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes; _controllerMetrics.setValueOfTableGauge(tableName, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION, existingStorageQuotaUtilization); @@ -193,4 +194,8 @@ public class StorageQuotaChecker { return failure(message); } } + + protected boolean isLeader() { + return ControllerLeadershipManager.getInstance().isLeader(); + } } diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java index de6eca9..4d4c324 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -75,7 +75,6 @@ public class SegmentStatusCheckerTest { } { helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(true); when(helixResourceManager.getAllTables()).thenReturn(allTableNames); when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); @@ -87,7 +86,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), @@ -141,7 +140,6 @@ public class SegmentStatusCheckerTest { } { helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(true); when(helixResourceManager.getAllTables()).thenReturn(allTableNames); when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); @@ -153,7 +151,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), @@ -218,7 +216,6 @@ public class SegmentStatusCheckerTest { } { helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(true); when(helixResourceManager.getAllTables()).thenReturn(allTableNames); when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); @@ -231,7 +228,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), @@ -266,7 +263,6 @@ public class SegmentStatusCheckerTest { } { helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(true); when(helixResourceManager.getAllTables()).thenReturn(allTableNames); when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); @@ -278,7 +274,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, @@ -301,7 +297,6 @@ public class SegmentStatusCheckerTest { } { helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(true); when(helixResourceManager.getAllTables()).thenReturn(allTableNames); when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); @@ -313,7 +308,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, @@ -369,7 +364,6 @@ public class SegmentStatusCheckerTest { { helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(true); when(helixResourceManager.getAllTables()).thenReturn(allTableNames); when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); @@ -382,7 +376,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(), @@ -415,7 +409,6 @@ public class SegmentStatusCheckerTest { } { helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(true); when(helixResourceManager.getAllTables()).thenReturn(allTableNames); when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); @@ -427,7 +420,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, @@ -454,7 +447,6 @@ public class SegmentStatusCheckerTest { } { helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(true); when(helixResourceManager.getAllTables()).thenReturn(allTableNames); when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); @@ -466,7 +458,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, @@ -501,7 +493,6 @@ public class SegmentStatusCheckerTest { } { helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(true); when(helixResourceManager.getAllTables()).thenReturn(allTableNames); when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); @@ -513,7 +504,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); // verify state before test Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge( ControllerGauge.DISABLED_TABLE_COUNT), 0); @@ -553,7 +544,6 @@ public class SegmentStatusCheckerTest { } { helixResourceManager = mock(PinotHelixResourceManager.class); - when(helixResourceManager.isLeader()).thenReturn(true); when(helixResourceManager.getAllTables()).thenReturn(allTableNames); when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker"); when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin); @@ -565,7 +555,7 @@ public class SegmentStatusCheckerTest { } metricsRegistry = new MetricsRegistry(); controllerMetrics = new ControllerMetrics(metricsRegistry); - segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics); + segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics); segmentStatusChecker.init(); segmentStatusChecker.run(); Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, @@ -577,4 +567,17 @@ public class SegmentStatusCheckerTest { Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); } + + private class MockSegmentStatusChecker extends SegmentStatusChecker { + + public MockSegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config, + ControllerMetrics metricsRegistry) { + super(pinotHelixResourceManager, config, metricsRegistry); + } + + @Override + protected boolean isLeader() { + return true; + } + } } diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java index 6a2c1b1..9a5d26c 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java @@ -32,8 +32,8 @@ public class ControllerPeriodicTaskTest { private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean(); private final AtomicBoolean _processCalled = new AtomicBoolean(); - private final ControllerPeriodicTask _task = - new ControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) { + private final MockControllerPeriodicTask _task = + new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) { @Override public void onBecomeLeader() { _onBecomeLeaderCalled.set(true); @@ -48,6 +48,7 @@ public class ControllerPeriodicTaskTest { public void process(List<String> tables) { _processCalled.set(true); } + }; private void resetState() { @@ -68,6 +69,7 @@ public class ControllerPeriodicTaskTest { public void testChangeLeadership() { // Initial state resetState(); + _task.setLeader(false); _task.init(); assertFalse(_onBecomeLeaderCalled.get()); assertFalse(_onBecomeNonLeaderCalled.get()); @@ -82,7 +84,7 @@ public class ControllerPeriodicTaskTest { // From non-leader to leader resetState(); - when(_resourceManager.isLeader()).thenReturn(true); + _task.setLeader(true); _task.run(); assertTrue(_onBecomeLeaderCalled.get()); assertFalse(_onBecomeNonLeaderCalled.get()); @@ -97,10 +99,33 @@ public class ControllerPeriodicTaskTest { // From leader to non-leader resetState(); - when(_resourceManager.isLeader()).thenReturn(false); + _task.setLeader(false); _task.run(); assertFalse(_onBecomeLeaderCalled.get()); assertTrue(_onBecomeNonLeaderCalled.get()); assertFalse(_processCalled.get()); } + + private class MockControllerPeriodicTask extends ControllerPeriodicTask { + + private boolean _isLeader = true; + public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds, + PinotHelixResourceManager pinotHelixResourceManager) { + super(taskName, runFrequencyInSeconds, pinotHelixResourceManager); + } + + @Override + public void process(List<String> tables) { + + } + + @Override + protected boolean isLeader() { + return _isLeader; + } + + void setLeader(boolean isLeader) { + _isLeader = isLeader; + } + } } diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java index d867be3..310df33 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java @@ -1131,13 +1131,20 @@ public class SegmentCompletionTest { public static class MockSegmentCompletionManager extends SegmentCompletionManager { public long _secconds; + private boolean _isLeader; protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader, boolean isConnected) { super(createMockHelixManager(isLeader, isConnected), segmentManager, new ControllerMetrics(new MetricsRegistry())); + _isLeader = isLeader; } @Override protected long getCurrentTimeMs() { return _secconds * 1000L; } + + @Override + protected boolean isLeader() { + return _isLeader; + } } } diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java index 1baf36f..b7c67a5 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -84,7 +84,7 @@ public class RetentionManagerTest { when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList); - RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0); + RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0); retentionManager.init(); retentionManager.run(); @@ -156,7 +156,6 @@ public class RetentionManagerTest { private void setupPinotHelixResourceManager(TableConfig tableConfig, final List<String> removedSegments, PinotHelixResourceManager resourceManager) { final String tableNameWithType = tableConfig.getTableName(); - when(resourceManager.isLeader()).thenReturn(true); when(resourceManager.getAllTables()).thenReturn(Collections.singletonList(tableNameWithType)); SegmentDeletionManager deletionManager = mock(SegmentDeletionManager.class); @@ -202,7 +201,7 @@ public class RetentionManagerTest { setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments); setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager); - RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0); + RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0); retentionManager.init(); retentionManager.run(); @@ -306,4 +305,17 @@ public class RetentionManagerTest { when(segmentMetadata.getTimeGranularity()).thenReturn(new Duration(timeUnit.toMillis(1))); return segmentMetadata; } + + private class MockRetentionManager extends RetentionManager { + + public MockRetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds, + int deletedSegmentsRetentionInDays) { + super(pinotHelixResourceManager, runFrequencyInSeconds, deletedSegmentsRetentionInDays); + } + + @Override + protected boolean isLeader() { + return true; + } + } } diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java index e64b6ee..4ea6967 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java @@ -58,7 +58,6 @@ public class StorageQuotaCheckerTest { _pinotHelixResourceManager = mock(PinotHelixResourceManager.class); when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig); when(_validationConfig.getReplicationNumber()).thenReturn(2); - when(_pinotHelixResourceManager.isLeader()).thenReturn(true); TEST_DIR.mkdirs(); } @@ -69,7 +68,7 @@ public class StorageQuotaCheckerTest { @Test public void testNoQuota() throws InvalidConfigException { - StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager); + StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager); when(_tableConfig.getQuotaConfig()).thenReturn(null); StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "myTable", "segment", 1000); @@ -78,7 +77,7 @@ public class StorageQuotaCheckerTest { @Test public void testNoStorageQuotaConfig() throws InvalidConfigException { - StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager); + StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager); when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig); when(_quotaConfig.storageSizeBytes()).thenReturn(-1L); StorageQuotaChecker.QuotaCheckerResponse res = @@ -118,7 +117,7 @@ public class StorageQuotaCheckerTest { when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig); when(_quotaConfig.storageSizeBytes()).thenReturn(3000L); when(_quotaConfig.getStorage()).thenReturn("3K"); - StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager); + StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager); StorageQuotaChecker.QuotaCheckerResponse response = checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, "segment1", 1000); Assert.assertTrue(response.isSegmentWithinQuota); @@ -160,4 +159,17 @@ public class StorageQuotaCheckerTest { response = checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, "segment1", 1000); Assert.assertTrue(response.isSegmentWithinQuota); } + + private class MockStorageQuotaChecker extends StorageQuotaChecker { + + public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader, + ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager) { + super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager); + } + + @Override + protected boolean isLeader() { + return true; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
