FANNG1 commented on code in PR #9050:
URL: https://github.com/apache/gravitino/pull/9050#discussion_r2556342290
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java:
##########
@@ -76,6 +76,8 @@ public class IcebergConfigOperations {
.add(Endpoint.V1_REPORT_METRICS)
.add(Endpoint.V1_COMMIT_TRANSACTION)
.add(Endpoint.V1_TABLE_CREDENTIALS)
+ // Note: Scan planning endpoints are available in Iceberg 1.10.0+
Review Comment:
how about removing the note?
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java:
##########
@@ -375,6 +377,78 @@ public Response getTableCredentials(
}
}
+ /**
+ * Plan table scan endpoint.
+ *
+ * <p>POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/scan
+ *
+ * <p>This endpoint allows clients to request a scan plan from the server,
which can optimize scan
+ * planning by leveraging server-side resources and reduce network overhead.
+ *
+ * <p>Implementation referenced from:
+ *
+ * <ul>
+ * <li>Iceberg PR #13004 - Request/Response models for REST Scan Planning
+ * <li>Iceberg PR #13400 - Core REST Scan Planning Task Implementation
+ * </ul>
+ *
+ * <p>Now using official {@code PlanTableScanRequest} and {@code
PlanTableScanResponse} from
+ * Apache Iceberg 1.10.0+.
+ *
+ * <p>Note: Iceberg 1.10.0 is a transition version. Full Builder API for
PlanTableScanRequest will
+ * be available in Iceberg 1.11.0+. See:
+ *
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java
+ *
+ * @param prefix The catalog prefix
+ * @param namespace The namespace
+ * @param table The table name
+ * @param accessDelegation The access delegation header for credential
vending
+ * @param scanRequest The scan request containing filters, projections, etc.
+ * @return Response containing the scan plan with tasks
+ */
+ @POST
+ @Path("{table}/scan")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Timed(name = "plan-table-scan." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "plan-table-scan", absolute = true)
+ public Response planTableScan(
+ @PathParam("prefix") String prefix,
+ @Encoded() @PathParam("namespace") String namespace,
+ @PathParam("table") String table,
+ @HeaderParam(X_ICEBERG_ACCESS_DELEGATION) String accessDelegation,
Review Comment:
does table scan plan interface need `accessDelegation`?
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java:
##########
@@ -375,6 +377,78 @@ public Response getTableCredentials(
}
}
+ /**
+ * Plan table scan endpoint.
+ *
+ * <p>POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/scan
+ *
+ * <p>This endpoint allows clients to request a scan plan from the server,
which can optimize scan
+ * planning by leveraging server-side resources and reduce network overhead.
+ *
+ * <p>Implementation referenced from:
+ *
+ * <ul>
+ * <li>Iceberg PR #13004 - Request/Response models for REST Scan Planning
+ * <li>Iceberg PR #13400 - Core REST Scan Planning Task Implementation
+ * </ul>
+ *
+ * <p>Now using official {@code PlanTableScanRequest} and {@code
PlanTableScanResponse} from
+ * Apache Iceberg 1.10.0+.
+ *
+ * <p>Note: Iceberg 1.10.0 is a transition version. Full Builder API for
PlanTableScanRequest will
+ * be available in Iceberg 1.11.0+. See:
+ *
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/rest/requests/PlanTableScanRequest.java
+ *
+ * @param prefix The catalog prefix
+ * @param namespace The namespace
+ * @param table The table name
+ * @param accessDelegation The access delegation header for credential
vending
+ * @param scanRequest The scan request containing filters, projections, etc.
+ * @return Response containing the scan plan with tasks
+ */
+ @POST
+ @Path("{table}/scan")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Timed(name = "plan-table-scan." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "plan-table-scan", absolute = true)
+ public Response planTableScan(
+ @PathParam("prefix") String prefix,
+ @Encoded() @PathParam("namespace") String namespace,
+ @PathParam("table") String table,
+ @HeaderParam(X_ICEBERG_ACCESS_DELEGATION) String accessDelegation,
+ PlanTableScanRequest scanRequest) {
+ String catalogName = IcebergRestUtils.getCatalogName(prefix);
+ Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
+ boolean isCredentialVending = isCredentialVending(accessDelegation);
+
+ LOG.info(
+ "Plan table scan, catalog: {}, namespace: {}, table: {}, credential
vending: {}",
+ catalogName,
+ icebergNS,
+ table,
+ isCredentialVending);
+
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ TableIdentifier tableIdentifier = TableIdentifier.of(icebergNS,
table);
+ IcebergRequestContext context =
+ new IcebergRequestContext(httpServletRequest(), catalogName,
isCredentialVending);
+
+ // Call dispatcher to plan scan
Review Comment:
how about remove the comment?
##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java:
##########
@@ -192,6 +206,186 @@ private Credential getCredential(LoadTableResponse
loadTableResponse) {
return credential;
}
+ /**
+ * Plan table scan and return scan tasks.
+ *
+ * <p>This method performs server-side scan planning to optimize query
performance by reducing
+ * client-side metadata loading and enabling parallel task execution.
+ *
+ * <p>Implementation uses synchronous scan planning (COMPLETED status) where
tasks are returned
+ * immediately as serialized JSON strings. This is different from
asynchronous mode (SUBMITTED
+ * status) where a plan ID is returned for later retrieval.
+ *
+ * <p>Referenced from Iceberg PR #13400 for scan planning implementation.
+ *
+ * @param tableIdentifier The table identifier.
+ * @param scanRequest The scan request parameters including filters,
projections, snapshot-id,
+ * etc.
+ * @return PlanTableScanResponse with status=COMPLETED and serialized
planTasks.
+ * @throws IllegalArgumentException if scan request validation fails
+ * @throws org.apache.gravitino.exceptions.NoSuchTableException if table
doesn't exist
+ * @throws RuntimeException for other scan planning failures
+ */
+ public PlanTableScanResponse planTableScan(
+ TableIdentifier tableIdentifier, PlanTableScanRequest scanRequest) {
+
+ LOG.debug(
+ "Planning scan for table: {}, snapshotId: {}, select: {},
caseSensitive: {}",
+ tableIdentifier,
+ scanRequest.snapshotId(),
+ scanRequest.select(),
+ scanRequest.caseSensitive());
+
+ try {
+ Table table = catalog.loadTable(tableIdentifier);
+ if (table == null) {
+ throw new NoSuchTableException("Table not found: %s", tableIdentifier);
+ }
+
+ TableScan tableScan = table.newScan();
+ tableScan = applyScanRequest(tableScan, scanRequest);
+
+ List<String> planTasks = new ArrayList<>();
+ Map<Integer, PartitionSpec> specsById = new HashMap<>();
+ List<org.apache.iceberg.DeleteFile> deleteFiles = new ArrayList<>();
+
+ try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.planFiles()) {
+ for (FileScanTask fileScanTask : fileScanTasks) {
+ try {
+ String taskString = ScanTaskParser.toJson(fileScanTask);
+ planTasks.add(taskString);
+
+ int specId = fileScanTask.spec().specId();
+ if (!specsById.containsKey(specId)) {
+ specsById.put(specId, fileScanTask.spec());
+ }
+
+ if (!fileScanTask.deletes().isEmpty()) {
+ deleteFiles.addAll(fileScanTask.deletes());
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to serialize scan task for table: {}, skipping task.
Error: {}",
+ tableIdentifier,
+ e.getMessage());
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to close scan task iterator for table: {}",
tableIdentifier, e);
+ throw new RuntimeException("Failed to plan scan tasks: " +
e.getMessage(), e);
+ }
+
+ List<DeleteFile> uniqueDeleteFiles =
+
deleteFiles.stream().distinct().collect(java.util.stream.Collectors.toList());
+
+ if (planTasks.isEmpty()) {
+ LOG.info(
+ "Scan planning returned no tasks for table: {}. Table may be empty
or fully filtered.",
+ tableIdentifier);
+ }
+
+ if (!planTasks.isEmpty() && specsById.isEmpty()) {
+ LOG.error(
+ "Internal error: planTasks is not empty ({} tasks) but specsById
is empty for table: {}",
+ planTasks.size(),
+ tableIdentifier);
+ throw new IllegalStateException("Scan planning produced tasks but no
partition specs");
+ }
+
+ PlanTableScanResponse.Builder responseBuilder =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(planTasks)
+ .withSpecsById(specsById);
+
+ if (!uniqueDeleteFiles.isEmpty()) {
+ responseBuilder.withDeleteFiles(uniqueDeleteFiles);
+ LOG.debug(
+ "Included {} delete files in scan plan for table: {}",
+ uniqueDeleteFiles.size(),
+ tableIdentifier);
+ }
+
+ PlanTableScanResponse response = responseBuilder.build();
+
+ String snapshotInfo =
+ scanRequest.snapshotId() != null ?
String.valueOf(scanRequest.snapshotId()) : "current";
+ LOG.info(
+ "Successfully planned {} scan tasks for table: {}, snapshot: {}",
+ planTasks.size(),
+ tableIdentifier,
+ snapshotInfo);
+
+ return response;
+
+ } catch (IllegalArgumentException e) {
+ LOG.error("Invalid scan request for table {}: {}", tableIdentifier,
e.getMessage());
+ throw new IllegalArgumentException("Invalid scan parameters: " +
e.getMessage(), e);
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ LOG.error("Table not found during scan planning: {}", tableIdentifier);
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Unexpected error during scan planning for table: {}",
tableIdentifier, e);
+ throw new RuntimeException(
+ "Scan planning failed for table " + tableIdentifier + ": " +
e.getMessage(), e);
+ }
+ }
+
+ private TableScan applyScanRequest(TableScan tableScan, PlanTableScanRequest
scanRequest) {
Review Comment:
should we handle start snapshot id and stop snapshot id?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]