This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-select-segment-api
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 243613426cbc8c02e0834f9d0c8bb12cfebf60c3
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Oct 27 15:48:20 2021 -0700

    Add select segments API
---
 .../api/resources/PinotSegmentRestletResource.java | 42 +++++++++++++
 .../helix/core/PinotHelixResourceManager.java      | 72 ++++++++++++++++++++++
 2 files changed, 114 insertions(+)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 2628f01..3900f34 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -53,9 +54,14 @@ import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.SegmentName;
+import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.access.AccessType;
@@ -657,6 +663,42 @@ public class PinotSegmentRestletResource {
     return segmentsMetadata;
   }
 
+  @GET
+  @Path("segments/{tableName}/select")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the selected segments given the (inclusive) start 
and (exclusive) end timestamps"
+      + " in milliseconds. If no timestamps are provided, all the segments 
will be returned.",
+      notes = "Get the selected segments given the start and end timestamps in 
milliseconds")
+  public List<Map<TableType, List<String>>> getSelectedSegments(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
+      @ApiParam(value = "Start timestamp (inclusive)") 
@QueryParam("startTimestamp") @DefaultValue("")
+          String startTimestampStr,
+      @ApiParam(value = "End timestamp (exclusive)") 
@QueryParam("endTimestamp") @DefaultValue("")
+          String endTimestampStr,
+      @ApiParam(value = "Whether to exclude the segments overlapping with the 
timestamps, false by default")
+      @QueryParam("excludeOverlapping") @DefaultValue("false") boolean 
excludeOverlapping) {
+    long startTimestamp = Strings.isNullOrEmpty(startTimestampStr) ? 
Long.MIN_VALUE : Long.parseLong(startTimestampStr);
+    long endTimestamp = Strings.isNullOrEmpty(endTimestampStr) ? 
Long.MAX_VALUE : Long.parseLong(endTimestampStr);
+    Preconditions.checkArgument(startTimestamp < endTimestamp,
+        "The value of startTimestamp should be smaller than the one of 
endTimestamp. Start timestamp: %d. End "
+            + "timestamp: %d",
+        startTimestamp, endTimestamp);
+
+    List<String> tableNamesWithType = ResourceUtils
+        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, 
Constants.validateTableType(tableTypeStr),
+            LOGGER);
+    List<Map<TableType, List<String>>> resultList = new 
ArrayList<>(tableNamesWithType.size());
+    for (String tableNameWithType : tableNamesWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      List<String> segments =
+          _pinotHelixResourceManager
+              .getSegmentsForTableWithTimestamps(tableNameWithType, 
startTimestamp, endTimestamp, excludeOverlapping);
+      resultList.add(Collections.singletonMap(tableType, segments));
+    }
+    return resultList;
+  }
+
   /**
    * This is a helper method to get the metadata for all segments for a given 
table name.
    * @param tableNameWithType name of the table along with its type
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8686671..d2f8633 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -593,6 +593,78 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
   }
 
+  /**
+   * Returns the segments for the given table based on the start and end 
timestamp.
+   *
+   * @param tableNameWithType  Table name with type suffix
+   * @param startTimestamp  start timestamp in milliseconds (inclusive)
+   * @param endTimestamp  end timestamp in milliseconds (exclusive)
+   * @param excludeOverlapping  whether to exclude the segments overlapping 
with the timestamps
+   */
+  public List<String> getSegmentsForTableWithTimestamps(String 
tableNameWithType, long startTimestamp,
+      long endTimestamp, boolean excludeOverlapping) {
+    List<String> selectedSegments = new ArrayList<>();
+    List<String> segmentNames = getSegmentsFor(tableNameWithType);
+    // If no start and end timestamp specified, just select all the segments.
+    if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
+      selectedSegments = segmentNames;
+    } else {
+      String segmentZKMetadataPathPrefix =
+          
ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + 
"/";
+      List<String> segmentZKMetadataPaths = new 
ArrayList<>(segmentNames.size());
+      for (String segmentName : segmentNames) {
+        segmentZKMetadataPaths.add(segmentZKMetadataPathPrefix + segmentName);
+      }
+      List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, 
null, AccessOption.PERSISTENT);
+      for (int i = 0; i < znRecords.size(); i++) {
+        String segmentName = segmentNames.get(i);
+        ZNRecord znRecord = znRecords.get(i);
+        if (isSegmentWithinTimeStamps(segmentName, znRecord, startTimestamp, 
endTimestamp, excludeOverlapping)) {
+          selectedSegments.add(segmentName);
+        }
+      }
+    }
+    // Fetch the segment lineage metadata, and filter segments based on 
segment lineage.
+    ZNRecord segmentLineageZNRecord =
+        SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
+    SegmentLineage segmentLineage = 
SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+    Set<String> selectedSegmentSet = new HashSet<>(selectedSegments);
+    
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(selectedSegmentSet, 
segmentLineage);
+    return new ArrayList<>(selectedSegmentSet);
+  }
+
+  /**
+   * Checks whether the segment is within the time range between the start and 
end timestamps.
+   * @param segmentName  segment name
+   * @param znRecord  the ZNRecord associated with the segment name
+   * @param startTimestamp  start timestamp
+   * @param endTimestamp  end timestamp
+   * @param excludeOverlapping  whether to exclude the segments overlapping 
with the timestamps
+   */
+  private boolean isSegmentWithinTimeStamps(String segmentName, ZNRecord 
znRecord, long startTimestamp,
+      long endTimestamp, boolean excludeOverlapping) {
+    if (znRecord == null) {
+      return false;
+    }
+    long startTimeMsInSegment = 
znRecord.getLongField(CommonConstants.Segment.START_TIME, -1);
+    long endTimeMsInSegment = 
znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
+    if (startTimeMsInSegment > endTimeMsInSegment) {
+      LOGGER.warn("Invalid start and end time for segment: {}. Start time: {}. 
End time: {}", segmentName,
+          startTimeMsInSegment, endTimeMsInSegment);
+      return false;
+    }
+    if (startTimestamp <= startTimeMsInSegment && endTimeMsInSegment < 
endTimestamp) {
+      // The segment is within the start and end time range.
+      return true;
+    } else if (endTimeMsInSegment < startTimestamp || startTimeMsInSegment >= 
endTimestamp) {
+      // The segment is outside of the start and end time range.
+      return false;
+    }
+    // If the segment happens to overlap with the start and end time range,
+    // check the excludeOverlapping flag to determine whether to include the 
segment.
+    return !excludeOverlapping;
+  }
+
   @Nullable
   public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, 
String segmentName) {
     return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, 
tableNameWithType, segmentName);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to