This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 493ed64 Split validation manager tasks into separate periodic tasks
(#3668)
493ed64 is described below
commit 493ed643e1e5a852cf2549cfe76ef4b3f1e7e0d4
Author: Neha Pawar <[email protected]>
AuthorDate: Tue Jan 8 17:37:12 2019 -0800
Split validation manager tasks into separate periodic tasks (#3668)
Split ValidationManager duties into separate ControllerPeriodicTasks viz.
OfflineSegmentIntervalChecker (to perform checks related to offline segments),
RealtimeSegmentValidationManager (to perform validation of realtime consuming
partitions), BrokerResourceValidationManager (to perform validation of broker
resource). These have been split out as they need to be run in different
frequencies than each other.
---
.../linkedin/pinot/controller/ControllerConf.java | 161 ++++++++++++++------
.../pinot/controller/ControllerStarter.java | 40 +++--
.../realtime/PinotLLCRealtimeSegmentManager.java | 10 +-
.../helix/core/retention/RetentionManager.java | 12 +-
.../BrokerResourceValidationManager.java | 82 ++++++++++
...ger.java => OfflineSegmentIntervalChecker.java} | 127 ++--------------
.../RealtimeSegmentValidationManager.java | 167 +++++++++++++++++++++
.../PinotLLCRealtimeSegmentManagerTest.java | 26 ++--
.../helix/core/retention/RetentionManagerTest.java | 11 +-
.../validation/ValidationManagerTest.java | 14 +-
.../tests/SegmentCompletionIntegrationTests.java | 4 +-
.../admin/command/StartControllerCommand.java | 4 +-
12 files changed, 459 insertions(+), 199 deletions(-)
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index df0c2df..863f3cc 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -54,15 +54,43 @@ public class ControllerConf extends PropertiesConfiguration
{
private static final String CONSOLE_WEBAPP_ROOT_PATH =
"controller.query.console";
private static final String CONSOLE_WEBAPP_USE_HTTPS =
"controller.query.console.useHttps";
private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT =
"controller.upload.onlineToOfflineTimeout";
- private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS =
"controller.retention.frequencyInSeconds";
- private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS =
"controller.validation.frequencyInSeconds";
- private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS =
"controller.statuschecker.frequencyInSeconds";
- private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
"controller.realtime.segment.relocator.frequency";
- private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
"controller.statuschecker.waitForPushTimeInSeconds";
+
+ public static class ControllerPeriodicTasksConf {
+ private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS =
"controller.retention.frequencyInSeconds";
+ @Deprecated // The ValidationManager has been split up into 3 separate
tasks, each having their own frequency config settings
+ private static final String
DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS =
+ "controller.validation.frequencyInSeconds";
+ private static final String
OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS =
+ "controller.offline.segment.interval.checker.frequencyInSeconds";
+ private static final String
REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
+ "controller.realtime.segment.validation.frequencyInSeconds";
+ private static final String
BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS =
+ "controller.broker.resource.validation.frequencyInSeconds";
+ private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS =
"controller.statuschecker.frequencyInSeconds";
+ private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
+ "controller.statuschecker.waitForPushTimeInSeconds";
+ private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS =
"controller.task.frequencyInSeconds";
+ private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
+ "controller.realtime.segment.relocator.frequency";
+ // Because segment level validation is expensive and requires heavy ZK
access, we run segment level validation with a
+ // separate interval
+ private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
+ "controller.segment.level.validation.intervalInSeconds";
+
+ private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS
= 6 * 60 * 60; // 6 Hours.
+ private static final int
DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60;
// 24 Hours.
+ private static final int
DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+ private static final int
DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+ private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS =
5 * 60; // 5 minutes
+ private static final int
DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+ private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1;
// Disabled
+ private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
"1h"; // 1 hour
+ private static final int
DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
+ }
+
private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS =
"server.request.timeoutSeconds";
private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS =
"controller.realtime.segment.commit.timeoutSeconds";
private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS =
"controller.deleted.segments.retentionInDays";
- private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS =
"controller.task.frequencyInSeconds";
private static final String TABLE_MIN_REPLICAS = "table.minReplicas";
private static final String ENABLE_SPLIT_COMMIT =
"controller.enable.split.commit";
private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
@@ -73,25 +101,17 @@ public class ControllerConf extends
PropertiesConfiguration {
private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS =
"controller.segment.upload.timeoutInMillis";
private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS =
"controller.realtime.segment.metadata.commit.numLocks";
private static final String ENABLE_STORAGE_QUOTA_CHECK =
"controller.enable.storage.quota.check";
- // Because segment level validation is expensive and requires heavy ZK
access, we run segment level validation with a
- // separate interval
- private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
- "controller.segment.level.validation.intervalInSeconds";
+
private static final String ENABLE_BATCH_MESSAGE_MODE =
"controller.enable.batch.message.mode";
// Defines the kind of storage and the underlying PinotFS implementation
private static final String PINOT_FS_FACTORY_CLASS_PREFIX =
"controller.storage.factory.class";
private static final String PINOT_FS_FACTORY_CLASS_LOCAL =
"controller.storage.factory.class.file";
- private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS =
6 * 60 * 60; // 6 Hours.
- private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS
= 60 * 60; // 1 Hour.
- private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5
* 60; // 5 minutes
- private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
"1h"; // 1 hour
- private static final int
DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+
private static final long
DEFAULT_EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT_MILLIS = 120_000L; // 2 minutes
private static final int DEFAULT_SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = 30;
private static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7;
- private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; //
Disabled
private static final int DEFAULT_TABLE_MIN_REPLICAS = 1;
private static final boolean DEFAULT_ENABLE_SPLIT_COMMIT = false;
private static final int DEFAULT_JERSEY_ADMIN_PORT = 21000;
@@ -101,7 +121,6 @@ public class ControllerConf extends PropertiesConfiguration
{
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS =
64;
private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = true;
- private static final int
DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL =
LocalPinotFS.class.getName();
@@ -324,58 +343,114 @@ public class ControllerConf extends
PropertiesConfiguration {
}
public int getRetentionControllerFrequencyInSeconds() {
- if (containsKey(RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) {
- return Integer.parseInt((String)
getProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
+ if
(containsKey(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS))
{
+ return Integer.parseInt((String)
getProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
}
- return DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
+ return
ControllerPeriodicTasksConf.DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
}
public void setRetentionControllerFrequencyInSeconds(int
retentionFrequencyInSeconds) {
- setProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS,
Integer.toString(retentionFrequencyInSeconds));
+
setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS,
+ Integer.toString(retentionFrequencyInSeconds));
}
- public int getValidationControllerFrequencyInSeconds() {
- if (containsKey(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
- return Integer.parseInt((String)
getProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
+ /**
+ * Returns the config value for
controller.offline.segment.interval.checker.frequencyInSeconds if it exists.
+ * If it doesn't exist, returns the segment level validation interval. This
is done in order to retain the current behavior,
+ * wherein the offline validation tasks were done at segment level
validation interval frequency
+ * The default value is the new
DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS
+ * @return
+ */
+ public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
+ if
(containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS))
{
+ return Integer.parseInt(
+ (String)
getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS));
+ }
+ return
getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
+
ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS);
+ }
+
+ public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int
validationFrequencyInSeconds) {
+
setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS,
+ Integer.toString(validationFrequencyInSeconds));
+ }
+
+ /**
+ * Returns the config value for
controller.realtime.segment.validation.frequencyInSeconds if it exists.
+ * If it doesn't exist, returns the validation controller frequency. This is
done in order to retain the current behavior,
+ * wherein the realtime validation tasks were done at validation controller
frequency
+ * The default value is the new
DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS
+ * @return
+ */
+ public int getRealtimeSegmentValidationFrequencyInSeconds() {
+ if
(containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS))
{
+ return Integer.parseInt(
+ (String)
getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS));
+ }
+ return
getInt(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
+
ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS);
+ }
+
+ public void setRealtimeSegmentValidationFrequencyInSeconds(int
validationFrequencyInSeconds) {
+
setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS,
+ Integer.toString(validationFrequencyInSeconds));
+ }
+
+ /**
+ * Returns the config value for
controller.broker.resource.validation.frequencyInSeconds if it exists.
+ * If it doesn't exist, returns the validation controller frequency. This is
done in order to retain the current behavior,
+ * wherein the broker resource validation tasks were done at validation
controller frequency
+ * The default value is the new
DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS
+ * @return
+ */
+ public int getBrokerResourceValidationFrequencyInSeconds() {
+ if
(containsKey(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS))
{
+ return Integer.parseInt(
+ (String)
getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS));
}
- return DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
+ return
getInt(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
+
ControllerPeriodicTasksConf.DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS);
}
- public void setValidationControllerFrequencyInSeconds(int
validationFrequencyInSeconds) {
- setProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
Integer.toString(validationFrequencyInSeconds));
+ public void setBrokerResourceValidationFrequencyInSeconds(int
validationFrequencyInSeconds) {
+
setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS,
+ Integer.toString(validationFrequencyInSeconds));
}
public int getStatusCheckerFrequencyInSeconds() {
- if (containsKey(STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
- return Integer.parseInt((String)
getProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS));
+ if
(containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
+ return Integer.parseInt((String)
getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS));
}
- return DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
+ return
ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
}
public void setStatusCheckerFrequencyInSeconds(int
statusCheckerFrequencyInSeconds) {
- setProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS,
Integer.toString(statusCheckerFrequencyInSeconds));
+
setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS,
+ Integer.toString(statusCheckerFrequencyInSeconds));
}
public String getRealtimeSegmentRelocatorFrequency() {
- if (containsKey(REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) {
- return (String) getProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
+ if
(containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY))
{
+ return (String)
getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
}
- return DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
+ return
ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
}
public void setRealtimeSegmentRelocatorFrequency(String relocatorFrequency) {
- setProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency);
+
setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY,
relocatorFrequency);
}
public int getStatusCheckerWaitForPushTimeInSeconds() {
- if (containsKey(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) {
- return Integer.parseInt((String)
getProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
+ if
(containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS))
{
+ return Integer.parseInt(
+ (String)
getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
}
- return DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
+ return
ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
}
public void setStatusCheckerWaitForPushTimeInSeconds(int
statusCheckerWaitForPushTimeInSeconds) {
- setProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS,
Integer.toString(statusCheckerWaitForPushTimeInSeconds));
+
setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS,
+ Integer.toString(statusCheckerWaitForPushTimeInSeconds));
}
public long getExternalViewOnlineToOfflineTimeout() {
@@ -417,11 +492,12 @@ public class ControllerConf extends
PropertiesConfiguration {
}
public int getTaskManagerFrequencyInSeconds() {
- return getInt(TASK_MANAGER_FREQUENCY_IN_SECONDS,
DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
+ return
getInt(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS,
+ ControllerPeriodicTasksConf.DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
}
public void setTaskManagerFrequencyInSeconds(int frequencyInSeconds) {
- setProperty(TASK_MANAGER_FREQUENCY_IN_SECONDS,
Integer.toString(frequencyInSeconds));
+ setProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS,
Integer.toString(frequencyInSeconds));
}
public int getDefaultTableMinReplicas() {
@@ -469,6 +545,7 @@ public class ControllerConf extends PropertiesConfiguration
{
}
public int getSegmentLevelValidationIntervalInSeconds() {
- return getInt(SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
+ return
getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
+
ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
}
}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 2799b44..2bca8ae 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -41,7 +41,9 @@ import
com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentMan
import
com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
import
com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import
com.linkedin.pinot.controller.validation.BrokerResourceValidationManager;
+import com.linkedin.pinot.controller.validation.OfflineSegmentIntervalChecker;
+import
com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
import com.linkedin.pinot.core.periodictask.PeriodicTask;
import com.linkedin.pinot.filesystem.PinotFSFactory;
@@ -86,7 +88,9 @@ public class ControllerStarter {
private final ControllerPeriodicTaskScheduler
_controllerPeriodicTaskScheduler;
// Can only be constructed after resource manager getting started
- private ValidationManager _validationManager;
+ private OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
+ private RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
+ private BrokerResourceValidationManager _brokerResourceValidationManager;
private RealtimeSegmentRelocator _realtimeSegmentRelocator;
private PinotHelixTaskResourceManager _helixTaskResourceManager;
private PinotTaskManager _taskManager;
@@ -95,8 +99,7 @@ public class ControllerStarter {
_config = conf;
_adminApp = new
ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(),
_config.getQueryConsoleUseHttps());
_helixResourceManager = new PinotHelixResourceManager(_config);
- _retentionManager = new RetentionManager(_helixResourceManager,
_config.getRetentionControllerFrequencyInSeconds(),
- _config.getDeletedSegmentsRetentionInDays());
+ _retentionManager = new RetentionManager(_helixResourceManager, _config);
_metricsRegistry = new MetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
_realtimeSegmentsManager = new
PinotRealtimeSegmentManager(_helixResourceManager);
@@ -111,8 +114,16 @@ public class ControllerStarter {
return _helixResourceManager;
}
- public ValidationManager getValidationManager() {
- return _validationManager;
+ public OfflineSegmentIntervalChecker getOfflineSegmentIntervalChecker() {
+ return _offlineSegmentIntervalChecker;
+ }
+
+ public RealtimeSegmentValidationManager
getRealtimeSegmentValidationManager() {
+ return _realtimeSegmentValidationManager;
+ }
+
+ public BrokerResourceValidationManager getBrokerResourceValidationManager() {
+ return _brokerResourceValidationManager;
}
public PinotHelixTaskResourceManager getHelixTaskResourceManager() {
@@ -181,10 +192,17 @@ public class ControllerStarter {
_taskManager = new PinotTaskManager(_helixTaskResourceManager,
_helixResourceManager, _config, _controllerMetrics);
periodicTasks.add(_taskManager);
periodicTasks.add(_retentionManager);
- _validationManager =
- new ValidationManager(_config, _helixResourceManager,
PinotLLCRealtimeSegmentManager.getInstance(),
+ _offlineSegmentIntervalChecker =
+ new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new
ValidationMetrics(_metricsRegistry));
+ _realtimeSegmentValidationManager =
+ new RealtimeSegmentValidationManager(_config, _helixResourceManager,
PinotLLCRealtimeSegmentManager.getInstance(),
new ValidationMetrics(_metricsRegistry));
- periodicTasks.add(_validationManager);
+ _brokerResourceValidationManager =
+ new BrokerResourceValidationManager(_config, _helixResourceManager);
+
+ periodicTasks.add(_offlineSegmentIntervalChecker);
+ periodicTasks.add(_realtimeSegmentValidationManager);
+ periodicTasks.add(_brokerResourceValidationManager);
periodicTasks.add(_segmentStatusChecker);
periodicTasks.add(_realtimeSegmentRelocator);
@@ -351,7 +369,9 @@ public class ControllerStarter {
conf.setControllerVipHost("localhost");
conf.setControllerVipProtocol("http");
conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
- conf.setValidationControllerFrequencyInSeconds(3600);
+ conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+ conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+ conf.setBrokerResourceValidationFrequencyInSeconds(3600);
conf.setStatusCheckerFrequencyInSeconds(5 * 60);
conf.setRealtimeSegmentRelocatorFrequency("1h");
conf.setStatusCheckerWaitForPushTimeInSeconds(10 * 60);
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9860ec1..a77750e 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -818,8 +818,8 @@ public class PinotLLCRealtimeSegmentManager {
/**
* An instance is reporting that it has stopped consuming a topic due to
some error.
* Mark the state of the segment to be OFFLINE in idealstate.
- * When all replicas of this segment are marked offline, the
ValidationManager, in its next
- * run, will auto-create a new segment with the appropriate offset.
+ * When all replicas of this segment are marked offline, the {@link
com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager},
+ * in its next run, will auto-create a new segment with the appropriate
offset.
*/
public void segmentStoppedConsuming(final LLCSegmentName segmentName, final
String instance) {
String rawTableName = segmentName.getTableName();
@@ -950,7 +950,7 @@ public class PinotLLCRealtimeSegmentManager {
*
* TODO: We need to find a place to detect and update a gauge for
nonConsumingPartitionsCount for a table, and reset it to 0 at the end of
validateLLC
*/
- public void validateLLCSegments(final TableConfig tableConfig) {
+ public void ensureAllPartitionsConsuming(final TableConfig tableConfig) {
final String tableNameWithType = tableConfig.getTableName();
final StreamConfig streamConfig = new
StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
final int partitionCount = getPartitionCount(streamConfig);
@@ -958,7 +958,7 @@ public class PinotLLCRealtimeSegmentManager {
@Nullable
@Override
public IdealState apply(@Nullable IdealState idealState) {
- return validateLLCSegments(tableConfig, idealState, partitionCount);
+ return ensureAllPartitionsConsuming(tableConfig, idealState,
partitionCount);
}
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
}
@@ -1078,7 +1078,7 @@ public class PinotLLCRealtimeSegmentManager {
* TODO: split this method into multiple smaller methods
*/
@VisibleForTesting
- protected IdealState validateLLCSegments(final TableConfig tableConfig,
IdealState idealState,
+ protected IdealState ensureAllPartitionsConsuming(final TableConfig
tableConfig, IdealState idealState,
final int partitionCount) {
final String tableNameWithType = tableConfig.getTableName();
final StreamConfig streamConfig = new
StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index f177a80..726c481 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -26,6 +26,7 @@ import
com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import com.linkedin.pinot.common.utils.CommonConstants;
import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import
com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import
com.linkedin.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -52,10 +53,9 @@ public class RetentionManager extends ControllerPeriodicTask
{
private final int _deletedSegmentsRetentionInDays;
- public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
int runFrequencyInSeconds,
- int deletedSegmentsRetentionInDays) {
- super("RetentionManager", runFrequencyInSeconds,
pinotHelixResourceManager);
- _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
+ public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
ControllerConf config) {
+ super("RetentionManager",
config.getRetentionControllerFrequencyInSeconds(), pinotHelixResourceManager);
+ _deletedSegmentsRetentionInDays =
config.getDeletedSegmentsRetentionInDays();
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {},
deletedSegmentsRetentionInDays: {}",
getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
@@ -148,7 +148,7 @@ public class RetentionManager extends
ControllerPeriodicTask {
// In progress segment, only check LLC segment
if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) {
// Delete old LLC segment that hangs around. Do not delete segment
that are current since there may be a race
- // with ValidationManager trying to auto-create the LLC segment
+ // with RealtimeSegmentValidationManager trying to auto-create the
LLC segment
if (shouldDeleteInProgressLLCSegment(segmentName, idealState,
realtimeSegmentZKMetadata)) {
segmentsToDelete.add(segmentName);
}
@@ -172,7 +172,7 @@ public class RetentionManager extends
ControllerPeriodicTask {
return false;
}
// delete a segment only if it is old enough (5 days) or else,
- // 1. latest segment could get deleted in the middle of repair by
ValidationManager
+ // 1. latest segment could get deleted in the middle of repair by
RealtimeSegmentValidationManager
// 2. for a brand new segment, if this code kicks in after new metadata is
created but ideal state entry is not yet created (between step 2 and 3),
// the latest segment metadata could get marked for deletion
if (System.currentTimeMillis() -
realtimeSegmentZKMetadata.getCreationTime()
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
new file mode 100644
index 0000000..40078d6
--- /dev/null
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import
com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Rebuilds the broker resource if the instance set has changed
+ */
+public class BrokerResourceValidationManager extends ControllerPeriodicTask {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BrokerResourceValidationManager.class);
+
+ private List<InstanceConfig> _instanceConfigs;
+
+ public BrokerResourceValidationManager(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager) {
+ super("BrokerResourceValidationManager",
config.getBrokerResourceValidationFrequencyInSeconds(),
+ pinotHelixResourceManager);
+ }
+
+ @Override
+ protected void preprocess() {
+ _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+ }
+
+ @Override
+ protected void processTable(String tableNameWithType) {
+ try {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
broker resource validation", tableNameWithType);
+ return;
+ }
+
+ // Rebuild broker resource
+ Set<String> brokerInstances =
_pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+ tableConfig.getTenantConfig().getBroker());
+ _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType,
brokerInstances);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while validating broker resource for
table: {}", tableNameWithType, e);
+ }
+ }
+
+
+ @Override
+ protected void postprocess() {
+
+ }
+
+ @Override
+ protected void initTask() {
+
+ }
+
+ @Override
+ public void stopTask() {
+ }
+}
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
similarity index 62%
rename from
pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
rename to
pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index c5528e7..3c79205 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -19,29 +19,19 @@
package com.linkedin.pinot.controller.validation;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.linkedin.pinot.common.config.SegmentsValidationAndRetentionConfig;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import com.linkedin.pinot.common.metrics.ValidationMetrics;
import com.linkedin.pinot.common.utils.CommonConstants;
-import com.linkedin.pinot.common.utils.HLCSegmentName;
-import com.linkedin.pinot.common.utils.SegmentName;
import com.linkedin.pinot.common.utils.time.TimeUtils;
import com.linkedin.pinot.controller.ControllerConf;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import
com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
-import
com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import com.linkedin.pinot.core.realtime.stream.StreamConfig;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.model.InstanceConfig;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.slf4j.Logger;
@@ -52,83 +42,39 @@ import org.slf4j.LoggerFactory;
* Manages the segment validation metrics, to ensure that all offline segments
are contiguous (no missing segments) and
* that the offline push delay isn't too high.
*/
-public class ValidationManager extends ControllerPeriodicTask {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ValidationManager.class);
+public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
- private final int _segmentLevelValidationIntervalInSeconds;
- private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
private final ValidationMetrics _validationMetrics;
- private long _lastSegmentLevelValidationTimeMs = 0L;
- private boolean _runSegmentLevelValidation;
- private List<InstanceConfig> _instanceConfigs;
-
- public ValidationManager(ControllerConf config, PinotHelixResourceManager
pinotHelixResourceManager,
- PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
ValidationMetrics validationMetrics) {
- super("ValidationManager",
config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager);
- _segmentLevelValidationIntervalInSeconds =
config.getSegmentLevelValidationIntervalInSeconds();
- Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
- _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+ public OfflineSegmentIntervalChecker(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager,
+ ValidationMetrics validationMetrics) {
+ super("OfflineSegmentIntervalChecker",
config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
+ pinotHelixResourceManager);
_validationMetrics = validationMetrics;
}
@Override
protected void preprocess() {
- // Run segment level validation using a separate interval
- _runSegmentLevelValidation = false;
- long currentTimeMs = System.currentTimeMillis();
- if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs -
_lastSegmentLevelValidationTimeMs)
- >= _segmentLevelValidationIntervalInSeconds) {
- LOGGER.info("Run segment-level validation");
- _runSegmentLevelValidation = true;
- _lastSegmentLevelValidationTimeMs = currentTimeMs;
- }
-
- // Cache instance configs to reduce ZK access
- _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
}
@Override
protected void processTable(String tableNameWithType) {
- runValidation(tableNameWithType);
- }
-
- @Override
- protected void postprocess() {
-
- }
-
- private void runValidation(String tableNameWithType) {
try {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
- return;
- }
-
- // Rebuild broker resource
- Set<String> brokerInstances =
_pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
- tableConfig.getTenantConfig().getBroker());
- _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType,
brokerInstances);
- // Perform validation based on the table type
CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
- if (_runSegmentLevelValidation) {
- validateOfflineSegmentPush(tableConfig);
- }
- } else {
- if (_runSegmentLevelValidation) {
- updateRealtimeDocumentCount(tableConfig);
- }
- Map<String, String> streamConfigMap =
tableConfig.getIndexingConfig().getStreamConfigs();
- StreamConfig streamConfig = new StreamConfig(streamConfigMap);
- if (streamConfig.hasLowLevelConsumerType()) {
- _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
+ return;
}
+
+ validateOfflineSegmentPush(tableConfig);
}
} catch (Exception e) {
- LOGGER.warn("Caught exception while validating table: {}",
tableNameWithType, e);
+ LOGGER.warn("Caught exception while checking offline segment intervals
for table: {}", tableNameWithType, e);
}
}
@@ -264,50 +210,9 @@ public class ValidationManager extends
ControllerPeriodicTask {
return numTotalDocs;
}
- private void updateRealtimeDocumentCount(TableConfig tableConfig) {
- String realtimeTableName = tableConfig.getTableName();
- List<RealtimeSegmentZKMetadata> metadataList =
-
_pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
- boolean countHLCSegments = true; // false if this table has ONLY LLC
segments (i.e. fully migrated)
- StreamConfig streamConfig = new
StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
- if (streamConfig.hasLowLevelConsumerType() &&
!streamConfig.hasHighLevelConsumerType()) {
- countHLCSegments = false;
- }
- // Update the gauge to contain the total document count in the segments
-
_validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
- computeRealtimeTotalDocumentInSegments(metadataList,
countHLCSegments));
- }
-
- @VisibleForTesting
- static long
computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata>
realtimeSegmentZKMetadataList,
- boolean countHLCSegments) {
- long numTotalDocs = 0;
-
- String groupId = "";
- for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata :
realtimeSegmentZKMetadataList) {
- String segmentName = realtimeSegmentZKMetadata.getSegmentName();
- if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
- if (countHLCSegments) {
- HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
- String segmentGroupIdName = hlcSegmentName.getGroupId();
-
- if (groupId.isEmpty()) {
- groupId = segmentGroupIdName;
- }
- // Discard all segments with different groupids as they are replicas
- if (groupId.equals(segmentGroupIdName) &&
realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
- numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
- }
- }
- } else {
- // Low level segments
- if (!countHLCSegments) {
- numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
- }
- }
- }
+ @Override
+ protected void postprocess() {
- return numTotalDocs;
}
@Override
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
new file mode 100644
index 0000000..3274df3
--- /dev/null
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import com.linkedin.pinot.common.metrics.ValidationMetrics;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.common.utils.HLCSegmentName;
+import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import
com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import
com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Validates realtime ideal states and segment metadata, fixing any partitions
which have stopped consuming
+ */
+public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
+
+ private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+ private final ValidationMetrics _validationMetrics;
+
+ private final int _segmentLevelValidationIntervalInSeconds;
+ private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
+ private boolean _updateRealtimeDocumentCount;
+
+ public RealtimeSegmentValidationManager(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager,
+ PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
ValidationMetrics validationMetrics) {
+ super("RealtimeSegmentValidationManager",
config.getRealtimeSegmentValidationFrequencyInSeconds(),
+ pinotHelixResourceManager);
+ _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+ _validationMetrics = validationMetrics;
+
+ _segmentLevelValidationIntervalInSeconds =
config.getSegmentLevelValidationIntervalInSeconds();
+ Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
+ }
+
+ @Override
+ protected void preprocess() {
+ // Update realtime document counts only if certain time has passed after
previous run
+ _updateRealtimeDocumentCount = false;
+ long currentTimeMs = System.currentTimeMillis();
+ if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs -
_lastUpdateRealtimeDocumentCountTimeMs)
+ >= _segmentLevelValidationIntervalInSeconds) {
+ LOGGER.info("Run segment-level validation");
+ _updateRealtimeDocumentCount = true;
+ _lastUpdateRealtimeDocumentCountTimeMs = currentTimeMs;
+ }
+ }
+
+ @Override
+ protected void processTable(String tableNameWithType) {
+ try {
+ CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
+ return;
+ }
+
+ if (_updateRealtimeDocumentCount) {
+ updateRealtimeDocumentCount(tableConfig);
+ }
+
+ Map<String, String> streamConfigMap =
tableConfig.getIndexingConfig().getStreamConfigs();
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+ if (streamConfig.hasLowLevelConsumerType()) {
+ _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while validating realtime table: {}",
tableNameWithType, e);
+ }
+ }
+
+ private void updateRealtimeDocumentCount(TableConfig tableConfig) {
+ String realtimeTableName = tableConfig.getTableName();
+ List<RealtimeSegmentZKMetadata> metadataList =
+
_pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
+ boolean countHLCSegments = true; // false if this table has ONLY LLC
segments (i.e. fully migrated)
+ StreamConfig streamConfig = new
StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
+ if (streamConfig.hasLowLevelConsumerType() &&
!streamConfig.hasHighLevelConsumerType()) {
+ countHLCSegments = false;
+ }
+ // Update the gauge to contain the total document count in the segments
+
_validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
+ computeRealtimeTotalDocumentInSegments(metadataList,
countHLCSegments));
+ }
+
+ @VisibleForTesting
+ static long
computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata>
realtimeSegmentZKMetadataList,
+ boolean countHLCSegments) {
+ long numTotalDocs = 0;
+
+ String groupId = "";
+ for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata :
realtimeSegmentZKMetadataList) {
+ String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+ if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
+ if (countHLCSegments) {
+ HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
+ String segmentGroupIdName = hlcSegmentName.getGroupId();
+
+ if (groupId.isEmpty()) {
+ groupId = segmentGroupIdName;
+ }
+ // Discard all segments with different groupids as they are replicas
+ if (groupId.equals(segmentGroupIdName) &&
realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
+ numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+ }
+ }
+ } else {
+ // Low level segments
+ if (!countHLCSegments) {
+ numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+ }
+ }
+ }
+
+ return numTotalDocs;
+ }
+
+ @Override
+ protected void postprocess() {
+
+ }
+
+ @Override
+ protected void initTask() {
+
+ }
+
+ @Override
+ public void stopTask() {
+ LOGGER.info("Unregister all the validation metrics.");
+ _validationMetrics.unregisterAllMetrics();
+ }
+}
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 36f3386..fd417a5 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -369,7 +369,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
oldMetadataMap.put(entry.getKey(), new
LLCRealtimeSegmentZKMetadata(entry.getValue().toZNRecord()));
}
segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
- IdealState updatedIdealState =
segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ IdealState updatedIdealState =
segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState,
nPartitions);
Map<String, Map<String, String>> updatedMapFields =
updatedIdealState.getRecord().getMapFields();
Map<String, LLCRealtimeSegmentZKMetadata> updatedMetadataMap =
segmentManager._metadataMap;
@@ -525,13 +525,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Map<String, String>> oldMapFields =
idealStateCopy.getRecord().getMapFields();
if (tooSoonToCorrect) {
- segmentManager.validateLLCSegments(tableConfig, idealState,
nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig,
idealState, nPartitions);
// validate that all entries in oldMapFields are unchanged in new
ideal state
verifyNoChangeToOldEntries(oldMapFields, idealState);
segmentManager.tooSoonToCorrect = false;
}
- segmentManager.validateLLCSegments(tableConfig, idealState,
nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState,
nPartitions);
// verify that new segment gets created in ideal state with CONSUMING
Assert.assertNotNull(idealState.getRecord().getMapFields().get(llcSegmentName.getSegmentName()));
@@ -572,13 +572,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Map<String, String>> oldMapFields =
idealStateCopy.getRecord().getMapFields();
if (tooSoonToCorrect) {
- segmentManager.validateLLCSegments(tableConfig, idealState,
nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig,
idealState, nPartitions);
// validate nothing changed and try again with disabled
verifyNoChangeToOldEntries(oldMapFields, idealState);
segmentManager.tooSoonToCorrect = false;
}
- segmentManager.validateLLCSegments(tableConfig, idealState,
nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState,
nPartitions);
// verify that new segment gets created in ideal state with
CONSUMING and old segment ONLINE
Assert.assertNotNull(idealState.getRecord().getMapFields().get(latestSegment.getSegmentName()).values().
@@ -615,7 +615,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
(ZNRecord)
znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
Map<String, Map<String, String>> oldMapFields =
idealStateCopy.getRecord().getMapFields();
- segmentManager.validateLLCSegments(tableConfig, idealState,
nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState,
nPartitions);
verifyRepairs(tableConfig, idealState, expectedPartitionAssignment,
segmentManager, oldMapFields);
} else {
@@ -644,12 +644,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Map<String, String>> oldMapFields =
idealStateCopy.getRecord().getMapFields();
if (tooSoonToCorrect) {
- segmentManager.validateLLCSegments(tableConfig, idealState,
nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig,
idealState, nPartitions);
// validate nothing changed and try again with disabled
verifyNoChangeToOldEntries(oldMapFields, idealState);
segmentManager.tooSoonToCorrect = false;
}
- segmentManager.validateLLCSegments(tableConfig, idealState,
nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig,
idealState, nPartitions);
verifyRepairs(tableConfig, idealState,
expectedPartitionAssignment, segmentManager, oldMapFields);
} else {
@@ -677,13 +677,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Map<String, String>> oldMapFields =
idealStateCopy.getRecord().getMapFields();
if (tooSoonToCorrect) {
- segmentManager.validateLLCSegments(tableConfig, idealState,
nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig,
idealState, nPartitions);
// validate nothing changed and try again with disabled
verifyNoChangeToOldEntries(oldMapFields, idealState);
segmentManager.tooSoonToCorrect = false;
}
- segmentManager.validateLLCSegments(tableConfig, idealState,
nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig,
idealState, nPartitions);
verifyRepairs(tableConfig, idealState,
expectedPartitionAssignment, segmentManager, oldMapFields);
} else {
@@ -695,7 +695,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
(ZNRecord)
znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
Map<String, Map<String, String>> oldMapFields =
idealStateCopy.getRecord().getMapFields();
- segmentManager.validateLLCSegments(tableConfig, idealState,
nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig,
idealState, nPartitions);
// verify that nothing changed
verifyNoChangeToOldEntries(oldMapFields, idealState);
@@ -830,7 +830,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
IdealState idealState = idealStateBuilder.clear().build();
segmentManager._metadataMap.clear();
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState,
nPartitions);
PartitionAssignment partitionAssignment =
segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig,
idealState);
@@ -845,7 +845,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
FakePinotLLCRealtimeSegmentManager segmentManager, TableConfig
tableConfig, int nPartitions) {
IdealState idealState = idealStateBuilder.clear().build();
segmentManager._metadataMap.clear();
- segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+ segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState,
nPartitions);
return idealStateBuilder.build();
}
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 3e60350..d2fe9b3 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -26,6 +26,7 @@ import
com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import com.linkedin.pinot.common.segment.SegmentMetadata;
import com.linkedin.pinot.common.utils.CommonConstants;
import com.linkedin.pinot.common.utils.LLCSegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import com.linkedin.pinot.controller.helix.core.SegmentDeletionManager;
@@ -87,7 +88,10 @@ public class RetentionManagerTest {
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
- RetentionManager retentionManager = new
RetentionManager(pinotHelixResourceManager, 0, 0);
+ ControllerConf conf = new ControllerConf();
+ conf.setRetentionControllerFrequencyInSeconds(0);
+ conf.setDeletedSegmentsRetentionInDays(0);
+ RetentionManager retentionManager = new
RetentionManager(pinotHelixResourceManager, conf);
retentionManager.init();
retentionManager.run();
@@ -204,7 +208,10 @@ public class RetentionManagerTest {
setupSegmentMetadata(tableConfig, now, initialNumSegments,
removedSegments);
setupPinotHelixResourceManager(tableConfig, removedSegments,
pinotHelixResourceManager);
- RetentionManager retentionManager = new
RetentionManager(pinotHelixResourceManager, 0, 0);
+ ControllerConf conf = new ControllerConf();
+ conf.setRetentionControllerFrequencyInSeconds(0);
+ conf.setDeletedSegmentsRetentionInDays(0);
+ RetentionManager retentionManager = new
RetentionManager(pinotHelixResourceManager, conf);
retentionManager.init();
retentionManager.run();
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
index 0526afb..f69657b 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
@@ -52,7 +52,7 @@ import static org.testng.Assert.*;
/**
- * Tests for the ValidationManager.
+ * Tests for the ValidationManagers.
*/
public class ValidationManagerTest {
private String HELIX_CLUSTER_NAME = "TestValidationManager";
@@ -178,7 +178,7 @@ public class ValidationManagerTest {
segmentZKMetadataList.add(
SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME,
segmentName4, 20));
-
assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList,
true), 60);
+
assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList,
true), 60);
// Now add some low level segment names
String segmentName5 = new LLCSegmentName(TEST_TABLE_NAME, 1, 0,
1000).getSegmentName();
@@ -188,7 +188,7 @@ public class ValidationManagerTest {
segmentZKMetadataList.add(SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME,
segmentName6, 5));
// Only the LLC segments should get counted.
-
assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList,
false), 15);
+
assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList,
false), 15);
}
@AfterClass
@@ -210,18 +210,18 @@ public class ValidationManagerTest {
jan1st2nd3rd.add(jan1st);
jan1st2nd3rd.add(jan2nd);
jan1st2nd3rd.add(jan3rd);
- assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd,
Duration.standardDays(1)), 0);
+
assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd,
Duration.standardDays(1)), 0);
ArrayList<Interval> jan1st2nd3rd5th = new ArrayList<>(jan1st2nd3rd);
jan1st2nd3rd5th.add(jan5th);
- assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd5th,
Duration.standardDays(1)), 1);
+
assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd5th,
Duration.standardDays(1)), 1);
// Should also work if the intervals are in random order
ArrayList<Interval> jan5th2nd1st = new ArrayList<>();
jan5th2nd1st.add(jan5th);
jan5th2nd1st.add(jan2nd);
jan5th2nd1st.add(jan1st);
- assertEquals(ValidationManager.computeNumMissingSegments(jan5th2nd1st,
Duration.standardDays(1)), 2);
+
assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan5th2nd1st,
Duration.standardDays(1)), 2);
// Should also work if the intervals are of different sizes
Interval jan1stAnd2nd = new Interval(new DateTime(2015, 1, 1, 0, 0, 0),
new DateTime(2015, 1, 2, 23, 59, 59));
@@ -229,6 +229,6 @@ public class ValidationManagerTest {
jan1st2nd4th5th.add(jan1stAnd2nd);
jan1st2nd4th5th.add(jan4th);
jan1st2nd4th5th.add(jan5th);
- assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd4th5th,
Duration.standardDays(1)), 1);
+
assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd4th5th,
Duration.standardDays(1)), 1);
}
}
diff --git
a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index 28eb463..5ef1278 100644
---
a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++
b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -28,7 +28,7 @@ import com.linkedin.pinot.common.utils.LLCSegmentName;
import com.linkedin.pinot.common.utils.NetUtil;
import com.linkedin.pinot.common.utils.ZkStarter;
import
com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import
com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
import com.linkedin.pinot.server.realtime.ControllerLeaderLocator;
import
com.linkedin.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import
com.linkedin.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
@@ -155,7 +155,7 @@ public class SegmentCompletionIntegrationTests extends
LLCRealtimeClusterIntegra
final String oldSegment = _currentSegment;
// Now call the validation manager, and the segment should fix itself
- ValidationManager validationManager =
_controllerStarter.getValidationManager();
+ RealtimeSegmentValidationManager validationManager =
_controllerStarter.getRealtimeSegmentValidationManager();
validationManager.init();
validationManager.run();
diff --git
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
index 49c56b1..aa82182 100644
---
a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
+++
b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
@@ -150,7 +150,9 @@ public class StartControllerCommand extends
AbstractBaseAdminCommand implements
conf.setTenantIsolationEnabled(_tenantIsolation);
conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
- conf.setValidationControllerFrequencyInSeconds(3600);
+ conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+ conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+ conf.setBrokerResourceValidationFrequencyInSeconds(3600);
}
LOGGER.info("Executing command: " + toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]