This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e2ba55bed5 Null check for partitionGroupSmallestOffset and metric for
failure (#9020)
e2ba55bed5 is described below
commit e2ba55bed54cf497cf87fd955d61224e0030c874
Author: Neha Pawar <[email protected]>
AuthorDate: Wed Jul 6 09:27:19 2022 -0700
Null check for partitionGroupSmallestOffset and metric for failure (#9020)
---
.../etc/jmx_prometheus_javaagent/configs/controller.yml | 7 +++++++
.../apache/pinot/common/metrics/ControllerMeter.java | 1 +
.../helix/core/periodictask/ControllerPeriodicTask.java | 2 ++
.../core/realtime/PinotLLCRealtimeSegmentManager.java | 17 ++++++++++++-----
4 files changed, 22 insertions(+), 5 deletions(-)
diff --git
a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
index f8d116713f..2d98e22cd2 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
@@ -117,6 +117,13 @@ rules:
labels:
table: "$1"
tableType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\",
name=\"pinot.controller.(\\w+)_(\\w+).(\\w+).periodicTaskError\"><>(\\w+)"
+ name: "pinot_controller_periodicTaskError_$4"
+ cache: true
+ labels:
+ table: "$1"
+ tableType: "$2"
+ periodicTask: "$3"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\",
name=\"pinot.controller.tableStorageQuotaUtilization.(\\w+)_(\\w+)\"><>(\\w+)"
name: "pinot_controller_tableStorageQuotaUtilization_$3"
cache: true
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 9fd0e7dd17..e4c634cf30 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -46,6 +46,7 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
LLC_STREAM_DATA_LOSS("dataLoss", false),
CONTROLLER_PERIODIC_TASK_RUN("periodicTaskRun", false),
CONTROLLER_PERIODIC_TASK_ERROR("periodicTaskError", false),
+ PERIODIC_TASK_ERROR("periodicTaskError", false),
NUMBER_TIMES_SCHEDULE_TASKS_CALLED("tasks", true),
NUMBER_TASKS_SUBMITTED("tasks", false),
NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 0e737b2e16..3067de8268 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -116,6 +116,8 @@ public abstract class ControllerPeriodicTask<C> extends
BasePeriodicTask {
processTable(tableNameWithType, context);
} catch (Exception e) {
LOGGER.error("Caught exception while processing table: {} in task:
{}", tableNameWithType, _taskName, e);
+ _controllerMetrics.addMeteredTableValue(tableNameWithType + "." +
_taskName,
+ ControllerMeter.PERIODIC_TASK_ERROR, 1L);
}
numTablesProcessed++;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7c1e5872f0..7cb579b83a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1215,12 +1215,18 @@ public class PinotLLCRealtimeSegmentManager {
StreamPartitionMsgOffset partitionGroupSmallestOffset =
getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);
- // Start offset must be higher than the start offset of the stream
- if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) {
- LOGGER.error("Data lost from offset: {} to: {} for partition: {} of
table: {}", startOffset,
- partitionGroupSmallestOffset, partitionGroupId,
tableConfig.getTableName());
+ if (partitionGroupSmallestOffset != null) {
+ // Start offset must be higher than the start offset of the stream
+ if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) {
+ LOGGER.error("Data lost from offset: {} to: {} for partition: {} of
table: {}", startOffset,
+ partitionGroupSmallestOffset, partitionGroupId,
tableConfig.getTableName());
+ _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(),
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
+ startOffset = partitionGroupSmallestOffset;
+ }
+ } else {
+ LOGGER.error("Smallest offset for partition: {} of table: {} not found.
Using startOffset: {}", partitionGroupId,
+ tableConfig.getTableName(), startOffset);
_controllerMetrics.addMeteredTableValue(tableConfig.getTableName(),
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
- startOffset = partitionGroupSmallestOffset;
}
CommittingSegmentDescriptor committingSegmentDescriptor =
@@ -1232,6 +1238,7 @@ public class PinotLLCRealtimeSegmentManager {
instancePartitionsMap);
}
+ @Nullable
private StreamPartitionMsgOffset
getPartitionGroupSmallestOffset(StreamConfig streamConfig, int
partitionGroupId) {
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]