nastra commented on code in PR #13400:
URL: https://github.com/apache/iceberg/pull/13400#discussion_r2592324740


##########
core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java:
##########
@@ -0,0 +1,1001 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import static org.apache.iceberg.TestBase.FILE_A;
+import static org.apache.iceberg.TestBase.FILE_A_DELETES;
+import static org.apache.iceberg.TestBase.FILE_A_EQUALITY_DELETES;
+import static org.apache.iceberg.TestBase.FILE_B;
+import static org.apache.iceberg.TestBase.FILE_B_DELETES;
+import static org.apache.iceberg.TestBase.FILE_B_EQUALITY_DELETES;
+import static org.apache.iceberg.TestBase.FILE_C;
+import static org.apache.iceberg.TestBase.FILE_C_EQUALITY_DELETES;
+import static org.apache.iceberg.TestBase.SCHEMA;
+import static org.apache.iceberg.TestBase.SPEC;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Scan;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SessionCatalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.gzip.GzipHandler;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.Mockito;
+
+public class TestRESTScanPlanning {
+  private static final ObjectMapper MAPPER = RESTObjectMapper.mapper();
+  private static final Namespace NS = Namespace.of("ns");
+
+  private InMemoryCatalog backendCatalog;
+  private Server httpServer;
+  private RESTCatalogAdapter adapterForRESTServer;
+  private ParserContext parserContext;
+  @TempDir private Path temp;
+  private RESTCatalog restCatalogWithScanPlanning;
+
+  @BeforeEach
+  public void setupCatalogs() throws Exception {
+    File warehouse = temp.toFile();
+    this.backendCatalog = new InMemoryCatalog();
+    this.backendCatalog.initialize(
+        "in-memory",
+        ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, 
warehouse.getAbsolutePath()));
+
+    adapterForRESTServer =
+        Mockito.spy(
+            new RESTCatalogAdapter(backendCatalog) {
+              @Override
+              public <T extends RESTResponse> T execute(
+                  HTTPRequest request,
+                  Class<T> responseType,
+                  Consumer<ErrorResponse> errorHandler,
+                  Consumer<Map<String, String>> responseHeaders) {
+                if (ResourcePaths.config().equals(request.path())) {
+                  return castResponse(
+                      responseType,
+                      ConfigResponse.builder()
+                          .withEndpoints(
+                              Arrays.stream(Route.values())
+                                  .map(r -> Endpoint.create(r.method().name(), 
r.resourcePath()))
+                                  .collect(Collectors.toList()))
+                          .withOverrides(
+                              ImmutableMap.of(
+                                  
RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true"))
+                          .build());
+                }
+                Object body = roundTripSerialize(request.body(), "request");
+                HTTPRequest req = 
ImmutableHTTPRequest.builder().from(request).body(body).build();
+                T response = super.execute(req, responseType, errorHandler, 
responseHeaders);
+                return roundTripSerialize(response, "response");
+              }
+            });
+
+    ServletContextHandler servletContext =
+        new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+    servletContext.addServlet(
+        new ServletHolder(new RESTCatalogServlet(adapterForRESTServer)), "/*");
+    servletContext.setHandler(new GzipHandler());
+
+    this.httpServer = new Server(new 
InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
+    httpServer.setHandler(servletContext);
+    httpServer.start();
+
+    // Initialize catalog with scan planning enabled
+    this.restCatalogWithScanPlanning = initCatalog("prod-with-scan-planning", 
ImmutableMap.of());
+  }
+
+  @AfterEach
+  public void teardownCatalogs() throws Exception {
+    if (restCatalogWithScanPlanning != null) {
+      restCatalogWithScanPlanning.close();
+    }
+
+    if (backendCatalog != null) {
+      backendCatalog.close();
+    }
+
+    if (httpServer != null) {
+      httpServer.stop();
+      httpServer.join();
+    }
+  }
+
+  // ==================== Helper Methods ====================
+
+  private RESTCatalog initCatalog(String catalogName, Map<String, String> 
additionalProperties) {
+    RESTCatalog catalog =
+        new RESTCatalog(
+            SessionCatalog.SessionContext.createEmpty(),
+            (config) ->
+                HTTPClient.builder(config)
+                    .uri(config.get(CatalogProperties.URI))
+                    .withHeaders(RESTUtil.configHeaders(config))
+                    .build());
+    catalog.setConf(new Configuration());
+    Map<String, String> properties =
+        ImmutableMap.of(
+            CatalogProperties.URI,
+            httpServer.getURI().toString(),
+            CatalogProperties.FILE_IO_IMPL,
+            "org.apache.iceberg.inmemory.InMemoryFileIO");
+    catalog.initialize(
+        catalogName,
+        ImmutableMap.<String, String>builder()
+            .putAll(properties)
+            .putAll(additionalProperties)
+            .build());
+    return catalog;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> T roundTripSerialize(T payload, String description) {
+    if (payload == null) {
+      return null;
+    }
+
+    try {
+      if (payload instanceof RESTMessage) {
+        RESTMessage message = (RESTMessage) payload;
+        ObjectReader reader = MAPPER.readerFor(message.getClass());
+        if (parserContext != null && !parserContext.isEmpty()) {
+          reader = reader.with(parserContext.toInjectableValues());
+        }
+        return reader.readValue(MAPPER.writeValueAsString(message));
+      } else {
+        // use Map so that Jackson doesn't try to instantiate ImmutableMap 
from payload.getClass()
+        return (T) MAPPER.readValue(MAPPER.writeValueAsString(payload), 
Map.class);
+      }
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(
+          String.format("Failed to serialize and deserialize %s: %s", 
description, payload), e);
+    }
+  }
+
+  private void setParserContext(Table table) {
+    parserContext =
+        ParserContext.builder().add("specsById", 
table.specs()).add("caseSensitive", false).build();
+  }
+
+  private RESTCatalog scanPlanningCatalog() {
+    return restCatalogWithScanPlanning;
+  }
+
+  private void configurePlanningBehavior(
+      Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> 
configurator) {
+    TestPlanningBehavior.Builder builder = TestPlanningBehavior.builder();
+    
adapterForRESTServer.setPlanningBehavior(configurator.apply(builder).build());
+  }
+
+  private Table createTableWithScanPlanning(RESTCatalog catalog, String 
tableName) {
+    return createTableWithScanPlanning(catalog, TableIdentifier.of(NS, 
tableName));
+  }
+
+  private Table createTableWithScanPlanning(RESTCatalog catalog, 
TableIdentifier identifier) {
+    catalog.createNamespace(identifier.namespace());
+    return catalog.buildTable(identifier, 
SCHEMA).withPartitionSpec(SPEC).create();
+  }
+
+  private RESTTable restTableFor(RESTCatalog catalog, String tableName) {
+    Table table = createTableWithScanPlanning(catalog, tableName);
+    table.newAppend().appendFile(FILE_A).commit();
+    assertThat(table).isInstanceOf(RESTTable.class);
+    return (RESTTable) table;
+  }
+
+  private RESTTableScan restTableScanFor(Table table) {
+    assertThat(table).isInstanceOf(RESTTable.class);
+    RESTTable restTable = (RESTTable) table;
+    TableScan scan = restTable.newScan();
+    assertThat(scan).isInstanceOf(RESTTableScan.class);
+    return (RESTTableScan) scan;
+  }
+
+  // ==================== Test Planning Behavior ====================
+
+  /** Enum for parameterized tests to test both synchronous and asynchronous 
planning modes. */
+  private enum PlanningMode
+      implements Function<TestPlanningBehavior.Builder, 
TestPlanningBehavior.Builder> {
+    SYNCHRONOUS(TestPlanningBehavior.Builder::synchronous),
+    ASYNCHRONOUS(TestPlanningBehavior.Builder::asynchronous);
+
+    private final Function<TestPlanningBehavior.Builder, 
TestPlanningBehavior.Builder> configurer;
+
+    PlanningMode(Function<TestPlanningBehavior.Builder, 
TestPlanningBehavior.Builder> configurer) {
+      this.configurer = configurer;
+    }
+
+    @Override
+    public TestPlanningBehavior.Builder apply(TestPlanningBehavior.Builder 
builder) {
+      return this.configurer.apply(builder);
+    }
+  }
+
+  private static class TestPlanningBehavior implements 
RESTCatalogAdapter.PlanningBehavior {
+    private final boolean asyncPlanning;
+    private final int tasksPerPage;
+
+    private TestPlanningBehavior(boolean asyncPlanning, int tasksPerPage) {
+      this.asyncPlanning = asyncPlanning;
+      this.tasksPerPage = tasksPerPage;
+    }
+
+    private static Builder builder() {
+      return new Builder();
+    }
+
+    @Override
+    public boolean shouldPlanTableScanAsync(Scan<?, FileScanTask, ?> scan) {
+      return asyncPlanning;
+    }
+
+    @Override
+    public int numberFileScanTasksPerPlanTask() {
+      return tasksPerPage;
+    }
+
+    protected static class Builder {
+      private boolean asyncPlanning;
+      private int tasksPerPage;
+
+      Builder asyncPlanning(boolean async) {
+        asyncPlanning = async;
+        return this;
+      }
+
+      Builder tasksPerPage(int tasks) {
+        tasksPerPage = tasks;
+        return this;
+      }
+
+      // Convenience methods for common test scenarios
+      Builder synchronous() {
+        return asyncPlanning(false).tasksPerPage(100);
+      }
+
+      Builder synchronousWithPagination() {
+        return asyncPlanning(false).tasksPerPage(1);
+      }
+
+      Builder asynchronous() {
+        return asyncPlanning(true).tasksPerPage(100);
+      }
+
+      TestPlanningBehavior build() {
+        return new TestPlanningBehavior(asyncPlanning, tasksPerPage);
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(PlanningMode.class)
+  void scanPlanningWithAllTasksInSingleResponse(
+      Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> 
planMode)
+      throws IOException {
+    configurePlanningBehavior(planMode);
+    Table table = restTableFor(scanPlanningCatalog(), "all_tasks_table");
+    setParserContext(table);
+
+    // Verify actual data file is returned with correct count
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+      assertThat(tasks).hasSize(1);
+      assertThat(tasks.get(0).file().location()).isEqualTo(FILE_A.location());
+      assertThat(tasks.get(0).deletes()).isEmpty();
+    }
+  }
+
+  @Test
+  public void nestedPlanTaskPagination() throws IOException {
+    // Configure: synchronous planning with very small pages (creates nested 
plan task structure)
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+
+    Table table = restTableFor(scanPlanningCatalog(), 
"nested_plan_task_table");
+    // add one more files for proper pagination
+    table.newFastAppend().appendFile(FILE_B).commit();
+    setParserContext(table);
+
+    // Verify actual data file is returned via nested plan task fetching with 
correct count
+    try (CloseableIterable<FileScanTask> iterable = 
table.newScan().planFiles()) {
+      List<FileScanTask> tasks = Lists.newArrayList(iterable);
+      assertThat(tasks).hasSize(2);
+      assertThat(tasks)
+          .anySatisfy(task -> 
assertThat(task.file().location()).isEqualTo(FILE_A.location()));
+      assertThat(tasks)
+          .anySatisfy(task -> 
assertThat(task.file().location()).isEqualTo(FILE_B.location()));
+      assertThat(tasks.get(0).deletes()).isEmpty();
+      assertThat(tasks.get(1).deletes()).isEmpty();
+    }
+  }
+
+  @Test
+  public void cancelPlanMethodAvailability() {
+    
configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination);
+    RESTTable table = restTableFor(scanPlanningCatalog(), 
"cancel_method_table");
+    RESTTableScan restTableScan = restTableScanFor(table);
+
+    // Test that cancelPlan method is available and callable
+    // When no plan is active, it should return false
+    assertThat(restTableScan.cancelPlan()).isFalse();
+
+    // Verify the method doesn't throw exceptions when called multiple times
+    assertThat(restTableScan.cancelPlan()).isFalse();
+  }
+
+  @Test
+  public void iterableCloseTriggersCancel() throws IOException {
+    configurePlanningBehavior(TestPlanningBehavior.Builder::asynchronous);
+    RESTTable restTable = restTableFor(scanPlanningCatalog(), 
"iterable_close_test");
+    setParserContext(restTable);
+
+    TableScan scan = restTable.newScan();
+    assertThat(scan).isInstanceOf(RESTTableScan.class);
+    RESTTableScan restTableScan = (RESTTableScan) scan;
+
+    // Get the iterable
+    CloseableIterable<FileScanTask> iterable = restTableScan.planFiles();
+
+    // call cancelPlan before closing the iterable
+    boolean cancelled = restTableScan.cancelPlan();
+    assertThat(cancelled).isTrue();
+
+    // Verify we can close the iterable without exceptions
+    // This tests that cancellation callbacks are properly wired through
+    iterable.close();
+  }
+
+  @ParameterizedTest
+  @EnumSource(MetadataTableType.class)
+  public void metadataTablesWithRemotePlanning(MetadataTableType type) throws 
IOException {
+    assumeThat(type)
+        .as("POSITION_DELETES table does not implement newScan() method")
+        .isNotEqualTo(MetadataTableType.POSITION_DELETES);
+
+    configurePlanningBehavior(TestPlanningBehavior.Builder::synchronous);
+    RESTTable table = restTableFor(scanPlanningCatalog(), 
"metadata_tables_test");
+    table.newAppend().appendFile(FILE_B).commit();
+    
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_EQUALITY_DELETES).commit();
+    setParserContext(table);
+    Table metadataTableInstance = 
MetadataTableUtils.createMetadataTableInstance(table, type);

Review Comment:
   while it's fine to check whether we can create a metadata table from the 
REST table, I don't think loading a metadata table actually works properly. For 
example, we would need to have a test similar to 
`CatalogTests.testLoadMetadataTable()` to verify that the loaded table is 
actually an instance of `BaseMetadataTable`, which currently isn't the case, 
because we never reach 
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java#L469-L471



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to