FANNG1 commented on code in PR #9050:
URL: https://github.com/apache/gravitino/pull/9050#discussion_r2564554582


##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java:
##########
@@ -207,6 +221,267 @@ private Credential getCredential(
     return credential;
   }
 
+  /**
+   * Plan table scan and return scan tasks.
+   *
+   * <p>This method performs server-side scan planning to optimize query 
performance by reducing
+   * client-side metadata loading and enabling parallel task execution.
+   *
+   * <p>Implementation uses synchronous scan planning (COMPLETED status) where 
tasks are returned
+   * immediately as serialized JSON strings. This is different from 
asynchronous mode (SUBMITTED
+   * status) where a plan ID is returned for later retrieval.
+   *
+   * <p>Referenced from Iceberg PR #13400 for scan planning implementation.
+   *
+   * @param tableIdentifier The table identifier.
+   * @param scanRequest The scan request parameters including filters, 
projections, snapshot-id,
+   *     etc.
+   * @return PlanTableScanResponse with status=COMPLETED and serialized 
planTasks.
+   * @throws IllegalArgumentException if scan request validation fails
+   * @throws org.apache.gravitino.exceptions.NoSuchTableException if table 
doesn't exist
+   * @throws RuntimeException for other scan planning failures
+   */
+  public PlanTableScanResponse planTableScan(
+      TableIdentifier tableIdentifier, PlanTableScanRequest scanRequest) {
+
+    LOG.debug(
+        "Planning scan for table: {}, snapshotId: {}, startSnapshotId: {}, 
endSnapshotId: {}, select: {}, caseSensitive: {}",
+        tableIdentifier,
+        scanRequest.snapshotId(),
+        scanRequest.startSnapshotId(),
+        scanRequest.endSnapshotId(),
+        scanRequest.select(),
+        scanRequest.caseSensitive());
+
+    try {
+      Table table = catalog.loadTable(tableIdentifier);
+      CloseableIterable<FileScanTask> fileScanTasks =
+          createFilePlanScanTasks(table, tableIdentifier, scanRequest);
+
+      List<String> planTasks = new ArrayList<>();
+      Map<Integer, PartitionSpec> specsById = new HashMap<>();
+      List<org.apache.iceberg.DeleteFile> deleteFiles = new ArrayList<>();
+
+      try (fileScanTasks) {
+        for (FileScanTask fileScanTask : fileScanTasks) {
+          try {
+            String taskString = ScanTaskParser.toJson(fileScanTask);
+            planTasks.add(taskString);
+
+            int specId = fileScanTask.spec().specId();
+            if (!specsById.containsKey(specId)) {
+              specsById.put(specId, fileScanTask.spec());
+            }
+
+            if (!fileScanTask.deletes().isEmpty()) {
+              deleteFiles.addAll(fileScanTask.deletes());
+            }
+          } catch (Exception e) {
+            throw new RuntimeException(
+                String.format(
+                    "Failed to serialize scan task for table: %s. Error: %s",
+                    tableIdentifier, e.getMessage()),
+                e);
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("Failed to close scan task iterator for table: {}", 
tableIdentifier, e);
+        throw new RuntimeException("Failed to plan scan tasks: " + 
e.getMessage(), e);
+      }
+
+      List<DeleteFile> uniqueDeleteFiles =
+          
deleteFiles.stream().distinct().collect(java.util.stream.Collectors.toList());
+
+      if (planTasks.isEmpty()) {
+        LOG.info(
+            "Scan planning returned no tasks for table: {}. Table may be empty 
or fully filtered.",
+            tableIdentifier);
+      }
+
+      PlanTableScanResponse.Builder responseBuilder =
+          PlanTableScanResponse.builder()
+              .withPlanStatus(PlanStatus.COMPLETED)
+              .withPlanTasks(planTasks)
+              .withSpecsById(specsById);
+
+      if (!uniqueDeleteFiles.isEmpty()) {
+        responseBuilder.withDeleteFiles(uniqueDeleteFiles);
+        LOG.debug(
+            "Included {} delete files in scan plan for table: {}",
+            uniqueDeleteFiles.size(),
+            tableIdentifier);
+      }
+
+      return responseBuilder.build();
+
+    } catch (IllegalArgumentException e) {
+      LOG.error("Invalid scan request for table {}: {}", tableIdentifier, 
e.getMessage());
+      throw new IllegalArgumentException("Invalid scan parameters: " + 
e.getMessage(), e);
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      LOG.error("Table not found during scan planning: {}", tableIdentifier);
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Unexpected error during scan planning for table: {}", 
tableIdentifier, e);
+      throw new RuntimeException(
+          "Scan planning failed for table " + tableIdentifier + ": " + 
e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Create and plan a scan based on the scan request.
+   *
+   * <p>If both start and end snapshot IDs are provided, uses 
IncrementalAppendScan. Otherwise, uses
+   * regular TableScan.
+   *
+   * @param table The table to scan
+   * @param tableIdentifier The table identifier for logging
+   * @param scanRequest The scan request parameters
+   * @return CloseableIterable of FileScanTask
+   */
+  private CloseableIterable<FileScanTask> createFilePlanScanTasks(
+      Table table, TableIdentifier tableIdentifier, PlanTableScanRequest 
scanRequest) {
+    Long startSnapshotId = scanRequest.startSnapshotId();
+    Long endSnapshotId = scanRequest.endSnapshotId();
+    // Use IncrementalAppendScan if both start and end snapshot IDs are 
provided
+    if (startSnapshotId != null && endSnapshotId != null) {
+      if (startSnapshotId >= endSnapshotId) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Invalid snapshot range: startSnapshotId (%d) must be less 
than endSnapshotId (%d)",
+                startSnapshotId, endSnapshotId));
+      }
+      LOG.debug(
+          "Using IncrementalAppendScan for table: {}, from snapshot: {} to 
snapshot: {}",
+          tableIdentifier,
+          startSnapshotId,
+          endSnapshotId);
+      IncrementalAppendScan incrementalScan =
+          table
+              .newIncrementalAppendScan()
+              .fromSnapshotInclusive(startSnapshotId)
+              .toSnapshot(endSnapshotId);
+      incrementalScan = applyScanRequestToIncremental(incrementalScan, 
scanRequest);
+      return incrementalScan.planFiles();
+    } else {
+      // Use regular TableScan for other cases
+      TableScan tableScan = table.newScan();
+      tableScan = applyScanRequest(tableScan, scanRequest);
+      return tableScan.planFiles();
+    }
+  }
+
+  private TableScan applyScanRequest(TableScan tableScan, PlanTableScanRequest 
scanRequest) {

Review Comment:
   is it possible to use `applyScanRequest(Scan scan,  PlanTableScanRequest 
scanRequest)`?  then we didn't need to provide different methods for 
`TableScan` and `IncrementalAppendScan `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to