This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 98867e2480 [ #9047] feat(iceberg):Support submit table scan plan 
(#9050)
98867e2480 is described below

commit 98867e24807022ce1f6fc2db61f2bfd45584ddae
Author: Xiaojian Sun <[email protected]>
AuthorDate: Thu Nov 27 14:46:09 2025 +0800

    [ #9047] feat(iceberg):Support submit table scan plan (#9050)
    
    ### What changes were proposed in this pull request?
    
    Support submit table scan plan.
    
    ### Why are the changes needed?
    
    Fix: #([9047](https://github.com/apache/gravitino/issues/9047))
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A
    
    ### How was this patch tested?
---
 .../listener/api/event/OperationType.java          |   1 +
 docs/iceberg-rest-service.md                       |   1 -
 .../iceberg/service/CatalogWrapperForREST.java     | 220 +++++++++++++++++++++
 .../dispatcher/IcebergTableEventDispatcher.java    |  37 ++++
 .../dispatcher/IcebergTableHookDispatcher.java     |  23 +++
 .../IcebergTableOperationDispatcher.java           |  20 ++
 .../dispatcher/IcebergTableOperationExecutor.java  |  12 ++
 .../service/rest/IcebergConfigOperations.java      |   1 +
 .../service/rest/IcebergTableOperations.java       |  48 +++++
 .../api/event/IcebergPlanTableScanEvent.java       |  37 ++++
 .../event/IcebergPlanTableScanFailureEvent.java    |  37 ++++
 .../api/event/IcebergPlanTableScanPreEvent.java    |  37 ++++
 .../service/rest/CatalogWrapperForTest.java        |  51 +++++
 .../service/rest/TestIcebergTableOperations.java   | 188 +++++++++++++++++-
 14 files changed, 709 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/listener/api/event/OperationType.java 
b/core/src/main/java/org/apache/gravitino/listener/api/event/OperationType.java
index ea19ccaffc..dd1fa67e3c 100644
--- 
a/core/src/main/java/org/apache/gravitino/listener/api/event/OperationType.java
+++ 
b/core/src/main/java/org/apache/gravitino/listener/api/event/OperationType.java
@@ -26,6 +26,7 @@ public enum OperationType {
   PURGE_TABLE,
   LOAD_TABLE,
   LOAD_TABLE_CREDENTIAL,
+  PLAN_TABLE_SCAN,
   LIST_TABLE,
   ALTER_TABLE,
   RENAME_TABLE,
diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md
index 835989a407..c809faef36 100644
--- a/docs/iceberg-rest-service.md
+++ b/docs/iceberg-rest-service.md
@@ -22,7 +22,6 @@ There are some key difference between Gravitino Iceberg REST 
server and Gravitin
 - Supports the Apache Iceberg REST API defined in Iceberg 1.10, and supports 
all namespace and table interfaces. The following interfaces are not 
implemented yet:
   - multi table transaction
   - pagination
-  - scan planning
 - Works as a catalog proxy, supporting `Hive` and `JDBC` as catalog backend.
 - Supports credential vending for `S3`、`GCS`、`OSS` and `ADLS`.
 - Supports different storages like `S3`, `HDFS`, `OSS`, `GCS`, `ADLS` and 
provides the capability to support other storages.
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
index c62ad9f9c9..b24303a453 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
@@ -22,8 +22,11 @@ package org.apache.gravitino.iceberg.service;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Stream;
@@ -40,15 +43,27 @@ import 
org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
 import org.apache.gravitino.storage.GCSProperties;
 import org.apache.gravitino.utils.MapUtils;
 import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IncrementalAppendScan;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Scan;
+import org.apache.iceberg.ScanTaskParser;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.ServiceUnavailableException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.rest.PlanStatus;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
 import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
 
 /** Process Iceberg REST specific operations, like credential vending. */
 public class CatalogWrapperForREST extends IcebergCatalogWrapper {
@@ -207,6 +222,211 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
     return credential;
   }
 
+  /**
+   * Plan table scan and return scan tasks.
+   *
+   * <p>This method performs server-side scan planning to optimize query 
performance by reducing
+   * client-side metadata loading and enabling parallel task execution.
+   *
+   * <p>Implementation uses synchronous scan planning (COMPLETED status) where 
tasks are returned
+   * immediately as serialized JSON strings. This is different from 
asynchronous mode (SUBMITTED
+   * status) where a plan ID is returned for later retrieval.
+   *
+   * <p>Referenced from Iceberg PR #13400 for scan planning implementation.
+   *
+   * @param tableIdentifier The table identifier.
+   * @param scanRequest The scan request parameters including filters, 
projections, snapshot-id,
+   *     etc.
+   * @return PlanTableScanResponse with status=COMPLETED and serialized 
planTasks.
+   * @throws IllegalArgumentException if scan request validation fails
+   * @throws org.apache.gravitino.exceptions.NoSuchTableException if table 
doesn't exist
+   * @throws RuntimeException for other scan planning failures
+   */
+  public PlanTableScanResponse planTableScan(
+      TableIdentifier tableIdentifier, PlanTableScanRequest scanRequest) {
+
+    LOG.debug(
+        "Planning scan for table: {}, snapshotId: {}, startSnapshotId: {}, 
endSnapshotId: {}, select: {}, caseSensitive: {}",
+        tableIdentifier,
+        scanRequest.snapshotId(),
+        scanRequest.startSnapshotId(),
+        scanRequest.endSnapshotId(),
+        scanRequest.select(),
+        scanRequest.caseSensitive());
+
+    try {
+      Table table = catalog.loadTable(tableIdentifier);
+      CloseableIterable<FileScanTask> fileScanTasks =
+          createFilePlanScanTasks(table, tableIdentifier, scanRequest);
+
+      List<String> planTasks = new ArrayList<>();
+      Map<Integer, PartitionSpec> specsById = new HashMap<>();
+      List<org.apache.iceberg.DeleteFile> deleteFiles = new ArrayList<>();
+
+      try (fileScanTasks) {
+        for (FileScanTask fileScanTask : fileScanTasks) {
+          try {
+            String taskString = ScanTaskParser.toJson(fileScanTask);
+            planTasks.add(taskString);
+
+            int specId = fileScanTask.spec().specId();
+            if (!specsById.containsKey(specId)) {
+              specsById.put(specId, fileScanTask.spec());
+            }
+
+            if (!fileScanTask.deletes().isEmpty()) {
+              deleteFiles.addAll(fileScanTask.deletes());
+            }
+          } catch (Exception e) {
+            throw new RuntimeException(
+                String.format(
+                    "Failed to serialize scan task for table: %s. Error: %s",
+                    tableIdentifier, e.getMessage()),
+                e);
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("Failed to close scan task iterator for table: {}", 
tableIdentifier, e);
+        throw new RuntimeException("Failed to plan scan tasks: " + 
e.getMessage(), e);
+      }
+
+      List<DeleteFile> uniqueDeleteFiles =
+          
deleteFiles.stream().distinct().collect(java.util.stream.Collectors.toList());
+
+      if (planTasks.isEmpty()) {
+        LOG.info(
+            "Scan planning returned no tasks for table: {}. Table may be empty 
or fully filtered.",
+            tableIdentifier);
+      }
+
+      PlanTableScanResponse.Builder responseBuilder =
+          PlanTableScanResponse.builder()
+              .withPlanStatus(PlanStatus.COMPLETED)
+              .withPlanTasks(planTasks)
+              .withSpecsById(specsById);
+
+      if (!uniqueDeleteFiles.isEmpty()) {
+        responseBuilder.withDeleteFiles(uniqueDeleteFiles);
+        LOG.debug(
+            "Included {} delete files in scan plan for table: {}",
+            uniqueDeleteFiles.size(),
+            tableIdentifier);
+      }
+
+      return responseBuilder.build();
+
+    } catch (IllegalArgumentException e) {
+      LOG.error("Invalid scan request for table {}: {}", tableIdentifier, 
e.getMessage());
+      throw new IllegalArgumentException("Invalid scan parameters: " + 
e.getMessage(), e);
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      LOG.error("Table not found during scan planning: {}", tableIdentifier);
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Unexpected error during scan planning for table: {}", 
tableIdentifier, e);
+      throw new RuntimeException(
+          "Scan planning failed for table " + tableIdentifier + ": " + 
e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Create and plan a scan based on the scan request.
+   *
+   * <p>If both start and end snapshot IDs are provided, uses 
IncrementalAppendScan. Otherwise, uses
+   * regular TableScan.
+   *
+   * @param table The table to scan
+   * @param tableIdentifier The table identifier for logging
+   * @param scanRequest The scan request parameters
+   * @return CloseableIterable of FileScanTask
+   */
+  private CloseableIterable<FileScanTask> createFilePlanScanTasks(
+      Table table, TableIdentifier tableIdentifier, PlanTableScanRequest 
scanRequest) {
+    Long startSnapshotId = scanRequest.startSnapshotId();
+    Long endSnapshotId = scanRequest.endSnapshotId();
+    // Use IncrementalAppendScan if both start and end snapshot IDs are 
provided
+    if (startSnapshotId != null && endSnapshotId != null) {
+      if (startSnapshotId >= endSnapshotId) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Invalid snapshot range: startSnapshotId (%d) must be less 
than endSnapshotId (%d)",
+                startSnapshotId, endSnapshotId));
+      }
+      LOG.debug(
+          "Using IncrementalAppendScan for table: {}, from snapshot: {} to 
snapshot: {}",
+          tableIdentifier,
+          startSnapshotId,
+          endSnapshotId);
+      IncrementalAppendScan incrementalScan =
+          table
+              .newIncrementalAppendScan()
+              .fromSnapshotInclusive(startSnapshotId)
+              .toSnapshot(endSnapshotId);
+      incrementalScan = applyScanRequest(incrementalScan, scanRequest);
+      return incrementalScan.planFiles();
+    } else {
+      TableScan tableScan = table.newScan();
+      if (scanRequest.snapshotId() != null && scanRequest.snapshotId() != 0L) {
+        tableScan = tableScan.useSnapshot(scanRequest.snapshotId());
+        LOG.debug("Applied snapshot filter: snapshot-id={}", 
scanRequest.snapshotId());
+      }
+      tableScan = applyScanRequest(tableScan, scanRequest);
+      return tableScan.planFiles();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T extends Scan> T applyScanRequest(T scan, PlanTableScanRequest 
scanRequest) {
+    scan = (T) scan.caseSensitive(scanRequest.caseSensitive());
+    LOG.debug("Applied case-sensitive: {}", scanRequest.caseSensitive());
+    scan = applyScanFilter(scan, scanRequest);
+    scan = applyScanSelect(scan, scanRequest);
+    scan = applyScanStatsFields(scan, scanRequest);
+
+    return scan;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T extends Scan> T applyScanFilter(T scan, PlanTableScanRequest 
scanRequest) {
+    if (scanRequest.filter() != null) {
+      try {
+        scan = (T) scan.filter(scanRequest.filter());
+        LOG.debug("Applied filter expression: {}", scanRequest.filter());
+      } catch (Exception e) {
+        LOG.error("Failed to apply filter expression: {}", e.getMessage(), e);
+        throw new IllegalArgumentException("Invalid filter expression: " + 
e.getMessage(), e);
+      }
+    }
+    return scan;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T extends Scan> T applyScanSelect(T scan, PlanTableScanRequest 
scanRequest) {
+    if (scanRequest.select() != null && !scanRequest.select().isEmpty()) {
+      try {
+        scan = (T) scan.select(scanRequest.select());
+        LOG.debug("Applied column projection: {}", scanRequest.select());
+      } catch (Exception e) {
+        LOG.error("Failed to apply column projection: {}", e.getMessage(), e);
+        throw new IllegalArgumentException("Invalid column selection: " + 
e.getMessage(), e);
+      }
+    }
+    return scan;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T extends Scan> T applyScanStatsFields(T scan, PlanTableScanRequest 
scanRequest) {
+    if (scanRequest.statsFields() != null && 
!scanRequest.statsFields().isEmpty()) {
+      try {
+        scan = (T) scan.includeColumnStats(scanRequest.statsFields());
+        LOG.debug("Applied statistics fields: {}", scanRequest.statsFields());
+      } catch (Exception e) {
+        LOG.error("Failed to apply statistics fields: {}", e.getMessage(), e);
+        throw new IllegalArgumentException("Invalid statistics fields: " + 
e.getMessage(), e);
+      }
+    }
+    return scan;
+  }
+
   @VisibleForTesting
   static Map<String, String> checkForCompatibility(
       Map<String, String> properties, Map<String, String> 
deprecatedProperties) {
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
index eaa0440140..e3160ff92a 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
@@ -39,6 +39,9 @@ import 
org.apache.gravitino.listener.api.event.IcebergLoadTableCredentialPreEven
 import org.apache.gravitino.listener.api.event.IcebergLoadTableEvent;
 import org.apache.gravitino.listener.api.event.IcebergLoadTableFailureEvent;
 import org.apache.gravitino.listener.api.event.IcebergLoadTablePreEvent;
+import org.apache.gravitino.listener.api.event.IcebergPlanTableScanEvent;
+import 
org.apache.gravitino.listener.api.event.IcebergPlanTableScanFailureEvent;
+import org.apache.gravitino.listener.api.event.IcebergPlanTableScanPreEvent;
 import org.apache.gravitino.listener.api.event.IcebergRenameTableEvent;
 import org.apache.gravitino.listener.api.event.IcebergRenameTableFailureEvent;
 import org.apache.gravitino.listener.api.event.IcebergRenameTablePreEvent;
@@ -52,11 +55,13 @@ import 
org.apache.gravitino.listener.api.event.IcebergUpdateTablePreEvent;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
 import org.apache.iceberg.rest.requests.RenameTableRequest;
 import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
 
 /**
  * {@code IcebergTableEventDispatcher} is a decorator for {@link 
IcebergTableOperationExecutor} that
@@ -263,4 +268,36 @@ public class IcebergTableEventDispatcher implements 
IcebergTableOperationDispatc
     eventBus.dispatchEvent(new IcebergLoadTableCredentialEvent(context, 
gravitinoNameIdentifier));
     return loadCredentialsResponse;
   }
+
+  /**
+   * Plan table scan and return scan tasks.
+   *
+   * <p>Dispatches pre/post events for table scan planning operation.
+   *
+   * @param context Iceberg REST request context information.
+   * @param tableIdentifier The Iceberg table identifier.
+   * @param scanRequest The scan request parameters
+   * @return A PlanTableScanResponse containing the scan plan with plan-id and 
tasks
+   */
+  @Override
+  public PlanTableScanResponse planTableScan(
+      IcebergRequestContext context,
+      TableIdentifier tableIdentifier,
+      PlanTableScanRequest scanRequest) {
+    NameIdentifier gravitinoNameIdentifier =
+        IcebergRESTUtils.getGravitinoNameIdentifier(
+            metalakeName, context.catalogName(), tableIdentifier);
+    eventBus.dispatchEvent(new IcebergPlanTableScanPreEvent(context, 
gravitinoNameIdentifier));
+    PlanTableScanResponse planTableScanResponse;
+    try {
+      planTableScanResponse =
+          icebergTableOperationDispatcher.planTableScan(context, 
tableIdentifier, scanRequest);
+    } catch (Exception e) {
+      eventBus.dispatchEvent(
+          new IcebergPlanTableScanFailureEvent(context, 
gravitinoNameIdentifier, e));
+      throw e;
+    }
+    eventBus.dispatchEvent(new IcebergPlanTableScanEvent(context, 
gravitinoNameIdentifier));
+    return planTableScanResponse;
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
index 5bdd7aedb8..0f498db38d 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
@@ -35,11 +35,13 @@ import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
 import org.apache.iceberg.rest.requests.RenameTableRequest;
 import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
 
 public class IcebergTableHookDispatcher implements 
IcebergTableOperationDispatcher {
 
@@ -156,6 +158,27 @@ public class IcebergTableHookDispatcher implements 
IcebergTableOperationDispatch
     return dispatcher.getTableCredentials(context, tableIdentifier);
   }
 
+  /**
+   * Plan table scan and return scan tasks.
+   *
+   * <p>This method performs server-side scan planning to optimize query 
performance by reducing
+   * client-side metadata loading and enabling parallel task execution.
+   *
+   * @param context Iceberg REST request context information.
+   * @param tableIdentifier The Iceberg table identifier.
+   * @param scanRequest The scan request parameters including filters, 
projections, snapshot-id,
+   *     etc.
+   * @return A PlanTableScanResponse containing the scan plan with plan-id and 
tasks. The response
+   *     format follows Iceberg REST API specification.
+   */
+  @Override
+  public PlanTableScanResponse planTableScan(
+      IcebergRequestContext context,
+      TableIdentifier tableIdentifier,
+      PlanTableScanRequest scanRequest) {
+    return dispatcher.planTableScan(context, tableIdentifier, scanRequest);
+  }
+
   private void importTable(String catalogName, Namespace namespace, String 
tableName) {
     TableDispatcher tableDispatcher = 
GravitinoEnv.getInstance().tableDispatcher();
     if (tableDispatcher != null) {
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
index 75762b42e8..0c4d08a6f0 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
@@ -23,11 +23,13 @@ import 
org.apache.gravitino.listener.api.event.IcebergRequestContext;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
 import org.apache.iceberg.rest.requests.RenameTableRequest;
 import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
 
 /**
  * The {@code IcebergTableOperationDispatcher} interface defines the public 
API for managing Iceberg
@@ -113,4 +115,22 @@ public interface IcebergTableOperationDispatcher {
    */
   LoadCredentialsResponse getTableCredentials(
       IcebergRequestContext context, TableIdentifier tableIdentifier);
+
+  /**
+   * Plan table scan and return scan tasks.
+   *
+   * <p>This method performs server-side scan planning to optimize query 
performance by reducing
+   * client-side metadata loading and enabling parallel task execution.
+   *
+   * @param context Iceberg REST request context information.
+   * @param tableIdentifier The Iceberg table identifier.
+   * @param scanRequest The scan request parameters including filters, 
projections, snapshot-id,
+   *     etc.
+   * @return A PlanTableScanResponse containing the scan plan with plan-id and 
tasks. The response
+   *     format follows Iceberg REST API specification.
+   */
+  PlanTableScanResponse planTableScan(
+      IcebergRequestContext context,
+      TableIdentifier tableIdentifier,
+      PlanTableScanRequest scanRequest);
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
index 1c34f259ff..6d23c89360 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
@@ -35,11 +35,13 @@ import 
org.apache.gravitino.server.authorization.expression.AuthorizationExpress
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
 import org.apache.iceberg.rest.requests.RenameTableRequest;
 import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -168,4 +170,14 @@ public class IcebergTableOperationExecutor implements 
IcebergTableOperationDispa
 
     return writable ? CredentialPrivilege.WRITE : CredentialPrivilege.READ;
   }
+
+  @Override
+  public PlanTableScanResponse planTableScan(
+      IcebergRequestContext context,
+      TableIdentifier tableIdentifier,
+      PlanTableScanRequest scanRequest) {
+    return icebergCatalogWrapperManager
+        .getCatalogWrapper(context.catalogName())
+        .planTableScan(tableIdentifier, scanRequest);
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
index 7925d54008..8d596ae84b 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
@@ -75,6 +75,7 @@ public class IcebergConfigOperations {
           .add(Endpoint.V1_REGISTER_TABLE)
           .add(Endpoint.V1_REPORT_METRICS)
           .add(Endpoint.V1_TABLE_CREDENTIALS)
+          .add(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN)
           .build();
 
   private static final List<Endpoint> DEFAULT_VIEW_ENDPOINTS =
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
index cc0db74147..53f7b73f01 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
@@ -66,11 +66,13 @@ import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.rest.RESTUtil;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
 import org.apache.iceberg.rest.requests.ReportMetricsRequest;
 import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -442,6 +444,52 @@ public class IcebergTableOperations {
     }
   }
 
+  /**
+   * Plan table scan endpoint. Allows clients to request a scan plan from the 
server to optimize
+   * scan planning by leveraging server-side resources.
+   *
+   * @param prefix The catalog prefix
+   * @param namespace The namespace
+   * @param table The table name
+   * @param scanRequest The scan request containing filters, projections, etc.
+   * @return Response containing the scan plan with tasks
+   */
+  @POST
+  @Path("{table}/scan")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Consumes(MediaType.APPLICATION_JSON)
+  @Timed(name = "plan-table-scan." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "plan-table-scan", absolute = true)
+  public Response planTableScan(
+      @PathParam("prefix") String prefix,
+      @Encoded() @PathParam("namespace") String namespace,
+      @PathParam("table") String table,
+      PlanTableScanRequest scanRequest) {
+    String catalogName = IcebergRESTUtils.getCatalogName(prefix);
+    Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
+
+    LOG.info(
+        "Plan table scan, catalog: {}, namespace: {}, table: {}", catalogName, 
icebergNS, table);
+
+    try {
+      return Utils.doAs(
+          httpRequest,
+          () -> {
+            TableIdentifier tableIdentifier = TableIdentifier.of(icebergNS, 
table);
+            IcebergRequestContext context =
+                new IcebergRequestContext(httpServletRequest(), catalogName);
+
+            PlanTableScanResponse scanResponse =
+                tableOperationDispatcher.planTableScan(context, 
tableIdentifier, scanRequest);
+
+            return IcebergRESTUtils.ok(scanResponse);
+          });
+    } catch (Exception e) {
+      LOG.error("Failed to plan table scan: {}", e.getMessage(), e);
+      return IcebergExceptionMapper.toRESTResponse(e);
+    }
+  }
+
   private boolean isCredentialVending(String accessDelegation) {
     if (StringUtils.isBlank(accessDelegation)) {
       return false;
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanEvent.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanEvent.java
new file mode 100644
index 0000000000..510aa71505
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanEvent.java
@@ -0,0 +1,37 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represent an event after planning Iceberg table scan successfully. */
+@DeveloperApi
+public class IcebergPlanTableScanEvent extends IcebergTableEvent {
+  public IcebergPlanTableScanEvent(
+      IcebergRequestContext icebergRequestContext, NameIdentifier 
resourceIdentifier) {
+    super(icebergRequestContext, resourceIdentifier);
+  }
+
+  @Override
+  public OperationType operationType() {
+    return OperationType.PLAN_TABLE_SCAN;
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanFailureEvent.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanFailureEvent.java
new file mode 100644
index 0000000000..793289f6d0
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanFailureEvent.java
@@ -0,0 +1,37 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represent a failure event when planning Iceberg table scan failed. */
+@DeveloperApi
+public class IcebergPlanTableScanFailureEvent extends IcebergTableFailureEvent 
{
+  public IcebergPlanTableScanFailureEvent(
+      IcebergRequestContext icebergRequestContext, NameIdentifier 
nameIdentifier, Exception e) {
+    super(icebergRequestContext, nameIdentifier, e);
+  }
+
+  @Override
+  public OperationType operationType() {
+    return OperationType.PLAN_TABLE_SCAN;
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanPreEvent.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanPreEvent.java
new file mode 100644
index 0000000000..45075f41f0
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanPreEvent.java
@@ -0,0 +1,37 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represent a pre event before planning Iceberg table scan. */
+@DeveloperApi
+public class IcebergPlanTableScanPreEvent extends IcebergTablePreEvent {
+  public IcebergPlanTableScanPreEvent(
+      IcebergRequestContext icebergRequestContext, NameIdentifier 
tableIdentifier) {
+    super(icebergRequestContext, tableIdentifier);
+  }
+
+  @Override
+  public OperationType operationType() {
+    return OperationType.PLAN_TABLE_SCAN;
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
index 1cf41af796..64834e408c 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
@@ -18,13 +18,22 @@
  */
 package org.apache.gravitino.iceberg.service.rest;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
 import org.apache.gravitino.iceberg.service.CatalogWrapperForREST;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
 import org.apache.iceberg.rest.requests.RegisterTableRequest;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
 import org.apache.iceberg.types.Types.NestedField;
@@ -34,10 +43,22 @@ import 
org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 // Used to override registerTable
 @SuppressWarnings("deprecation")
 public class CatalogWrapperForTest extends CatalogWrapperForREST {
+  public static final String GENERATE_PLAN_TASKS_DATA_PROP = 
"test.generate-plan-data";
+
   public CatalogWrapperForTest(String catalogName, IcebergConfig 
icebergConfig) {
     super(catalogName, icebergConfig);
   }
 
+  @Override
+  public LoadTableResponse createTable(
+      Namespace namespace, CreateTableRequest request, boolean 
requestCredential) {
+    LoadTableResponse loadTableResponse = super.createTable(namespace, 
request, requestCredential);
+    if (shouldGeneratePlanTasksData(request)) {
+      appendSampleData(namespace, request.name());
+    }
+    return loadTableResponse;
+  }
+
   @Override
   public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest request) {
     if (request.name().contains("fail")) {
@@ -55,4 +76,34 @@ public class CatalogWrapperForTest extends 
CatalogWrapperForREST {
             .build();
     return loadTableResponse;
   }
+
+  private boolean shouldGeneratePlanTasksData(CreateTableRequest request) {
+    if (request.properties() == null) {
+      return false;
+    }
+    return Boolean.parseBoolean(
+        request.properties().getOrDefault(GENERATE_PLAN_TASKS_DATA_PROP, 
Boolean.FALSE.toString()));
+  }
+
+  private void appendSampleData(Namespace namespace, String tableName) {
+    try {
+      Table table = catalog.loadTable(TableIdentifier.of(namespace, 
tableName));
+      // Append multiple times to create multiple snapshots for incremental 
scan testing
+      for (int i = 0; i < 3; i++) {
+        Path tempFile = Files.createTempFile("plan-scan-" + i, ".parquet");
+        tempFile.toFile().deleteOnExit();
+        DataFile dataFile =
+            DataFiles.builder(table.spec())
+                .withPath(tempFile.toUri().toString())
+                .withFormat(FileFormat.PARQUET)
+                .withRecordCount(1)
+                .withFileSizeInBytes(0L)
+                .build();
+        table.newFastAppend().appendFile(dataFile).commit();
+      }
+      super.loadTable(TableIdentifier.of(namespace, tableName));
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to append sample data for test table 
" + tableName, e);
+    }
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
index 94af167579..848d78f451 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
@@ -19,6 +19,8 @@
 
 package org.apache.gravitino.iceberg.service.rest;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.util.Arrays;
 import java.util.List;
@@ -27,6 +29,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
 import javax.ws.rs.core.Application;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -46,6 +49,9 @@ import 
org.apache.gravitino.listener.api.event.IcebergListTablePreEvent;
 import org.apache.gravitino.listener.api.event.IcebergLoadTableEvent;
 import org.apache.gravitino.listener.api.event.IcebergLoadTableFailureEvent;
 import org.apache.gravitino.listener.api.event.IcebergLoadTablePreEvent;
+import org.apache.gravitino.listener.api.event.IcebergPlanTableScanEvent;
+import 
org.apache.gravitino.listener.api.event.IcebergPlanTableScanFailureEvent;
+import org.apache.gravitino.listener.api.event.IcebergPlanTableScanPreEvent;
 import org.apache.gravitino.listener.api.event.IcebergRenameTableEvent;
 import org.apache.gravitino.listener.api.event.IcebergRenameTableFailureEvent;
 import org.apache.gravitino.listener.api.event.IcebergRenameTablePreEvent;
@@ -58,6 +64,7 @@ import org.apache.gravitino.server.ServerConfig;
 import org.apache.gravitino.server.authorization.GravitinoAuthorizerProvider;
 import org.apache.iceberg.MetadataUpdate;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.UpdateRequirement;
 import org.apache.iceberg.UpdateRequirements;
@@ -66,7 +73,9 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.metrics.CommitReport;
 import org.apache.iceberg.metrics.ImmutableCommitMetricsResult;
 import org.apache.iceberg.metrics.ImmutableCommitReport;
+import org.apache.iceberg.rest.PlanStatus;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
 import org.apache.iceberg.rest.requests.RenameTableRequest;
 import org.apache.iceberg.rest.requests.ReportMetricsRequest;
 import org.apache.iceberg.rest.requests.UpdateTableRequest;
@@ -74,6 +83,7 @@ import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.util.JsonUtil;
 import org.glassfish.jersey.internal.inject.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.junit.jupiter.api.Assertions;
@@ -154,6 +164,106 @@ public class TestIcebergTableOperations extends 
IcebergNamespaceTestBase {
     Assertions.assertTrue(dummyEventListener.popPostEvent() instanceof 
IcebergLoadTableEvent);
   }
 
+  @ParameterizedTest
+  
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+  void testPlanTableScan(Namespace namespace) {
+    verifyCreateNamespaceSucc(namespace);
+    verifyCreateTableSucc(namespace, "plan_scan_table", true);
+
+    dummyEventListener.clearEvent();
+    TableMetadata metadata = getTableMeta(namespace, "plan_scan_table");
+    Long snapshotId =
+        metadata.currentSnapshot() == null ? null : 
metadata.currentSnapshot().snapshotId();
+    JsonNode planResponse = verifyPlanTableScanSucc(namespace, 
"plan_scan_table", snapshotId);
+
+    Assertions.assertEquals(
+        PlanStatus.COMPLETED.status(), 
planResponse.get("plan-status").asText());
+    Assertions.assertTrue(planResponse.has("plan-tasks"));
+    Assertions.assertTrue(planResponse.get("plan-tasks").isArray());
+    Assertions.assertTrue(planResponse.get("plan-tasks").size() > 0);
+
+    Assertions.assertTrue(dummyEventListener.popPreEvent() instanceof 
IcebergPlanTableScanPreEvent);
+    Assertions.assertTrue(dummyEventListener.popPostEvent() instanceof 
IcebergPlanTableScanEvent);
+  }
+
+  @ParameterizedTest
+  
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+  void testPlanTableScanTableNotFound(Namespace namespace) {
+    verifyCreateNamespaceSucc(namespace);
+    dummyEventListener.clearEvent();
+
+    verifyPlanTableScanFail(namespace, "missing_table", 404);
+
+    Assertions.assertTrue(dummyEventListener.popPreEvent() instanceof 
IcebergPlanTableScanPreEvent);
+    Assertions.assertTrue(
+        dummyEventListener.popPostEvent() instanceof 
IcebergPlanTableScanFailureEvent);
+  }
+
+  @ParameterizedTest
+  
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+  void testPlanTableScanWithIncrementalAppendScan(Namespace namespace) {
+    verifyCreateNamespaceSucc(namespace);
+    verifyCreateTableSucc(namespace, "incremental_scan_table", true);
+
+    dummyEventListener.clearEvent();
+    TableMetadata metadata = getTableMeta(namespace, "incremental_scan_table");
+    Long currentSnapshotId = metadata.currentSnapshot().snapshotId();
+
+    PlanTableScanRequest invalidRequest =
+        buildPlanTableScanRequestWithRange(currentSnapshotId, 
currentSnapshotId);
+    Response response = doPlanTableScan(namespace, "incremental_scan_table", 
invalidRequest);
+    Assertions.assertEquals(
+        Status.BAD_REQUEST.getStatusCode(),
+        response.getStatus(),
+        "Expected BAD_REQUEST for start == end snapshot IDs");
+  }
+
+  @ParameterizedTest
+  
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+  void testPlanTableScanWithIncrementalAppendScanValidRange(Namespace 
namespace) {
+    verifyCreateNamespaceSucc(namespace);
+    verifyCreateTableSucc(namespace, "incremental_scan_valid_table", true);
+
+    dummyEventListener.clearEvent();
+    TableMetadata metadata = getTableMeta(namespace, 
"incremental_scan_valid_table");
+
+    // generatePlanData=true creates two snapshots: one from table creation, 
one from
+    // appendSampleData
+    List<Snapshot> snapshots = metadata.snapshots();
+    Assertions.assertNotNull(snapshots, "Snapshots should not be null");
+    Assertions.assertTrue(snapshots.size() >= 2, "Should have at least 2 
snapshots");
+
+    // Sort by sequence number to get ordered snapshots
+    List<Snapshot> sortedSnapshots =
+        snapshots.stream()
+            .sorted((s1, s2) -> Long.compare(s1.snapshotId(), s2.snapshotId()))
+            .collect(java.util.stream.Collectors.toList());
+
+    Assertions.assertTrue(
+        sortedSnapshots.size() >= 2,
+        "Should have at least 2 snapshots for incremental scan test, but got: "
+            + sortedSnapshots.size());
+
+    // For IncrementalAppendScan, start snapshot must be an ancestor of end 
snapshot
+    // Use the last snapshot as end, and find its parent from the sorted list
+    Snapshot endSnapshot = sortedSnapshots.get(sortedSnapshots.size() - 1);
+    Long endSnapshotId = endSnapshot.snapshotId();
+    Long startSnapshotId = endSnapshot.parentId();
+
+    Assertions.assertNotNull(
+        startSnapshotId, "Could not find a valid startSnapshotId from 
snapshots");
+    JsonNode planResponse =
+        verifyPlanTableScanSuccWithRange(
+            namespace, "incremental_scan_valid_table", startSnapshotId, 
endSnapshotId);
+    Assertions.assertEquals(
+        PlanStatus.COMPLETED.status(), 
planResponse.get("plan-status").asText());
+    Assertions.assertTrue(planResponse.has("plan-tasks"));
+    Assertions.assertTrue(planResponse.get("plan-tasks").isArray());
+
+    Assertions.assertTrue(dummyEventListener.popPreEvent() instanceof 
IcebergPlanTableScanPreEvent);
+    Assertions.assertTrue(dummyEventListener.popPostEvent() instanceof 
IcebergPlanTableScanEvent);
+  }
+
   @ParameterizedTest
   
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
   void testDropTable(Namespace namespace) {
@@ -341,8 +451,19 @@ public class TestIcebergTableOperations extends 
IcebergNamespaceTestBase {
   }
 
   private Response doCreateTable(Namespace ns, String name) {
-    CreateTableRequest createTableRequest =
-        
CreateTableRequest.builder().withName(name).withSchema(tableSchema).build();
+    return doCreateTable(ns, name, false);
+  }
+
+  private Response doCreateTable(Namespace ns, String name, boolean 
generatePlanData) {
+    CreateTableRequest.Builder builder =
+        CreateTableRequest.builder().withName(name).withSchema(tableSchema);
+    if (generatePlanData) {
+      builder =
+          builder.setProperties(
+              ImmutableMap.of(
+                  CatalogWrapperForTest.GENERATE_PLAN_TASKS_DATA_PROP, 
Boolean.TRUE.toString()));
+    }
+    CreateTableRequest createTableRequest = builder.build();
     return getTableClientBuilder(ns, Optional.empty())
         .post(Entity.entity(createTableRequest, 
MediaType.APPLICATION_JSON_TYPE));
   }
@@ -379,6 +500,11 @@ public class TestIcebergTableOperations extends 
IcebergNamespaceTestBase {
     return getTableClientBuilder(ns, Optional.of(name)).get();
   }
 
+  private Response doPlanTableScan(Namespace ns, String tableName, 
PlanTableScanRequest request) {
+    Invocation.Builder builder = getTableClientBuilder(ns, 
Optional.of(tableName + "/scan"));
+    return builder.post(Entity.entity(request, 
MediaType.APPLICATION_JSON_TYPE));
+  }
+
   private Response doUpdateTable(Namespace ns, String name, TableMetadata 
base) {
     TableMetadata newMetadata = base.updateSchema(newTableSchema);
     List<MetadataUpdate> metadataUpdates = newMetadata.changes();
@@ -431,13 +557,69 @@ public class TestIcebergTableOperations extends 
IcebergNamespaceTestBase {
     Assertions.assertEquals(status, response.getStatus());
   }
 
+  private void verifyPlanTableScanFail(Namespace ns, String tableName, int 
status) {
+    Response response = doPlanTableScan(ns, tableName, 
buildPlanTableScanRequest(null));
+    Assertions.assertEquals(status, response.getStatus());
+  }
+
+  private JsonNode verifyPlanTableScanSucc(Namespace ns, String tableName, 
Long snapshotId) {
+    Response response = doPlanTableScan(ns, tableName, 
buildPlanTableScanRequest(snapshotId));
+    Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    String responseBody = response.readEntity(String.class);
+    try {
+      return JsonUtil.mapper().readTree(responseBody);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to parse plan table scan response", 
e);
+    }
+  }
+
+  private JsonNode verifyPlanTableScanSuccWithRange(
+      Namespace ns, String tableName, Long startSnapshotId, Long 
endSnapshotId) {
+    Response response =
+        doPlanTableScan(
+            ns, tableName, buildPlanTableScanRequestWithRange(startSnapshotId, 
endSnapshotId));
+    Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus());
+    String responseBody = response.readEntity(String.class);
+    try {
+      return JsonUtil.mapper().readTree(responseBody);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to parse plan table scan response", 
e);
+    }
+  }
+
+  private PlanTableScanRequest buildPlanTableScanRequest(Long snapshotId) {
+    PlanTableScanRequest.Builder builder = new PlanTableScanRequest.Builder();
+    if (snapshotId != null) {
+      builder = builder.withSnapshotId(snapshotId);
+    } else {
+      builder = builder.withSnapshotId(0L);
+    }
+    return builder.build();
+  }
+
+  private PlanTableScanRequest buildPlanTableScanRequestWithRange(
+      Long startSnapshotId, Long endSnapshotId) {
+    PlanTableScanRequest.Builder builder = new PlanTableScanRequest.Builder();
+    if (startSnapshotId != null) {
+      builder = builder.withStartSnapshotId(startSnapshotId);
+    }
+    if (endSnapshotId != null) {
+      builder = builder.withEndSnapshotId(endSnapshotId);
+    }
+    return builder.build();
+  }
+
   private void verifyDropTableFail(Namespace ns, String name, int status) {
     Response response = doDropTable(ns, name);
     Assertions.assertEquals(status, response.getStatus());
   }
 
   private void verifyCreateTableSucc(Namespace ns, String name) {
-    Response response = doCreateTable(ns, name);
+    verifyCreateTableSucc(ns, name, false);
+  }
+
+  private void verifyCreateTableSucc(Namespace ns, String name, boolean 
generatePlanData) {
+    Response response = doCreateTable(ns, name, generatePlanData);
     Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus());
     LoadTableResponse loadTableResponse = 
response.readEntity(LoadTableResponse.class);
     Schema schema = loadTableResponse.tableMetadata().schema();

Reply via email to