This is an automated email from the ASF dual-hosted git repository.
singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f154dafb22 Core: Support incremental Scan in RESTCatalogAdapter for
RemoteScanPlanning (#14661)
f154dafb22 is described below
commit f154dafb22676d69667f3b2380b2e9473601d4ae
Author: Prashant Singh <[email protected]>
AuthorDate: Mon Nov 24 15:59:04 2025 -0800
Core: Support incremental Scan in RESTCatalogAdapter for RemoteScanPlanning
(#14661)
---
.../org/apache/iceberg/rest/CatalogHandlers.java | 96 ++++++++++++++++------
.../apache/iceberg/rest/RESTCatalogAdapter.java | 5 +-
2 files changed, 73 insertions(+), 28 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index dca17bc520..b08455408d 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -42,8 +42,10 @@ import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BaseTransaction;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
@@ -641,29 +643,40 @@ public class CatalogHandlers {
Catalog catalog,
TableIdentifier ident,
PlanTableScanRequest request,
- Predicate<TableScan> shouldPlanAsync,
- ToIntFunction<TableScan> tasksPerPlanTask) {
+ Predicate<Scan<?, FileScanTask, ?>> shouldPlanAsync,
+ ToIntFunction<Scan<?, FileScanTask, ?>> tasksPerPlanTask) {
Table table = catalog.loadTable(ident);
- TableScan tableScan = table.newScan();
+ // Configure the appropriate scan type
+ Scan<?, FileScanTask, ?> configuredScan;
+
+ if (request.startSnapshotId() != null && request.endSnapshotId() != null) {
+ // Incremental append scan for reading changes between snapshots
+ IncrementalAppendScan incrementalScan =
+ table
+ .newIncrementalAppendScan()
+ .fromSnapshotInclusive(request.startSnapshotId())
+ .toSnapshot(request.endSnapshotId());
+
+ configuredScan = configureScan(incrementalScan, request);
+ } else {
+ // Regular table scan at a specific snapshot
+ TableScan tableScan = table.newScan();
- if (request.snapshotId() != null) {
- tableScan = tableScan.useSnapshot(request.snapshotId());
- }
- if (request.select() != null) {
- tableScan = tableScan.select(request.select());
- }
- if (request.filter() != null) {
- tableScan = tableScan.filter(request.filter());
- }
- if (request.statsFields() != null) {
- tableScan = tableScan.includeColumnStats(request.statsFields());
- }
+ if (request.snapshotId() != null) {
+ tableScan = tableScan.useSnapshot(request.snapshotId());
+ }
- tableScan = tableScan.caseSensitive(request.caseSensitive());
+ // Apply filters and projections using common method
+ configuredScan = configureScan(tableScan, request);
+ }
- if (shouldPlanAsync.test(tableScan)) {
+ if (shouldPlanAsync.test(configuredScan)) {
String asyncPlanId = "async-" + UUID.randomUUID();
- asyncPlanFiles(tableScan, asyncPlanId,
tasksPerPlanTask.applyAsInt(tableScan));
+ asyncPlanFiles(
+ configuredScan,
+ asyncPlanId,
+ table.uuid().toString(),
+ tasksPerPlanTask.applyAsInt(configuredScan));
return PlanTableScanResponse.builder()
.withPlanId(asyncPlanId)
.withPlanStatus(PlanStatus.SUBMITTED)
@@ -672,7 +685,11 @@ public class CatalogHandlers {
}
String planId = "sync-" + UUID.randomUUID();
- planFilesFor(tableScan, planId, tasksPerPlanTask.applyAsInt(tableScan));
+ planFilesFor(
+ configuredScan,
+ planId,
+ table.uuid().toString(),
+ tasksPerPlanTask.applyAsInt(configuredScan));
Pair<List<FileScanTask>, String> initial =
IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
return PlanTableScanResponse.builder()
.withPlanStatus(PlanStatus.COMPLETED)
@@ -757,21 +774,48 @@ public class CatalogHandlers {
InMemoryPlanningState.getInstance().clear();
}
+ /**
+ * Applies filters, projections, and other scan configurations from the
request to the scan.
+ *
+ * @param scan the scan to configure
+ * @param request the plan table scan request containing filters and
projections
+ * @param <T> the specific scan type (TableScan, IncrementalAppendScan, etc.)
+ * @return the configured scan with filters and projections applied
+ */
+ private static <T extends Scan<T, FileScanTask, ?>> T configureScan(
+ T scan, PlanTableScanRequest request) {
+ T configuredScan = scan;
+
+ if (request.select() != null) {
+ configuredScan = configuredScan.select(request.select());
+ }
+ if (request.filter() != null) {
+ configuredScan = configuredScan.filter(request.filter());
+ }
+ if (request.statsFields() != null) {
+ configuredScan =
configuredScan.includeColumnStats(request.statsFields());
+ }
+ configuredScan = configuredScan.caseSensitive(request.caseSensitive());
+
+ return configuredScan;
+ }
+
/**
* Plans file scan tasks for a table scan, grouping them into plan tasks for
pagination.
*
- * @param tableScan the table scan to plan
+ * @param scan the table scan to plan files for
* @param planId the unique identifier for this plan
+ * @param tableId the uuid of the table being scanned
* @param tasksPerPlanTask number of file scan tasks to group per plan task
*/
- private static void planFilesFor(TableScan tableScan, String planId, int
tasksPerPlanTask) {
+ private static void planFilesFor(
+ Scan<?, FileScanTask, ?> scan, String planId, String tableId, int
tasksPerPlanTask) {
Iterable<List<FileScanTask>> taskGroupings =
- Iterables.partition(tableScan.planFiles(), tasksPerPlanTask);
+ Iterables.partition(scan.planFiles(), tasksPerPlanTask);
int planTaskSequence = 0;
String previousPlanTask = null;
for (List<FileScanTask> taskGrouping : taskGroupings) {
- String planTaskKey =
- String.format("%s-%s-%s", planId, tableScan.table().uuid(),
planTaskSequence++);
+ String planTaskKey = String.format("%s-%s-%s", planId, tableId,
planTaskSequence++);
IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping);
if (previousPlanTask != null) {
IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask,
planTaskKey);
@@ -783,11 +827,11 @@ public class CatalogHandlers {
@SuppressWarnings("FutureReturnValueIgnored")
private static void asyncPlanFiles(
- TableScan tableScan, String asyncPlanId, int tasksPerPlanTask) {
+ Scan<?, FileScanTask, ?> scan, String asyncPlanId, String tableId, int
tasksPerPlanTask) {
IN_MEMORY_PLANNING_STATE.addAsyncPlan(asyncPlanId);
CompletableFuture.runAsync(
() -> {
- planFilesFor(tableScan, asyncPlanId, tasksPerPlanTask);
+ planFilesFor(scan, asyncPlanId, tableId, tasksPerPlanTask);
},
ASYNC_PLANNING_POOL)
.whenComplete(
diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index 1a7a0e03d5..ff6daa61e3 100644
--- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -31,8 +31,9 @@ import java.util.stream.Collectors;
import org.apache.http.HttpHeaders;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BaseTransaction;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Scan;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.Transactions;
import org.apache.iceberg.catalog.Catalog;
@@ -588,7 +589,7 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
return 100;
}
- default boolean shouldPlanTableScanAsync(TableScan tableScan) {
+ default boolean shouldPlanTableScanAsync(Scan<?, FileScanTask, ?> scan) {
return false;
}
}