This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 35d61aa Add ControllerLeadershipManager as single place to check
controller leadership changes (#3604)
35d61aa is described below
commit 35d61aafa6a8fdc67cdd47d6e383dd791684e92a
Author: Neha Pawar <[email protected]>
AuthorDate: Fri Dec 14 15:07:20 2018 -0800
Add ControllerLeadershipManager as single place to check controller
leadership changes (#3604)
---
.../controller/ControllerLeadershipManager.java | 126 +++++++++++++++++++++
.../pinot/controller/ControllerStarter.java | 7 +-
.../controller/LeadershipChangeSubscriber.java | 32 ++++++
.../helix/core/PinotHelixResourceManager.java | 9 --
.../core/periodictask/ControllerPeriodicTask.java | 9 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 24 +---
.../core/realtime/PinotRealtimeSegmentManager.java | 29 +++--
.../core/realtime/SegmentCompletionManager.java | 17 ++-
.../controller/validation/StorageQuotaChecker.java | 7 +-
.../controller/helix/SegmentStatusCheckerTest.java | 43 +++----
.../periodictask/ControllerPeriodicTaskTest.java | 33 +++++-
.../helix/core/realtime/SegmentCompletionTest.java | 7 ++
.../helix/core/retention/RetentionManagerTest.java | 18 ++-
.../validation/StorageQuotaCheckerTest.java | 20 +++-
14 files changed, 299 insertions(+), 82 deletions(-)
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
new file mode 100644
index 0000000..66e0482
--- /dev/null
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
@@ -0,0 +1,126 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.pinot.controller;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.listeners.ControllerChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single place for listening on controller changes
+ * This should be created at controller startup and everyone who wants to
listen to controller changes should subscribe
+ */
+public class ControllerLeadershipManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ControllerLeadershipManager.class);
+
+ private static ControllerLeadershipManager INSTANCE = null;
+
+ private HelixManager _helixManager;
+ private volatile boolean _amILeader = false;
+
+ private Map<String, LeadershipChangeSubscriber> _subscribers = new
ConcurrentHashMap<>();
+
+ private ControllerLeadershipManager(HelixManager helixManager) {
+ _helixManager = helixManager;
+ _helixManager.addControllerListener((ControllerChangeListener)
notificationContext -> onControllerChange());
+ }
+
+ /**
+ * Create an instance of ControllerLeadershipManager
+ * @param helixManager
+ */
+ public static synchronized void init(HelixManager helixManager) {
+ if (INSTANCE != null) {
+ throw new RuntimeException("Instance of ControllerLeadershipManager
already created");
+ }
+ INSTANCE = new ControllerLeadershipManager(helixManager);
+ }
+
+ /**
+ * Get the instance of ControllerLeadershipManager
+ * @return
+ */
+ public static synchronized ControllerLeadershipManager getInstance() {
+ if (INSTANCE == null) {
+ throw new RuntimeException("Instance of ControllerLeadershipManager not
yet created");
+ }
+ return INSTANCE;
+ }
+
+ /**
+ * When stopping this service, if the controller is leader, invoke {@link
ControllerLeadershipManager#onBecomingNonLeader()}
+ */
+ public void stop() {
+ if (_amILeader) {
+ onBecomingNonLeader();
+ }
+ }
+
+ /**
+ * Callback on changes in the controller
+ */
+ protected void onControllerChange() {
+ if (_helixManager.isLeader()) {
+ if (!_amILeader) {
+ _amILeader = true;
+ LOGGER.info("Became leader");
+ onBecomingLeader();
+ } else {
+ LOGGER.info("Already leader. Duplicate notification");
+ }
+ } else {
+ if (_amILeader) {
+ _amILeader = false;
+ LOGGER.info("Lost leadership");
+ onBecomingNonLeader();
+ } else {
+ LOGGER.info("Already not leader. Duplicate notification");
+ }
+ }
+ }
+
+ public boolean isLeader() {
+ return _amILeader;
+ }
+
+ private void onBecomingLeader() {
+ _subscribers.forEach((k, v) -> v.onBecomingLeader());
+ }
+
+ private void onBecomingNonLeader() {
+ _subscribers.forEach((k, v) -> v.onBecomingNonLeader());
+ }
+
+ /**
+ * Subscribe to changes in the controller leadership
+ * If controller is already leader, invoke {@link
LeadershipChangeSubscriber#onBecomingLeader()}
+ * @param name
+ * @param subscriber
+ */
+ public void subscribe(String name, LeadershipChangeSubscriber subscriber) {
+ LOGGER.info("{} subscribing to leadership changes", name);
+ _subscribers.put(name, subscriber);
+ if (_amILeader) {
+ subscriber.onBecomingLeader();
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index f1336d8..33fb5ce 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -161,13 +161,15 @@ public class ControllerStarter {
_helixResourceManager.start();
final HelixManager helixManager =
_helixResourceManager.getHelixZkManager();
+ LOGGER.info("Init controller leadership manager");
+ ControllerLeadershipManager.init(helixManager);
+
LOGGER.info("Starting task resource manager");
_helixTaskResourceManager = new PinotHelixTaskResourceManager(new
TaskDriver(helixManager));
// Helix resource manager must be started in order to create
PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config,
_controllerMetrics);
- PinotLLCRealtimeSegmentManager.getInstance().start();
_realtimeSegmentsManager.start(_controllerMetrics);
// Setting up periodic tasks
@@ -287,6 +289,9 @@ public class ControllerStarter {
public void stop() {
try {
+ LOGGER.info("Stopping controller leadership manager");
+ ControllerLeadershipManager.getInstance().stop();
+
// Stop PinotLLCSegmentManager before stopping Jersey API. It is
possible that stopping Jersey API
// may interrupt the handlers waiting on an I/O.
PinotLLCRealtimeSegmentManager.getInstance().stop();
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/LeadershipChangeSubscriber.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/LeadershipChangeSubscriber.java
new file mode 100644
index 0000000..844fa3e
--- /dev/null
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/LeadershipChangeSubscriber.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.controller;
+
+/**
+ * Interface for a subscriber to the {@link ControllerLeadershipManager}
+ */
+public interface LeadershipChangeSubscriber {
+
+ /**
+ * Callback to invoke on becoming leader
+ */
+ void onBecomingLeader();
+
+ /**
+ * Callback to invoke on losing leadership
+ */
+ void onBecomingNonLeader();
+}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index 971c908..423db8c 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -221,15 +221,6 @@ public class PinotHelixResourceManager {
}
/**
- * Check whether the Helix manager is the leader.
- *
- * @return Whether the Helix manager is the leader
- */
- public boolean isLeader() {
- return _helixZkManager.isLeader();
- }
-
- /**
* Get the Helix admin.
*
* @return Helix admin
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3d1ef6a..3b12c0f 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -15,6 +15,8 @@
*/
package com.linkedin.pinot.controller.helix.core.periodictask;
+import com.google.common.annotations.VisibleForTesting;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
import java.util.List;
@@ -59,7 +61,7 @@ public abstract class ControllerPeriodicTask extends
BasePeriodicTask {
@Override
public void run() {
- if (!_pinotHelixResourceManager.isLeader()) {
+ if (!isLeader()) {
skipLeaderTask();
} else {
List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
@@ -109,4 +111,9 @@ public abstract class ControllerPeriodicTask extends
BasePeriodicTask {
* @param tables List of table names
*/
public abstract void process(List<String> tables);
+
+ @VisibleForTesting
+ protected boolean isLeader() {
+ return ControllerLeadershipManager.getInstance().isLeader();
+ }
}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 563fb80..486231a 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -42,6 +42,7 @@ import com.linkedin.pinot.common.utils.TarGzCompressionUtils;
import com.linkedin.pinot.common.utils.helix.HelixHelper;
import com.linkedin.pinot.common.utils.retry.RetryPolicies;
import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
import com.linkedin.pinot.controller.api.events.MetadataEventNotifierFactory;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import
com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -124,7 +125,6 @@ public class PinotLLCRealtimeSegmentManager {
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final PinotHelixResourceManager _helixResourceManager;
private final String _clusterName;
- private boolean _amILeader = false;
private final ControllerConf _controllerConf;
private final ControllerMetrics _controllerMetrics;
private final int _numIdealStateUpdateLocks;
@@ -163,11 +163,6 @@ public class PinotLLCRealtimeSegmentManager {
SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf,
controllerMetrics);
}
- public void start() {
- _helixManager.addControllerListener(changeContext -> onBecomeLeader());
- }
-
-
public void stop() {
_isStopping = true;
LOGGER.info("Awaiting segment metadata commits: maxWaitTimeMillis = {}",
MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS);
@@ -218,24 +213,9 @@ public class PinotLLCRealtimeSegmentManager {
return INSTANCE;
}
- private void onBecomeLeader() {
- if (isLeader()) {
- if (!_amILeader) {
- // We were not leader before, now we are.
- _amILeader = true;
- LOGGER.info("Became leader");
- } else {
- // We already had leadership, nothing to do.
- LOGGER.info("Already leader. Duplicate notification");
- }
- } else {
- _amILeader = false;
- LOGGER.info("Lost leadership");
- }
- }
protected boolean isLeader() {
- return _helixManager.isLeader();
+ return ControllerLeadershipManager.getInstance().isLeader();
}
protected boolean isConnected() {
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
index 07c411f..c427b0d 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
@@ -22,10 +22,9 @@ import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
import com.linkedin.pinot.common.metadata.instance.InstanceZKMetadata;
import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import com.linkedin.pinot.common.utils.CommonConstants;
-import com.linkedin.pinot.core.realtime.stream.StreamConfig;
import com.linkedin.pinot.common.metrics.ControllerMeter;
import com.linkedin.pinot.common.metrics.ControllerMetrics;
+import com.linkedin.pinot.common.utils.CommonConstants;
import com.linkedin.pinot.common.utils.CommonConstants.Helix.TableType;
import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import com.linkedin.pinot.common.utils.CommonConstants.Segment.SegmentType;
@@ -33,9 +32,12 @@ import com.linkedin.pinot.common.utils.HLCSegmentName;
import com.linkedin.pinot.common.utils.SegmentName;
import com.linkedin.pinot.common.utils.helix.HelixHelper;
import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.controller.LeadershipChangeSubscriber;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import com.linkedin.pinot.core.query.utils.Pair;
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -47,8 +49,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
@@ -65,7 +65,7 @@ import org.slf4j.LoggerFactory;
/**
* Realtime segment manager, which assigns realtime segments to server
instances so that they can consume from the stream.
*/
-public class PinotRealtimeSegmentManager implements HelixPropertyListener,
IZkChildListener, IZkDataListener {
+public class PinotRealtimeSegmentManager implements HelixPropertyListener,
IZkChildListener, IZkDataListener, LeadershipChangeSubscriber {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotRealtimeSegmentManager.class);
private static final String TABLE_CONFIG = "/CONFIGS/TABLE";
private static final String SEGMENTS_PATH = "/SEGMENTS";
@@ -101,12 +101,7 @@ public class PinotRealtimeSegmentManager implements
HelixPropertyListener, IZkCh
_zkClient.subscribeDataChanges(_tableConfigPath, this);
// Subscribe to leadership changes
- _pinotHelixResourceManager.getHelixZkManager().addControllerListener(new
ControllerChangeListener() {
- @Override
- public void onControllerChange(NotificationContext changeContext) {
- processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
- }
- });
+
ControllerLeadershipManager.getInstance().subscribe(PinotLLCRealtimeSegmentManager.class.getName(),
this);
// Setup change listeners for already existing tables, if any.
processPropertyStoreChange(_tableConfigPath);
@@ -265,7 +260,7 @@ public class PinotRealtimeSegmentManager implements
HelixPropertyListener, IZkCh
}
private boolean isLeader() {
- return _pinotHelixResourceManager.isLeader();
+ return ControllerLeadershipManager.getInstance().isLeader();
}
@Override
@@ -408,4 +403,14 @@ public class PinotRealtimeSegmentManager implements
HelixPropertyListener, IZkCh
LOGGER.info("PinotRealtimeSegmentManager.handleDataDeleted: {}", dataPath);
processPropertyStoreChange(dataPath);
}
+
+ @Override
+ public void onBecomingLeader() {
+ processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
+ }
+
+ @Override
+ public void onBecomingNonLeader() {
+ processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
+ }
}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 1ba51bc..6d47ca7 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.controller.helix.core.realtime;
+import com.google.common.annotations.VisibleForTesting;
import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import com.linkedin.pinot.common.metrics.ControllerMeter;
@@ -23,6 +24,7 @@ import
com.linkedin.pinot.common.protocols.SegmentCompletionProtocol;
import com.linkedin.pinot.common.utils.CommonConstants;
import com.linkedin.pinot.common.utils.LLCSegmentName;
import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
import
com.linkedin.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import java.util.HashMap;
import java.util.HashSet;
@@ -153,7 +155,7 @@ public class SegmentCompletionManager {
* that it currently has (i.e. next offset that it will consume, if it
continues to consume).
*/
public SegmentCompletionProtocol.Response
segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) {
- if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+ if (!isLeader() || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER,
1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
}
@@ -190,7 +192,7 @@ public class SegmentCompletionManager {
* incoming segment).
*/
public SegmentCompletionProtocol.Response segmentCommitStart(final
SegmentCompletionProtocol.Request.Params reqParams) {
- if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+ if (!isLeader() || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER,
1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
}
@@ -214,7 +216,7 @@ public class SegmentCompletionManager {
}
public SegmentCompletionProtocol.Response extendBuildTime(final
SegmentCompletionProtocol.Request.Params reqParams) {
- if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+ if (!isLeader() || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER,
1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
}
@@ -245,7 +247,7 @@ public class SegmentCompletionManager {
* @return
*/
public SegmentCompletionProtocol.Response
segmentStoppedConsuming(SegmentCompletionProtocol.Request.Params reqParams) {
- if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+ if (!isLeader() || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER,
1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
}
@@ -280,7 +282,7 @@ public class SegmentCompletionManager {
* @return
*/
public SegmentCompletionProtocol.Response
segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, boolean
success, boolean isSplitCommit) {
- if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+ if (!isLeader() || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER,
1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
}
@@ -1091,4 +1093,9 @@ public class SegmentCompletionManager {
return false;
}
}
+
+ @VisibleForTesting
+ protected boolean isLeader() {
+ return ControllerLeadershipManager.getInstance().isLeader();
+ }
}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
index 7d6be94..2614c2d 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
@@ -23,6 +23,7 @@ import
com.linkedin.pinot.common.exception.InvalidConfigException;
import com.linkedin.pinot.common.metrics.ControllerGauge;
import com.linkedin.pinot.common.metrics.ControllerMetrics;
import com.linkedin.pinot.common.utils.DataSize;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.controller.util.TableSizeReader;
import java.io.File;
@@ -150,7 +151,7 @@ public class StorageQuotaChecker {
tableName, tableSubtypeSize.estimatedSizeInBytes,
tableSubtypeSize.reportedSizeInBytes);
// Only emit the real percentage of storage quota usage by lead
controller, otherwise emit 0L.
- if (_pinotHelixResourceManager.isLeader() && allowedStorageBytes != 0L) {
+ if (isLeader() && allowedStorageBytes != 0L) {
long existingStorageQuotaUtilization =
tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes;
_controllerMetrics.setValueOfTableGauge(tableName,
ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
existingStorageQuotaUtilization);
@@ -193,4 +194,8 @@ public class StorageQuotaChecker {
return failure(message);
}
}
+
+ protected boolean isLeader() {
+ return ControllerLeadershipManager.getInstance().isLeader();
+ }
}
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index de6eca9..4d4c324 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -75,7 +75,6 @@ public class SegmentStatusCheckerTest {
}
{
helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(true);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -87,7 +86,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
+ segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -141,7 +140,6 @@ public class SegmentStatusCheckerTest {
}
{
helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(true);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -153,7 +151,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
+ segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -218,7 +216,6 @@ public class SegmentStatusCheckerTest {
}
{
helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(true);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -231,7 +228,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
+ segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -266,7 +263,6 @@ public class SegmentStatusCheckerTest {
}
{
helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(true);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -278,7 +274,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
+ segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -301,7 +297,6 @@ public class SegmentStatusCheckerTest {
}
{
helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(true);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -313,7 +308,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
+ segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -369,7 +364,6 @@ public class SegmentStatusCheckerTest {
{
helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(true);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -382,7 +376,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
+ segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -415,7 +409,6 @@ public class SegmentStatusCheckerTest {
}
{
helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(true);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -427,7 +420,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
+ segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -454,7 +447,6 @@ public class SegmentStatusCheckerTest {
}
{
helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(true);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -466,7 +458,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
+ segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -501,7 +493,6 @@ public class SegmentStatusCheckerTest {
}
{
helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(true);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -513,7 +504,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
+ segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
// verify state before test
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
ControllerGauge.DISABLED_TABLE_COUNT), 0);
@@ -553,7 +544,6 @@ public class SegmentStatusCheckerTest {
}
{
helixResourceManager = mock(PinotHelixResourceManager.class);
- when(helixResourceManager.isLeader()).thenReturn(true);
when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -565,7 +555,7 @@ public class SegmentStatusCheckerTest {
}
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
+ segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager,
config, controllerMetrics);
segmentStatusChecker.init();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -577,4 +567,17 @@ public class SegmentStatusCheckerTest {
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
}
+
+ private class MockSegmentStatusChecker extends SegmentStatusChecker {
+
+ public MockSegmentStatusChecker(PinotHelixResourceManager
pinotHelixResourceManager, ControllerConf config,
+ ControllerMetrics metricsRegistry) {
+ super(pinotHelixResourceManager, config, metricsRegistry);
+ }
+
+ @Override
+ protected boolean isLeader() {
+ return true;
+ }
+ }
}
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 6a2c1b1..9a5d26c 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -32,8 +32,8 @@ public class ControllerPeriodicTaskTest {
private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
private final AtomicBoolean _processCalled = new AtomicBoolean();
- private final ControllerPeriodicTask _task =
- new ControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS,
_resourceManager) {
+ private final MockControllerPeriodicTask _task =
+ new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS,
_resourceManager) {
@Override
public void onBecomeLeader() {
_onBecomeLeaderCalled.set(true);
@@ -48,6 +48,7 @@ public class ControllerPeriodicTaskTest {
public void process(List<String> tables) {
_processCalled.set(true);
}
+
};
private void resetState() {
@@ -68,6 +69,7 @@ public class ControllerPeriodicTaskTest {
public void testChangeLeadership() {
// Initial state
resetState();
+ _task.setLeader(false);
_task.init();
assertFalse(_onBecomeLeaderCalled.get());
assertFalse(_onBecomeNonLeaderCalled.get());
@@ -82,7 +84,7 @@ public class ControllerPeriodicTaskTest {
// From non-leader to leader
resetState();
- when(_resourceManager.isLeader()).thenReturn(true);
+ _task.setLeader(true);
_task.run();
assertTrue(_onBecomeLeaderCalled.get());
assertFalse(_onBecomeNonLeaderCalled.get());
@@ -97,10 +99,33 @@ public class ControllerPeriodicTaskTest {
// From leader to non-leader
resetState();
- when(_resourceManager.isLeader()).thenReturn(false);
+ _task.setLeader(false);
_task.run();
assertFalse(_onBecomeLeaderCalled.get());
assertTrue(_onBecomeNonLeaderCalled.get());
assertFalse(_processCalled.get());
}
+
+ private class MockControllerPeriodicTask extends ControllerPeriodicTask {
+
+ private boolean _isLeader = true;
+ public MockControllerPeriodicTask(String taskName, long
runFrequencyInSeconds,
+ PinotHelixResourceManager pinotHelixResourceManager) {
+ super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
+ }
+
+ @Override
+ public void process(List<String> tables) {
+
+ }
+
+ @Override
+ protected boolean isLeader() {
+ return _isLeader;
+ }
+
+ void setLeader(boolean isLeader) {
+ _isLeader = isLeader;
+ }
+ }
}
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index d867be3..310df33 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -1131,13 +1131,20 @@ public class SegmentCompletionTest {
public static class MockSegmentCompletionManager extends
SegmentCompletionManager {
public long _secconds;
+ private boolean _isLeader;
protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager
segmentManager, boolean isLeader,
boolean isConnected) {
super(createMockHelixManager(isLeader, isConnected), segmentManager, new
ControllerMetrics(new MetricsRegistry()));
+ _isLeader = isLeader;
}
@Override
protected long getCurrentTimeMs() {
return _secconds * 1000L;
}
+
+ @Override
+ protected boolean isLeader() {
+ return _isLeader;
+ }
}
}
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 1da115e..8e25505 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -84,7 +84,7 @@ public class RetentionManagerTest {
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
- RetentionManager retentionManager = new
RetentionManager(pinotHelixResourceManager, 0, 0);
+ RetentionManager retentionManager = new
MockRetentionManager(pinotHelixResourceManager, 0, 0);
retentionManager.init();
retentionManager.run();
@@ -156,7 +156,6 @@ public class RetentionManagerTest {
private void setupPinotHelixResourceManager(TableConfig tableConfig, final
List<String> removedSegments,
PinotHelixResourceManager resourceManager) {
final String tableNameWithType = tableConfig.getTableName();
- when(resourceManager.isLeader()).thenReturn(true);
when(resourceManager.getAllTables()).thenReturn(Collections.singletonList(tableNameWithType));
SegmentDeletionManager deletionManager =
mock(SegmentDeletionManager.class);
@@ -202,7 +201,7 @@ public class RetentionManagerTest {
setupSegmentMetadata(tableConfig, now, initialNumSegments,
removedSegments);
setupPinotHelixResourceManager(tableConfig, removedSegments,
pinotHelixResourceManager);
- RetentionManager retentionManager = new
RetentionManager(pinotHelixResourceManager, 0, 0);
+ RetentionManager retentionManager = new
MockRetentionManager(pinotHelixResourceManager, 0, 0);
retentionManager.init();
retentionManager.run();
@@ -306,4 +305,17 @@ public class RetentionManagerTest {
when(segmentMetadata.getTimeGranularity()).thenReturn(new
Duration(timeUnit.toMillis(1)));
return segmentMetadata;
}
+
+ private class MockRetentionManager extends RetentionManager {
+
+ public MockRetentionManager(PinotHelixResourceManager
pinotHelixResourceManager, int runFrequencyInSeconds,
+ int deletedSegmentsRetentionInDays) {
+ super(pinotHelixResourceManager, runFrequencyInSeconds,
deletedSegmentsRetentionInDays);
+ }
+
+ @Override
+ protected boolean isLeader() {
+ return true;
+ }
+ }
}
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
index e64b6ee..4ea6967 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -58,7 +58,6 @@ public class StorageQuotaCheckerTest {
_pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
when(_validationConfig.getReplicationNumber()).thenReturn(2);
- when(_pinotHelixResourceManager.isLeader()).thenReturn(true);
TEST_DIR.mkdirs();
}
@@ -69,7 +68,7 @@ public class StorageQuotaCheckerTest {
@Test
public void testNoQuota() throws InvalidConfigException {
- StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig,
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+ StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig,
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
when(_tableConfig.getQuotaConfig()).thenReturn(null);
StorageQuotaChecker.QuotaCheckerResponse res =
checker.isSegmentStorageWithinQuota(TEST_DIR, "myTable", "segment",
1000);
@@ -78,7 +77,7 @@ public class StorageQuotaCheckerTest {
@Test
public void testNoStorageQuotaConfig() throws InvalidConfigException {
- StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig,
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+ StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig,
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
StorageQuotaChecker.QuotaCheckerResponse res =
@@ -118,7 +117,7 @@ public class StorageQuotaCheckerTest {
when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
when(_quotaConfig.storageSizeBytes()).thenReturn(3000L);
when(_quotaConfig.getStorage()).thenReturn("3K");
- StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig,
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+ StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig,
_tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
StorageQuotaChecker.QuotaCheckerResponse response =
checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, "segment1",
1000);
Assert.assertTrue(response.isSegmentWithinQuota);
@@ -160,4 +159,17 @@ public class StorageQuotaCheckerTest {
response = checker.isSegmentStorageWithinQuota(TEST_DIR, tableName,
"segment1", 1000);
Assert.assertTrue(response.isSegmentWithinQuota);
}
+
+ private class MockStorageQuotaChecker extends StorageQuotaChecker {
+
+ public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader
tableSizeReader,
+ ControllerMetrics controllerMetrics, PinotHelixResourceManager
pinotHelixResourceManager) {
+ super(tableConfig, tableSizeReader, controllerMetrics,
pinotHelixResourceManager);
+ }
+
+ @Override
+ protected boolean isLeader() {
+ return true;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]