This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch metric_after_periodic_task_run
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/metric_after_periodic_task_run
by this push:
new a75222a Some more refactoring to keep counting of numTablesProcessed
in base class of periodic tasks
a75222a is described below
commit a75222a944928615c653e67a4fa2e2c9034cf993
Author: Neha Pawar <[email protected]>
AuthorDate: Mon Jan 28 11:37:27 2019 -0800
Some more refactoring to keep counting of numTablesProcessed in base class
of periodic tasks
---
.../controller/helix/SegmentStatusChecker.java | 17 +++--
.../helix/core/minion/PinotTaskManager.java | 8 ++-
.../core/periodictask/ControllerPeriodicTask.java | 27 +++++---
.../core/relocation/RealtimeSegmentRelocator.java | 18 +++---
.../helix/core/retention/RetentionManager.java | 73 ++++++++++------------
.../BrokerResourceValidationManager.java | 30 +++++----
.../validation/OfflineSegmentIntervalChecker.java | 28 ++++-----
.../RealtimeSegmentValidationManager.java | 42 ++++++-------
.../periodictask/ControllerPeriodicTaskTest.java | 7 ++-
9 files changed, 121 insertions(+), 129 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d971e1e..84bf706 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -77,7 +77,6 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
_realTimeTableCount = 0;
_offlineTableCount = 0;
_disabledTableCount = 0;
@@ -94,20 +93,18 @@ public class SegmentStatusChecker extends
ControllerPeriodicTask {
@Override
protected void processTable(String tableNameWithType) {
- try {
- updateSegmentMetrics(tableNameWithType);
- _numTablesProcessed ++;
- } catch (Exception e) {
- LOGGER.error("Caught exception while updating segment status for table
{}", tableNameWithType, e);
+ updateSegmentMetrics(tableNameWithType);
+ }
- // Remove the metric for this table
- resetTableMetrics(tableNameWithType);
- }
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while updating segment status for table
{}", tableNameWithType, e);
+ // Remove the metric for this table
+ resetTableMetrics(tableNameWithType);
}
@Override
protected void postprocess() {
- super.postprocess();
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT,
_realTimeTableCount);
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT,
_offlineTableCount);
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT,
_disabledTableCount);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index dea29c0..ed3bad6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -102,7 +102,6 @@ public class PinotTaskManager extends
ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
_metricsRegistry.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
_taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
@@ -130,7 +129,11 @@ public class PinotTaskManager extends
ControllerPeriodicTask {
}
}
}
- _numTablesProcessed ++;
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Exception in PinotTaskManager for table {}",
tableNameWithType, e);
}
@Override
@@ -150,7 +153,6 @@ public class PinotTaskManager extends
ControllerPeriodicTask {
_metricsRegistry.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
}
- super.postprocess();
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index dc06af0..e7ddbb3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -43,8 +43,6 @@ public abstract class ControllerPeriodicTask extends
BasePeriodicTask {
private volatile boolean _stopPeriodicTask;
private volatile boolean _periodicTaskInProgress;
- protected int _numTablesProcessed;
-
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
long initialDelayInSeconds,
PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics
controllerMetrics) {
super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
@@ -123,16 +121,28 @@ public abstract class ControllerPeriodicTask extends
BasePeriodicTask {
*/
protected void process(List<String> tableNamesWithType) {
if (!shouldStopPeriodicTask()) {
+
+ int numTablesProcessed = 0;
preprocess();
+
for (String tableNameWithType : tableNamesWithType) {
if (shouldStopPeriodicTask()) {
LOGGER.info("Skip processing table {} and all the remaining tables
for task {}.", tableNameWithType,
getTaskName());
break;
}
- processTable(tableNameWithType);
+ try {
+ processTable(tableNameWithType);
+ numTablesProcessed++;
+ } catch (Exception e) {
+ exceptionHandler(tableNameWithType, e);
+ }
}
+
postprocess();
+
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
getTaskName(),
+ numTablesProcessed);
+
} else {
LOGGER.info("Skip processing all tables for task {}", getTaskName());
}
@@ -141,9 +151,7 @@ public abstract class ControllerPeriodicTask extends
BasePeriodicTask {
/**
* This method runs before processing all tables
*/
- protected void preprocess() {
- _numTablesProcessed = 0;
- }
+ protected abstract void preprocess();
/**
* Execute the controller periodic task for the given table
@@ -154,10 +162,9 @@ public abstract class ControllerPeriodicTask extends
BasePeriodicTask {
/**
* This method runs after processing all tables
*/
- protected void postprocess() {
-
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
getTaskName(),
- _numTablesProcessed);
- }
+ protected abstract void postprocess();
+
+ protected abstract void exceptionHandler(String tableNameWithType, Exception
e);
@VisibleForTesting
protected boolean shouldStopPeriodicTask() {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index deb03a9..5cbcb48 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -69,25 +69,23 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
}
@Override
protected void processTable(String tableNameWithType) {
- try {
- CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == CommonConstants.Helix.TableType.REALTIME) {
- runRelocation(tableNameWithType);
- _numTablesProcessed ++;
- }
- } catch (Exception e) {
- LOGGER.error("Exception in relocating realtime segments of table {}",
tableNameWithType, e);
+ CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+ runRelocation(tableNameWithType);
}
}
@Override
protected void postprocess() {
- super.postprocess();
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Exception in relocating realtime segments of table {}",
tableNameWithType, e);
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 4e493cc..53f9b35 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -71,60 +71,55 @@ public class RetentionManager extends
ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
}
@Override
protected void processTable(String tableNameWithType) {
- try {
- LOGGER.info("Start managing retention for table: {}", tableNameWithType);
- manageRetentionForTable(tableNameWithType);
- _numTablesProcessed ++;
- } catch (Exception e) {
- LOGGER.error("Caught exception while managing retention for table: {}",
tableNameWithType, e);
- }
+ LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+ manageRetentionForTable(tableNameWithType);
}
@Override
protected void postprocess() {
LOGGER.info("Removing aged (more than {} days) deleted segments for all
tables", _deletedSegmentsRetentionInDays);
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
- super.postprocess();
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while managing retention for table: {}",
tableNameWithType, e);
}
private void manageRetentionForTable(String tableNameWithType) {
- try {
- // Build retention strategy from table config
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.error("Failed to get table config for table: {}",
tableNameWithType);
- return;
- }
- SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
- String segmentPushType = validationConfig.getSegmentPushType();
- if (!"APPEND".equalsIgnoreCase(segmentPushType)) {
- LOGGER.info("Segment push type is not APPEND for table: {}, skip",
tableNameWithType);
- return;
- }
- String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
- String retentionTimeValue = validationConfig.getRetentionTimeValue();
- RetentionStrategy retentionStrategy;
- try {
- retentionStrategy = new
TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
- Long.parseLong(retentionTimeValue));
- } catch (Exception e) {
- LOGGER.warn("Invalid retention time: {} {} for table: {}, skip",
retentionTimeUnit, retentionTimeValue);
- return;
- }
- // Scan all segment ZK metadata and purge segments if necessary
- if (TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableNameWithType)) {
- manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
- } else {
- manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
- }
+ // Build retention strategy from table config
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.error("Failed to get table config for table: {}",
tableNameWithType);
+ return;
+ }
+ SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
+ String segmentPushType = validationConfig.getSegmentPushType();
+ if (!"APPEND".equalsIgnoreCase(segmentPushType)) {
+ LOGGER.info("Segment push type is not APPEND for table: {}, skip",
tableNameWithType);
+ return;
+ }
+ String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+ String retentionTimeValue = validationConfig.getRetentionTimeValue();
+ RetentionStrategy retentionStrategy;
+ try {
+ retentionStrategy = new
TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
+ Long.parseLong(retentionTimeValue));
} catch (Exception e) {
- LOGGER.error("Caught exception while managing retention for table: {}",
tableNameWithType, e);
+ LOGGER.warn("Invalid retention time: {} {} for table: {}, skip",
retentionTimeUnit, retentionTimeValue);
+ return;
+ }
+
+ // Scan all segment ZK metadata and purge segments if necessary
+ if (TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableNameWithType)) {
+ manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
+ } else {
+ manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index ebcc5b6..c71a4ab 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -46,33 +46,31 @@ public class BrokerResourceValidationManager extends
ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
_instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
}
@Override
protected void processTable(String tableNameWithType) {
- try {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping
broker resource validation", tableNameWithType);
- return;
- }
-
- // Rebuild broker resource
- Set<String> brokerInstances =
_pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
- tableConfig.getTenantConfig().getBroker());
- _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType,
brokerInstances);
- _numTablesProcessed ++;
- } catch (Exception e) {
- LOGGER.warn("Caught exception while validating broker resource for
table: {}", tableNameWithType, e);
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping broker
resource validation", tableNameWithType);
+ return;
}
+
+ // Rebuild broker resource
+ Set<String> brokerInstances =
_pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+ tableConfig.getTenantConfig().getBroker());
+ _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType,
brokerInstances);
}
@Override
protected void postprocess() {
- super.postprocess();
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while validating broker resource for table:
{}", tableNameWithType, e);
}
@Override
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 8610347..ca9563b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -57,27 +57,19 @@ public class OfflineSegmentIntervalChecker extends
ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
}
@Override
protected void processTable(String tableNameWithType) {
- try {
+ CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
- CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
-
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
- return;
- }
-
- validateOfflineSegmentPush(tableConfig);
- _numTablesProcessed ++;
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
+ return;
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while checking offline segment intervals
for table: {}", tableNameWithType, e);
+ validateOfflineSegmentPush(tableConfig);
}
}
@@ -215,7 +207,11 @@ public class OfflineSegmentIntervalChecker extends
ControllerPeriodicTask {
@Override
protected void postprocess() {
- super.postprocess();
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.warn("Caught exception while checking offline segment intervals for
table: {}", tableNameWithType, e);
}
@Override
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 70fdbe9..a43d63b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -67,7 +67,6 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
// Update realtime document counts only if certain time has passed after
previous run
_updateRealtimeDocumentCount = false;
long currentTimeMs = System.currentTimeMillis();
@@ -81,29 +80,24 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask {
@Override
protected void processTable(String tableNameWithType) {
- try {
- CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == CommonConstants.Helix.TableType.REALTIME) {
-
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
- return;
- }
+ CommonConstants.Helix.TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.REALTIME) {
- if (_updateRealtimeDocumentCount) {
- updateRealtimeDocumentCount(tableConfig);
- }
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
+ return;
+ }
- Map<String, String> streamConfigMap =
tableConfig.getIndexingConfig().getStreamConfigs();
- StreamConfig streamConfig = new StreamConfig(streamConfigMap);
- if (streamConfig.hasLowLevelConsumerType()) {
- _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
- }
- _numTablesProcessed ++;
+ if (_updateRealtimeDocumentCount) {
+ updateRealtimeDocumentCount(tableConfig);
+ }
+
+ Map<String, String> streamConfigMap =
tableConfig.getIndexingConfig().getStreamConfigs();
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+ if (streamConfig.hasLowLevelConsumerType()) {
+ _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while validating realtime table: {}",
tableNameWithType, e);
}
}
@@ -155,7 +149,11 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask {
@Override
protected void postprocess() {
- super.postprocess();
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while validating realtime table: {}",
tableNameWithType, e);
}
@Override
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 5c59221..e20cdc8 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -73,7 +73,6 @@ public class ControllerPeriodicTaskTest {
@Override
public void processTable(String tableNameWithType) {
_tablesProcessed.getAndIncrement();
- _numTablesProcessed ++;
}
};
@@ -164,7 +163,6 @@ public class ControllerPeriodicTaskTest {
@Override
protected void preprocess() {
- super.preprocess();
}
@Override
@@ -174,9 +172,12 @@ public class ControllerPeriodicTaskTest {
@Override
public void postprocess() {
- super.postprocess();
}
+ @Override
+ public void exceptionHandler(String tableNameWithType, Exception e) {
+
+ }
@Override
public void stopTask() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]