This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 40de4bc7dc Core: Implement IncrementalChangelogScan without deletes
(#5382)
40de4bc7dc is described below
commit 40de4bc7dc12c3e2c40d4fb687f9ee3342cb1727
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Aug 4 20:48:29 2022 -0700
Core: Implement IncrementalChangelogScan without deletes (#5382)
---
.palantir/revapi.yml | 6 +
api/src/main/java/org/apache/iceberg/Scan.java | 14 +
.../main/java/org/apache/iceberg/TableScan.java | 16 --
.../apache/iceberg/BaseIncrementalAppendScan.java | 120 +--------
.../iceberg/BaseIncrementalChangelogScan.java | 183 +++++++++++++
.../org/apache/iceberg/BaseIncrementalScan.java | 142 ++++++++++
.../src/main/java/org/apache/iceberg/BaseScan.java | 57 ++++
.../main/java/org/apache/iceberg/BaseTable.java | 5 +
.../java/org/apache/iceberg/BaseTableScan.java | 24 --
.../java/org/apache/iceberg/DataTableScan.java | 31 +--
.../apache/iceberg/IncrementalDataTableScan.java | 5 +-
.../java/org/apache/iceberg/PartitionsTable.java | 5 +-
.../java/org/apache/iceberg/util/SnapshotUtil.java | 9 +
.../test/java/org/apache/iceberg/ScanTestBase.java | 43 +--
.../java/org/apache/iceberg/TableTestBase.java | 33 +++
.../iceberg/TestBaseIncrementalAppendScan.java | 3 +-
.../iceberg/TestBaseIncrementalChangelogScan.java | 290 +++++++++++++++++++++
.../java/org/apache/iceberg/TestDataTableScan.java | 2 +-
18 files changed, 778 insertions(+), 210 deletions(-)
diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index 79f1e51ba2..37e95468b4 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -117,3 +117,9 @@ acceptedBreaks:
\ boolean)"
justification: "IncrementalScanEvent should only be constructed by
Iceberg code.\
\ Hence the change of constructor params shouldn't affect users"
+ - code: "java.method.addedToInterface"
+ new: "method org.apache.iceberg.expressions.Expression
org.apache.iceberg.Scan<ThisT, T extends org.apache.iceberg.ScanTask, G extends
org.apache.iceberg.ScanTaskGroup<T extends
org.apache.iceberg.ScanTask>>::filter()"
+ justification: "Move a method to the parent interface"
+ - code: "java.method.addedToInterface"
+ new: "method boolean org.apache.iceberg.Scan<ThisT, T extends
org.apache.iceberg.ScanTask, G extends org.apache.iceberg.ScanTaskGroup<T
extends org.apache.iceberg.ScanTask>>::isCaseSensitive()"
+ justification: "Move a method to the parent interface"
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java
b/api/src/main/java/org/apache/iceberg/Scan.java
index 118b9ce66b..ec18c162cf 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -59,6 +59,13 @@ public interface Scan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>> {
*/
ThisT caseSensitive(boolean caseSensitive);
+ /**
+ * Returns whether this scan is case-sensitive with respect to column names.
+ *
+ * @return true if case-sensitive, false otherwise.
+ */
+ boolean isCaseSensitive();
+
/**
* Create a new scan from this that loads the column stats with each data
file.
*
@@ -86,6 +93,13 @@ public interface Scan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>> {
*/
ThisT filter(Expression expr);
+ /**
+ * Returns this scan's filter {@link Expression}.
+ *
+ * @return this scan's filter expression
+ */
+ Expression filter();
+
/**
* Create a new scan from this that applies data filtering to files but not
to rows in those
* files.
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java
b/api/src/main/java/org/apache/iceberg/TableScan.java
index de0e76b8b1..56f7f11d3c 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg;
-import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
/** API for configuring a table scan. */
@@ -62,13 +61,6 @@ public interface TableScan extends Scan<TableScan,
FileScanTask, CombinedScanTas
return select(Lists.newArrayList(columns));
}
- /**
- * Returns this scan's filter {@link Expression}.
- *
- * @return this scan's filter expression
- */
- Expression filter();
-
/**
* Create a new {@link TableScan} to read appended data from {@code
fromSnapshotId} exclusive to
* {@code toSnapshotId} inclusive.
@@ -103,12 +95,4 @@ public interface TableScan extends Scan<TableScan,
FileScanTask, CombinedScanTas
* @return the Snapshot this scan will use
*/
Snapshot snapshot();
-
- /**
- * Returns whether this scan should apply column name case sensitiveness as
per {@link
- * Scan#caseSensitive(boolean)}.
- *
- * @return true if case sensitive, false otherwise.
- */
- boolean isCaseSensitive();
}
diff --git
a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
index d8386bd98e..cf3bc10610 100644
--- a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
@@ -20,10 +20,7 @@ package org.apache.iceberg;
import java.util.List;
import java.util.Set;
-import org.apache.iceberg.events.IncrementalScanEvent;
-import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -32,7 +29,7 @@ import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
class BaseIncrementalAppendScan
- extends BaseScan<IncrementalAppendScan, FileScanTask, CombinedScanTask>
+ extends BaseIncrementalScan<IncrementalAppendScan, FileScanTask,
CombinedScanTask>
implements IncrementalAppendScan {
BaseIncrementalAppendScan(TableOperations ops, Table table) {
@@ -51,67 +48,8 @@ class BaseIncrementalAppendScan
}
@Override
- public IncrementalAppendScan fromSnapshotInclusive(long fromSnapshotId) {
- Preconditions.checkArgument(
- table().snapshot(fromSnapshotId) != null,
- "Cannot find the starting snapshot: %s",
- fromSnapshotId);
- return newRefinedScan(
- tableOps(), table(), schema(),
context().fromSnapshotIdInclusive(fromSnapshotId));
- }
-
- @Override
- public IncrementalAppendScan fromSnapshotExclusive(long fromSnapshotId) {
- // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be
applied.
- // as fromSnapshotId could be matched to a parent snapshot that is already
expired
- return newRefinedScan(
- tableOps(), table(), schema(),
context().fromSnapshotIdExclusive(fromSnapshotId));
- }
-
- @Override
- public IncrementalAppendScan toSnapshot(long toSnapshotId) {
- Preconditions.checkArgument(
- table().snapshot(toSnapshotId) != null, "Cannot find end snapshot:
%s", toSnapshotId);
- return newRefinedScan(tableOps(), table(), schema(),
context().toSnapshotId(toSnapshotId));
- }
-
- @Override
- public CloseableIterable<FileScanTask> planFiles() {
- Long fromSnapshotId = context().fromSnapshotId();
- Long toSnapshotId = context().toSnapshotId();
- if (fromSnapshotId == null && toSnapshotId == null &&
table().currentSnapshot() == null) {
- // If it is an empty table (no current snapshot) and both from and to
snapshots aren't set
- // either,
- // simply return an empty iterable. In this case, listener notification
is also skipped.
- return CloseableIterable.empty();
- }
-
- long toSnapshotIdInclusive = toSnapshotIdInclusive();
- // fromSnapshotIdExclusive can be null. appendsBetween handles null
fromSnapshotIdExclusive
- // properly
- // by finding the oldest ancestor of end snapshot.
- Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(fromSnapshotId,
toSnapshotIdInclusive);
- if (fromSnapshotIdExclusive != null) {
- Listeners.notifyAll(
- new IncrementalScanEvent(
- table().name(),
- fromSnapshotIdExclusive,
- toSnapshotIdInclusive,
- context().rowFilter(),
- table().schema(),
- false));
- } else {
- Snapshot oldestAncestorSnapshot =
- SnapshotUtil.oldestAncestorOf(toSnapshotIdInclusive,
table()::snapshot);
- Listeners.notifyAll(
- new IncrementalScanEvent(
- table().name(),
- oldestAncestorSnapshot.snapshotId(),
- toSnapshotIdInclusive,
- context().rowFilter(),
- table().schema(),
- true));
- }
+ protected CloseableIterable<FileScanTask> doPlanFiles(
+ Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
// appendsBetween handles null fromSnapshotId (exclusive) properly
List<Snapshot> snapshots =
@@ -132,44 +70,6 @@ class BaseIncrementalAppendScan
splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost());
}
- private Long fromSnapshotIdExclusive(Long fromSnapshotId, long
toSnapshotIdInclusive) {
- if (fromSnapshotId != null) {
- if (context().fromSnapshotInclusive()) {
- // validate the fromSnapshotId is an ancestor of toSnapshotId
- Preconditions.checkArgument(
- SnapshotUtil.isAncestorOf(table(), toSnapshotIdInclusive,
fromSnapshotId),
- "Starting snapshot (inclusive) %s is not an ancestor of end
snapshot %s",
- fromSnapshotId,
- toSnapshotIdInclusive);
- // for inclusive behavior fromSnapshotIdExclusive is set to the parent
snapshot id, which
- // can be null.
- return table().snapshot(fromSnapshotId).parentId();
- } else {
- // validate the parent snapshot id an ancestor of toSnapshotId
- Preconditions.checkArgument(
- SnapshotUtil.isParentAncestorOf(table(), toSnapshotIdInclusive,
fromSnapshotId),
- "Starting snapshot (exclusive) %s is not a parent ancestor of end
snapshot %s",
- fromSnapshotId,
- toSnapshotIdInclusive);
- return fromSnapshotId;
- }
- } else {
- return null;
- }
- }
-
- private long toSnapshotIdInclusive() {
- if (context().toSnapshotId() != null) {
- return context().toSnapshotId();
- } else {
- Snapshot currentSnapshot = table().currentSnapshot();
- Preconditions.checkArgument(
- currentSnapshot != null,
- "Invalid config: end snapshot is not set and table has no current
snapshot");
- return currentSnapshot.snapshotId();
- }
- }
-
private CloseableIterable<FileScanTask>
appendFilesFromSnapshots(List<Snapshot> snapshots) {
Set<Long> snapshotIds = Sets.newHashSet(Iterables.transform(snapshots,
Snapshot::snapshotId));
Set<ManifestFile> manifests =
@@ -180,12 +80,9 @@ class BaseIncrementalAppendScan
ManifestGroup manifestGroup =
new ManifestGroup(tableOps().io(), manifests)
- .caseSensitive(context().caseSensitive())
- .select(
- context().returnColumnStats()
- ? DataTableScan.SCAN_WITH_STATS_COLUMNS
- : DataTableScan.SCAN_COLUMNS)
- .filterData(context().rowFilter())
+ .caseSensitive(isCaseSensitive())
+ .select(scanColumns())
+ .filterData(filter())
.filterManifestEntries(
manifestEntry ->
snapshotIds.contains(manifestEntry.snapshotId())
@@ -197,9 +94,8 @@ class BaseIncrementalAppendScan
manifestGroup = manifestGroup.ignoreResiduals();
}
- if (manifests.size() > 1
- && (DataTableScan.PLAN_SCANS_WITH_WORKER_POOL ||
context().planWithCustomizedExecutor())) {
- manifestGroup = manifestGroup.planWith(context().planExecutor());
+ if (manifests.size() > 1 && shouldPlanWithExecutor()) {
+ manifestGroup = manifestGroup.planWith(planExecutor());
}
return manifestGroup.planFiles();
diff --git
a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
new file mode 100644
index 0000000000..885cf591a5
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+ extends BaseIncrementalScan<
+ IncrementalChangelogScan, ChangelogScanTask,
ScanTaskGroup<ChangelogScanTask>>
+ implements IncrementalChangelogScan {
+
+ BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+ this(ops, table, table.schema(), new TableScanContext());
+ }
+
+ BaseIncrementalChangelogScan(
+ TableOperations ops, Table table, Schema schema, TableScanContext
context) {
+ super(ops, table, schema, context);
+ }
+
+ @Override
+ protected IncrementalChangelogScan newRefinedScan(
+ TableOperations newOps, Table newTable, Schema newSchema,
TableScanContext newContext) {
+ return new BaseIncrementalChangelogScan(newOps, newTable, newSchema,
newContext);
+ }
+
+ @Override
+ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+ Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+ Deque<Snapshot> changelogSnapshots =
+ orderedChangelogSnapshots(fromSnapshotIdExclusive,
toSnapshotIdInclusive);
+
+ if (changelogSnapshots.isEmpty()) {
+ return CloseableIterable.empty();
+ }
+
+ Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+
+ Set<ManifestFile> newDataManifests =
+ FluentIterable.from(changelogSnapshots)
+ .transformAndConcat(snapshot ->
snapshot.dataManifests(table().io()))
+ .filter(manifest ->
changelogSnapshotIds.contains(manifest.snapshotId()))
+ .toSet();
+
+ ManifestGroup manifestGroup =
+ new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+ .specsById(table().specs())
+ .caseSensitive(isCaseSensitive())
+ .select(scanColumns())
+ .filterData(filter())
+ .filterManifestEntries(entry ->
changelogSnapshotIds.contains(entry.snapshotId()))
+ .ignoreExisting();
+
+ if (shouldIgnoreResiduals()) {
+ manifestGroup = manifestGroup.ignoreResiduals();
+ }
+
+ if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+ manifestGroup = manifestGroup.planWith(planExecutor());
+ }
+
+ return manifestGroup.plan(new
CreateDataFileChangeTasks(changelogSnapshots));
+ }
+
+ @Override
+ public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+ return TableScanUtil.planTaskGroups(
+ planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+ }
+
+ // builds a collection of changelog snapshots (oldest to newest)
+ // the order of the snapshots is important as it is used to determine change
ordinals
+ private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long
toIdIncl) {
+ Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+ for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl,
fromIdExcl)) {
+ if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+ if (snapshot.deleteManifests(table().io()).size() > 0) {
+ throw new UnsupportedOperationException(
+ "Delete files are currently not supported in changelog scans");
+ }
+
+ changelogSnapshots.addFirst(snapshot);
+ }
+ }
+
+ return changelogSnapshots;
+ }
+
+ private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) {
+ return
snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+ }
+
+ private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot>
snapshots) {
+ Map<Long, Integer> snapshotOrdinals = Maps.newHashMap();
+
+ int ordinal = 0;
+
+ for (Snapshot snapshot : snapshots) {
+ snapshotOrdinals.put(snapshot.snapshotId(), ordinal++);
+ }
+
+ return snapshotOrdinals;
+ }
+
+ private static class CreateDataFileChangeTasks implements
CreateTasksFunction<ChangelogScanTask> {
+ private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+
+ private final Map<Long, Integer> snapshotOrdinals;
+
+ CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
+ this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+ }
+
+ @Override
+ public CloseableIterable<ChangelogScanTask> apply(
+ CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext
context) {
+
+ return CloseableIterable.transform(
+ entries,
+ entry -> {
+ long commitSnapshotId = entry.snapshotId();
+ int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
+ DataFile dataFile = entry.file().copy(context.shouldKeepStats());
+
+ switch (entry.status()) {
+ case ADDED:
+ return new BaseAddedRowsScanTask(
+ changeOrdinal,
+ commitSnapshotId,
+ dataFile,
+ NO_DELETES,
+ context.schemaAsString(),
+ context.specAsString(),
+ context.residuals());
+
+ case DELETED:
+ return new BaseDeletedDataFileScanTask(
+ changeOrdinal,
+ commitSnapshotId,
+ dataFile,
+ NO_DELETES,
+ context.schemaAsString(),
+ context.specAsString(),
+ context.residuals());
+
+ default:
+ throw new IllegalArgumentException("Unexpected entry status: "
+ entry.status());
+ }
+ });
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
new file mode 100644
index 0000000000..1f32bfe016
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.events.IncrementalScanEvent;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+
+abstract class BaseIncrementalScan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>>
+ extends BaseScan<ThisT, T, G> implements IncrementalScan<ThisT, T, G> {
+
+ protected BaseIncrementalScan(
+ TableOperations ops, Table table, Schema schema, TableScanContext
context) {
+ super(ops, table, schema, context);
+ }
+
+ protected abstract CloseableIterable<T> doPlanFiles(
+ Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+
+ @Override
+ public ThisT fromSnapshotInclusive(long fromSnapshotId) {
+ Preconditions.checkArgument(
+ table().snapshot(fromSnapshotId) != null,
+ "Cannot find the starting snapshot: %s",
+ fromSnapshotId);
+ TableScanContext newContext =
context().fromSnapshotIdInclusive(fromSnapshotId);
+ return newRefinedScan(tableOps(), table(), schema(), newContext);
+ }
+
+ @Override
+ public ThisT fromSnapshotExclusive(long fromSnapshotId) {
+ // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be
applied
+ // as fromSnapshotId could be matched to a parent snapshot that is already
expired
+ TableScanContext newContext =
context().fromSnapshotIdExclusive(fromSnapshotId);
+ return newRefinedScan(tableOps(), table(), schema(), newContext);
+ }
+
+ @Override
+ public ThisT toSnapshot(long toSnapshotId) {
+ Preconditions.checkArgument(
+ table().snapshot(toSnapshotId) != null, "Cannot find the end snapshot:
%s", toSnapshotId);
+ TableScanContext newContext = context().toSnapshotId(toSnapshotId);
+ return newRefinedScan(tableOps(), table(), schema(), newContext);
+ }
+
+ @Override
+ public CloseableIterable<T> planFiles() {
+ if (scanCurrentLineage() && table().currentSnapshot() == null) {
+ // If the table is empty (no current snapshot) and both from and to
snapshots aren't set,
+ // simply return an empty iterable. In this case, the listener
notification is also skipped.
+ return CloseableIterable.empty();
+ }
+
+ long toSnapshotIdInclusive = toSnapshotIdInclusive();
+ Long fromSnapshotIdExclusive =
fromSnapshotIdExclusive(toSnapshotIdInclusive);
+
+ if (fromSnapshotIdExclusive != null) {
+ Listeners.notifyAll(
+ new IncrementalScanEvent(
+ table().name(),
+ fromSnapshotIdExclusive,
+ toSnapshotIdInclusive,
+ filter(),
+ schema(),
+ false /* from snapshot ID inclusive */));
+ } else {
+ Listeners.notifyAll(
+ new IncrementalScanEvent(
+ table().name(),
+ SnapshotUtil.oldestAncestorOf(table(),
toSnapshotIdInclusive).snapshotId(),
+ toSnapshotIdInclusive,
+ filter(),
+ schema(),
+ true /* from snapshot ID inclusive */));
+ }
+
+ return doPlanFiles(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+ }
+
+ private boolean scanCurrentLineage() {
+ return context().fromSnapshotId() == null && context().toSnapshotId() ==
null;
+ }
+
+ private long toSnapshotIdInclusive() {
+ if (context().toSnapshotId() != null) {
+ return context().toSnapshotId();
+ } else {
+ Snapshot currentSnapshot = table().currentSnapshot();
+ Preconditions.checkArgument(
+ currentSnapshot != null, "End snapshot is not set and table has no
current snapshot");
+ return currentSnapshot.snapshotId();
+ }
+ }
+
+ private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) {
+ Long fromSnapshotId = context().fromSnapshotId();
+ boolean fromSnapshotInclusive = context().fromSnapshotInclusive();
+
+ if (fromSnapshotId == null) {
+ return null;
+ } else {
+ if (fromSnapshotInclusive) {
+ // validate fromSnapshotId is an ancestor of toSnapshotIdInclusive
+ Preconditions.checkArgument(
+ SnapshotUtil.isAncestorOf(table(), toSnapshotIdInclusive,
fromSnapshotId),
+ "Starting snapshot (inclusive) %s is not an ancestor of end
snapshot %s",
+ fromSnapshotId,
+ toSnapshotIdInclusive);
+ // for inclusive behavior fromSnapshotIdExclusive is set to the parent
snapshot ID,
+ // which can be null
+ return table().snapshot(fromSnapshotId).parentId();
+
+ } else {
+ // validate there is an ancestor of toSnapshotIdInclusive where parent
is fromSnapshotId
+ Preconditions.checkArgument(
+ SnapshotUtil.isParentAncestorOf(table(), toSnapshotIdInclusive,
fromSnapshotId),
+ "Starting snapshot (exclusive) %s is not a parent ancestor of end
snapshot %s",
+ fromSnapshotId,
+ toSnapshotIdInclusive);
+ return fromSnapshotId;
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index bdde1f680f..65501f4a8a 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -20,17 +20,48 @@ package org.apache.iceberg;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
implements Scan<ThisT, T, G> {
+
+ private static final List<String> SCAN_COLUMNS =
+ ImmutableList.of(
+ "snapshot_id",
+ "file_path",
+ "file_ordinal",
+ "file_format",
+ "block_size_in_bytes",
+ "file_size_in_bytes",
+ "record_count",
+ "partition",
+ "key_metadata",
+ "split_offsets");
+
+ private static final List<String> STATS_COLUMNS =
+ ImmutableList.of(
+ "value_counts",
+ "null_value_counts",
+ "nan_value_counts",
+ "lower_bounds",
+ "upper_bounds",
+ "column_sizes");
+
+ private static final List<String> SCAN_WITH_STATS_COLUMNS =
+
ImmutableList.<String>builder().addAll(SCAN_COLUMNS).addAll(STATS_COLUMNS).build();
+
+ private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
+ SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED,
true);
+
private final TableOperations ops;
private final Table table;
private final Schema schema;
@@ -59,6 +90,22 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>>
return context;
}
+ protected List<String> scanColumns() {
+ return context.returnColumnStats() ? SCAN_WITH_STATS_COLUMNS :
SCAN_COLUMNS;
+ }
+
+ protected boolean shouldIgnoreResiduals() {
+ return context().ignoreResiduals();
+ }
+
+ protected boolean shouldPlanWithExecutor() {
+ return PLAN_SCANS_WITH_WORKER_POOL ||
context().planWithCustomizedExecutor();
+ }
+
+ protected ExecutorService planExecutor() {
+ return context().planExecutor();
+ }
+
protected abstract ThisT newRefinedScan(
TableOperations newOps, Table newTable, Schema newSchema,
TableScanContext newContext);
@@ -77,6 +124,11 @@ abstract class BaseScan<ThisT, T extends ScanTask, G
extends ScanTaskGroup<T>>
return newRefinedScan(ops, table, schema,
context.setCaseSensitive(caseSensitive));
}
+ @Override
+ public boolean isCaseSensitive() {
+ return context().caseSensitive();
+ }
+
@Override
public ThisT includeColumnStats() {
return newRefinedScan(ops, table, schema,
context.shouldReturnColumnStats(true));
@@ -93,6 +145,11 @@ abstract class BaseScan<ThisT, T extends ScanTask, G
extends ScanTaskGroup<T>>
ops, table, schema,
context.filterRows(Expressions.and(context.rowFilter(), expr)));
}
+ @Override
+ public Expression filter() {
+ return context().rowFilter();
+ }
+
@Override
public ThisT ignoreResiduals() {
return newRefinedScan(ops, table, schema, context.ignoreResiduals(true));
diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java
b/core/src/main/java/org/apache/iceberg/BaseTable.java
index 9605b07d8d..d79c46050f 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTable.java
@@ -79,6 +79,11 @@ public class BaseTable implements Table, HasTableOperations,
Serializable {
ops, this, schema(), new TableScanContext().reportWith(scanReporter));
}
+ @Override
+ public IncrementalChangelogScan newIncrementalChangelogScan() {
+ return new BaseIncrementalChangelogScan(ops, this);
+ }
+
@Override
public Schema schema() {
return ops.current().schema();
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 5f48786f5d..2a33fea1e2 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -19,10 +19,8 @@
package org.apache.iceberg;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.events.ScanEvent;
-import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.DefaultMetricsContext;
@@ -55,18 +53,6 @@ abstract class BaseTableScan extends BaseScan<TableScan,
FileScanTask, CombinedS
return context().snapshotId();
}
- protected boolean colStats() {
- return context().returnColumnStats();
- }
-
- protected boolean shouldIgnoreResiduals() {
- return context().ignoreResiduals();
- }
-
- protected ExecutorService planExecutor() {
- return context().planExecutor();
- }
-
protected Map<String, String> options() {
return context().options();
}
@@ -116,11 +102,6 @@ abstract class BaseTableScan extends BaseScan<TableScan,
FileScanTask, CombinedS
return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(),
timestampMillis));
}
- @Override
- public Expression filter() {
- return context().rowFilter();
- }
-
@Override
public CloseableIterable<FileScanTask> planFiles() {
Snapshot snapshot = snapshot();
@@ -171,11 +152,6 @@ abstract class BaseTableScan extends BaseScan<TableScan,
FileScanTask, CombinedS
: tableOps().current().currentSnapshot();
}
- @Override
- public boolean isCaseSensitive() {
- return context().caseSensitive();
- }
-
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java
b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 678dd8884a..2d125ae79b 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -22,35 +22,9 @@ import java.util.List;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.SnapshotUtil;
public class DataTableScan extends BaseTableScan {
- static final ImmutableList<String> SCAN_COLUMNS =
- ImmutableList.of(
- "snapshot_id",
- "file_path",
- "file_ordinal",
- "file_format",
- "block_size_in_bytes",
- "file_size_in_bytes",
- "record_count",
- "partition",
- "key_metadata",
- "split_offsets");
- static final ImmutableList<String> SCAN_WITH_STATS_COLUMNS =
- ImmutableList.<String>builder()
- .addAll(SCAN_COLUMNS)
- .add(
- "value_counts",
- "null_value_counts",
- "nan_value_counts",
- "lower_bounds",
- "upper_bounds",
- "column_sizes")
- .build();
- static final boolean PLAN_SCANS_WITH_WORKER_POOL =
- SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED,
true);
public DataTableScan(TableOperations ops, Table table) {
super(ops, table, table.schema());
@@ -112,7 +86,7 @@ public class DataTableScan extends BaseTableScan {
ManifestGroup manifestGroup =
new ManifestGroup(io, dataManifests, deleteManifests)
.caseSensitive(isCaseSensitive())
- .select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
+ .select(scanColumns())
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
@@ -122,8 +96,7 @@ public class DataTableScan extends BaseTableScan {
manifestGroup = manifestGroup.ignoreResiduals();
}
- if (dataManifests.size() > 1
- && (PLAN_SCANS_WITH_WORKER_POOL ||
context().planWithCustomizedExecutor())) {
+ if (dataManifests.size() > 1 && shouldPlanWithExecutor()) {
manifestGroup = manifestGroup.planWith(planExecutor());
}
diff --git
a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
index 66bb42b0b4..270dfcf595 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
@@ -90,7 +90,7 @@ class IncrementalDataTableScan extends DataTableScan {
ManifestGroup manifestGroup =
new ManifestGroup(table().io(), manifests)
.caseSensitive(isCaseSensitive())
- .select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
+ .select(scanColumns())
.filterData(filter())
.filterManifestEntries(
manifestEntry ->
@@ -107,8 +107,7 @@ class IncrementalDataTableScan extends DataTableScan {
new IncrementalScanEvent(
table().name(), fromSnapshotId, toSnapshotId, filter(), schema(),
false));
- if (manifests.size() > 1
- && (PLAN_SCANS_WITH_WORKER_POOL ||
context().planWithCustomizedExecutor())) {
+ if (manifests.size() > 1 && shouldPlanWithExecutor()) {
manifestGroup = manifestGroup.planWith(planExecutor());
}
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 3723a54bc9..2a9d111c66 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -176,10 +176,7 @@ public class PartitionsTable extends BaseMetadataTable {
new ManifestGroup(io, snapshot.dataManifests(io),
snapshot.deleteManifests(io))
.caseSensitive(caseSensitive)
.filterManifests(m -> evalCache.get(m.partitionSpecId()).eval(m))
- .select(
- scan.colStats()
- ? DataTableScan.SCAN_WITH_STATS_COLUMNS
- : DataTableScan.SCAN_COLUMNS)
+ .select(scan.scanColumns())
.specsById(scan.table().specs())
.ignoreDeleted();
diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
index 0c33ea878f..93880f97cb 100644
--- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
@@ -118,6 +118,10 @@ public class SnapshotUtil {
return lastSnapshot;
}
+ public static Snapshot oldestAncestorOf(Table table, long snapshotId) {
+ return oldestAncestorOf(snapshotId, table::snapshot);
+ }
+
/**
* Traverses the history and finds the oldest ancestor of the specified
snapshot.
*
@@ -199,6 +203,11 @@ public class SnapshotUtil {
return toIds(ancestorsBetween(latestSnapshotId, oldestSnapshotId, lookup));
}
+ public static Iterable<Snapshot> ancestorsBetween(
+ Table table, long latestSnapshotId, Long oldestSnapshotId) {
+ return ancestorsBetween(latestSnapshotId, oldestSnapshotId,
table::snapshot);
+ }
+
public static Iterable<Snapshot> ancestorsBetween(
long latestSnapshotId, Long oldestSnapshotId, Function<Long, Snapshot>
lookup) {
if (oldestSnapshotId != null) {
diff --git a/core/src/test/java/org/apache/iceberg/ScanTestBase.java
b/core/src/test/java/org/apache/iceberg/ScanTestBase.java
index 2b68b6b0ec..f33893d816 100644
--- a/core/src/test/java/org/apache/iceberg/ScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/ScanTestBase.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -37,8 +38,10 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
-public abstract class ScanTestBase<T extends Scan<T, FileScanTask,
CombinedScanTask>>
+public abstract class ScanTestBase<
+ ScanT extends Scan<ScanT, T, G>, T extends ScanTask, G extends
ScanTaskGroup<T>>
extends TableTestBase {
+
@Parameterized.Parameters(name = "formatVersion = {0}")
public static Object[] parameters() {
return new Object[] {1, 2};
@@ -48,11 +51,11 @@ public abstract class ScanTestBase<T extends Scan<T,
FileScanTask, CombinedScanT
super(formatVersion);
}
- protected abstract T newScan();
+ protected abstract ScanT newScan();
@Test
public void testTableScanHonorsSelect() {
- T scan = newScan().select(Arrays.asList("id"));
+ ScanT scan = newScan().select(Arrays.asList("id"));
Schema expectedSchema = new Schema(required(1, "id",
Types.IntegerType.get()));
@@ -76,9 +79,9 @@ public abstract class ScanTestBase<T extends Scan<T,
FileScanTask, CombinedScanT
@Test
public void testTableScanHonorsSelectWithoutCaseSensitivity() {
- T scan1 = newScan().caseSensitive(false).select(Arrays.asList("ID"));
+ ScanT scan1 = newScan().caseSensitive(false).select(Arrays.asList("ID"));
// order of refinements shouldn't matter
- T scan2 = newScan().select(Arrays.asList("ID")).caseSensitive(false);
+ ScanT scan2 = newScan().select(Arrays.asList("ID")).caseSensitive(false);
Schema expectedSchema = new Schema(required(1, "id",
Types.IntegerType.get()));
@@ -97,26 +100,26 @@ public abstract class ScanTestBase<T extends Scan<T,
FileScanTask, CombinedScanT
public void testTableScanHonorsIgnoreResiduals() throws IOException {
table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
- T scan1 = newScan().filter(Expressions.equal("id", 5));
+ ScanT scan1 = newScan().filter(Expressions.equal("id", 5));
- try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) {
- Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) >
0);
- for (CombinedScanTask combinedScanTask : tasks) {
- for (FileScanTask fileScanTask : combinedScanTask.files()) {
- Assert.assertNotEquals(
- "Residuals must be preserved", Expressions.alwaysTrue(),
fileScanTask.residual());
+ try (CloseableIterable<G> groups = scan1.planTasks()) {
+ Assert.assertTrue("Tasks should not be empty", Iterables.size(groups) >
0);
+ for (G group : groups) {
+ for (T task : group.tasks()) {
+ Expression residual = ((ContentScanTask<?>) task).residual();
+ Assert.assertNotEquals("Residuals must be preserved",
Expressions.alwaysTrue(), residual);
}
}
}
- T scan2 = newScan().filter(Expressions.equal("id", 5)).ignoreResiduals();
+ ScanT scan2 = newScan().filter(Expressions.equal("id",
5)).ignoreResiduals();
- try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) {
- Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) >
0);
- for (CombinedScanTask combinedScanTask : tasks) {
- for (FileScanTask fileScanTask : combinedScanTask.files()) {
- Assert.assertEquals(
- "Residuals must be ignored", Expressions.alwaysTrue(),
fileScanTask.residual());
+ try (CloseableIterable<G> groups = scan2.planTasks()) {
+ Assert.assertTrue("Tasks should not be empty", Iterables.size(groups) >
0);
+ for (G group : groups) {
+ for (T task : group.tasks()) {
+ Expression residual = ((ContentScanTask<?>) task).residual();
+ Assert.assertEquals("Residuals must be ignored",
Expressions.alwaysTrue(), residual);
}
}
}
@@ -128,7 +131,7 @@ public abstract class ScanTestBase<T extends Scan<T,
FileScanTask, CombinedScanT
table.newFastAppend().appendFile(FILE_B).commit();
AtomicInteger planThreadsIndex = new AtomicInteger(0);
- T scan =
+ ScanT scan =
newScan()
.planWith(
Executors.newFixedThreadPool(
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java
b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 44b6dd8395..53516c980f 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -22,6 +22,9 @@ import static
org.apache.iceberg.types.Types.NestedField.required;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -518,6 +521,31 @@ public class TableTestBase {
return positionDelete.set(path, pos, row);
}
+ protected void withUnavailableLocations(Iterable<String> locations, Action
action) {
+ for (String location : locations) {
+ move(location, location + "_temp");
+ }
+
+ try {
+ action.invoke();
+ } finally {
+ for (String location : locations) {
+ move(location + "_temp", location);
+ }
+ }
+ }
+
+ private void move(String location, String newLocation) {
+ Path path = Paths.get(location);
+ Path tempPath = Paths.get(newLocation);
+
+ try {
+ java.nio.file.Files.move(path, tempPath);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to move: " + location, e);
+ }
+ }
+
static void validateManifestEntries(
ManifestFile manifest,
Iterator<Long> ids,
@@ -586,4 +614,9 @@ public class TableTestBase {
}
}
}
+
+ @FunctionalInterface
+ protected interface Action {
+ void invoke();
+ }
}
diff --git
a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
index b22e03ef0b..00feaf80ab 100644
--- a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java
@@ -22,7 +22,8 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.junit.Assert;
import org.junit.Test;
-public class TestBaseIncrementalAppendScan extends
ScanTestBase<IncrementalAppendScan> {
+public class TestBaseIncrementalAppendScan
+ extends ScanTestBase<IncrementalAppendScan, FileScanTask,
CombinedScanTask> {
public TestBaseIncrementalAppendScan(int formatVersion) {
super(formatVersion);
}
diff --git
a/core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java
b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java
new file mode 100644
index 0000000000..1a1844345b
--- /dev/null
+++
b/core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ComparisonChain;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan
+ extends ScanTestBase<
+ IncrementalChangelogScan, ChangelogScanTask,
ScanTaskGroup<ChangelogScanTask>> {
+
+ public TestBaseIncrementalChangelogScan(int formatVersion) {
+ super(formatVersion);
+ }
+
+ @Override
+ protected IncrementalChangelogScan newScan() {
+ return table.newIncrementalChangelogScan();
+ }
+
+ @Test
+ public void testDataFilters() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+
+ Snapshot snap1 = table.currentSnapshot();
+ ManifestFile snap1DataManifest =
Iterables.getOnlyElement(snap1.dataManifests(table.io()));
+
+ table.newFastAppend().appendFile(FILE_B).commit();
+
+ Snapshot snap2 = table.currentSnapshot();
+
+ Assert.assertEquals("Must be 2 data manifests", 2,
snap2.dataManifests(table.io()).size());
+
+ withUnavailableLocations(
+ ImmutableList.of(snap1DataManifest.path()),
+ () -> {
+ // bucket(k, 16) is 1 which is supposed to match only FILE_B
+ IncrementalChangelogScan scan =
newScan().filter(Expressions.equal("data", "k"));
+
+ List<ChangelogScanTask> tasks = plan(scan);
+
+ Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+ AddedRowsScanTask t1 = (AddedRowsScanTask)
Iterables.getOnlyElement(tasks);
+ Assert.assertEquals("Ordinal must match", 1, t1.changeOrdinal());
+ Assert.assertEquals("Snapshot must match", snap2.snapshotId(),
t1.commitSnapshotId());
+ Assert.assertEquals("Data file must match", FILE_B.path(),
t1.file().path());
+ Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+ });
+ }
+
+ @Test
+ public void testOverwrites() {
+ table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ Snapshot snap1 = table.currentSnapshot();
+
+ table.newOverwrite().addFile(FILE_A2).deleteFile(FILE_A).commit();
+
+ Snapshot snap2 = table.currentSnapshot();
+
+ IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
+
+ List<ChangelogScanTask> tasks = plan(scan);
+
+ Assert.assertEquals("Must have 2 tasks", 2, tasks.size());
+
+ AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+ Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+ Assert.assertEquals("Snapshot must match", snap2.snapshotId(),
t1.commitSnapshotId());
+ Assert.assertEquals("Data file must match", FILE_A2.path(),
t1.file().path());
+ Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+ DeletedDataFileScanTask t2 = (DeletedDataFileScanTask) tasks.get(1);
+ Assert.assertEquals("Ordinal must match", 0, t2.changeOrdinal());
+ Assert.assertEquals("Snapshot must match", snap2.snapshotId(),
t2.commitSnapshotId());
+ Assert.assertEquals("Data file must match", FILE_A.path(),
t2.file().path());
+ Assert.assertTrue("Must be no deletes", t2.existingDeletes().isEmpty());
+ }
+
+ @Test
+ public void testFileDeletes() {
+ table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+ Snapshot snap1 = table.currentSnapshot();
+
+ table.newDelete().deleteFile(FILE_A).commit();
+
+ Snapshot snap2 = table.currentSnapshot();
+
+ IncrementalChangelogScan scan =
+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
+
+ List<ChangelogScanTask> tasks = plan(scan);
+
+ Assert.assertEquals("Must have 1 tasks", 1, tasks.size());
+
+ DeletedDataFileScanTask t1 = (DeletedDataFileScanTask)
Iterables.getOnlyElement(tasks);
+ Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+ Assert.assertEquals("Snapshot must match", snap2.snapshotId(),
t1.commitSnapshotId());
+ Assert.assertEquals("Data file must match", FILE_A.path(),
t1.file().path());
+ Assert.assertTrue("Must be no deletes", t1.existingDeletes().isEmpty());
+ }
+
+ @Test
+ public void testExistingEntriesInNewDataManifestsAreIgnored() {
+ table
+ .updateProperties()
+ .set(MANIFEST_MIN_MERGE_COUNT, "1")
+ .set(MANIFEST_MERGE_ENABLED, "true")
+ .commit();
+
+ table.newAppend().appendFile(FILE_A).commit();
+
+ table.newAppend().appendFile(FILE_B).commit();
+
+ table.newAppend().appendFile(FILE_C).commit();
+
+ Snapshot snap3 = table.currentSnapshot();
+
+ ManifestFile manifest =
Iterables.getOnlyElement(snap3.dataManifests(table.io()));
+ Assert.assertTrue("Manifest must have existing files",
manifest.hasExistingFiles());
+
+ IncrementalChangelogScan scan =
+
newScan().fromSnapshotInclusive(snap3.snapshotId()).toSnapshot(snap3.snapshotId());
+
+ List<ChangelogScanTask> tasks = plan(scan);
+
+ Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+ AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.getOnlyElement(tasks);
+ Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+ Assert.assertEquals("Snapshot must match", snap3.snapshotId(),
t1.commitSnapshotId());
+ Assert.assertEquals("Data file must match", FILE_C.path(),
t1.file().path());
+ Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+ }
+
+ @Test
+ public void testManifestRewritesAreIgnored() throws IOException {
+ table.newAppend().appendFile(FILE_A).commit();
+
+ Snapshot snap1 = table.currentSnapshot();
+
+ table.newAppend().appendFile(FILE_B).commit();
+
+ Snapshot snap2 = table.currentSnapshot();
+
+ ManifestFile newManifest =
+ writeManifest(
+ "manifest-file.avro",
+ manifestEntry(ManifestEntry.Status.EXISTING, snap1.snapshotId(),
FILE_A),
+ manifestEntry(ManifestEntry.Status.EXISTING, snap2.snapshotId(),
FILE_B));
+
+ RewriteManifests rewriteManifests = table.rewriteManifests();
+
+ for (ManifestFile manifest : snap2.dataManifests(table.io())) {
+ rewriteManifests.deleteManifest(manifest);
+ }
+
+ rewriteManifests.addManifest(newManifest);
+
+ rewriteManifests.commit();
+
+ table.newAppend().appendFile(FILE_C).commit();
+
+ Snapshot snap4 = table.currentSnapshot();
+
+ List<ChangelogScanTask> tasks = plan(newScan());
+
+ Assert.assertEquals("Must have 3 tasks", 3, tasks.size());
+
+ AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+ Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+ Assert.assertEquals("Snapshot must match", snap1.snapshotId(),
t1.commitSnapshotId());
+ Assert.assertEquals("Data file must match", FILE_A.path(),
t1.file().path());
+ Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+ AddedRowsScanTask t2 = (AddedRowsScanTask) tasks.get(1);
+ Assert.assertEquals("Ordinal must match", 1, t2.changeOrdinal());
+ Assert.assertEquals("Snapshot must match", snap2.snapshotId(),
t2.commitSnapshotId());
+ Assert.assertEquals("Data file must match", FILE_B.path(),
t2.file().path());
+ Assert.assertTrue("Must be no deletes", t2.deletes().isEmpty());
+
+ AddedRowsScanTask t3 = (AddedRowsScanTask) tasks.get(2);
+ Assert.assertEquals("Ordinal must match", 2, t3.changeOrdinal());
+ Assert.assertEquals("Snapshot must match", snap4.snapshotId(),
t3.commitSnapshotId());
+ Assert.assertEquals("Data file must match", FILE_C.path(),
t3.file().path());
+ Assert.assertTrue("Must be no deletes", t3.deletes().isEmpty());
+ }
+
+ @Test
+ public void testDataFileRewrites() {
+ table.newAppend().appendFile(FILE_A).commit();
+
+ Snapshot snap1 = table.currentSnapshot();
+
+ table.newAppend().appendFile(FILE_B).commit();
+
+ Snapshot snap2 = table.currentSnapshot();
+
+ table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A),
ImmutableSet.of(FILE_A2)).commit();
+
+ List<ChangelogScanTask> tasks = plan(newScan());
+
+ Assert.assertEquals("Must have 2 tasks", 2, tasks.size());
+
+ AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+ Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+ Assert.assertEquals("Snapshot must match", snap1.snapshotId(),
t1.commitSnapshotId());
+ Assert.assertEquals("Data file must match", FILE_A.path(),
t1.file().path());
+ Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+ AddedRowsScanTask t2 = (AddedRowsScanTask) tasks.get(1);
+ Assert.assertEquals("Ordinal must match", 1, t2.changeOrdinal());
+ Assert.assertEquals("Snapshot must match", snap2.snapshotId(),
t2.commitSnapshotId());
+ Assert.assertEquals("Data file must match", FILE_B.path(),
t2.file().path());
+ Assert.assertTrue("Must be no deletes", t2.deletes().isEmpty());
+ }
+
+ @Test
+ public void testDeleteFilesAreNotSupported() {
+ Assume.assumeTrue(formatVersion == 2);
+
+ table.newFastAppend().appendFile(FILE_A2).appendFile(FILE_B).commit();
+
+ table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
+
+ AssertHelpers.assertThrows(
+ "Should complain about delete files",
+ UnsupportedOperationException.class,
+ "Delete files are currently not supported",
+ () -> plan(newScan()));
+ }
+
+ // plans tasks and reorders them to have deterministic order
+ private List<ChangelogScanTask> plan(IncrementalChangelogScan scan) {
+ try (CloseableIterable<ChangelogScanTask> tasks = scan.planFiles()) {
+ List<ChangelogScanTask> tasksAsList = Lists.newArrayList(tasks);
+ tasksAsList.sort(taskComparator());
+ return tasksAsList;
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Comparator<? super ChangelogScanTask> taskComparator() {
+ return (t1, t2) ->
+ ComparisonChain.start()
+ .compare(t1.changeOrdinal(), t2.changeOrdinal())
+ .compare(t1.getClass().getName(), t2.getClass().getName())
+ .compare(path(t1), path(t2))
+ .result();
+ }
+
+ private String path(ChangelogScanTask task) {
+ return ((ContentScanTask<?>) task).file().path().toString();
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
index a5e9f6f9ec..9f3946984c 100644
--- a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg;
-public class TestDataTableScan extends ScanTestBase<TableScan> {
+public class TestDataTableScan extends ScanTestBase<TableScan, FileScanTask,
CombinedScanTask> {
public TestDataTableScan(int formatVersion) {
super(formatVersion);
}