Copilot commented on code in PR #9050:
URL: https://github.com/apache/gravitino/pull/9050#discussion_r2554586978


##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java:
##########
@@ -192,6 +206,186 @@ private Credential getCredential(LoadTableResponse 
loadTableResponse) {
     return credential;
   }
 
+  /**
+   * Plan table scan and return scan tasks.
+   *
+   * <p>This method performs server-side scan planning to optimize query 
performance by reducing
+   * client-side metadata loading and enabling parallel task execution.
+   *
+   * <p>Implementation uses synchronous scan planning (COMPLETED status) where 
tasks are returned
+   * immediately as serialized JSON strings. This is different from 
asynchronous mode (SUBMITTED
+   * status) where a plan ID is returned for later retrieval.
+   *
+   * <p>Referenced from Iceberg PR #13400 for scan planning implementation.
+   *
+   * @param tableIdentifier The table identifier.
+   * @param scanRequest The scan request parameters including filters, 
projections, snapshot-id,
+   *     etc.
+   * @return PlanTableScanResponse with status=COMPLETED and serialized 
planTasks.
+   * @throws IllegalArgumentException if scan request validation fails
+   * @throws org.apache.gravitino.exceptions.NoSuchTableException if table 
doesn't exist
+   * @throws RuntimeException for other scan planning failures
+   */
+  public PlanTableScanResponse planTableScan(
+      TableIdentifier tableIdentifier, PlanTableScanRequest scanRequest) {
+
+    LOG.debug(
+        "Planning scan for table: {}, snapshotId: {}, select: {}, 
caseSensitive: {}",
+        tableIdentifier,
+        scanRequest.snapshotId(),
+        scanRequest.select(),
+        scanRequest.caseSensitive());
+
+    try {
+      Table table = catalog.loadTable(tableIdentifier);
+      if (table == null) {
+        throw new NoSuchTableException("Table not found: %s", tableIdentifier);
+      }
+
+      TableScan tableScan = table.newScan();
+      tableScan = applyScanRequest(tableScan, scanRequest);
+
+      List<String> planTasks = new ArrayList<>();
+      Map<Integer, PartitionSpec> specsById = new HashMap<>();
+      List<org.apache.iceberg.DeleteFile> deleteFiles = new ArrayList<>();
+
+      try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+        for (FileScanTask fileScanTask : fileScanTasks) {
+          try {
+            String taskString = ScanTaskParser.toJson(fileScanTask);
+            planTasks.add(taskString);
+
+            int specId = fileScanTask.spec().specId();
+            if (!specsById.containsKey(specId)) {
+              specsById.put(specId, fileScanTask.spec());
+            }
+
+            if (!fileScanTask.deletes().isEmpty()) {
+              deleteFiles.addAll(fileScanTask.deletes());
+            }
+          } catch (Exception e) {
+            LOG.warn(
+                "Failed to serialize scan task for table: {}, skipping task. 
Error: {}",
+                tableIdentifier,
+                e.getMessage());

Review Comment:
   Silently skipping failed scan task serialization can lead to incomplete scan 
results and incorrect query results. When a scan task fails to serialize, the 
method continues processing other tasks and returns a partial scan plan without 
any indication that tasks were skipped. This could cause data to be missed 
during query execution. Consider either throwing an exception or accumulating 
warnings and returning them in the response.
   ```suggestion
               throw new RuntimeException(
                   String.format("Failed to serialize scan task for table: %s. 
Error: %s", tableIdentifier, e.getMessage()), e);
   ```



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java:
##########
@@ -192,6 +206,186 @@ private Credential getCredential(LoadTableResponse 
loadTableResponse) {
     return credential;
   }
 
+  /**
+   * Plan table scan and return scan tasks.
+   *
+   * <p>This method performs server-side scan planning to optimize query 
performance by reducing
+   * client-side metadata loading and enabling parallel task execution.
+   *
+   * <p>Implementation uses synchronous scan planning (COMPLETED status) where 
tasks are returned
+   * immediately as serialized JSON strings. This is different from 
asynchronous mode (SUBMITTED
+   * status) where a plan ID is returned for later retrieval.
+   *
+   * <p>Referenced from Iceberg PR #13400 for scan planning implementation.
+   *
+   * @param tableIdentifier The table identifier.
+   * @param scanRequest The scan request parameters including filters, 
projections, snapshot-id,
+   *     etc.
+   * @return PlanTableScanResponse with status=COMPLETED and serialized 
planTasks.
+   * @throws IllegalArgumentException if scan request validation fails
+   * @throws org.apache.gravitino.exceptions.NoSuchTableException if table 
doesn't exist
+   * @throws RuntimeException for other scan planning failures
+   */
+  public PlanTableScanResponse planTableScan(
+      TableIdentifier tableIdentifier, PlanTableScanRequest scanRequest) {
+
+    LOG.debug(
+        "Planning scan for table: {}, snapshotId: {}, select: {}, 
caseSensitive: {}",
+        tableIdentifier,
+        scanRequest.snapshotId(),
+        scanRequest.select(),
+        scanRequest.caseSensitive());
+
+    try {
+      Table table = catalog.loadTable(tableIdentifier);
+      if (table == null) {
+        throw new NoSuchTableException("Table not found: %s", tableIdentifier);
+      }

Review Comment:
   The null check for `table` after `catalog.loadTable()` is unnecessary. 
According to the Iceberg API contract, `catalog.loadTable()` throws 
`NoSuchTableException` if the table doesn't exist rather than returning null. 
This check can be removed to simplify the code.
   ```suggestion
   
   ```



##########
docs/iceberg-rest-service.md:
##########
@@ -31,6 +30,7 @@ There are some key difference between Gravitino Iceberg REST 
server and Gravitin
 - Supports OAuth2 and HTTPS.
 - Provides a pluggable metrics store interface to store and delete Iceberg 
metrics.
 - Supports table metadata cache.
+- Supports scan plan cache.

Review Comment:
   The documentation mentions "Supports scan plan cache" but no scan plan 
caching implementation is present in this PR. The PR implements synchronous 
scan planning that returns scan tasks immediately (COMPLETED status) without 
any caching mechanism. This documentation should be removed or clarified that 
it's a planned future feature.
   ```suggestion
   - Scan plan cache is a planned future feature.
   ```



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java:
##########
@@ -55,4 +76,30 @@ public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest
             .build();
     return loadTableResponse;
   }
+
+  private boolean shouldGeneratePlanTasksData(CreateTableRequest request) {
+    if (request.properties() == null) {
+      return false;
+    }
+    return Boolean.parseBoolean(
+        request.properties().getOrDefault(GENERATE_PLAN_TASKS_DATA_PROP, 
Boolean.FALSE.toString()));
+  }
+
+  private void appendSampleData(Namespace namespace, String tableName) {
+    try {
+      Table table = catalog.loadTable(TableIdentifier.of(namespace, 
tableName));
+      Path tempFile = Files.createTempFile("plan-scan", ".parquet");

Review Comment:
   The temporary file created by `Files.createTempFile()` is never deleted, 
leading to a resource leak in tests. The file should be deleted after use, or 
at least deleted on JVM exit using `tempFile.toFile().deleteOnExit()`.
   ```suggestion
         Path tempFile = Files.createTempFile("plan-scan", ".parquet");
         tempFile.toFile().deleteOnExit();
   ```



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java:
##########
@@ -192,6 +206,186 @@ private Credential getCredential(LoadTableResponse 
loadTableResponse) {
     return credential;
   }
 
+  /**
+   * Plan table scan and return scan tasks.
+   *
+   * <p>This method performs server-side scan planning to optimize query 
performance by reducing
+   * client-side metadata loading and enabling parallel task execution.
+   *
+   * <p>Implementation uses synchronous scan planning (COMPLETED status) where 
tasks are returned
+   * immediately as serialized JSON strings. This is different from 
asynchronous mode (SUBMITTED
+   * status) where a plan ID is returned for later retrieval.
+   *
+   * <p>Referenced from Iceberg PR #13400 for scan planning implementation.
+   *
+   * @param tableIdentifier The table identifier.
+   * @param scanRequest The scan request parameters including filters, 
projections, snapshot-id,
+   *     etc.
+   * @return PlanTableScanResponse with status=COMPLETED and serialized 
planTasks.
+   * @throws IllegalArgumentException if scan request validation fails
+   * @throws org.apache.gravitino.exceptions.NoSuchTableException if table 
doesn't exist
+   * @throws RuntimeException for other scan planning failures
+   */
+  public PlanTableScanResponse planTableScan(
+      TableIdentifier tableIdentifier, PlanTableScanRequest scanRequest) {
+
+    LOG.debug(
+        "Planning scan for table: {}, snapshotId: {}, select: {}, 
caseSensitive: {}",
+        tableIdentifier,
+        scanRequest.snapshotId(),
+        scanRequest.select(),
+        scanRequest.caseSensitive());
+
+    try {
+      Table table = catalog.loadTable(tableIdentifier);
+      if (table == null) {
+        throw new NoSuchTableException("Table not found: %s", tableIdentifier);
+      }
+
+      TableScan tableScan = table.newScan();
+      tableScan = applyScanRequest(tableScan, scanRequest);
+
+      List<String> planTasks = new ArrayList<>();
+      Map<Integer, PartitionSpec> specsById = new HashMap<>();
+      List<org.apache.iceberg.DeleteFile> deleteFiles = new ArrayList<>();
+
+      try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+        for (FileScanTask fileScanTask : fileScanTasks) {
+          try {
+            String taskString = ScanTaskParser.toJson(fileScanTask);
+            planTasks.add(taskString);
+
+            int specId = fileScanTask.spec().specId();
+            if (!specsById.containsKey(specId)) {
+              specsById.put(specId, fileScanTask.spec());
+            }
+
+            if (!fileScanTask.deletes().isEmpty()) {
+              deleteFiles.addAll(fileScanTask.deletes());
+            }
+          } catch (Exception e) {
+            LOG.warn(
+                "Failed to serialize scan task for table: {}, skipping task. 
Error: {}",
+                tableIdentifier,
+                e.getMessage());
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("Failed to close scan task iterator for table: {}", 
tableIdentifier, e);
+        throw new RuntimeException("Failed to plan scan tasks: " + 
e.getMessage(), e);
+      }
+
+      List<DeleteFile> uniqueDeleteFiles =
+          
deleteFiles.stream().distinct().collect(java.util.stream.Collectors.toList());
+
+      if (planTasks.isEmpty()) {
+        LOG.info(
+            "Scan planning returned no tasks for table: {}. Table may be empty 
or fully filtered.",
+            tableIdentifier);
+      }
+
+      if (!planTasks.isEmpty() && specsById.isEmpty()) {
+        LOG.error(
+            "Internal error: planTasks is not empty ({} tasks) but specsById 
is empty for table: {}",
+            planTasks.size(),
+            tableIdentifier);
+        throw new IllegalStateException("Scan planning produced tasks but no 
partition specs");
+      }
+

Review Comment:
   The validation at lines 287-293 for empty `specsById` when `planTasks` is 
not empty is problematic. This condition can occur when all scan task 
serializations fail (caught at line 266-271), meaning tasks exist but none were 
successfully added to `planTasks`. However, in this scenario, both collections 
would be empty, not just `specsById`. If any task serialization succeeds, its 
spec is added to `specsById`. The check should verify that if `planTasks` is 
not empty after *any* task serialization succeeds, then `specsById` must also 
not be empty - but this is always true. Consider removing this check or 
validating that at least one task serialization succeeded if the scan produced 
tasks.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to