sajjad-moradi commented on code in PR #8911:
URL: https://github.com/apache/pinot/pull/8911#discussion_r922888568
##########
pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java:
##########
@@ -195,19 +198,35 @@ private DataTable processQueryInternal(ServerQueryRequest
queryRequest, Executor
// Gather stats for realtime consuming segments
int numConsumingSegments = 0;
- long minIndexTimeMs = Long.MAX_VALUE;
- long minIngestionTimeMs = Long.MAX_VALUE;
- for (IndexSegment indexSegment : indexSegments) {
- if (indexSegment instanceof MutableSegment) {
- numConsumingSegments += 1;
- SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
- long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
- if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
- minIndexTimeMs = indexTimeMs;
- }
- long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
- if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs <
minIngestionTimeMs) {
- minIngestionTimeMs = ingestionTimeMs;
+ int numOnlineSegments = 0;
+ long minIndexTimeMs = 0;
+ long minIngestionTimeMs = 0;
+ long maxEndTimeMs = 0;
+ if (tableDataManager instanceof RealtimeTableDataManager) {
+ numConsumingSegments = 0;
+ numOnlineSegments = 0;
+ minIndexTimeMs = Long.MAX_VALUE;
+ minIngestionTimeMs = Long.MAX_VALUE;
+ maxEndTimeMs = Long.MIN_VALUE;
+ for (IndexSegment indexSegment : indexSegments) {
+ if (indexSegment instanceof MutableSegment) {
+ numConsumingSegments += 1;
+ SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
+ long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
+ if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
+ minIndexTimeMs = indexTimeMs;
+ }
+ long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
+ if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs <
minIngestionTimeMs) {
+ minIngestionTimeMs = ingestionTimeMs;
+ }
+ } else if (indexSegment instanceof ImmutableSegment) {
+ SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
+ Interval timeInterval = segmentMetadata.getTimeInterval();
+ numOnlineSegments++;
+ if (timeInterval != null) {
+ maxEndTimeMs = Math.max(maxEndTimeMs, timeInterval.getEndMillis());
Review Comment:
I was actually talking about immutable segments, not mutable ones!
The values for `startTimeString` and `endTimeString` variables are taken
from the min and max value of the time column:
```java
SegmentColumnarIndexCreator.writeMetadata() {
...
if (_config.getStartTime() != null) {
startTime = Long.parseLong(_config.getStartTime());
endTime = Long.parseLong(_config.getEndTime());
timeUnit =
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
} else {
if (_totalDocs > 0) {
HERE-> String startTimeStr =
timeColumnIndexCreationInfo.getMin().toString();
HERE-> String endTimeStr =
timeColumnIndexCreationInfo.getMax().toString();
if (_config.getTimeColumnType() ==
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
// For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value
into millis since epoch
// Use DateTimeFormatter from DateTimeFormatSpec to handle
default time zone consistently.
DateTimeFormatSpec formatSpec =
_config.getDateTimeFormatSpec();
Preconditions.checkNotNull(formatSpec, "DateTimeFormatSpec
must exist for SimpleDate");
DateTimeFormatter dateTimeFormatter =
formatSpec.getDateTimeFormatter();
startTime = dateTimeFormatter.parseMillis(startTimeStr);
endTime = dateTimeFormatter.parseMillis(endTimeStr);
timeUnit = TimeUnit.MILLISECONDS;
} else {
// by default, time column type is TimeColumnType.EPOCH
startTime = Long.parseLong(startTimeStr);
endTime = Long.parseLong(endTimeStr);
timeUnit =
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
}
} else {
// No records in segment. Use current time as start/end
long now = System.currentTimeMillis();
if (_config.getTimeColumnType() ==
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
startTime = now;
endTime = now;
timeUnit = TimeUnit.MILLISECONDS;
} else {
timeUnit =
Preconditions.checkNotNull(_config.getSegmentTimeUnit());
startTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
endTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
}
}
}
if (!_config.isSkipTimeValueCheck()) {
Interval timeInterval =
new Interval(timeUnit.toMillis(startTime),
timeUnit.toMillis(endTime), DateTimeZone.UTC);
Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
"Invalid segment start/end time: %s (in millis: %s/%s) for
time column: %s, must be between: %s",
timeInterval, timeInterval.getStartMillis(),
timeInterval.getEndMillis(), timeColumnName,
TimeUtils.VALID_TIME_INTERVAL);
}
HERE-> properties.setProperty(SEGMENT_START_TIME, startTime);
HERE-> properties.setProperty(SEGMENT_END_TIME, endTime);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]