This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c2f77aacd2 API: Add BatchScan (#5922)
c2f77aacd2 is described below
commit c2f77aacd2da438827d4ad44baad7f2b6b42808c
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Oct 14 14:37:41 2022 -0700
API: Add BatchScan (#5922)
---
.../main/java/org/apache/iceberg/BatchScan.java | 71 ++++++++++
.../java/org/apache/iceberg/BatchScanAdapter.java | 143 +++++++++++++++++++++
api/src/main/java/org/apache/iceberg/Scan.java | 12 ++
api/src/main/java/org/apache/iceberg/Table.java | 11 ++
.../main/java/org/apache/iceberg/TableScan.java | 14 --
.../java/org/apache/iceberg/TestBatchScans.java | 129 +++++++++++++++++++
6 files changed, 366 insertions(+), 14 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/BatchScan.java
b/api/src/main/java/org/apache/iceberg/BatchScan.java
new file mode 100644
index 0000000000..4823d7f180
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/BatchScan.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+/** API for configuring a batch scan. */
+public interface BatchScan extends Scan<BatchScan, ScanTask,
ScanTaskGroup<ScanTask>> {
+ /**
+ * Returns the {@link Table} from which this scan loads data.
+ *
+ * @return this scan's table
+ */
+ Table table();
+
+ /**
+ * Create a new {@link BatchScan} from this scan's configuration that will
use a snapshot with the
+ * given ID.
+ *
+ * @param snapshotId a snapshot ID
+ * @return a new scan based on this with the given snapshot ID
+ * @throws IllegalArgumentException if the snapshot cannot be found
+ */
+ BatchScan useSnapshot(long snapshotId);
+
+ /**
+ * Create a new {@link BatchScan} from this scan's configuration that will
use the given
+ * reference.
+ *
+ * @param ref a reference
+ * @return a new scan based on this with the given reference
+ * @throws IllegalArgumentException if the reference with the given name
could not be found
+ */
+ BatchScan useRef(String ref);
+
+ /**
+ * Create a new {@link BatchScan} from this scan's configuration that will
use the most recent
+ * snapshot as of the given time in milliseconds on the branch in the scan
or main if no branch is
+ * set.
+ *
+ * @param timestampMillis a timestamp in milliseconds
+ * @return a new scan based on this with the current snapshot at the given
time
+ * @throws IllegalArgumentException if the snapshot cannot be found or time
travel is attempted on
+ * a tag
+ */
+ BatchScan asOfTime(long timestampMillis);
+
+ /**
+ * Returns the {@link Snapshot} that will be used by this scan.
+ *
+ * <p>If the snapshot was not configured using {@link #asOfTime(long)} or
{@link
+ * #useSnapshot(long)}, the current table snapshot will be used.
+ *
+ * @return the Snapshot this scan will use
+ */
+ Snapshot snapshot();
+}
diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
new file mode 100644
index 0000000000..10e064fd75
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+
+/** An adapter that allows using {@link TableScan} as {@link BatchScan}. */
+class BatchScanAdapter implements BatchScan {
+
+ private final TableScan scan;
+
+ BatchScanAdapter(TableScan scan) {
+ this.scan = scan;
+ }
+
+ @Override
+ public Table table() {
+ return scan.table();
+ }
+
+ @Override
+ public BatchScan useSnapshot(long snapshotId) {
+ return new BatchScanAdapter(scan.useSnapshot(snapshotId));
+ }
+
+ @Override
+ public BatchScan useRef(String ref) {
+ return new BatchScanAdapter(scan.useRef(ref));
+ }
+
+ @Override
+ public BatchScan asOfTime(long timestampMillis) {
+ return new BatchScanAdapter(scan.asOfTime(timestampMillis));
+ }
+
+ @Override
+ public Snapshot snapshot() {
+ return scan.snapshot();
+ }
+
+ @Override
+ public BatchScan option(String property, String value) {
+ return new BatchScanAdapter(scan.option(property, value));
+ }
+
+ @Override
+ public BatchScan project(Schema schema) {
+ return new BatchScanAdapter(scan.project(schema));
+ }
+
+ @Override
+ public BatchScan caseSensitive(boolean caseSensitive) {
+ return new BatchScanAdapter(scan.caseSensitive(caseSensitive));
+ }
+
+ @Override
+ public boolean isCaseSensitive() {
+ return scan.isCaseSensitive();
+ }
+
+ @Override
+ public BatchScan includeColumnStats() {
+ return new BatchScanAdapter(scan.includeColumnStats());
+ }
+
+ @Override
+ public BatchScan select(Collection<String> columns) {
+ return new BatchScanAdapter(scan.select(columns));
+ }
+
+ @Override
+ public BatchScan filter(Expression expr) {
+ return new BatchScanAdapter(scan.filter(expr));
+ }
+
+ @Override
+ public Expression filter() {
+ return scan.filter();
+ }
+
+ @Override
+ public BatchScan ignoreResiduals() {
+ return new BatchScanAdapter(scan.ignoreResiduals());
+ }
+
+ @Override
+ public BatchScan planWith(ExecutorService executorService) {
+ return new BatchScanAdapter(scan.planWith(executorService));
+ }
+
+ @Override
+ public Schema schema() {
+ return scan.schema();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CloseableIterable<ScanTask> planFiles() {
+ CloseableIterable<? extends ScanTask> tasks = scan.planFiles();
+ return (CloseableIterable<ScanTask>) tasks;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+ CloseableIterable<? extends ScanTaskGroup<? extends ScanTask>> taskGroups
= scan.planTasks();
+ return (CloseableIterable<ScanTaskGroup<ScanTask>>) taskGroups;
+ }
+
+ @Override
+ public long targetSplitSize() {
+ return scan.targetSplitSize();
+ }
+
+ @Override
+ public int splitLookback() {
+ return scan.splitLookback();
+ }
+
+ @Override
+ public long splitOpenFileCost() {
+ return scan.splitOpenFileCost();
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java
b/api/src/main/java/org/apache/iceberg/Scan.java
index ec18c162cf..035f22947c 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
/**
* Scan objects are immutable and can be shared between threads. Refinement
methods, like {@link
@@ -85,6 +86,17 @@ public interface Scan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>> {
*/
ThisT select(Collection<String> columns);
+ /**
+ * Create a new scan from this that will read the given columns. This
produces an expected schema
+ * that includes all fields that are either selected or used by this scan's
filter expression.
+ *
+ * @param columns column names
+ * @return a new scan based on this with the given projection columns
+ */
+ default ThisT select(String... columns) {
+ return select(Lists.newArrayList(columns));
+ }
+
/**
* Create a new scan from the results of this filtered by the {@link
Expression}.
*
diff --git a/api/src/main/java/org/apache/iceberg/Table.java
b/api/src/main/java/org/apache/iceberg/Table.java
index e17702f366..02db808417 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -48,6 +48,17 @@ public interface Table {
*/
TableScan newScan();
+ /**
+ * Create a new {@link BatchScan batch scan} for this table.
+ *
+ * <p>Once a batch scan is created, it can be refined to project columns and
filter data.
+ *
+ * @return a batch scan for this table
+ */
+ default BatchScan newBatchScan() {
+ return new BatchScanAdapter(newScan());
+ }
+
/**
* Create a new {@link IncrementalAppendScan scan} for this table.
*
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java
b/api/src/main/java/org/apache/iceberg/TableScan.java
index ebff7ad51b..ac70e1f5d8 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -18,8 +18,6 @@
*/
package org.apache.iceberg;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-
/** API for configuring a table scan. */
public interface TableScan extends Scan<TableScan, FileScanTask,
CombinedScanTask> {
/**
@@ -61,18 +59,6 @@ public interface TableScan extends Scan<TableScan,
FileScanTask, CombinedScanTas
*/
TableScan asOfTime(long timestampMillis);
- /**
- * Create a new {@link TableScan} from this that will read the given data
columns. This produces
- * an expected schema that includes all fields that are either selected or
used by this scan's
- * filter expression.
- *
- * @param columns column names from the table's schema
- * @return a new scan based on this with the given projection columns
- */
- default TableScan select(String... columns) {
- return select(Lists.newArrayList(columns));
- }
-
/**
* Create a new {@link TableScan} to read appended data from {@code
fromSnapshotId} exclusive to
* {@code toSnapshotId} inclusive.
diff --git a/core/src/test/java/org/apache/iceberg/TestBatchScans.java
b/core/src/test/java/org/apache/iceberg/TestBatchScans.java
new file mode 100644
index 0000000000..ec4cb28f9e
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestBatchScans.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestBatchScans extends TableTestBase {
+
+ @Parameterized.Parameters(name = "formatVersion = {0}")
+ public static Object[] parameters() {
+ return new Object[] {1, 2};
+ }
+
+ public TestBatchScans(int formatVersion) {
+ super(formatVersion);
+ }
+
+ @Test
+ public void testDataTableScan() {
+ table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ if (formatVersion > 1) {
+ table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+ }
+
+ BatchScan scan = table.newBatchScan();
+
+ List<ScanTask> tasks = planTasks(scan);
+ Assert.assertEquals("Expected 2 tasks", 2, tasks.size());
+
+ FileScanTask t1 = tasks.get(0).asFileScanTask();
+ Assert.assertEquals("Task file must match", t1.file().path(),
FILE_A.path());
+ V1Assert.assertEquals("Task deletes size must match", 0,
t1.deletes().size());
+ V2Assert.assertEquals("Task deletes size must match", 1,
t1.deletes().size());
+
+ FileScanTask t2 = tasks.get(1).asFileScanTask();
+ Assert.assertEquals("Task file must match", t2.file().path(),
FILE_B.path());
+ Assert.assertEquals("Task deletes size must match", 0,
t2.deletes().size());
+
+ List<ScanTaskGroup<ScanTask>> taskGroups = planTaskGroups(scan);
+ Assert.assertEquals("Expected 1 task group", 1, taskGroups.size());
+
+ ScanTaskGroup<ScanTask> tg = taskGroups.get(0);
+ Assert.assertEquals("Task number must match", 2, tg.tasks().size());
+ V1Assert.assertEquals("Files count must match", 2, tg.filesCount());
+ V2Assert.assertEquals("Files count must match", 3, tg.filesCount());
+ }
+
+ @Test
+ public void testFilesTableScan() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ table.newFastAppend().appendFile(FILE_B).commit();
+
+ List<String> manifestPaths =
+ table.currentSnapshot().dataManifests(table.io()).stream()
+ .map(ManifestFile::path)
+ .sorted()
+ .collect(Collectors.toList());
+ Assert.assertEquals("Must have 2 manifests", 2, manifestPaths.size());
+
+ FilesTable filesTable = new FilesTable(table.ops(), table);
+
+ BatchScan scan = filesTable.newBatchScan();
+
+ List<ScanTask> tasks = planTasks(scan);
+ Assert.assertEquals("Expected 2 tasks", 2, tasks.size());
+
+ FileScanTask t1 = tasks.get(0).asFileScanTask();
+ Assert.assertEquals("Task file must match", t1.file().path(),
manifestPaths.get(0));
+
+ FileScanTask t2 = tasks.get(1).asFileScanTask();
+ Assert.assertEquals("Task file must match", t2.file().path(),
manifestPaths.get(1));
+
+ List<ScanTaskGroup<ScanTask>> taskGroups = planTaskGroups(scan);
+ Assert.assertEquals("Expected 1 task group", 1, taskGroups.size());
+
+ ScanTaskGroup<ScanTask> tg = taskGroups.get(0);
+ Assert.assertEquals("Task number must match", 2, tg.tasks().size());
+ }
+
+ // plans tasks and reorders them by file name to have deterministic order
+ private List<ScanTask> planTasks(BatchScan scan) {
+ try (CloseableIterable<ScanTask> tasks = scan.planFiles()) {
+ List<ScanTask> tasksAsList = Lists.newArrayList(tasks);
+ tasksAsList.sort((t1, t2) -> path(t1).compareTo(path(t2)));
+ return tasksAsList;
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private List<ScanTaskGroup<ScanTask>> planTaskGroups(BatchScan scan) {
+ try (CloseableIterable<ScanTaskGroup<ScanTask>> taskGroups =
scan.planTasks()) {
+ return Lists.newArrayList(taskGroups);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String path(ScanTask task) {
+ return ((ContentScanTask<?>) task).file().path().toString();
+ }
+}