nastra commented on code in PR #13400:
URL: https://github.com/apache/iceberg/pull/13400#discussion_r2538955853


##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3163,4 +3382,459 @@ private static List<HTTPRequest> 
allRequests(RESTCatalogAdapter adapter) {
     verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), 
any());
     return captor.getAllValues();
   }
+
+  @Test
+  public void testCancelPlanWithNoActivePlan() {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    RESTTableScan restTableScan = restTableScanFor("cancel_test_table");
+
+    // Calling cancel with no active plan should return false
+    assertThat(restTableScan.cancelPlan()).isFalse();
+  }
+
+  @Test
+  public void testCancelPlanEndpointSupport() {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    RESTTableScan restTableScan = restTableScanFor("cancel_support_table");
+
+    // Test that cancelPlan method is available and returns false when no plan 
is active
+    assertThat(restTableScan.cancelPlan()).isFalse();
+  }
+
+  @Test
+  public void testCancelPlanMethodAvailability() {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    RESTTableScan restTableScan = restTableScanFor("cancel_method_table");
+
+    // Test that cancelPlan method is available and callable
+    // When no plan is active, it should return false
+    assertThat(restTableScan.cancelPlan()).isFalse();
+
+    // Verify the method exists and doesn't throw exceptions when called 
multiple times
+    assertThat(restTableScan.cancelPlan()).isFalse();
+  }
+
+  @Test
+  public void testCancelPlanEndpointPath() {
+    TableIdentifier tableId = TableIdentifier.of("test_namespace", 
"test_table");
+    String planId = "plan-abc-123";
+    ResourcePaths paths = new ResourcePaths("test-prefix");
+
+    // Test that the cancel plan path is generated correctly
+    String cancelPath = paths.plan(tableId, planId);
+
+    assertThat(cancelPath)
+        
.isEqualTo("v1/test-prefix/namespaces/test_namespace/tables/test_table/plan/plan-abc-123");
+
+    // Test with different identifiers
+    TableIdentifier complexId = TableIdentifier.of(Namespace.of("db", 
"schema"), "my_table");
+    String complexPath = paths.plan(complexId, "plan-xyz-789");
+
+    assertThat(complexPath).contains("/plan/plan-xyz-789");
+    assertThat(complexPath).contains("db%1Fschema"); // URL encoded namespace 
separator
+  }
+
+  @Test
+  public void testIteratorCloseTriggersCancel() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    Table table = createTableWithScanPlanning("iterator_close_table");
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+    RESTTable restTable = (RESTTable) table;
+
+    TableScan scan = restTable.newScan();
+    assertThat(scan).isInstanceOf(RESTTableScan.class);
+    boolean cancelled = isCancelled((RESTTableScan) scan);
+    assertThat(cancelled).isFalse(); // No active plan to cancel
+  }
+
+  private static boolean isCancelled(RESTTableScan scan) throws IOException {
+
+    // Get the iterable and iterator
+    CloseableIterable<FileScanTask> iterable = scan.planFiles();
+    CloseableIterator<FileScanTask> iterator = iterable.iterator();
+
+    // Verify we can close the iterator without exceptions
+    // The cancellation callback will be called (though no active plan exists)
+    iterator.close();
+
+    // Verify we can still call cancelPlan on the scan
+    return scan.cancelPlan();
+  }
+
+  @Test
+  public void testMetadataTablesWithRemotePlanning() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("metadata_tables_test");
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Test that metadata tables work with remote planning
+    // Files metadata table
+    Table filesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.FILES);
+    assertThat(filesTable).isNotNull();
+
+    TableScan filesScan = filesTable.newScan();
+    assertThat(filesScan).isNotNull();
+
+    // Verify metadata table scan works (should not use REST scan planning)
+    CloseableIterable<FileScanTask> filesIterable = filesScan.planFiles();
+    List<FileScanTask> filesTasks = Lists.newArrayList(filesIterable);
+    assertThat(filesTasks).isNotEmpty();
+
+    // Snapshots metadata table
+    Table snapshotsTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.SNAPSHOTS);
+    assertThat(snapshotsTable).isNotNull();
+
+    TableScan snapshotsScan = snapshotsTable.newScan();
+    CloseableIterable<FileScanTask> snapshotsIterable = 
snapshotsScan.planFiles();
+    List<FileScanTask> snapshotsTasks = Lists.newArrayList(snapshotsIterable);
+    assertThat(snapshotsTasks).isNotEmpty();
+
+    // Manifests metadata table
+    Table manifestsTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.MANIFESTS);
+    assertThat(manifestsTable).isNotNull();
+
+    TableScan manifestsScan = manifestsTable.newScan();
+    CloseableIterable<FileScanTask> manifestsIterable = 
manifestsScan.planFiles();
+    List<FileScanTask> manifestsTasks = Lists.newArrayList(manifestsIterable);
+    assertThat(manifestsTasks).isNotEmpty();
+  }
+
+  @Test
+  public void testIterableCloseTriggersCancel() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    Table table = createTableWithScanPlanning("iterable_close_test");
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+    RESTTable restTable = (RESTTable) table;
+
+    TableScan scan = restTable.newScan();
+    assertThat(scan).isInstanceOf(RESTTableScan.class);
+    RESTTableScan restTableScan = (RESTTableScan) scan;
+
+    // Get the iterable
+    CloseableIterable<FileScanTask> iterable = restTableScan.planFiles();
+
+    // Verify we can close the iterable without exceptions
+    // This tests that cancellation callbacks are properly wired through
+    iterable.close();
+
+    // Verify the scan is still functional
+    boolean cancelled = restTableScan.cancelPlan();
+    assertThat(cancelled).isFalse(); // No active plan to cancel
+  }
+
+  @Test
+  public void testRESTScanPlanningWithPositionDeletes() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("position_deletes_test");
+
+    // Add position deletes that correspond to FILE_A (which was added in 
table creation)
+    table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning - should handle position deletes correctly
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify we get tasks back (specific count depends on implementation)
+      assertThat(tasks).hasSize(1); // 1 data file: FILE_A
+
+      // Verify specific task content and delete file associations
+      FileScanTask taskWithDeletes =
+          tasks.stream()
+              .filter(task -> !task.deletes().isEmpty())
+              .findFirst()
+              .orElseThrow(
+                  () -> new AssertionError("Expected at least one task with 
delete files"));
+
+      assertThat(taskWithDeletes.file().path()).isEqualTo(FILE_A.path());
+      assertThat(taskWithDeletes.deletes()).hasSize(1); // 1 delete file: 
FILE_A_DELETES
+      
assertThat(taskWithDeletes.deletes().get(0).path()).isEqualTo(FILE_A_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithEqualityDeletes() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("equality_deletes_test");
+
+    // Add equality deletes that correspond to FILE_A
+    table.newRowDelta().addDeletes(FILE_A_EQUALITY_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning - should handle equality deletes correctly
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify the task count and file paths
+      assertThat(tasks).hasSize(1); // 1 data file: FILE_A
+
+      // Verify specific task content and equality delete file associations
+      FileScanTask taskWithDeletes =
+          tasks.stream()
+              .filter(task -> !task.deletes().isEmpty())
+              .findFirst()
+              .orElseThrow(
+                  () -> new AssertionError("Expected at least one task with 
delete files"));
+
+      assertThat(taskWithDeletes.file().path()).isEqualTo(FILE_A.path());
+      assertThat(taskWithDeletes.deletes()).hasSize(1); // 1 delete file: 
FILE_A_EQUALITY_DELETES
+      
assertThat(taskWithDeletes.deletes().get(0).path()).isEqualTo(FILE_A_EQUALITY_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithMixedDeletes() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("mixed_deletes_test");
+
+    // Add both position and equality deletes in separate commits
+    table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); // Position 
deletes for FILE_A
+    table
+        .newRowDelta()
+        .addDeletes(FILE_B_EQUALITY_DELETES)
+        .commit(); // Equality deletes for different partition
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning - should handle mixed delete types correctly
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify task count: FILE_A only (FILE_B_EQUALITY_DELETES is in 
different partition)
+      assertThat(tasks).hasSize(1); // 1 data file: FILE_A
+
+      // Verify FILE_A with position deletes (FILE_B_EQUALITY_DELETES not 
associated since no
+      // FILE_B)
+      FileScanTask fileATask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_A.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_A in scan 
tasks"));
+
+      assertThat(fileATask.deletes())
+          .hasSize(1); // 1 delete file: FILE_A_DELETES 
(FILE_B_EQUALITY_DELETES not matched)
+      
assertThat(fileATask.deletes().get(0).path()).isEqualTo(FILE_A_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithMultipleDeleteFiles() throws IOException 
{
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("multiple_deletes_test");
+
+    // Add FILE_B and FILE_C to the table (FILE_A is already added during 
table creation)
+    table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+
+    // Add multiple delete files corresponding to FILE_A, FILE_B, FILE_C
+    table
+        .newRowDelta()
+        .addDeletes(FILE_A_DELETES) // Position delete for FILE_A
+        .addDeletes(FILE_B_DELETES) // Position delete for FILE_B
+        .addDeletes(FILE_C_EQUALITY_DELETES) // Equality delete for FILE_C
+        .commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning with multiple delete files
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify we get tasks back (should have 3 data files: FILE_A, FILE_B, 
FILE_C)
+      assertThat(tasks).hasSize(3); // 3 data files
+
+      // Verify FILE_A with position deletes
+      FileScanTask fileATask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_A.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_A in scan 
tasks"));
+      assertThat(fileATask.deletes()).isNotEmpty(); // Has delete files
+      assertThat(fileATask.deletes().stream().map(DeleteFile::path))
+          .contains(FILE_A_DELETES.path()); // FILE_A_DELETES is present
+
+      // Verify FILE_B with position deletes
+      FileScanTask fileBTask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_B.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_B in scan 
tasks"));
+      assertThat(fileBTask.deletes()).isNotEmpty(); // Has delete files
+      assertThat(fileBTask.deletes().stream().map(DeleteFile::path))
+          .contains(FILE_B_DELETES.path()); // FILE_B_DELETES is present
+
+      // Verify FILE_C with equality deletes
+      FileScanTask fileCTask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_C.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_C in scan 
tasks"));
+      assertThat(fileCTask.deletes()).isNotEmpty(); // Has delete files
+      assertThat(fileCTask.deletes().stream().map(DeleteFile::path))
+          .contains(FILE_C_EQUALITY_DELETES.path()); // 
FILE_C_EQUALITY_DELETES is present
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithDeletesAndFiltering() throws IOException 
{
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("deletes_filtering_test");
+
+    // Add FILE_B to have more data for filtering
+    table.newAppend().appendFile(FILE_B).commit();
+
+    // Add equality delete for FILE_B
+    table.newRowDelta().addDeletes(FILE_B_EQUALITY_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Create a filtered scan and execute scan planning with filtering and 
deletes
+    try (CloseableIterable<FileScanTask> iterable =
+        table.newScan().filter(Expressions.equal("data", "test")).planFiles()) 
{
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify scan planning works with both filtering and deletes
+      assertThat(tasks).hasSize(2); // 2 data files: FILE_A, FILE_B
+
+      // FILE_A should have no delete files
+      FileScanTask fileATask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_A.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_A in scan 
tasks"));
+      assertThat(fileATask.deletes()).isEmpty(); // 0 delete files for FILE_A
+
+      // FILE_B should have FILE_B_EQUALITY_DELETES
+      FileScanTask fileBTask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_B.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_B in scan 
tasks"));
+      assertThat(fileBTask.deletes()).hasSize(1); // 1 delete file: 
FILE_B_EQUALITY_DELETES
+      
assertThat(fileBTask.deletes().get(0).path()).isEqualTo(FILE_B_EQUALITY_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningDeletesCancellation() throws IOException {
+    configurePlanningBehavior(TestPlanningBehavior.Builder::asynchronous);
+    Table table = createTableWithScanPlanning("deletes_cancellation_test");
+
+    // Add deletes to make the scenario more complex
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A_EQUALITY_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+    RESTTableScan restTableScan = restTableScanFor(table);
+
+    // Get the iterable (which may involve async planning with deletes)
+    try (CloseableIterable<FileScanTask> iterable = restTableScan.planFiles();
+        CloseableIterator<FileScanTask> iterator = iterable.iterator()) {
+      // Test cancellation works with delete files present
+      // Resources will be closed automatically
+    }
+
+    // Verify cancellation method is still accessible
+    assertThat(restTableScan.cancelPlan()).isFalse(); // No active plan at 
this point
+  }
+
+  @Test
+  public void testRESTScanPlanningWithTimeTravel() throws IOException {
+    // Test server-side scan planning with time travel (snapshot-based queries)
+    // Verify that snapshot IDs are correctly passed through the REST API
+    // and that historical scans return the correct files and deletes
+
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+
+    TableIdentifier tableId = TableIdentifier.of(NS, "snapshot_scan_test");
+
+    RESTCatalog catalog =

Review Comment:
   should this be calling the util methods that set this up?



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3163,4 +3382,459 @@ private static List<HTTPRequest> 
allRequests(RESTCatalogAdapter adapter) {
     verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), 
any());
     return captor.getAllValues();
   }
+
+  @Test
+  public void testCancelPlanWithNoActivePlan() {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    RESTTableScan restTableScan = restTableScanFor("cancel_test_table");
+
+    // Calling cancel with no active plan should return false
+    assertThat(restTableScan.cancelPlan()).isFalse();
+  }
+
+  @Test
+  public void testCancelPlanEndpointSupport() {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    RESTTableScan restTableScan = restTableScanFor("cancel_support_table");
+
+    // Test that cancelPlan method is available and returns false when no plan 
is active
+    assertThat(restTableScan.cancelPlan()).isFalse();
+  }
+
+  @Test
+  public void testCancelPlanMethodAvailability() {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    RESTTableScan restTableScan = restTableScanFor("cancel_method_table");
+
+    // Test that cancelPlan method is available and callable
+    // When no plan is active, it should return false
+    assertThat(restTableScan.cancelPlan()).isFalse();
+
+    // Verify the method exists and doesn't throw exceptions when called 
multiple times
+    assertThat(restTableScan.cancelPlan()).isFalse();
+  }
+
+  @Test
+  public void testCancelPlanEndpointPath() {
+    TableIdentifier tableId = TableIdentifier.of("test_namespace", 
"test_table");
+    String planId = "plan-abc-123";
+    ResourcePaths paths = new ResourcePaths("test-prefix");
+
+    // Test that the cancel plan path is generated correctly
+    String cancelPath = paths.plan(tableId, planId);
+
+    assertThat(cancelPath)
+        
.isEqualTo("v1/test-prefix/namespaces/test_namespace/tables/test_table/plan/plan-abc-123");
+
+    // Test with different identifiers
+    TableIdentifier complexId = TableIdentifier.of(Namespace.of("db", 
"schema"), "my_table");
+    String complexPath = paths.plan(complexId, "plan-xyz-789");
+
+    assertThat(complexPath).contains("/plan/plan-xyz-789");
+    assertThat(complexPath).contains("db%1Fschema"); // URL encoded namespace 
separator
+  }
+
+  @Test
+  public void testIteratorCloseTriggersCancel() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    Table table = createTableWithScanPlanning("iterator_close_table");
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+    RESTTable restTable = (RESTTable) table;
+
+    TableScan scan = restTable.newScan();
+    assertThat(scan).isInstanceOf(RESTTableScan.class);
+    boolean cancelled = isCancelled((RESTTableScan) scan);
+    assertThat(cancelled).isFalse(); // No active plan to cancel
+  }
+
+  private static boolean isCancelled(RESTTableScan scan) throws IOException {
+
+    // Get the iterable and iterator
+    CloseableIterable<FileScanTask> iterable = scan.planFiles();
+    CloseableIterator<FileScanTask> iterator = iterable.iterator();
+
+    // Verify we can close the iterator without exceptions
+    // The cancellation callback will be called (though no active plan exists)
+    iterator.close();
+
+    // Verify we can still call cancelPlan on the scan
+    return scan.cancelPlan();
+  }
+
+  @Test
+  public void testMetadataTablesWithRemotePlanning() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("metadata_tables_test");
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Test that metadata tables work with remote planning
+    // Files metadata table
+    Table filesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.FILES);
+    assertThat(filesTable).isNotNull();
+
+    TableScan filesScan = filesTable.newScan();
+    assertThat(filesScan).isNotNull();
+
+    // Verify metadata table scan works (should not use REST scan planning)
+    CloseableIterable<FileScanTask> filesIterable = filesScan.planFiles();
+    List<FileScanTask> filesTasks = Lists.newArrayList(filesIterable);
+    assertThat(filesTasks).isNotEmpty();
+
+    // Snapshots metadata table
+    Table snapshotsTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.SNAPSHOTS);
+    assertThat(snapshotsTable).isNotNull();
+
+    TableScan snapshotsScan = snapshotsTable.newScan();
+    CloseableIterable<FileScanTask> snapshotsIterable = 
snapshotsScan.planFiles();
+    List<FileScanTask> snapshotsTasks = Lists.newArrayList(snapshotsIterable);
+    assertThat(snapshotsTasks).isNotEmpty();
+
+    // Manifests metadata table
+    Table manifestsTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.MANIFESTS);
+    assertThat(manifestsTable).isNotNull();
+
+    TableScan manifestsScan = manifestsTable.newScan();
+    CloseableIterable<FileScanTask> manifestsIterable = 
manifestsScan.planFiles();
+    List<FileScanTask> manifestsTasks = Lists.newArrayList(manifestsIterable);
+    assertThat(manifestsTasks).isNotEmpty();
+  }
+
+  @Test
+  public void testIterableCloseTriggersCancel() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    Table table = createTableWithScanPlanning("iterable_close_test");
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+    RESTTable restTable = (RESTTable) table;
+
+    TableScan scan = restTable.newScan();
+    assertThat(scan).isInstanceOf(RESTTableScan.class);
+    RESTTableScan restTableScan = (RESTTableScan) scan;
+
+    // Get the iterable
+    CloseableIterable<FileScanTask> iterable = restTableScan.planFiles();
+
+    // Verify we can close the iterable without exceptions
+    // This tests that cancellation callbacks are properly wired through
+    iterable.close();
+
+    // Verify the scan is still functional
+    boolean cancelled = restTableScan.cancelPlan();
+    assertThat(cancelled).isFalse(); // No active plan to cancel
+  }
+
+  @Test
+  public void testRESTScanPlanningWithPositionDeletes() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("position_deletes_test");
+
+    // Add position deletes that correspond to FILE_A (which was added in 
table creation)
+    table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning - should handle position deletes correctly
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify we get tasks back (specific count depends on implementation)
+      assertThat(tasks).hasSize(1); // 1 data file: FILE_A
+
+      // Verify specific task content and delete file associations
+      FileScanTask taskWithDeletes =
+          tasks.stream()
+              .filter(task -> !task.deletes().isEmpty())
+              .findFirst()
+              .orElseThrow(
+                  () -> new AssertionError("Expected at least one task with 
delete files"));
+
+      assertThat(taskWithDeletes.file().path()).isEqualTo(FILE_A.path());
+      assertThat(taskWithDeletes.deletes()).hasSize(1); // 1 delete file: 
FILE_A_DELETES
+      
assertThat(taskWithDeletes.deletes().get(0).path()).isEqualTo(FILE_A_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithEqualityDeletes() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("equality_deletes_test");
+
+    // Add equality deletes that correspond to FILE_A
+    table.newRowDelta().addDeletes(FILE_A_EQUALITY_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning - should handle equality deletes correctly
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify the task count and file paths
+      assertThat(tasks).hasSize(1); // 1 data file: FILE_A
+
+      // Verify specific task content and equality delete file associations
+      FileScanTask taskWithDeletes =
+          tasks.stream()
+              .filter(task -> !task.deletes().isEmpty())
+              .findFirst()
+              .orElseThrow(
+                  () -> new AssertionError("Expected at least one task with 
delete files"));
+
+      assertThat(taskWithDeletes.file().path()).isEqualTo(FILE_A.path());
+      assertThat(taskWithDeletes.deletes()).hasSize(1); // 1 delete file: 
FILE_A_EQUALITY_DELETES
+      
assertThat(taskWithDeletes.deletes().get(0).path()).isEqualTo(FILE_A_EQUALITY_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithMixedDeletes() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("mixed_deletes_test");
+
+    // Add both position and equality deletes in separate commits
+    table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); // Position 
deletes for FILE_A
+    table
+        .newRowDelta()
+        .addDeletes(FILE_B_EQUALITY_DELETES)
+        .commit(); // Equality deletes for different partition
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning - should handle mixed delete types correctly
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify task count: FILE_A only (FILE_B_EQUALITY_DELETES is in 
different partition)
+      assertThat(tasks).hasSize(1); // 1 data file: FILE_A
+
+      // Verify FILE_A with position deletes (FILE_B_EQUALITY_DELETES not 
associated since no
+      // FILE_B)
+      FileScanTask fileATask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_A.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_A in scan 
tasks"));
+
+      assertThat(fileATask.deletes())
+          .hasSize(1); // 1 delete file: FILE_A_DELETES 
(FILE_B_EQUALITY_DELETES not matched)
+      
assertThat(fileATask.deletes().get(0).path()).isEqualTo(FILE_A_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithMultipleDeleteFiles() throws IOException 
{
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("multiple_deletes_test");
+
+    // Add FILE_B and FILE_C to the table (FILE_A is already added during 
table creation)
+    table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+
+    // Add multiple delete files corresponding to FILE_A, FILE_B, FILE_C
+    table
+        .newRowDelta()
+        .addDeletes(FILE_A_DELETES) // Position delete for FILE_A
+        .addDeletes(FILE_B_DELETES) // Position delete for FILE_B
+        .addDeletes(FILE_C_EQUALITY_DELETES) // Equality delete for FILE_C
+        .commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning with multiple delete files
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify we get tasks back (should have 3 data files: FILE_A, FILE_B, 
FILE_C)
+      assertThat(tasks).hasSize(3); // 3 data files
+
+      // Verify FILE_A with position deletes
+      FileScanTask fileATask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_A.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_A in scan 
tasks"));
+      assertThat(fileATask.deletes()).isNotEmpty(); // Has delete files
+      assertThat(fileATask.deletes().stream().map(DeleteFile::path))
+          .contains(FILE_A_DELETES.path()); // FILE_A_DELETES is present
+
+      // Verify FILE_B with position deletes
+      FileScanTask fileBTask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_B.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_B in scan 
tasks"));
+      assertThat(fileBTask.deletes()).isNotEmpty(); // Has delete files
+      assertThat(fileBTask.deletes().stream().map(DeleteFile::path))
+          .contains(FILE_B_DELETES.path()); // FILE_B_DELETES is present
+
+      // Verify FILE_C with equality deletes
+      FileScanTask fileCTask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_C.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_C in scan 
tasks"));
+      assertThat(fileCTask.deletes()).isNotEmpty(); // Has delete files
+      assertThat(fileCTask.deletes().stream().map(DeleteFile::path))
+          .contains(FILE_C_EQUALITY_DELETES.path()); // 
FILE_C_EQUALITY_DELETES is present
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithDeletesAndFiltering() throws IOException 
{
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("deletes_filtering_test");
+
+    // Add FILE_B to have more data for filtering
+    table.newAppend().appendFile(FILE_B).commit();
+
+    // Add equality delete for FILE_B
+    table.newRowDelta().addDeletes(FILE_B_EQUALITY_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Create a filtered scan and execute scan planning with filtering and 
deletes
+    try (CloseableIterable<FileScanTask> iterable =
+        table.newScan().filter(Expressions.equal("data", "test")).planFiles()) 
{
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify scan planning works with both filtering and deletes
+      assertThat(tasks).hasSize(2); // 2 data files: FILE_A, FILE_B
+
+      // FILE_A should have no delete files
+      FileScanTask fileATask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_A.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_A in scan 
tasks"));
+      assertThat(fileATask.deletes()).isEmpty(); // 0 delete files for FILE_A
+
+      // FILE_B should have FILE_B_EQUALITY_DELETES
+      FileScanTask fileBTask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_B.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_B in scan 
tasks"));
+      assertThat(fileBTask.deletes()).hasSize(1); // 1 delete file: 
FILE_B_EQUALITY_DELETES
+      
assertThat(fileBTask.deletes().get(0).path()).isEqualTo(FILE_B_EQUALITY_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningDeletesCancellation() throws IOException {
+    configurePlanningBehavior(TestPlanningBehavior.Builder::asynchronous);
+    Table table = createTableWithScanPlanning("deletes_cancellation_test");
+
+    // Add deletes to make the scenario more complex
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A_EQUALITY_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+    RESTTableScan restTableScan = restTableScanFor(table);
+
+    // Get the iterable (which may involve async planning with deletes)
+    try (CloseableIterable<FileScanTask> iterable = restTableScan.planFiles();
+        CloseableIterator<FileScanTask> iterator = iterable.iterator()) {
+      // Test cancellation works with delete files present
+      // Resources will be closed automatically
+    }
+
+    // Verify cancellation method is still accessible
+    assertThat(restTableScan.cancelPlan()).isFalse(); // No active plan at 
this point
+  }
+
+  @Test
+  public void testRESTScanPlanningWithTimeTravel() throws IOException {
+    // Test server-side scan planning with time travel (snapshot-based queries)
+    // Verify that snapshot IDs are correctly passed through the REST API
+    // and that historical scans return the correct files and deletes
+
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+
+    TableIdentifier tableId = TableIdentifier.of(NS, "snapshot_scan_test");
+
+    RESTCatalog catalog =
+        initCatalog(
+            "prod", 
ImmutableMap.of(RESTCatalogProperties.REST_SERVER_PLANNING_ENABLED, "true"));
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(tableId.namespace());
+    }
+
+    // Create table and add FILE_A (snapshot 1)
+    Table table = catalog.buildTable(tableId, 
SCHEMA).withPartitionSpec(SPEC).create();
+    // Assert that we have a RESTTable
+    assertThat(table).isInstanceOf(RESTTable.class);

Review Comment:
   `restTableFor`?



##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3163,4 +3382,459 @@ private static List<HTTPRequest> 
allRequests(RESTCatalogAdapter adapter) {
     verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), 
any());
     return captor.getAllValues();
   }
+
+  @Test
+  public void testCancelPlanWithNoActivePlan() {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    RESTTableScan restTableScan = restTableScanFor("cancel_test_table");
+
+    // Calling cancel with no active plan should return false
+    assertThat(restTableScan.cancelPlan()).isFalse();
+  }
+
+  @Test
+  public void testCancelPlanEndpointSupport() {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    RESTTableScan restTableScan = restTableScanFor("cancel_support_table");
+
+    // Test that cancelPlan method is available and returns false when no plan 
is active
+    assertThat(restTableScan.cancelPlan()).isFalse();
+  }
+
+  @Test
+  public void testCancelPlanMethodAvailability() {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    RESTTableScan restTableScan = restTableScanFor("cancel_method_table");
+
+    // Test that cancelPlan method is available and callable
+    // When no plan is active, it should return false
+    assertThat(restTableScan.cancelPlan()).isFalse();
+
+    // Verify the method exists and doesn't throw exceptions when called 
multiple times
+    assertThat(restTableScan.cancelPlan()).isFalse();
+  }
+
+  @Test
+  public void testCancelPlanEndpointPath() {
+    TableIdentifier tableId = TableIdentifier.of("test_namespace", 
"test_table");
+    String planId = "plan-abc-123";
+    ResourcePaths paths = new ResourcePaths("test-prefix");
+
+    // Test that the cancel plan path is generated correctly
+    String cancelPath = paths.plan(tableId, planId);
+
+    assertThat(cancelPath)
+        
.isEqualTo("v1/test-prefix/namespaces/test_namespace/tables/test_table/plan/plan-abc-123");
+
+    // Test with different identifiers
+    TableIdentifier complexId = TableIdentifier.of(Namespace.of("db", 
"schema"), "my_table");
+    String complexPath = paths.plan(complexId, "plan-xyz-789");
+
+    assertThat(complexPath).contains("/plan/plan-xyz-789");
+    assertThat(complexPath).contains("db%1Fschema"); // URL encoded namespace 
separator
+  }
+
+  @Test
+  public void testIteratorCloseTriggersCancel() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    Table table = createTableWithScanPlanning("iterator_close_table");
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+    RESTTable restTable = (RESTTable) table;
+
+    TableScan scan = restTable.newScan();
+    assertThat(scan).isInstanceOf(RESTTableScan.class);
+    boolean cancelled = isCancelled((RESTTableScan) scan);
+    assertThat(cancelled).isFalse(); // No active plan to cancel
+  }
+
+  private static boolean isCancelled(RESTTableScan scan) throws IOException {
+
+    // Get the iterable and iterator
+    CloseableIterable<FileScanTask> iterable = scan.planFiles();
+    CloseableIterator<FileScanTask> iterator = iterable.iterator();
+
+    // Verify we can close the iterator without exceptions
+    // The cancellation callback will be called (though no active plan exists)
+    iterator.close();
+
+    // Verify we can still call cancelPlan on the scan
+    return scan.cancelPlan();
+  }
+
+  @Test
+  public void testMetadataTablesWithRemotePlanning() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("metadata_tables_test");
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Test that metadata tables work with remote planning
+    // Files metadata table
+    Table filesTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.FILES);
+    assertThat(filesTable).isNotNull();
+
+    TableScan filesScan = filesTable.newScan();
+    assertThat(filesScan).isNotNull();
+
+    // Verify metadata table scan works (should not use REST scan planning)
+    CloseableIterable<FileScanTask> filesIterable = filesScan.planFiles();
+    List<FileScanTask> filesTasks = Lists.newArrayList(filesIterable);
+    assertThat(filesTasks).isNotEmpty();
+
+    // Snapshots metadata table
+    Table snapshotsTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.SNAPSHOTS);
+    assertThat(snapshotsTable).isNotNull();
+
+    TableScan snapshotsScan = snapshotsTable.newScan();
+    CloseableIterable<FileScanTask> snapshotsIterable = 
snapshotsScan.planFiles();
+    List<FileScanTask> snapshotsTasks = Lists.newArrayList(snapshotsIterable);
+    assertThat(snapshotsTasks).isNotEmpty();
+
+    // Manifests metadata table
+    Table manifestsTable =
+        MetadataTableUtils.createMetadataTableInstance(table, 
MetadataTableType.MANIFESTS);
+    assertThat(manifestsTable).isNotNull();
+
+    TableScan manifestsScan = manifestsTable.newScan();
+    CloseableIterable<FileScanTask> manifestsIterable = 
manifestsScan.planFiles();
+    List<FileScanTask> manifestsTasks = Lists.newArrayList(manifestsIterable);
+    assertThat(manifestsTasks).isNotEmpty();
+  }
+
+  @Test
+  public void testIterableCloseTriggersCancel() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    Table table = createTableWithScanPlanning("iterable_close_test");
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+    RESTTable restTable = (RESTTable) table;
+
+    TableScan scan = restTable.newScan();
+    assertThat(scan).isInstanceOf(RESTTableScan.class);
+    RESTTableScan restTableScan = (RESTTableScan) scan;
+
+    // Get the iterable
+    CloseableIterable<FileScanTask> iterable = restTableScan.planFiles();
+
+    // Verify we can close the iterable without exceptions
+    // This tests that cancellation callbacks are properly wired through
+    iterable.close();
+
+    // Verify the scan is still functional
+    boolean cancelled = restTableScan.cancelPlan();
+    assertThat(cancelled).isFalse(); // No active plan to cancel
+  }
+
+  @Test
+  public void testRESTScanPlanningWithPositionDeletes() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("position_deletes_test");
+
+    // Add position deletes that correspond to FILE_A (which was added in 
table creation)
+    table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning - should handle position deletes correctly
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify we get tasks back (specific count depends on implementation)
+      assertThat(tasks).hasSize(1); // 1 data file: FILE_A
+
+      // Verify specific task content and delete file associations
+      FileScanTask taskWithDeletes =
+          tasks.stream()
+              .filter(task -> !task.deletes().isEmpty())
+              .findFirst()
+              .orElseThrow(
+                  () -> new AssertionError("Expected at least one task with 
delete files"));
+
+      assertThat(taskWithDeletes.file().path()).isEqualTo(FILE_A.path());
+      assertThat(taskWithDeletes.deletes()).hasSize(1); // 1 delete file: 
FILE_A_DELETES
+      
assertThat(taskWithDeletes.deletes().get(0).path()).isEqualTo(FILE_A_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithEqualityDeletes() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("equality_deletes_test");
+
+    // Add equality deletes that correspond to FILE_A
+    table.newRowDelta().addDeletes(FILE_A_EQUALITY_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning - should handle equality deletes correctly
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify the task count and file paths
+      assertThat(tasks).hasSize(1); // 1 data file: FILE_A
+
+      // Verify specific task content and equality delete file associations
+      FileScanTask taskWithDeletes =
+          tasks.stream()
+              .filter(task -> !task.deletes().isEmpty())
+              .findFirst()
+              .orElseThrow(
+                  () -> new AssertionError("Expected at least one task with 
delete files"));
+
+      assertThat(taskWithDeletes.file().path()).isEqualTo(FILE_A.path());
+      assertThat(taskWithDeletes.deletes()).hasSize(1); // 1 delete file: 
FILE_A_EQUALITY_DELETES
+      
assertThat(taskWithDeletes.deletes().get(0).path()).isEqualTo(FILE_A_EQUALITY_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithMixedDeletes() throws IOException {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("mixed_deletes_test");
+
+    // Add both position and equality deletes in separate commits
+    table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); // Position 
deletes for FILE_A
+    table
+        .newRowDelta()
+        .addDeletes(FILE_B_EQUALITY_DELETES)
+        .commit(); // Equality deletes for different partition
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning - should handle mixed delete types correctly
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify task count: FILE_A only (FILE_B_EQUALITY_DELETES is in 
different partition)
+      assertThat(tasks).hasSize(1); // 1 data file: FILE_A
+
+      // Verify FILE_A with position deletes (FILE_B_EQUALITY_DELETES not 
associated since no
+      // FILE_B)
+      FileScanTask fileATask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_A.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_A in scan 
tasks"));
+
+      assertThat(fileATask.deletes())
+          .hasSize(1); // 1 delete file: FILE_A_DELETES 
(FILE_B_EQUALITY_DELETES not matched)
+      
assertThat(fileATask.deletes().get(0).path()).isEqualTo(FILE_A_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithMultipleDeleteFiles() throws IOException 
{
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("multiple_deletes_test");
+
+    // Add FILE_B and FILE_C to the table (FILE_A is already added during 
table creation)
+    table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+
+    // Add multiple delete files corresponding to FILE_A, FILE_B, FILE_C
+    table
+        .newRowDelta()
+        .addDeletes(FILE_A_DELETES) // Position delete for FILE_A
+        .addDeletes(FILE_B_DELETES) // Position delete for FILE_B
+        .addDeletes(FILE_C_EQUALITY_DELETES) // Equality delete for FILE_C
+        .commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Execute scan planning with multiple delete files
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify we get tasks back (should have 3 data files: FILE_A, FILE_B, 
FILE_C)
+      assertThat(tasks).hasSize(3); // 3 data files
+
+      // Verify FILE_A with position deletes
+      FileScanTask fileATask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_A.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_A in scan 
tasks"));
+      assertThat(fileATask.deletes()).isNotEmpty(); // Has delete files
+      assertThat(fileATask.deletes().stream().map(DeleteFile::path))
+          .contains(FILE_A_DELETES.path()); // FILE_A_DELETES is present
+
+      // Verify FILE_B with position deletes
+      FileScanTask fileBTask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_B.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_B in scan 
tasks"));
+      assertThat(fileBTask.deletes()).isNotEmpty(); // Has delete files
+      assertThat(fileBTask.deletes().stream().map(DeleteFile::path))
+          .contains(FILE_B_DELETES.path()); // FILE_B_DELETES is present
+
+      // Verify FILE_C with equality deletes
+      FileScanTask fileCTask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_C.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_C in scan 
tasks"));
+      assertThat(fileCTask.deletes()).isNotEmpty(); // Has delete files
+      assertThat(fileCTask.deletes().stream().map(DeleteFile::path))
+          .contains(FILE_C_EQUALITY_DELETES.path()); // 
FILE_C_EQUALITY_DELETES is present
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningWithDeletesAndFiltering() throws IOException 
{
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+    Table table = createTableWithScanPlanning("deletes_filtering_test");
+
+    // Add FILE_B to have more data for filtering
+    table.newAppend().appendFile(FILE_B).commit();
+
+    // Add equality delete for FILE_B
+    table.newRowDelta().addDeletes(FILE_B_EQUALITY_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+
+    // Create a filtered scan and execute scan planning with filtering and 
deletes
+    try (CloseableIterable<FileScanTask> iterable =
+        table.newScan().filter(Expressions.equal("data", "test")).planFiles()) 
{
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      // Verify scan planning works with both filtering and deletes
+      assertThat(tasks).hasSize(2); // 2 data files: FILE_A, FILE_B
+
+      // FILE_A should have no delete files
+      FileScanTask fileATask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_A.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_A in scan 
tasks"));
+      assertThat(fileATask.deletes()).isEmpty(); // 0 delete files for FILE_A
+
+      // FILE_B should have FILE_B_EQUALITY_DELETES
+      FileScanTask fileBTask =
+          tasks.stream()
+              .filter(task -> task.file().path().equals(FILE_B.path()))
+              .findFirst()
+              .orElseThrow(() -> new AssertionError("Expected FILE_B in scan 
tasks"));
+      assertThat(fileBTask.deletes()).hasSize(1); // 1 delete file: 
FILE_B_EQUALITY_DELETES
+      
assertThat(fileBTask.deletes().get(0).path()).isEqualTo(FILE_B_EQUALITY_DELETES.path());
+    }
+  }
+
+  @Test
+  public void testRESTScanPlanningDeletesCancellation() throws IOException {
+    configurePlanningBehavior(TestPlanningBehavior.Builder::asynchronous);
+    Table table = createTableWithScanPlanning("deletes_cancellation_test");
+
+    // Add deletes to make the scenario more complex
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_A_EQUALITY_DELETES).commit();
+
+    // Ensure we have a RESTTable with server-side planning enabled
+    assertThat(table).isInstanceOf(RESTTable.class);
+    RESTTableScan restTableScan = restTableScanFor(table);
+
+    // Get the iterable (which may involve async planning with deletes)
+    try (CloseableIterable<FileScanTask> iterable = restTableScan.planFiles();
+        CloseableIterator<FileScanTask> iterator = iterable.iterator()) {
+      // Test cancellation works with delete files present
+      // Resources will be closed automatically
+    }
+
+    // Verify cancellation method is still accessible
+    assertThat(restTableScan.cancelPlan()).isFalse(); // No active plan at 
this point
+  }
+
+  @Test
+  public void testRESTScanPlanningWithTimeTravel() throws IOException {
+    // Test server-side scan planning with time travel (snapshot-based queries)
+    // Verify that snapshot IDs are correctly passed through the REST API
+    // and that historical scans return the correct files and deletes
+
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithAllTasks);
+
+    TableIdentifier tableId = TableIdentifier.of(NS, "snapshot_scan_test");
+
+    RESTCatalog catalog =
+        initCatalog(
+            "prod", 
ImmutableMap.of(RESTCatalogProperties.REST_SERVER_PLANNING_ENABLED, "true"));
+    if (requiresNamespaceCreate()) {
+      catalog.createNamespace(tableId.namespace());
+    }
+
+    // Create table and add FILE_A (snapshot 1)
+    Table table = catalog.buildTable(tableId, 
SCHEMA).withPartitionSpec(SPEC).create();
+    // Assert that we have a RESTTable
+    assertThat(table).isInstanceOf(RESTTable.class);
+    table.newAppend().appendFile(FILE_A).commit();
+    table.refresh();
+    long snapshot1Id = table.currentSnapshot().snapshotId();
+
+    // Add FILE_B (snapshot 2)
+    table.newAppend().appendFile(FILE_B).commit();
+    table.refresh();
+    long snapshot2Id = table.currentSnapshot().snapshotId();
+    assertThat(snapshot2Id).isNotEqualTo(snapshot1Id);
+
+    // Add FILE_C and deletes (snapshots 3 and 4)
+    table.newAppend().appendFile(FILE_C).commit();
+    table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+    table.refresh();
+    long snapshot4Id = table.currentSnapshot().snapshotId();
+    assertThat(snapshot4Id).isNotEqualTo(snapshot2Id);
+
+    // Test 1: Scan at snapshot 1 (should only see FILE_A, no deletes)
+    TableScan scan1 = table.newScan().useSnapshot(snapshot1Id);
+    CloseableIterable<FileScanTask> iterable1 = scan1.planFiles();
+    List<FileScanTask> tasks1 = Lists.newArrayList(iterable1);
+
+    assertThat(tasks1).hasSize(1); // Only FILE_A exists at snapshot 1
+    assertThat(tasks1.get(0).file().path()).isEqualTo(FILE_A.path());
+    assertThat(tasks1.get(0).deletes()).isEmpty(); // No deletes at snapshot 1
+
+    // Test 2: Scan at snapshot 2 (should see FILE_A and FILE_B, no deletes)
+    TableScan scan2 = table.newScan().useSnapshot(snapshot2Id);
+    CloseableIterable<FileScanTask> iterable2 = scan2.planFiles();
+    List<FileScanTask> tasks2 = Lists.newArrayList(iterable2);
+
+    assertThat(tasks2).hasSize(2); // FILE_A and FILE_B exist at snapshot 2
+    assertThat(tasks2.stream().map(task -> task.file().path()))
+        .containsExactlyInAnyOrder(FILE_A.path(), FILE_B.path());
+    assertThat(tasks2.stream().allMatch(task -> task.deletes().isEmpty()))

Review Comment:
   ```suggestion
       assertThat(tasks2).allMatch(task -> task.deletes().isEmpty());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to