This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 127f4c3 Add select segments API (#7651)
127f4c3 is described below
commit 127f4c3b6966686ae984ba47dbd01a4432184baf
Author: Jialiang Li <[email protected]>
AuthorDate: Thu Oct 28 11:17:55 2021 -0700
Add select segments API (#7651)
Co-authored-by: Jack Li(Analytics Engineering) <[email protected]>
---
.../api/resources/PinotSegmentRestletResource.java | 41 +++++++++++++
.../helix/core/PinotHelixResourceManager.java | 68 ++++++++++++++++++++++
2 files changed, 109 insertions(+)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 2628f01..ba569d5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -657,6 +658,46 @@ public class PinotSegmentRestletResource {
return segmentsMetadata;
}
+ @GET
+ @Path("segments/{tableName}/select")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get the selected segments given the (inclusive) start
and (exclusive) end timestamps"
+ + " in milliseconds. These timestamps will be compared against the
minmax values of the time column in each"
+ + " segment. If the table is a refresh use case, the value of start and
end timestamp is voided,"
+ + " since there is no time column for refresh use case; instead, the
whole qualified segments will be returned."
+ + " If no timestamps are provided, all the qualified segments will be
returned."
+ + " For the segments that partially belong to the time range, the
boolean flag 'excludeOverlapping' is introduced"
+ + " in order for user to determine whether to exclude this kind of
segments in the response.",
+ notes = "Get the selected segments given the start and end timestamps in
milliseconds")
+ public List<Map<TableType, List<String>>> getSelectedSegments(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
+ @ApiParam(value = "Start timestamp (inclusive)")
@QueryParam("startTimestamp") @DefaultValue("")
+ String startTimestampStr,
+ @ApiParam(value = "End timestamp (exclusive)")
@QueryParam("endTimestamp") @DefaultValue("")
+ String endTimestampStr,
+ @ApiParam(value = "Whether to exclude the segments overlapping with the
timestamps, false by default")
+ @QueryParam("excludeOverlapping") @DefaultValue("false") boolean
excludeOverlapping) {
+ long startTimestamp = Strings.isNullOrEmpty(startTimestampStr) ?
Long.MIN_VALUE : Long.parseLong(startTimestampStr);
+ long endTimestamp = Strings.isNullOrEmpty(endTimestampStr) ?
Long.MAX_VALUE : Long.parseLong(endTimestampStr);
+ Preconditions.checkArgument(startTimestamp < endTimestamp,
+ "The value of startTimestamp should be smaller than the one of
endTimestamp. Start timestamp: %d. End "
+ + "timestamp: %d",
+ startTimestamp, endTimestamp);
+
+ List<String> tableNamesWithType = ResourceUtils
+ .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName,
Constants.validateTableType(tableTypeStr),
+ LOGGER);
+ List<Map<TableType, List<String>>> resultList = new
ArrayList<>(tableNamesWithType.size());
+ for (String tableNameWithType : tableNamesWithType) {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ List<String> segments = _pinotHelixResourceManager
+ .getSegmentsForTableWithTimestamps(tableNameWithType,
startTimestamp, endTimestamp, excludeOverlapping);
+ resultList.add(Collections.singletonMap(tableType, segments));
+ }
+ return resultList;
+ }
+
/**
* This is a helper method to get the metadata for all segments for a given
table name.
* @param tableNameWithType name of the table along with its type
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8686671..189c02c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -593,6 +593,74 @@ public class PinotHelixResourceManager {
return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
}
+ /**
+ * Returns the segments for the given table based on the start and end
timestamp.
+ *
+ * @param tableNameWithType Table name with type suffix
+ * @param startTimestamp start timestamp in milliseconds (inclusive)
+ * @param endTimestamp end timestamp in milliseconds (exclusive)
+ * @param excludeOverlapping whether to exclude the segments overlapping
with the timestamps
+ */
+ public List<String> getSegmentsForTableWithTimestamps(String
tableNameWithType, long startTimestamp,
+ long endTimestamp, boolean excludeOverlapping) {
+ List<String> selectedSegments;
+ // If no start and end timestamp specified, just select all the segments.
+ if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
+ selectedSegments = getSegmentsFor(tableNameWithType);
+ } else {
+ selectedSegments = new ArrayList<>();
+ List<SegmentZKMetadata> segmentZKMetadataList =
getSegmentsZKMetadata(tableNameWithType);
+ for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp,
endTimestamp, excludeOverlapping)) {
+ selectedSegments.add(segmentName);
+ }
+ }
+ }
+ // Fetch the segment lineage metadata, and filter segments based on
segment lineage.
+ ZNRecord segmentLineageZNRecord =
+ SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore,
tableNameWithType);
+ SegmentLineage segmentLineage =
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+ Set<String> selectedSegmentSet = new HashSet<>(selectedSegments);
+
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(selectedSegmentSet,
segmentLineage);
+ return new ArrayList<>(selectedSegmentSet);
+ }
+
+ /**
+ * Checks whether the segment is within the time range between the start and
end timestamps.
+ * @param segmentMetadata the segmentMetadata associated with the segment
+ * @param startTimestamp start timestamp
+ * @param endTimestamp end timestamp
+ * @param excludeOverlapping whether to exclude the segments overlapping
with the timestamps
+ */
+ private boolean isSegmentWithinTimeStamps(SegmentZKMetadata segmentMetadata,
long startTimestamp,
+ long endTimestamp, boolean excludeOverlapping) {
+ if (segmentMetadata == null) {
+ return false;
+ }
+ long startTimeMsInSegment = segmentMetadata.getStartTimeMs();
+ long endTimeMsInSegment = segmentMetadata.getEndTimeMs();
+ if (startTimeMsInSegment == -1 && endTimeMsInSegment == -1) {
+ // No time column specified in the metadata and no minmax value either.
+ return true;
+ }
+ if (startTimeMsInSegment > endTimeMsInSegment) {
+ LOGGER.warn("Invalid start and end time for segment: {}. Start time: {}.
End time: {}",
+ segmentMetadata.getSegmentName(), startTimeMsInSegment,
endTimeMsInSegment);
+ return false;
+ }
+ if (startTimestamp <= startTimeMsInSegment && endTimeMsInSegment <
endTimestamp) {
+ // The segment is within the start and end time range.
+ return true;
+ } else if (endTimeMsInSegment < startTimestamp || startTimeMsInSegment >=
endTimestamp) {
+ // The segment is outside of the start and end time range.
+ return false;
+ }
+ // If the segment happens to overlap with the start and end time range,
+ // check the excludeOverlapping flag to determine whether to include the
segment.
+ return !excludeOverlapping;
+ }
+
@Nullable
public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType,
String segmentName) {
return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore,
tableNameWithType, segmentName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]