This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1fe72ae1d3 Minor Refactoring and fixes (#15419)
1fe72ae1d3 is described below
commit 1fe72ae1d3ecb0dfcd9df39edf26cd95abff58fd
Author: NOOB <[email protected]>
AuthorDate: Tue Apr 15 23:46:55 2025 +0530
Minor Refactoring and fixes (#15419)
---
.../manager/realtime/RealtimeSegmentDataManager.java | 16 +++++++++-------
.../manager/realtime/RealtimeTableDataManager.java | 8 +++-----
.../realtime/RealtimeSegmentDataManagerTest.java | 20 +++++++++++---------
3 files changed, 23 insertions(+), 21 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index c25ce54f72..1293f3345d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -311,13 +311,12 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private final AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0);
private final String _instanceId;
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
- private final long _consumeStartTime;
private final StreamPartitionMsgOffset _startOffset;
private final StreamConfig _streamConfig;
private RowMetadata _lastRowMetadata;
private long _lastConsumedTimestampMs = -1;
-
+ private long _consumeStartTime = -1;
private long _lastLogTime = 0;
private int _lastConsumedCount = 0;
private String _stopReason = null;
@@ -747,6 +746,10 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_segmentLogger.info("Acquired consumer semaphore.");
+ _consumeStartTime = now();
+ _segmentLogger.info("Starting consumption on segment: {}, maxRowCount:
{}, maxEndTime: {}.", _llcSegmentName,
+ _segmentMaxRowCount, new DateTime(_consumeEndTime,
DateTimeZone.UTC));
+
// TODO:
// When reaching here, the current consuming segment has already
acquired the consumer semaphore, but there is
// no guarantee that the previous consuming segment is already
persisted (replaced with immutable segment). It
@@ -1503,6 +1506,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset,
long timeoutMs) {
_finalOffset = endOffset;
+ if (_consumeStartTime == -1) {
+ _consumeStartTime = now();
+ }
_consumeEndTime = now() + timeoutMs;
_state = State.CONSUMING_TO_ONLINE;
_shouldStop = false;
@@ -1721,17 +1727,13 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
_state = State.INITIAL_CONSUMING;
_latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
- _consumeStartTime = now();
- setConsumeEndTime(segmentZKMetadata, _consumeStartTime);
+ setConsumeEndTime(segmentZKMetadata, now());
_segmentCommitterFactory =
new SegmentCommitterFactory(_segmentLogger, _protocolHandler,
tableConfig, indexLoadingConfig, serverMetrics);
- _segmentLogger.info("Starting consumption on realtime consuming segment
{} maxRowCount {} maxEndTime {}",
- llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime,
DateTimeZone.UTC));
} catch (Throwable t) {
// In case of exception thrown here, segment goes to ERROR state. Then
any attempt to reset the segment from
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the
semaphore is acquired, but not released.
// Hence releasing the semaphore here to unblock reset operation via
Helix Admin.
- releaseConsumerSemaphore();
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new
SegmentErrorInfo(now(),
"Failed to initialize segment data manager", t));
_segmentLogger.warn(
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 0db35b57a4..b59e3a58e6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -519,14 +519,12 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
private void doAddConsumingSegment(String segmentName)
throws AttemptsExceededException, RetriableOperationException {
SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
- if (!_enforceConsumptionInOrder && zkMetadata.getStatus().isCompleted()) {
+ if (zkMetadata.getStatus().isCompleted()) {
// NOTE:
- // 1. When consumption order is enforced, we always create the
RealtimeSegmentDataManager to coordinate the
- // consumption.
- // 2. When segment is COMMITTING (for pauseless consumption), we still
create the RealtimeSegmentDataManager
+ // 1. When segment is COMMITTING (for pauseless consumption), we still
create the RealtimeSegmentDataManager
// because there is no guarantee that the segment will be committed
soon. This way the slow server can still
// catch up.
- // 3. We do not throw exception here because the segment might have just
been committed before the state
+ // 2. We do not throw exception here because the segment might have just
been committed before the state
// transition is processed. We can skip adding this segment, and the
segment will enter CONSUMING state in
// Helix, then we can rely on the following CONSUMING -> ONLINE state
transition to add it.
_logger.warn("Segment: {} is already completed, skipping adding it as
CONSUMING segment", segmentName);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 429f77ec53..a3ac5c906c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -785,8 +785,9 @@ public class RealtimeSegmentDataManagerTest {
@Override
public Long get() {
long now = System.currentTimeMillis();
- // now() is called once in the run() method, once before each batch
reading and once for every row indexation
- if (_timeCheckCounter.incrementAndGet() <=
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 4) {
+ // now() is called once in the run() method, then once on setting
consumeStartTime, once before each batch
+ // reading and once for every row indexation
+ if (_timeCheckCounter.incrementAndGet() <=
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 5) {
return now;
}
// Exceed segment time threshold
@@ -810,10 +811,10 @@ public class RealtimeSegmentDataManagerTest {
consumer.run();
- // millis() is called first in run before consumption, then once for
each batch and once for each message in
- // the batch, then once more when metrics are updated after each batch
is processed and then 4 more times in
- // run() after consume loop
- Assert.assertEquals(timeSupplier._timeCheckCounter.get(),
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 8);
+ // millis() is called first in run before consumption, then once on
setting consumeStartTime, then once for
+ // each batch and once for each message in the batch, then once more
when metrics are updated after each batch
+ // is processed and then 4 more times in run() after consume loop
+ Assert.assertEquals(timeSupplier._timeCheckCounter.get(),
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 9);
Assert.assertEquals(((LongMsgOffset)
segmentDataManager.getCurrentOffset()).getOffset(),
START_OFFSET_VALUE +
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
@@ -845,9 +846,10 @@ public class RealtimeSegmentDataManagerTest {
consumer.run();
- // millis() is called first in run before consumption, then once for
each batch and once for each message in
- // the batch, then once for metrics updates and then 4 more times in
run() after consume loop
- Assert.assertEquals(timeSupplier._timeCheckCounter.get(),
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 6);
+ // millis() is called first in run before consumption, then once on
setting consumeStartTime, then once for
+ // each batch and once for each message in the batch, then once for
metrics updates and then 4 more times in
+ // run() after consume loop
+ Assert.assertEquals(timeSupplier._timeCheckCounter.get(),
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 7);
Assert.assertEquals(((LongMsgOffset)
segmentDataManager.getCurrentOffset()).getOffset(),
START_OFFSET_VALUE +
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]