This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4ad049f Fix idle count bug in realtime consumption (#4327)
4ad049f is described below
commit 4ad049faf874b1726806455fda31d7a9d93da207
Author: Neha Pawar <[email protected]>
AuthorDate: Mon Jun 17 17:08:32 2019 -0700
Fix idle count bug in realtime consumption (#4327)
---
.../core/data/manager/realtime/LLRealtimeSegmentDataManager.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 59d2bb8..61ed73c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -344,7 +344,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) /
(idlePipeSleepTimeMillis + _partitionLevelStreamConfig
.getFetchTimeoutMillis()); // 3 minute count
long lastUpdatedOffset = _currentOffset; // so that we always update the
metric when we enter this method.
- long idleCount = 0;
+ long consecutiveIdleCount = 0;
// At this point, we know that we can potentially move the offset, so the
old saved segment file is not valid
// anymore. Remove the file if it exists.
removeSegmentFile();
@@ -378,6 +378,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
processStreamEvents(messageBatch, idlePipeSleepTimeMillis);
if (_currentOffset != lastUpdatedOffset) {
+ consecutiveIdleCount = 0;
// We consumed something. Update the highest stream offset as well as
partition-consuming metric.
_serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, _currentOffset);
_serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, _currentOffset);
@@ -386,9 +387,9 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
} else {
// We did not consume any rows. Update the partition-consuming metric
only if we have been idling for a long time.
// Create a new stream consumer wrapper, in case we are stuck on
something.
- if (++idleCount > maxIdleCountBeforeStatUpdate) {
+ if (++consecutiveIdleCount > maxIdleCountBeforeStatUpdate) {
_serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
- idleCount = 0;
+ consecutiveIdleCount = 0;
makeStreamConsumer("Idle for too long");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]