sajjad-moradi commented on code in PR #13668:
URL: https://github.com/apache/pinot/pull/13668#discussion_r1710442034
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -214,7 +230,7 @@ private long getPartitionOffsetLag(IngestionOffsets offset)
{
*
* @param partitionGroupId partition ID which we should stop tracking.
*/
- private void removePartitionId(int partitionGroupId) {
+ private synchronized void removePartitionId(int partitionGroupId) {
Review Comment:
Declaring these methods as synchronized definitely addresses the edge case
that you described. Keep in mind that this scenario is very rare. Consuming
segment moving to another servers is itself rare. For the described scenario to
happen, consuming segment needs to move to another server as well as the
execution of threads 1) handling consumption and 2) handling metric removal
helix message should align.
Using synchronized for threads handling consumption adds overhead and delays
consumption a bit especially if there are many partitions on the same server. I
suggest we only block detecting the edge case and not block for the regular
happy path of consumption / updating metrics.
We can do that by using an object level lock, and acquire it inside
`stopTrackingPartitionIngestionDelay` method and also when the partition is
added to the gauge metric inside `updateIngestionDelay` method:
```java
public void stopTrackingPartitionIngestionDelay(String segmentName) {
_lock.lock();
_segmentsToIgnore.put(segmentName, true);
removePartitionId(new LLCSegmentName(segmentName).getPartitionGroupId());
_lock.unlock();
}
void updateIngestionDelay(long ingestionTimeMs, long
firstStreamIngestionTimeMs, int partitionGroupId) {
if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) {
// If stream does not return a valid ingestion timestamps don't
publish a metric
return;
}
IngestionTimestamps previousMeasure =
_partitionToIngestionTimestampsMap.put(partitionGroupId,
new IngestionTimestamps(ingestionTimeMs,
firstStreamIngestionTimeMs));
if (previousMeasure == null) {
_lock.lock(); //<<---- HERE
// First time we start tracking a partition we should start tracking
it via metric
// Only publish the metric if supported by the underlying stream. If
not supported the stream
// returns Long.MIN_VALUE
if (ingestionTimeMs >= 0) {
if (_segmentsToIgnore does not contain segment for partitionId) {
//<<---- HERE
_serverMetrics.setOrUpdatePartitionGauge(_metricName,
partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS,
() -> getPartitionIngestionDelayMs(partitionGroupId));
}
}
if (firstStreamIngestionTimeMs >= 0) {
if (_segmentsToIgnore does not contain segment for partitionId) {
//<<---- HERE
// Only publish this metric when creation time is supported by the
underlying stream
// When this timestamp is not supported it always returns the
value Long.MIN_VALUE
_serverMetrics.setOrUpdatePartitionGauge(_metricName,
partitionGroupId,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
() -> getPartitionEndToEndIngestionDelayMs(partitionGroupId));
}
}
_lock.unlock(); //<<---- HERE
}
}
```
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]