Copilot commented on code in PR #17627:
URL: https://github.com/apache/pinot/pull/17627#discussion_r2779576692
##########
pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java:
##########
@@ -106,27 +112,53 @@ public SuccessResponse reloadSegment(String tableName,
String segmentName, boole
}
public SuccessResponse reloadAllSegments(String tableName, String
tableTypeStr, boolean forceDownload,
- String targetInstance, String instanceToSegmentsMapInJson, HttpHeaders
headers)
+ String targetInstance, String instanceToSegmentsMapInJson, String
startTimestampStr, String endTimestampStr,
+ boolean excludeOverlapping, HttpHeaders headers)
throws IOException {
tableName = DatabaseUtils.translateTableName(tableName, headers);
TableType tableTypeFromTableName =
TableNameBuilder.getTableTypeFromTableName(tableName);
TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
- // When rawTableName is provided but w/o table type, Pinot tries to reload
both OFFLINE
+ // When rawTableName is provided but without table type, Pinot tries to
reload both OFFLINE
// and REALTIME tables for the raw table. But forceDownload option only
works with
// OFFLINE table currently, so we limit the table type to OFFLINE to let
Pinot continue
- // to reload w/o being accidentally aborted upon REALTIME table type.
+ // to reload without being accidentally aborted upon REALTIME table type.
// TODO: support to force download immutable segments from RealTime table.
if (forceDownload && (tableTypeFromTableName == null &&
tableTypeFromRequest == null)) {
tableTypeFromRequest = TableType.OFFLINE;
}
Review Comment:
The table-name translation + table-type resolution + `forceDownload`
defaulting logic now exists in both `reloadAllSegments(...)` and
`reloadSegmentsInTimeRange(...)`. This duplication increases the risk of the
two paths drifting (e.g., behavior changes applied to only one). Consider
extracting this shared resolution into a small private helper that returns
`(translatedTableName, resolvedTableTypeFromRequest)` (or similar) and reusing
it in both methods.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java:
##########
@@ -169,6 +201,108 @@ public SuccessResponse reloadAllSegments(String
tableName, String tableTypeStr,
return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
}
+ public SuccessResponse reloadSegmentsInTimeRange(String tableName, String
tableTypeStr, String startTimestampStr,
+ String endTimestampStr, boolean excludeOverlapping, boolean
forceDownload, @Nullable String targetInstance,
+ HttpHeaders headers) {
+ if (Strings.isNullOrEmpty(startTimestampStr) ||
Strings.isNullOrEmpty(endTimestampStr)) {
+ throw new ControllerApplicationException(LOG, "startTimestamp and
endTimestamp must be provided.",
+ Response.Status.BAD_REQUEST);
+ }
+ long startTimestamp;
+ long endTimestamp;
+ try {
+ startTimestamp = Long.parseLong(startTimestampStr);
+ endTimestamp = Long.parseLong(endTimestampStr);
+ } catch (NumberFormatException e) {
+ throw new ControllerApplicationException(LOG,
+ "Failed to parse the start/end timestamp. Please make sure they are
in 'milliseconds since epoch' format.",
+ Response.Status.BAD_REQUEST, e);
+ }
+ if (startTimestamp >= endTimestamp) {
+ throw new ControllerApplicationException(LOG, String.format(
+ "startTimestamp must be less than endTimestamp. Provided: start=%d,
end=%d", startTimestamp, endTimestamp),
+ Response.Status.BAD_REQUEST);
+ }
+
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ TableType tableTypeFromTableName =
TableNameBuilder.getTableTypeFromTableName(tableName);
+ TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
+ // When rawTableName is provided but without table type, Pinot tries to
reload both OFFLINE
+ // and REALTIME tables for the raw table. But forceDownload option only
works with
+ // OFFLINE table currently, so we limit the table type to OFFLINE to let
Pinot continue
+ // to reload without being accidentally aborted upon REALTIME table type.
+ // TODO: support to force download immutable segments from RealTime table.
+ if (forceDownload && (tableTypeFromTableName == null &&
tableTypeFromRequest == null)) {
+ tableTypeFromRequest = TableType.OFFLINE;
+ }
+ List<String> tableNamesWithType =
+
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableTypeFromRequest, LOG);
+ Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
+ for (String tableNameWithType : tableNamesWithType) {
+ List<String> segments =
+ _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, true,
startTimestamp, endTimestamp,
+ excludeOverlapping);
+ if (segments.isEmpty()) {
+ continue;
+ }
+ Set<String> selectedSegments = new HashSet<>(segments);
+ Map<String, List<String>> serverToSegmentsMap =
+ _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType,
targetInstance, false);
+ Map<String, List<String>> filteredInstanceToSegmentsMap = new
HashMap<>();
+ for (Map.Entry<String, List<String>> entry :
serverToSegmentsMap.entrySet()) {
+ List<String> instanceSegments =
+
entry.getValue().stream().filter(selectedSegments::contains).collect(Collectors.toList());
+ if (!instanceSegments.isEmpty()) {
+ filteredInstanceToSegmentsMap.put(entry.getKey(), instanceSegments);
+ }
+ }
+ if (filteredInstanceToSegmentsMap.isEmpty()) {
+ continue;
+ }
+ String reloadJobId = UUID.randomUUID().toString();
+ long startTimeMs = System.currentTimeMillis();
+ Map<String, Pair<Integer, String>> instanceMsgInfoMap =
+ _pinotHelixResourceManager.reloadSegments(tableNameWithType,
forceDownload, filteredInstanceToSegmentsMap,
+ reloadJobId);
+ int numReloadMsgSent =
instanceMsgInfoMap.values().stream().mapToInt(Pair::getLeft).sum();
+ if (numReloadMsgSent <= 0) {
+ continue;
+ }
+ Set<String> segmentsToReload =
filteredInstanceToSegmentsMap.values().stream()
+ .flatMap(List::stream)
+ .collect(Collectors.toSet());
Review Comment:
Building `segmentNamesStr` from a `Set` makes the order of segment names
non-deterministic, which can lead to inconsistent job metadata/logging and
flaky assertions if anyone later checks the serialized segment list. Consider
collecting into a sorted `List` (or using a deterministic set such as
`TreeSet`) before joining.
```suggestion
List<String> segmentsToReload =
filteredInstanceToSegmentsMap.values().stream()
.flatMap(List::stream)
.sorted()
.collect(Collectors.toList());
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/services/PinotTableReloadService.java:
##########
@@ -106,27 +112,53 @@ public SuccessResponse reloadSegment(String tableName,
String segmentName, boole
}
public SuccessResponse reloadAllSegments(String tableName, String
tableTypeStr, boolean forceDownload,
- String targetInstance, String instanceToSegmentsMapInJson, HttpHeaders
headers)
+ String targetInstance, String instanceToSegmentsMapInJson, String
startTimestampStr, String endTimestampStr,
+ boolean excludeOverlapping, HttpHeaders headers)
throws IOException {
tableName = DatabaseUtils.translateTableName(tableName, headers);
TableType tableTypeFromTableName =
TableNameBuilder.getTableTypeFromTableName(tableName);
TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
- // When rawTableName is provided but w/o table type, Pinot tries to reload
both OFFLINE
+ // When rawTableName is provided but without table type, Pinot tries to
reload both OFFLINE
// and REALTIME tables for the raw table. But forceDownload option only
works with
// OFFLINE table currently, so we limit the table type to OFFLINE to let
Pinot continue
- // to reload w/o being accidentally aborted upon REALTIME table type.
+ // to reload without being accidentally aborted upon REALTIME table type.
// TODO: support to force download immutable segments from RealTime table.
if (forceDownload && (tableTypeFromTableName == null &&
tableTypeFromRequest == null)) {
tableTypeFromRequest = TableType.OFFLINE;
}
+ boolean hasStartTimestamp = !Strings.isNullOrEmpty(startTimestampStr);
+ boolean hasEndTimestamp = !Strings.isNullOrEmpty(endTimestampStr);
+ if (hasStartTimestamp || hasEndTimestamp) {
+ if (!(hasStartTimestamp && hasEndTimestamp)) {
+ throw new ControllerApplicationException(LOG, "startTimestamp and
endTimestamp must be provided together.",
+ Response.Status.BAD_REQUEST);
+ }
+ if (targetInstance != null || instanceToSegmentsMapInJson != null) {
+ throw new ControllerApplicationException(LOG,
+ "startTimestamp/endTimestamp/excludeOverlapping cannot be used
with targetInstance/instanceToSegmentsMap.",
Review Comment:
This error message is misleading because `excludeOverlapping` is inherently
a time-range option (and will always be present as `false` by default), but the
actual conflict here is only between time-range parameters and
`targetInstance`/`instanceToSegmentsMap`. Consider rewording to something like:
\"startTimestamp/endTimestamp cannot be used with
targetInstance/instanceToSegmentsMap\" (and optionally mention
`excludeOverlapping` only in the separate validation that it requires the time
range).
```suggestion
"startTimestamp/endTimestamp cannot be used with
targetInstance/instanceToSegmentsMap.",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]