This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch split_vm_tasks_2 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 75259146d0c219e2372abdb6d18fe759cc1a49e4 Author: Neha Pawar <[email protected]> AuthorDate: Mon Jan 7 18:31:25 2019 -0800 Use controller validation frequency as default values for new periodic validation tasks --- .../linkedin/pinot/controller/ControllerConf.java | 45 +++++++++++----------- .../realtime/PinotLLCRealtimeSegmentManager.java | 6 +-- .../RealtimeSegmentValidationManager.java | 2 +- .../PinotLLCRealtimeSegmentManagerTest.java | 26 ++++++------- 4 files changed, 40 insertions(+), 39 deletions(-) diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java index 9f84691..c6ff82e 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java @@ -57,7 +57,9 @@ public class ControllerConf extends PropertiesConfiguration { public static class ControllerPeriodicTasksConf { private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds"; - private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = "controller.validation.frequencyInSeconds"; + @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings + private static final String DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = + "controller.validation.frequencyInSeconds"; private static final String OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = "controller.offline.segment.interval.checker.frequencyInSeconds"; private static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = @@ -76,7 +78,8 @@ public class ControllerConf extends PropertiesConfiguration { "controller.segment.level.validation.intervalInSeconds"; private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours. - private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. + @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings + private static final int DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours. private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. @@ -353,34 +356,26 @@ public class ControllerConf extends PropertiesConfiguration { Integer.toString(retentionFrequencyInSeconds)); } - /** - * Deprecated. The validation manager has been split into 3 separate tasks, each having their own frequency config - * @return - */ - @Deprecated - public int getValidationControllerFrequencyInSeconds() { - if (containsKey(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) { + private int getValidationControllerFrequencyInSeconds() { + if (containsKey(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) { return Integer.parseInt( - (String) getProperty(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)); + (String) getProperty(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)); } - return ControllerPeriodicTasksConf.DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS; + return ControllerPeriodicTasksConf.DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS; } /** - * Deprecated. The validation manager has been split into 3 separate tasks, each having their own frequency config + * Returns the config value for controller.offline.segment.interval.checker.frequencyInSeconds if it exists. + * If it doesn't exist, returns the segment level validation interval. This is done in order to retain the current behavior, + * wherein the offline validation tasks were done at segment level validation interval frequency * @return */ - public void setValidationControllerFrequencyInSeconds(int validationFrequencyInSeconds) { - setProperty(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS, - Integer.toString(validationFrequencyInSeconds)); - } - public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() { if (containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)) { return Integer.parseInt( (String) getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)); } - return ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS; + return getSegmentLevelValidationIntervalInSeconds(); } public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int validationFrequencyInSeconds) { @@ -388,13 +383,18 @@ public class ControllerConf extends PropertiesConfiguration { Integer.toString(validationFrequencyInSeconds)); } + /** + * Returns the config value for controller.realtime.segment.validation.frequencyInSeconds if it exists. + * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior, + * wherein the realtime validation tasks were done at validation controller frequency + * @return + */ public int getRealtimeSegmentValidationFrequencyInSeconds() { - if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)) { return Integer.parseInt( (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)); } - return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS; + return getValidationControllerFrequencyInSeconds(); } public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequencyInSeconds) { @@ -403,8 +403,9 @@ public class ControllerConf extends PropertiesConfiguration { } /** - * Return broker resource validation frequency if present, else return the validation manager frequency - * This is so that we can rollout with no config changes to the frequency of this task + * Returns the config value for controller.broker.resource.validation.frequencyInSeconds if it exists. + * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior, + * wherin the broker resource validation tasks were done at validation controller frequency * @return */ public int getBrokerResourceValidationFrequencyInSeconds() { diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index e708abc..a77750e 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -950,7 +950,7 @@ public class PinotLLCRealtimeSegmentManager { * * TODO: We need to find a place to detect and update a gauge for nonConsumingPartitionsCount for a table, and reset it to 0 at the end of validateLLC */ - public void validateLLCSegments(final TableConfig tableConfig) { + public void ensureAllPartitionsConsuming(final TableConfig tableConfig) { final String tableNameWithType = tableConfig.getTableName(); final StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs()); final int partitionCount = getPartitionCount(streamConfig); @@ -958,7 +958,7 @@ public class PinotLLCRealtimeSegmentManager { @Nullable @Override public IdealState apply(@Nullable IdealState idealState) { - return validateLLCSegments(tableConfig, idealState, partitionCount); + return ensureAllPartitionsConsuming(tableConfig, idealState, partitionCount); } }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true); } @@ -1078,7 +1078,7 @@ public class PinotLLCRealtimeSegmentManager { * TODO: split this method into multiple smaller methods */ @VisibleForTesting - protected IdealState validateLLCSegments(final TableConfig tableConfig, IdealState idealState, + protected IdealState ensureAllPartitionsConsuming(final TableConfig tableConfig, IdealState idealState, final int partitionCount) { final String tableNameWithType = tableConfig.getTableName(); final StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs()); diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java index 36d7b27..77ed13a 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -92,7 +92,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask { Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs(); StreamConfig streamConfig = new StreamConfig(streamConfigMap); if (streamConfig.hasLowLevelConsumerType()) { - _llcRealtimeSegmentManager.validateLLCSegments(tableConfig); + _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig); } } } catch (Exception e) { diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 36f3386..fd417a5 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -369,7 +369,7 @@ public class PinotLLCRealtimeSegmentManagerTest { oldMetadataMap.put(entry.getKey(), new LLCRealtimeSegmentZKMetadata(entry.getValue().toZNRecord())); } segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances); - IdealState updatedIdealState = segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + IdealState updatedIdealState = segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); Map<String, Map<String, String>> updatedMapFields = updatedIdealState.getRecord().getMapFields(); Map<String, LLCRealtimeSegmentZKMetadata> updatedMetadataMap = segmentManager._metadataMap; @@ -525,13 +525,13 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields(); if (tooSoonToCorrect) { - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); // validate that all entries in oldMapFields are unchanged in new ideal state verifyNoChangeToOldEntries(oldMapFields, idealState); segmentManager.tooSoonToCorrect = false; } - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); // verify that new segment gets created in ideal state with CONSUMING Assert.assertNotNull(idealState.getRecord().getMapFields().get(llcSegmentName.getSegmentName())); @@ -572,13 +572,13 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields(); if (tooSoonToCorrect) { - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); // validate nothing changed and try again with disabled verifyNoChangeToOldEntries(oldMapFields, idealState); segmentManager.tooSoonToCorrect = false; } - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); // verify that new segment gets created in ideal state with CONSUMING and old segment ONLINE Assert.assertNotNull(idealState.getRecord().getMapFields().get(latestSegment.getSegmentName()).values(). @@ -615,7 +615,7 @@ public class PinotLLCRealtimeSegmentManagerTest { (ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord()))); Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields(); - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields); } else { @@ -644,12 +644,12 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields(); if (tooSoonToCorrect) { - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); // validate nothing changed and try again with disabled verifyNoChangeToOldEntries(oldMapFields, idealState); segmentManager.tooSoonToCorrect = false; } - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields); } else { @@ -677,13 +677,13 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields(); if (tooSoonToCorrect) { - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); // validate nothing changed and try again with disabled verifyNoChangeToOldEntries(oldMapFields, idealState); segmentManager.tooSoonToCorrect = false; } - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields); } else { @@ -695,7 +695,7 @@ public class PinotLLCRealtimeSegmentManagerTest { (ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord()))); Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields(); - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); // verify that nothing changed verifyNoChangeToOldEntries(oldMapFields, idealState); @@ -830,7 +830,7 @@ public class PinotLLCRealtimeSegmentManagerTest { IdealState idealState = idealStateBuilder.clear().build(); segmentManager._metadataMap.clear(); - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); PartitionAssignment partitionAssignment = segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig, idealState); @@ -845,7 +845,7 @@ public class PinotLLCRealtimeSegmentManagerTest { FakePinotLLCRealtimeSegmentManager segmentManager, TableConfig tableConfig, int nPartitions) { IdealState idealState = idealStateBuilder.clear().build(); segmentManager._metadataMap.clear(); - segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions); + segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions); return idealStateBuilder.build(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
