This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c2f77aacd2 API: Add BatchScan (#5922)
c2f77aacd2 is described below

commit c2f77aacd2da438827d4ad44baad7f2b6b42808c
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Oct 14 14:37:41 2022 -0700

    API: Add BatchScan (#5922)
---
 .../main/java/org/apache/iceberg/BatchScan.java    |  71 ++++++++++
 .../java/org/apache/iceberg/BatchScanAdapter.java  | 143 +++++++++++++++++++++
 api/src/main/java/org/apache/iceberg/Scan.java     |  12 ++
 api/src/main/java/org/apache/iceberg/Table.java    |  11 ++
 .../main/java/org/apache/iceberg/TableScan.java    |  14 --
 .../java/org/apache/iceberg/TestBatchScans.java    | 129 +++++++++++++++++++
 6 files changed, 366 insertions(+), 14 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/BatchScan.java 
b/api/src/main/java/org/apache/iceberg/BatchScan.java
new file mode 100644
index 0000000000..4823d7f180
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/BatchScan.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** API for configuring a batch scan. */
+public interface BatchScan extends Scan<BatchScan, ScanTask, 
ScanTaskGroup<ScanTask>> {
+  /**
+   * Returns the {@link Table} from which this scan loads data.
+   *
+   * @return this scan's table
+   */
+  Table table();
+
+  /**
+   * Create a new {@link BatchScan} from this scan's configuration that will 
use a snapshot with the
+   * given ID.
+   *
+   * @param snapshotId a snapshot ID
+   * @return a new scan based on this with the given snapshot ID
+   * @throws IllegalArgumentException if the snapshot cannot be found
+   */
+  BatchScan useSnapshot(long snapshotId);
+
+  /**
+   * Create a new {@link BatchScan} from this scan's configuration that will 
use the given
+   * reference.
+   *
+   * @param ref a reference
+   * @return a new scan based on this with the given reference
+   * @throws IllegalArgumentException if the reference with the given name 
could not be found
+   */
+  BatchScan useRef(String ref);
+
+  /**
+   * Create a new {@link BatchScan} from this scan's configuration that will 
use the most recent
+   * snapshot as of the given time in milliseconds on the branch in the scan 
or main if no branch is
+   * set.
+   *
+   * @param timestampMillis a timestamp in milliseconds
+   * @return a new scan based on this with the current snapshot at the given 
time
+   * @throws IllegalArgumentException if the snapshot cannot be found or time 
travel is attempted on
+   *     a tag
+   */
+  BatchScan asOfTime(long timestampMillis);
+
+  /**
+   * Returns the {@link Snapshot} that will be used by this scan.
+   *
+   * <p>If the snapshot was not configured using {@link #asOfTime(long)} or 
{@link
+   * #useSnapshot(long)}, the current table snapshot will be used.
+   *
+   * @return the Snapshot this scan will use
+   */
+  Snapshot snapshot();
+}
diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java 
b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
new file mode 100644
index 0000000000..10e064fd75
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+
+/** An adapter that allows using {@link TableScan} as {@link BatchScan}. */
+class BatchScanAdapter implements BatchScan {
+
+  private final TableScan scan;
+
+  BatchScanAdapter(TableScan scan) {
+    this.scan = scan;
+  }
+
+  @Override
+  public Table table() {
+    return scan.table();
+  }
+
+  @Override
+  public BatchScan useSnapshot(long snapshotId) {
+    return new BatchScanAdapter(scan.useSnapshot(snapshotId));
+  }
+
+  @Override
+  public BatchScan useRef(String ref) {
+    return new BatchScanAdapter(scan.useRef(ref));
+  }
+
+  @Override
+  public BatchScan asOfTime(long timestampMillis) {
+    return new BatchScanAdapter(scan.asOfTime(timestampMillis));
+  }
+
+  @Override
+  public Snapshot snapshot() {
+    return scan.snapshot();
+  }
+
+  @Override
+  public BatchScan option(String property, String value) {
+    return new BatchScanAdapter(scan.option(property, value));
+  }
+
+  @Override
+  public BatchScan project(Schema schema) {
+    return new BatchScanAdapter(scan.project(schema));
+  }
+
+  @Override
+  public BatchScan caseSensitive(boolean caseSensitive) {
+    return new BatchScanAdapter(scan.caseSensitive(caseSensitive));
+  }
+
+  @Override
+  public boolean isCaseSensitive() {
+    return scan.isCaseSensitive();
+  }
+
+  @Override
+  public BatchScan includeColumnStats() {
+    return new BatchScanAdapter(scan.includeColumnStats());
+  }
+
+  @Override
+  public BatchScan select(Collection<String> columns) {
+    return new BatchScanAdapter(scan.select(columns));
+  }
+
+  @Override
+  public BatchScan filter(Expression expr) {
+    return new BatchScanAdapter(scan.filter(expr));
+  }
+
+  @Override
+  public Expression filter() {
+    return scan.filter();
+  }
+
+  @Override
+  public BatchScan ignoreResiduals() {
+    return new BatchScanAdapter(scan.ignoreResiduals());
+  }
+
+  @Override
+  public BatchScan planWith(ExecutorService executorService) {
+    return new BatchScanAdapter(scan.planWith(executorService));
+  }
+
+  @Override
+  public Schema schema() {
+    return scan.schema();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public CloseableIterable<ScanTask> planFiles() {
+    CloseableIterable<? extends ScanTask> tasks = scan.planFiles();
+    return (CloseableIterable<ScanTask>) tasks;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+    CloseableIterable<? extends ScanTaskGroup<? extends ScanTask>> taskGroups 
= scan.planTasks();
+    return (CloseableIterable<ScanTaskGroup<ScanTask>>) taskGroups;
+  }
+
+  @Override
+  public long targetSplitSize() {
+    return scan.targetSplitSize();
+  }
+
+  @Override
+  public int splitLookback() {
+    return scan.splitLookback();
+  }
+
+  @Override
+  public long splitOpenFileCost() {
+    return scan.splitOpenFileCost();
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java 
b/api/src/main/java/org/apache/iceberg/Scan.java
index ec18c162cf..035f22947c 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 /**
  * Scan objects are immutable and can be shared between threads. Refinement 
methods, like {@link
@@ -85,6 +86,17 @@ public interface Scan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>> {
    */
   ThisT select(Collection<String> columns);
 
+  /**
+   * Create a new scan from this that will read the given columns. This 
produces an expected schema
+   * that includes all fields that are either selected or used by this scan's 
filter expression.
+   *
+   * @param columns column names
+   * @return a new scan based on this with the given projection columns
+   */
+  default ThisT select(String... columns) {
+    return select(Lists.newArrayList(columns));
+  }
+
   /**
    * Create a new scan from the results of this filtered by the {@link 
Expression}.
    *
diff --git a/api/src/main/java/org/apache/iceberg/Table.java 
b/api/src/main/java/org/apache/iceberg/Table.java
index e17702f366..02db808417 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -48,6 +48,17 @@ public interface Table {
    */
   TableScan newScan();
 
+  /**
+   * Create a new {@link BatchScan batch scan} for this table.
+   *
+   * <p>Once a batch scan is created, it can be refined to project columns and 
filter data.
+   *
+   * @return a batch scan for this table
+   */
+  default BatchScan newBatchScan() {
+    return new BatchScanAdapter(newScan());
+  }
+
   /**
    * Create a new {@link IncrementalAppendScan scan} for this table.
    *
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java 
b/api/src/main/java/org/apache/iceberg/TableScan.java
index ebff7ad51b..ac70e1f5d8 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iceberg;
 
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-
 /** API for configuring a table scan. */
 public interface TableScan extends Scan<TableScan, FileScanTask, 
CombinedScanTask> {
   /**
@@ -61,18 +59,6 @@ public interface TableScan extends Scan<TableScan, 
FileScanTask, CombinedScanTas
    */
   TableScan asOfTime(long timestampMillis);
 
-  /**
-   * Create a new {@link TableScan} from this that will read the given data 
columns. This produces
-   * an expected schema that includes all fields that are either selected or 
used by this scan's
-   * filter expression.
-   *
-   * @param columns column names from the table's schema
-   * @return a new scan based on this with the given projection columns
-   */
-  default TableScan select(String... columns) {
-    return select(Lists.newArrayList(columns));
-  }
-
   /**
    * Create a new {@link TableScan} to read appended data from {@code 
fromSnapshotId} exclusive to
    * {@code toSnapshotId} inclusive.
diff --git a/core/src/test/java/org/apache/iceberg/TestBatchScans.java 
b/core/src/test/java/org/apache/iceberg/TestBatchScans.java
new file mode 100644
index 0000000000..ec4cb28f9e
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestBatchScans.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestBatchScans extends TableTestBase {
+
+  @Parameterized.Parameters(name = "formatVersion = {0}")
+  public static Object[] parameters() {
+    return new Object[] {1, 2};
+  }
+
+  public TestBatchScans(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Test
+  public void testDataTableScan() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    if (formatVersion > 1) {
+      table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+    }
+
+    BatchScan scan = table.newBatchScan();
+
+    List<ScanTask> tasks = planTasks(scan);
+    Assert.assertEquals("Expected 2 tasks", 2, tasks.size());
+
+    FileScanTask t1 = tasks.get(0).asFileScanTask();
+    Assert.assertEquals("Task file must match", t1.file().path(), 
FILE_A.path());
+    V1Assert.assertEquals("Task deletes size must match", 0, 
t1.deletes().size());
+    V2Assert.assertEquals("Task deletes size must match", 1, 
t1.deletes().size());
+
+    FileScanTask t2 = tasks.get(1).asFileScanTask();
+    Assert.assertEquals("Task file must match", t2.file().path(), 
FILE_B.path());
+    Assert.assertEquals("Task deletes size must match", 0, 
t2.deletes().size());
+
+    List<ScanTaskGroup<ScanTask>> taskGroups = planTaskGroups(scan);
+    Assert.assertEquals("Expected 1 task group", 1, taskGroups.size());
+
+    ScanTaskGroup<ScanTask> tg = taskGroups.get(0);
+    Assert.assertEquals("Task number must match", 2, tg.tasks().size());
+    V1Assert.assertEquals("Files count must match", 2, tg.filesCount());
+    V2Assert.assertEquals("Files count must match", 3, tg.filesCount());
+  }
+
+  @Test
+  public void testFilesTableScan() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    List<String> manifestPaths =
+        table.currentSnapshot().dataManifests(table.io()).stream()
+            .map(ManifestFile::path)
+            .sorted()
+            .collect(Collectors.toList());
+    Assert.assertEquals("Must have 2 manifests", 2, manifestPaths.size());
+
+    FilesTable filesTable = new FilesTable(table.ops(), table);
+
+    BatchScan scan = filesTable.newBatchScan();
+
+    List<ScanTask> tasks = planTasks(scan);
+    Assert.assertEquals("Expected 2 tasks", 2, tasks.size());
+
+    FileScanTask t1 = tasks.get(0).asFileScanTask();
+    Assert.assertEquals("Task file must match", t1.file().path(), 
manifestPaths.get(0));
+
+    FileScanTask t2 = tasks.get(1).asFileScanTask();
+    Assert.assertEquals("Task file must match", t2.file().path(), 
manifestPaths.get(1));
+
+    List<ScanTaskGroup<ScanTask>> taskGroups = planTaskGroups(scan);
+    Assert.assertEquals("Expected 1 task group", 1, taskGroups.size());
+
+    ScanTaskGroup<ScanTask> tg = taskGroups.get(0);
+    Assert.assertEquals("Task number must match", 2, tg.tasks().size());
+  }
+
+  // plans tasks and reorders them by file name to have deterministic order
+  private List<ScanTask> planTasks(BatchScan scan) {
+    try (CloseableIterable<ScanTask> tasks = scan.planFiles()) {
+      List<ScanTask> tasksAsList = Lists.newArrayList(tasks);
+      tasksAsList.sort((t1, t2) -> path(t1).compareTo(path(t2)));
+      return tasksAsList;
+
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private List<ScanTaskGroup<ScanTask>> planTaskGroups(BatchScan scan) {
+    try (CloseableIterable<ScanTaskGroup<ScanTask>> taskGroups = 
scan.planTasks()) {
+      return Lists.newArrayList(taskGroups);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String path(ScanTask task) {
+    return ((ContentScanTask<?>) task).file().path().toString();
+  }
+}

Reply via email to