This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 98867e2480 [ #9047] feat(iceberg):Support submit table scan plan
(#9050)
98867e2480 is described below
commit 98867e24807022ce1f6fc2db61f2bfd45584ddae
Author: Xiaojian Sun <[email protected]>
AuthorDate: Thu Nov 27 14:46:09 2025 +0800
[ #9047] feat(iceberg):Support submit table scan plan (#9050)
### What changes were proposed in this pull request?
Support submit table scan plan.
### Why are the changes needed?
Fix: #([9047](https://github.com/apache/gravitino/issues/9047))
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
---
.../listener/api/event/OperationType.java | 1 +
docs/iceberg-rest-service.md | 1 -
.../iceberg/service/CatalogWrapperForREST.java | 220 +++++++++++++++++++++
.../dispatcher/IcebergTableEventDispatcher.java | 37 ++++
.../dispatcher/IcebergTableHookDispatcher.java | 23 +++
.../IcebergTableOperationDispatcher.java | 20 ++
.../dispatcher/IcebergTableOperationExecutor.java | 12 ++
.../service/rest/IcebergConfigOperations.java | 1 +
.../service/rest/IcebergTableOperations.java | 48 +++++
.../api/event/IcebergPlanTableScanEvent.java | 37 ++++
.../event/IcebergPlanTableScanFailureEvent.java | 37 ++++
.../api/event/IcebergPlanTableScanPreEvent.java | 37 ++++
.../service/rest/CatalogWrapperForTest.java | 51 +++++
.../service/rest/TestIcebergTableOperations.java | 188 +++++++++++++++++-
14 files changed, 709 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/OperationType.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/OperationType.java
index ea19ccaffc..dd1fa67e3c 100644
---
a/core/src/main/java/org/apache/gravitino/listener/api/event/OperationType.java
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/OperationType.java
@@ -26,6 +26,7 @@ public enum OperationType {
PURGE_TABLE,
LOAD_TABLE,
LOAD_TABLE_CREDENTIAL,
+ PLAN_TABLE_SCAN,
LIST_TABLE,
ALTER_TABLE,
RENAME_TABLE,
diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md
index 835989a407..c809faef36 100644
--- a/docs/iceberg-rest-service.md
+++ b/docs/iceberg-rest-service.md
@@ -22,7 +22,6 @@ There are some key difference between Gravitino Iceberg REST
server and Gravitin
- Supports the Apache Iceberg REST API defined in Iceberg 1.10, and supports
all namespace and table interfaces. The following interfaces are not
implemented yet:
- multi table transaction
- pagination
- - scan planning
- Works as a catalog proxy, supporting `Hive` and `JDBC` as catalog backend.
- Supports credential vending for `S3`、`GCS`、`OSS` and `ADLS`.
- Supports different storages like `S3`, `HDFS`, `OSS`, `GCS`, `ADLS` and
provides the capability to support other storages.
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
index c62ad9f9c9..b24303a453 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
@@ -22,8 +22,11 @@ package org.apache.gravitino.iceberg.service;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
@@ -40,15 +43,27 @@ import
org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.storage.GCSProperties;
import org.apache.gravitino.utils.MapUtils;
import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IncrementalAppendScan;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Scan;
+import org.apache.iceberg.ScanTaskParser;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ServiceUnavailableException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.rest.PlanStatus;
import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
/** Process Iceberg REST specific operations, like credential vending. */
public class CatalogWrapperForREST extends IcebergCatalogWrapper {
@@ -207,6 +222,211 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
return credential;
}
+ /**
+ * Plan table scan and return scan tasks.
+ *
+ * <p>This method performs server-side scan planning to optimize query
performance by reducing
+ * client-side metadata loading and enabling parallel task execution.
+ *
+ * <p>Implementation uses synchronous scan planning (COMPLETED status) where
tasks are returned
+ * immediately as serialized JSON strings. This is different from
asynchronous mode (SUBMITTED
+ * status) where a plan ID is returned for later retrieval.
+ *
+ * <p>Referenced from Iceberg PR #13400 for scan planning implementation.
+ *
+ * @param tableIdentifier The table identifier.
+ * @param scanRequest The scan request parameters including filters,
projections, snapshot-id,
+ * etc.
+ * @return PlanTableScanResponse with status=COMPLETED and serialized
planTasks.
+ * @throws IllegalArgumentException if scan request validation fails
+ * @throws org.apache.gravitino.exceptions.NoSuchTableException if table
doesn't exist
+ * @throws RuntimeException for other scan planning failures
+ */
+ public PlanTableScanResponse planTableScan(
+ TableIdentifier tableIdentifier, PlanTableScanRequest scanRequest) {
+
+ LOG.debug(
+ "Planning scan for table: {}, snapshotId: {}, startSnapshotId: {},
endSnapshotId: {}, select: {}, caseSensitive: {}",
+ tableIdentifier,
+ scanRequest.snapshotId(),
+ scanRequest.startSnapshotId(),
+ scanRequest.endSnapshotId(),
+ scanRequest.select(),
+ scanRequest.caseSensitive());
+
+ try {
+ Table table = catalog.loadTable(tableIdentifier);
+ CloseableIterable<FileScanTask> fileScanTasks =
+ createFilePlanScanTasks(table, tableIdentifier, scanRequest);
+
+ List<String> planTasks = new ArrayList<>();
+ Map<Integer, PartitionSpec> specsById = new HashMap<>();
+ List<org.apache.iceberg.DeleteFile> deleteFiles = new ArrayList<>();
+
+ try (fileScanTasks) {
+ for (FileScanTask fileScanTask : fileScanTasks) {
+ try {
+ String taskString = ScanTaskParser.toJson(fileScanTask);
+ planTasks.add(taskString);
+
+ int specId = fileScanTask.spec().specId();
+ if (!specsById.containsKey(specId)) {
+ specsById.put(specId, fileScanTask.spec());
+ }
+
+ if (!fileScanTask.deletes().isEmpty()) {
+ deleteFiles.addAll(fileScanTask.deletes());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to serialize scan task for table: %s. Error: %s",
+ tableIdentifier, e.getMessage()),
+ e);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to close scan task iterator for table: {}",
tableIdentifier, e);
+ throw new RuntimeException("Failed to plan scan tasks: " +
e.getMessage(), e);
+ }
+
+ List<DeleteFile> uniqueDeleteFiles =
+
deleteFiles.stream().distinct().collect(java.util.stream.Collectors.toList());
+
+ if (planTasks.isEmpty()) {
+ LOG.info(
+ "Scan planning returned no tasks for table: {}. Table may be empty
or fully filtered.",
+ tableIdentifier);
+ }
+
+ PlanTableScanResponse.Builder responseBuilder =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(planTasks)
+ .withSpecsById(specsById);
+
+ if (!uniqueDeleteFiles.isEmpty()) {
+ responseBuilder.withDeleteFiles(uniqueDeleteFiles);
+ LOG.debug(
+ "Included {} delete files in scan plan for table: {}",
+ uniqueDeleteFiles.size(),
+ tableIdentifier);
+ }
+
+ return responseBuilder.build();
+
+ } catch (IllegalArgumentException e) {
+ LOG.error("Invalid scan request for table {}: {}", tableIdentifier,
e.getMessage());
+ throw new IllegalArgumentException("Invalid scan parameters: " +
e.getMessage(), e);
+ } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+ LOG.error("Table not found during scan planning: {}", tableIdentifier);
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Unexpected error during scan planning for table: {}",
tableIdentifier, e);
+ throw new RuntimeException(
+ "Scan planning failed for table " + tableIdentifier + ": " +
e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Create and plan a scan based on the scan request.
+ *
+ * <p>If both start and end snapshot IDs are provided, uses
IncrementalAppendScan. Otherwise, uses
+ * regular TableScan.
+ *
+ * @param table The table to scan
+ * @param tableIdentifier The table identifier for logging
+ * @param scanRequest The scan request parameters
+ * @return CloseableIterable of FileScanTask
+ */
+ private CloseableIterable<FileScanTask> createFilePlanScanTasks(
+ Table table, TableIdentifier tableIdentifier, PlanTableScanRequest
scanRequest) {
+ Long startSnapshotId = scanRequest.startSnapshotId();
+ Long endSnapshotId = scanRequest.endSnapshotId();
+ // Use IncrementalAppendScan if both start and end snapshot IDs are
provided
+ if (startSnapshotId != null && endSnapshotId != null) {
+ if (startSnapshotId >= endSnapshotId) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid snapshot range: startSnapshotId (%d) must be less
than endSnapshotId (%d)",
+ startSnapshotId, endSnapshotId));
+ }
+ LOG.debug(
+ "Using IncrementalAppendScan for table: {}, from snapshot: {} to
snapshot: {}",
+ tableIdentifier,
+ startSnapshotId,
+ endSnapshotId);
+ IncrementalAppendScan incrementalScan =
+ table
+ .newIncrementalAppendScan()
+ .fromSnapshotInclusive(startSnapshotId)
+ .toSnapshot(endSnapshotId);
+ incrementalScan = applyScanRequest(incrementalScan, scanRequest);
+ return incrementalScan.planFiles();
+ } else {
+ TableScan tableScan = table.newScan();
+ if (scanRequest.snapshotId() != null && scanRequest.snapshotId() != 0L) {
+ tableScan = tableScan.useSnapshot(scanRequest.snapshotId());
+ LOG.debug("Applied snapshot filter: snapshot-id={}",
scanRequest.snapshotId());
+ }
+ tableScan = applyScanRequest(tableScan, scanRequest);
+ return tableScan.planFiles();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends Scan> T applyScanRequest(T scan, PlanTableScanRequest
scanRequest) {
+ scan = (T) scan.caseSensitive(scanRequest.caseSensitive());
+ LOG.debug("Applied case-sensitive: {}", scanRequest.caseSensitive());
+ scan = applyScanFilter(scan, scanRequest);
+ scan = applyScanSelect(scan, scanRequest);
+ scan = applyScanStatsFields(scan, scanRequest);
+
+ return scan;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends Scan> T applyScanFilter(T scan, PlanTableScanRequest
scanRequest) {
+ if (scanRequest.filter() != null) {
+ try {
+ scan = (T) scan.filter(scanRequest.filter());
+ LOG.debug("Applied filter expression: {}", scanRequest.filter());
+ } catch (Exception e) {
+ LOG.error("Failed to apply filter expression: {}", e.getMessage(), e);
+ throw new IllegalArgumentException("Invalid filter expression: " +
e.getMessage(), e);
+ }
+ }
+ return scan;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends Scan> T applyScanSelect(T scan, PlanTableScanRequest
scanRequest) {
+ if (scanRequest.select() != null && !scanRequest.select().isEmpty()) {
+ try {
+ scan = (T) scan.select(scanRequest.select());
+ LOG.debug("Applied column projection: {}", scanRequest.select());
+ } catch (Exception e) {
+ LOG.error("Failed to apply column projection: {}", e.getMessage(), e);
+ throw new IllegalArgumentException("Invalid column selection: " +
e.getMessage(), e);
+ }
+ }
+ return scan;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends Scan> T applyScanStatsFields(T scan, PlanTableScanRequest
scanRequest) {
+ if (scanRequest.statsFields() != null &&
!scanRequest.statsFields().isEmpty()) {
+ try {
+ scan = (T) scan.includeColumnStats(scanRequest.statsFields());
+ LOG.debug("Applied statistics fields: {}", scanRequest.statsFields());
+ } catch (Exception e) {
+ LOG.error("Failed to apply statistics fields: {}", e.getMessage(), e);
+ throw new IllegalArgumentException("Invalid statistics fields: " +
e.getMessage(), e);
+ }
+ }
+ return scan;
+ }
+
@VisibleForTesting
static Map<String, String> checkForCompatibility(
Map<String, String> properties, Map<String, String>
deprecatedProperties) {
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
index eaa0440140..e3160ff92a 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
@@ -39,6 +39,9 @@ import
org.apache.gravitino.listener.api.event.IcebergLoadTableCredentialPreEven
import org.apache.gravitino.listener.api.event.IcebergLoadTableEvent;
import org.apache.gravitino.listener.api.event.IcebergLoadTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergLoadTablePreEvent;
+import org.apache.gravitino.listener.api.event.IcebergPlanTableScanEvent;
+import
org.apache.gravitino.listener.api.event.IcebergPlanTableScanFailureEvent;
+import org.apache.gravitino.listener.api.event.IcebergPlanTableScanPreEvent;
import org.apache.gravitino.listener.api.event.IcebergRenameTableEvent;
import org.apache.gravitino.listener.api.event.IcebergRenameTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergRenameTablePreEvent;
@@ -52,11 +55,13 @@ import
org.apache.gravitino.listener.api.event.IcebergUpdateTablePreEvent;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
/**
* {@code IcebergTableEventDispatcher} is a decorator for {@link
IcebergTableOperationExecutor} that
@@ -263,4 +268,36 @@ public class IcebergTableEventDispatcher implements
IcebergTableOperationDispatc
eventBus.dispatchEvent(new IcebergLoadTableCredentialEvent(context,
gravitinoNameIdentifier));
return loadCredentialsResponse;
}
+
+ /**
+ * Plan table scan and return scan tasks.
+ *
+ * <p>Dispatches pre/post events for table scan planning operation.
+ *
+ * @param context Iceberg REST request context information.
+ * @param tableIdentifier The Iceberg table identifier.
+ * @param scanRequest The scan request parameters
+ * @return A PlanTableScanResponse containing the scan plan with plan-id and
tasks
+ */
+ @Override
+ public PlanTableScanResponse planTableScan(
+ IcebergRequestContext context,
+ TableIdentifier tableIdentifier,
+ PlanTableScanRequest scanRequest) {
+ NameIdentifier gravitinoNameIdentifier =
+ IcebergRESTUtils.getGravitinoNameIdentifier(
+ metalakeName, context.catalogName(), tableIdentifier);
+ eventBus.dispatchEvent(new IcebergPlanTableScanPreEvent(context,
gravitinoNameIdentifier));
+ PlanTableScanResponse planTableScanResponse;
+ try {
+ planTableScanResponse =
+ icebergTableOperationDispatcher.planTableScan(context,
tableIdentifier, scanRequest);
+ } catch (Exception e) {
+ eventBus.dispatchEvent(
+ new IcebergPlanTableScanFailureEvent(context,
gravitinoNameIdentifier, e));
+ throw e;
+ }
+ eventBus.dispatchEvent(new IcebergPlanTableScanEvent(context,
gravitinoNameIdentifier));
+ return planTableScanResponse;
+ }
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
index 5bdd7aedb8..0f498db38d 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableHookDispatcher.java
@@ -35,11 +35,13 @@ import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
public class IcebergTableHookDispatcher implements
IcebergTableOperationDispatcher {
@@ -156,6 +158,27 @@ public class IcebergTableHookDispatcher implements
IcebergTableOperationDispatch
return dispatcher.getTableCredentials(context, tableIdentifier);
}
+ /**
+ * Plan table scan and return scan tasks.
+ *
+ * <p>This method performs server-side scan planning to optimize query
performance by reducing
+ * client-side metadata loading and enabling parallel task execution.
+ *
+ * @param context Iceberg REST request context information.
+ * @param tableIdentifier The Iceberg table identifier.
+ * @param scanRequest The scan request parameters including filters,
projections, snapshot-id,
+ * etc.
+ * @return A PlanTableScanResponse containing the scan plan with plan-id and
tasks. The response
+ * format follows Iceberg REST API specification.
+ */
+ @Override
+ public PlanTableScanResponse planTableScan(
+ IcebergRequestContext context,
+ TableIdentifier tableIdentifier,
+ PlanTableScanRequest scanRequest) {
+ return dispatcher.planTableScan(context, tableIdentifier, scanRequest);
+ }
+
private void importTable(String catalogName, Namespace namespace, String
tableName) {
TableDispatcher tableDispatcher =
GravitinoEnv.getInstance().tableDispatcher();
if (tableDispatcher != null) {
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
index 75762b42e8..0c4d08a6f0 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
@@ -23,11 +23,13 @@ import
org.apache.gravitino.listener.api.event.IcebergRequestContext;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
/**
* The {@code IcebergTableOperationDispatcher} interface defines the public
API for managing Iceberg
@@ -113,4 +115,22 @@ public interface IcebergTableOperationDispatcher {
*/
LoadCredentialsResponse getTableCredentials(
IcebergRequestContext context, TableIdentifier tableIdentifier);
+
+ /**
+ * Plan table scan and return scan tasks.
+ *
+ * <p>This method performs server-side scan planning to optimize query
performance by reducing
+ * client-side metadata loading and enabling parallel task execution.
+ *
+ * @param context Iceberg REST request context information.
+ * @param tableIdentifier The Iceberg table identifier.
+ * @param scanRequest The scan request parameters including filters,
projections, snapshot-id,
+ * etc.
+ * @return A PlanTableScanResponse containing the scan plan with plan-id and
tasks. The response
+ * format follows Iceberg REST API specification.
+ */
+ PlanTableScanResponse planTableScan(
+ IcebergRequestContext context,
+ TableIdentifier tableIdentifier,
+ PlanTableScanRequest scanRequest);
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
index 1c34f259ff..6d23c89360 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java
@@ -35,11 +35,13 @@ import
org.apache.gravitino.server.authorization.expression.AuthorizationExpress
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,4 +170,14 @@ public class IcebergTableOperationExecutor implements
IcebergTableOperationDispa
return writable ? CredentialPrivilege.WRITE : CredentialPrivilege.READ;
}
+
+ @Override
+ public PlanTableScanResponse planTableScan(
+ IcebergRequestContext context,
+ TableIdentifier tableIdentifier,
+ PlanTableScanRequest scanRequest) {
+ return icebergCatalogWrapperManager
+ .getCatalogWrapper(context.catalogName())
+ .planTableScan(tableIdentifier, scanRequest);
+ }
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
index 7925d54008..8d596ae84b 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
@@ -75,6 +75,7 @@ public class IcebergConfigOperations {
.add(Endpoint.V1_REGISTER_TABLE)
.add(Endpoint.V1_REPORT_METRICS)
.add(Endpoint.V1_TABLE_CREDENTIALS)
+ .add(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN)
.build();
private static final List<Endpoint> DEFAULT_VIEW_ENDPOINTS =
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
index cc0db74147..53f7b73f01 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
@@ -66,11 +66,13 @@ import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -442,6 +444,52 @@ public class IcebergTableOperations {
}
}
+ /**
+ * Plan table scan endpoint. Allows clients to request a scan plan from the
server to optimize
+ * scan planning by leveraging server-side resources.
+ *
+ * @param prefix The catalog prefix
+ * @param namespace The namespace
+ * @param table The table name
+ * @param scanRequest The scan request containing filters, projections, etc.
+ * @return Response containing the scan plan with tasks
+ */
+ @POST
+ @Path("{table}/scan")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Timed(name = "plan-table-scan." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "plan-table-scan", absolute = true)
+ public Response planTableScan(
+ @PathParam("prefix") String prefix,
+ @Encoded() @PathParam("namespace") String namespace,
+ @PathParam("table") String table,
+ PlanTableScanRequest scanRequest) {
+ String catalogName = IcebergRESTUtils.getCatalogName(prefix);
+ Namespace icebergNS = RESTUtil.decodeNamespace(namespace);
+
+ LOG.info(
+ "Plan table scan, catalog: {}, namespace: {}, table: {}", catalogName,
icebergNS, table);
+
+ try {
+ return Utils.doAs(
+ httpRequest,
+ () -> {
+ TableIdentifier tableIdentifier = TableIdentifier.of(icebergNS,
table);
+ IcebergRequestContext context =
+ new IcebergRequestContext(httpServletRequest(), catalogName);
+
+ PlanTableScanResponse scanResponse =
+ tableOperationDispatcher.planTableScan(context,
tableIdentifier, scanRequest);
+
+ return IcebergRESTUtils.ok(scanResponse);
+ });
+ } catch (Exception e) {
+ LOG.error("Failed to plan table scan: {}", e.getMessage(), e);
+ return IcebergExceptionMapper.toRESTResponse(e);
+ }
+ }
+
private boolean isCredentialVending(String accessDelegation) {
if (StringUtils.isBlank(accessDelegation)) {
return false;
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanEvent.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanEvent.java
new file mode 100644
index 0000000000..510aa71505
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represent an event after planning Iceberg table scan successfully. */
+@DeveloperApi
+public class IcebergPlanTableScanEvent extends IcebergTableEvent {
+ public IcebergPlanTableScanEvent(
+ IcebergRequestContext icebergRequestContext, NameIdentifier
resourceIdentifier) {
+ super(icebergRequestContext, resourceIdentifier);
+ }
+
+ @Override
+ public OperationType operationType() {
+ return OperationType.PLAN_TABLE_SCAN;
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanFailureEvent.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanFailureEvent.java
new file mode 100644
index 0000000000..793289f6d0
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanFailureEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represent a failure event when planning Iceberg table scan failed. */
+@DeveloperApi
+public class IcebergPlanTableScanFailureEvent extends IcebergTableFailureEvent
{
+ public IcebergPlanTableScanFailureEvent(
+ IcebergRequestContext icebergRequestContext, NameIdentifier
nameIdentifier, Exception e) {
+ super(icebergRequestContext, nameIdentifier, e);
+ }
+
+ @Override
+ public OperationType operationType() {
+ return OperationType.PLAN_TABLE_SCAN;
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanPreEvent.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanPreEvent.java
new file mode 100644
index 0000000000..45075f41f0
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPlanTableScanPreEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represent a pre event before planning Iceberg table scan. */
+@DeveloperApi
+public class IcebergPlanTableScanPreEvent extends IcebergTablePreEvent {
+ public IcebergPlanTableScanPreEvent(
+ IcebergRequestContext icebergRequestContext, NameIdentifier
tableIdentifier) {
+ super(icebergRequestContext, tableIdentifier);
+ }
+
+ @Override
+ public OperationType operationType() {
+ return OperationType.PLAN_TABLE_SCAN;
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
index 1cf41af796..64834e408c 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/CatalogWrapperForTest.java
@@ -18,13 +18,22 @@
*/
package org.apache.gravitino.iceberg.service.rest;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.service.CatalogWrapperForREST;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.types.Types.NestedField;
@@ -34,10 +43,22 @@ import
org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
// Used to override registerTable
@SuppressWarnings("deprecation")
public class CatalogWrapperForTest extends CatalogWrapperForREST {
+ public static final String GENERATE_PLAN_TASKS_DATA_PROP =
"test.generate-plan-data";
+
public CatalogWrapperForTest(String catalogName, IcebergConfig
icebergConfig) {
super(catalogName, icebergConfig);
}
+ @Override
+ public LoadTableResponse createTable(
+ Namespace namespace, CreateTableRequest request, boolean
requestCredential) {
+ LoadTableResponse loadTableResponse = super.createTable(namespace,
request, requestCredential);
+ if (shouldGeneratePlanTasksData(request)) {
+ appendSampleData(namespace, request.name());
+ }
+ return loadTableResponse;
+ }
+
@Override
public LoadTableResponse registerTable(Namespace namespace,
RegisterTableRequest request) {
if (request.name().contains("fail")) {
@@ -55,4 +76,34 @@ public class CatalogWrapperForTest extends
CatalogWrapperForREST {
.build();
return loadTableResponse;
}
+
+ private boolean shouldGeneratePlanTasksData(CreateTableRequest request) {
+ if (request.properties() == null) {
+ return false;
+ }
+ return Boolean.parseBoolean(
+ request.properties().getOrDefault(GENERATE_PLAN_TASKS_DATA_PROP,
Boolean.FALSE.toString()));
+ }
+
+ private void appendSampleData(Namespace namespace, String tableName) {
+ try {
+ Table table = catalog.loadTable(TableIdentifier.of(namespace,
tableName));
+ // Append multiple times to create multiple snapshots for incremental
scan testing
+ for (int i = 0; i < 3; i++) {
+ Path tempFile = Files.createTempFile("plan-scan-" + i, ".parquet");
+ tempFile.toFile().deleteOnExit();
+ DataFile dataFile =
+ DataFiles.builder(table.spec())
+ .withPath(tempFile.toUri().toString())
+ .withFormat(FileFormat.PARQUET)
+ .withRecordCount(1)
+ .withFileSizeInBytes(0L)
+ .build();
+ table.newFastAppend().appendFile(dataFile).commit();
+ }
+ super.loadTable(TableIdentifier.of(namespace, tableName));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to append sample data for test table
" + tableName, e);
+ }
+ }
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
index 94af167579..848d78f451 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergTableOperations.java
@@ -19,6 +19,8 @@
package org.apache.gravitino.iceberg.service.rest;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.List;
@@ -27,6 +29,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -46,6 +49,9 @@ import
org.apache.gravitino.listener.api.event.IcebergListTablePreEvent;
import org.apache.gravitino.listener.api.event.IcebergLoadTableEvent;
import org.apache.gravitino.listener.api.event.IcebergLoadTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergLoadTablePreEvent;
+import org.apache.gravitino.listener.api.event.IcebergPlanTableScanEvent;
+import
org.apache.gravitino.listener.api.event.IcebergPlanTableScanFailureEvent;
+import org.apache.gravitino.listener.api.event.IcebergPlanTableScanPreEvent;
import org.apache.gravitino.listener.api.event.IcebergRenameTableEvent;
import org.apache.gravitino.listener.api.event.IcebergRenameTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergRenameTablePreEvent;
@@ -58,6 +64,7 @@ import org.apache.gravitino.server.ServerConfig;
import org.apache.gravitino.server.authorization.GravitinoAuthorizerProvider;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.UpdateRequirement;
import org.apache.iceberg.UpdateRequirements;
@@ -66,7 +73,9 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.CommitReport;
import org.apache.iceberg.metrics.ImmutableCommitMetricsResult;
import org.apache.iceberg.metrics.ImmutableCommitReport;
+import org.apache.iceberg.rest.PlanStatus;
import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
@@ -74,6 +83,7 @@ import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.util.JsonUtil;
import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;
import org.junit.jupiter.api.Assertions;
@@ -154,6 +164,106 @@ public class TestIcebergTableOperations extends
IcebergNamespaceTestBase {
Assertions.assertTrue(dummyEventListener.popPostEvent() instanceof
IcebergLoadTableEvent);
}
+ @ParameterizedTest
+
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+ void testPlanTableScan(Namespace namespace) {
+ verifyCreateNamespaceSucc(namespace);
+ verifyCreateTableSucc(namespace, "plan_scan_table", true);
+
+ dummyEventListener.clearEvent();
+ TableMetadata metadata = getTableMeta(namespace, "plan_scan_table");
+ Long snapshotId =
+ metadata.currentSnapshot() == null ? null :
metadata.currentSnapshot().snapshotId();
+ JsonNode planResponse = verifyPlanTableScanSucc(namespace,
"plan_scan_table", snapshotId);
+
+ Assertions.assertEquals(
+ PlanStatus.COMPLETED.status(),
planResponse.get("plan-status").asText());
+ Assertions.assertTrue(planResponse.has("plan-tasks"));
+ Assertions.assertTrue(planResponse.get("plan-tasks").isArray());
+ Assertions.assertTrue(planResponse.get("plan-tasks").size() > 0);
+
+ Assertions.assertTrue(dummyEventListener.popPreEvent() instanceof
IcebergPlanTableScanPreEvent);
+ Assertions.assertTrue(dummyEventListener.popPostEvent() instanceof
IcebergPlanTableScanEvent);
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+ void testPlanTableScanTableNotFound(Namespace namespace) {
+ verifyCreateNamespaceSucc(namespace);
+ dummyEventListener.clearEvent();
+
+ verifyPlanTableScanFail(namespace, "missing_table", 404);
+
+ Assertions.assertTrue(dummyEventListener.popPreEvent() instanceof
IcebergPlanTableScanPreEvent);
+ Assertions.assertTrue(
+ dummyEventListener.popPostEvent() instanceof
IcebergPlanTableScanFailureEvent);
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+ void testPlanTableScanWithIncrementalAppendScan(Namespace namespace) {
+ verifyCreateNamespaceSucc(namespace);
+ verifyCreateTableSucc(namespace, "incremental_scan_table", true);
+
+ dummyEventListener.clearEvent();
+ TableMetadata metadata = getTableMeta(namespace, "incremental_scan_table");
+ Long currentSnapshotId = metadata.currentSnapshot().snapshotId();
+
+ PlanTableScanRequest invalidRequest =
+ buildPlanTableScanRequestWithRange(currentSnapshotId,
currentSnapshotId);
+ Response response = doPlanTableScan(namespace, "incremental_scan_table",
invalidRequest);
+ Assertions.assertEquals(
+ Status.BAD_REQUEST.getStatusCode(),
+ response.getStatus(),
+ "Expected BAD_REQUEST for start == end snapshot IDs");
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
+ void testPlanTableScanWithIncrementalAppendScanValidRange(Namespace
namespace) {
+ verifyCreateNamespaceSucc(namespace);
+ verifyCreateTableSucc(namespace, "incremental_scan_valid_table", true);
+
+ dummyEventListener.clearEvent();
+ TableMetadata metadata = getTableMeta(namespace,
"incremental_scan_valid_table");
+
+ // generatePlanData=true creates two snapshots: one from table creation,
one from
+ // appendSampleData
+ List<Snapshot> snapshots = metadata.snapshots();
+ Assertions.assertNotNull(snapshots, "Snapshots should not be null");
+ Assertions.assertTrue(snapshots.size() >= 2, "Should have at least 2
snapshots");
+
+ // Sort by sequence number to get ordered snapshots
+ List<Snapshot> sortedSnapshots =
+ snapshots.stream()
+ .sorted((s1, s2) -> Long.compare(s1.snapshotId(), s2.snapshotId()))
+ .collect(java.util.stream.Collectors.toList());
+
+ Assertions.assertTrue(
+ sortedSnapshots.size() >= 2,
+ "Should have at least 2 snapshots for incremental scan test, but got: "
+ + sortedSnapshots.size());
+
+ // For IncrementalAppendScan, start snapshot must be an ancestor of end
snapshot
+ // Use the last snapshot as end, and find its parent from the sorted list
+ Snapshot endSnapshot = sortedSnapshots.get(sortedSnapshots.size() - 1);
+ Long endSnapshotId = endSnapshot.snapshotId();
+ Long startSnapshotId = endSnapshot.parentId();
+
+ Assertions.assertNotNull(
+ startSnapshotId, "Could not find a valid startSnapshotId from
snapshots");
+ JsonNode planResponse =
+ verifyPlanTableScanSuccWithRange(
+ namespace, "incremental_scan_valid_table", startSnapshotId,
endSnapshotId);
+ Assertions.assertEquals(
+ PlanStatus.COMPLETED.status(),
planResponse.get("plan-status").asText());
+ Assertions.assertTrue(planResponse.has("plan-tasks"));
+ Assertions.assertTrue(planResponse.get("plan-tasks").isArray());
+
+ Assertions.assertTrue(dummyEventListener.popPreEvent() instanceof
IcebergPlanTableScanPreEvent);
+ Assertions.assertTrue(dummyEventListener.popPostEvent() instanceof
IcebergPlanTableScanEvent);
+ }
+
@ParameterizedTest
@MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces")
void testDropTable(Namespace namespace) {
@@ -341,8 +451,19 @@ public class TestIcebergTableOperations extends
IcebergNamespaceTestBase {
}
private Response doCreateTable(Namespace ns, String name) {
- CreateTableRequest createTableRequest =
-
CreateTableRequest.builder().withName(name).withSchema(tableSchema).build();
+ return doCreateTable(ns, name, false);
+ }
+
+ private Response doCreateTable(Namespace ns, String name, boolean
generatePlanData) {
+ CreateTableRequest.Builder builder =
+ CreateTableRequest.builder().withName(name).withSchema(tableSchema);
+ if (generatePlanData) {
+ builder =
+ builder.setProperties(
+ ImmutableMap.of(
+ CatalogWrapperForTest.GENERATE_PLAN_TASKS_DATA_PROP,
Boolean.TRUE.toString()));
+ }
+ CreateTableRequest createTableRequest = builder.build();
return getTableClientBuilder(ns, Optional.empty())
.post(Entity.entity(createTableRequest,
MediaType.APPLICATION_JSON_TYPE));
}
@@ -379,6 +500,11 @@ public class TestIcebergTableOperations extends
IcebergNamespaceTestBase {
return getTableClientBuilder(ns, Optional.of(name)).get();
}
+ private Response doPlanTableScan(Namespace ns, String tableName,
PlanTableScanRequest request) {
+ Invocation.Builder builder = getTableClientBuilder(ns,
Optional.of(tableName + "/scan"));
+ return builder.post(Entity.entity(request,
MediaType.APPLICATION_JSON_TYPE));
+ }
+
private Response doUpdateTable(Namespace ns, String name, TableMetadata
base) {
TableMetadata newMetadata = base.updateSchema(newTableSchema);
List<MetadataUpdate> metadataUpdates = newMetadata.changes();
@@ -431,13 +557,69 @@ public class TestIcebergTableOperations extends
IcebergNamespaceTestBase {
Assertions.assertEquals(status, response.getStatus());
}
+ private void verifyPlanTableScanFail(Namespace ns, String tableName, int
status) {
+ Response response = doPlanTableScan(ns, tableName,
buildPlanTableScanRequest(null));
+ Assertions.assertEquals(status, response.getStatus());
+ }
+
+ private JsonNode verifyPlanTableScanSucc(Namespace ns, String tableName,
Long snapshotId) {
+ Response response = doPlanTableScan(ns, tableName,
buildPlanTableScanRequest(snapshotId));
+ Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ String responseBody = response.readEntity(String.class);
+ try {
+ return JsonUtil.mapper().readTree(responseBody);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse plan table scan response",
e);
+ }
+ }
+
+ private JsonNode verifyPlanTableScanSuccWithRange(
+ Namespace ns, String tableName, Long startSnapshotId, Long
endSnapshotId) {
+ Response response =
+ doPlanTableScan(
+ ns, tableName, buildPlanTableScanRequestWithRange(startSnapshotId,
endSnapshotId));
+ Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus());
+ String responseBody = response.readEntity(String.class);
+ try {
+ return JsonUtil.mapper().readTree(responseBody);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse plan table scan response",
e);
+ }
+ }
+
+ private PlanTableScanRequest buildPlanTableScanRequest(Long snapshotId) {
+ PlanTableScanRequest.Builder builder = new PlanTableScanRequest.Builder();
+ if (snapshotId != null) {
+ builder = builder.withSnapshotId(snapshotId);
+ } else {
+ builder = builder.withSnapshotId(0L);
+ }
+ return builder.build();
+ }
+
+ private PlanTableScanRequest buildPlanTableScanRequestWithRange(
+ Long startSnapshotId, Long endSnapshotId) {
+ PlanTableScanRequest.Builder builder = new PlanTableScanRequest.Builder();
+ if (startSnapshotId != null) {
+ builder = builder.withStartSnapshotId(startSnapshotId);
+ }
+ if (endSnapshotId != null) {
+ builder = builder.withEndSnapshotId(endSnapshotId);
+ }
+ return builder.build();
+ }
+
private void verifyDropTableFail(Namespace ns, String name, int status) {
Response response = doDropTable(ns, name);
Assertions.assertEquals(status, response.getStatus());
}
private void verifyCreateTableSucc(Namespace ns, String name) {
- Response response = doCreateTable(ns, name);
+ verifyCreateTableSucc(ns, name, false);
+ }
+
+ private void verifyCreateTableSucc(Namespace ns, String name, boolean
generatePlanData) {
+ Response response = doCreateTable(ns, name, generatePlanData);
Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus());
LoadTableResponse loadTableResponse =
response.readEntity(LoadTableResponse.class);
Schema schema = loadTableResponse.tableMetadata().schema();