This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-select-segment-api in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 243613426cbc8c02e0834f9d0c8bb12cfebf60c3 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Wed Oct 27 15:48:20 2021 -0700 Add select segments API --- .../api/resources/PinotSegmentRestletResource.java | 42 +++++++++++++ .../helix/core/PinotHelixResourceManager.java | 72 ++++++++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 2628f01..3900f34 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -53,9 +54,14 @@ import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.lineage.LineageEntry; +import org.apache.pinot.common.lineage.LineageEntryState; +import org.apache.pinot.common.lineage.SegmentLineage; +import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.SegmentName; +import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.access.AccessType; @@ -657,6 +663,42 @@ public class PinotSegmentRestletResource { return segmentsMetadata; } + @GET + @Path("segments/{tableName}/select") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get the selected segments given the (inclusive) start and (exclusive) end timestamps" + + " in milliseconds. If no timestamps are provided, all the segments will be returned.", + notes = "Get the selected segments given the start and end timestamps in milliseconds") + public List<Map<TableType, List<String>>> getSelectedSegments( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, + @ApiParam(value = "Start timestamp (inclusive)") @QueryParam("startTimestamp") @DefaultValue("") + String startTimestampStr, + @ApiParam(value = "End timestamp (exclusive)") @QueryParam("endTimestamp") @DefaultValue("") + String endTimestampStr, + @ApiParam(value = "Whether to exclude the segments overlapping with the timestamps, false by default") + @QueryParam("excludeOverlapping") @DefaultValue("false") boolean excludeOverlapping) { + long startTimestamp = Strings.isNullOrEmpty(startTimestampStr) ? Long.MIN_VALUE : Long.parseLong(startTimestampStr); + long endTimestamp = Strings.isNullOrEmpty(endTimestampStr) ? Long.MAX_VALUE : Long.parseLong(endTimestampStr); + Preconditions.checkArgument(startTimestamp < endTimestamp, + "The value of startTimestamp should be smaller than the one of endTimestamp. Start timestamp: %d. End " + + "timestamp: %d", + startTimestamp, endTimestamp); + + List<String> tableNamesWithType = ResourceUtils + .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, Constants.validateTableType(tableTypeStr), + LOGGER); + List<Map<TableType, List<String>>> resultList = new ArrayList<>(tableNamesWithType.size()); + for (String tableNameWithType : tableNamesWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + List<String> segments = + _pinotHelixResourceManager + .getSegmentsForTableWithTimestamps(tableNameWithType, startTimestamp, endTimestamp, excludeOverlapping); + resultList.add(Collections.singletonMap(tableType, segments)); + } + return resultList; + } + /** * This is a helper method to get the metadata for all segments for a given table name. * @param tableNameWithType name of the table along with its type diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 8686671..d2f8633 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -593,6 +593,78 @@ public class PinotHelixResourceManager { return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType); } + /** + * Returns the segments for the given table based on the start and end timestamp. + * + * @param tableNameWithType Table name with type suffix + * @param startTimestamp start timestamp in milliseconds (inclusive) + * @param endTimestamp end timestamp in milliseconds (exclusive) + * @param excludeOverlapping whether to exclude the segments overlapping with the timestamps + */ + public List<String> getSegmentsForTableWithTimestamps(String tableNameWithType, long startTimestamp, + long endTimestamp, boolean excludeOverlapping) { + List<String> selectedSegments = new ArrayList<>(); + List<String> segmentNames = getSegmentsFor(tableNameWithType); + // If no start and end timestamp specified, just select all the segments. + if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) { + selectedSegments = segmentNames; + } else { + String segmentZKMetadataPathPrefix = + ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + "/"; + List<String> segmentZKMetadataPaths = new ArrayList<>(segmentNames.size()); + for (String segmentName : segmentNames) { + segmentZKMetadataPaths.add(segmentZKMetadataPathPrefix + segmentName); + } + List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT); + for (int i = 0; i < znRecords.size(); i++) { + String segmentName = segmentNames.get(i); + ZNRecord znRecord = znRecords.get(i); + if (isSegmentWithinTimeStamps(segmentName, znRecord, startTimestamp, endTimestamp, excludeOverlapping)) { + selectedSegments.add(segmentName); + } + } + } + // Fetch the segment lineage metadata, and filter segments based on segment lineage. + ZNRecord segmentLineageZNRecord = + SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType); + SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord); + Set<String> selectedSegmentSet = new HashSet<>(selectedSegments); + SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(selectedSegmentSet, segmentLineage); + return new ArrayList<>(selectedSegmentSet); + } + + /** + * Checks whether the segment is within the time range between the start and end timestamps. + * @param segmentName segment name + * @param znRecord the ZNRecord associated with the segment name + * @param startTimestamp start timestamp + * @param endTimestamp end timestamp + * @param excludeOverlapping whether to exclude the segments overlapping with the timestamps + */ + private boolean isSegmentWithinTimeStamps(String segmentName, ZNRecord znRecord, long startTimestamp, + long endTimestamp, boolean excludeOverlapping) { + if (znRecord == null) { + return false; + } + long startTimeMsInSegment = znRecord.getLongField(CommonConstants.Segment.START_TIME, -1); + long endTimeMsInSegment = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1); + if (startTimeMsInSegment > endTimeMsInSegment) { + LOGGER.warn("Invalid start and end time for segment: {}. Start time: {}. End time: {}", segmentName, + startTimeMsInSegment, endTimeMsInSegment); + return false; + } + if (startTimestamp <= startTimeMsInSegment && endTimeMsInSegment < endTimestamp) { + // The segment is within the start and end time range. + return true; + } else if (endTimeMsInSegment < startTimestamp || startTimeMsInSegment >= endTimestamp) { + // The segment is outside of the start and end time range. + return false; + } + // If the segment happens to overlap with the start and end time range, + // check the excludeOverlapping flag to determine whether to include the segment. + return !excludeOverlapping; + } + @Nullable public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, String segmentName) { return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org