This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1e52c81 Spark: Add SparkMergeScan (#1782)
1e52c81 is described below
commit 1e52c817d05add2f3106e6e6767bb06f566b1c04
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Nov 20 14:37:57 2020 -0800
Spark: Add SparkMergeScan (#1782)
---
.../iceberg/spark/source/SparkBatchQueryScan.java | 167 +++++++++++++++++++
.../iceberg/spark/source/SparkBatchScan.java | 153 +++---------------
.../iceberg/spark/source/SparkMergeScan.java | 179 +++++++++++++++++++++
.../iceberg/spark/source/SparkScanBuilder.java | 16 +-
4 files changed, 386 insertions(+), 129 deletions(-)
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
new file mode 100644
index 0000000..d6e0a3f
--- /dev/null
+++
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkBatchQueryScan extends SparkBatchScan {
+
+ private final Long snapshotId;
+ private final Long startSnapshotId;
+ private final Long endSnapshotId;
+ private final Long asOfTimestamp;
+ private final Long splitSize;
+ private final Integer splitLookback;
+ private final Long splitOpenFileCost;
+
+ private List<CombinedScanTask> tasks = null; // lazy cache of tasks
+
+ SparkBatchQueryScan(Table table, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryption,
+ boolean caseSensitive, Schema expectedSchema,
List<Expression> filters,
+ CaseInsensitiveStringMap options) {
+
+ super(table, io, encryption, caseSensitive, expectedSchema, filters,
options);
+
+ this.snapshotId = Spark3Util.propertyAsLong(options, "snapshot-id", null);
+ this.asOfTimestamp = Spark3Util.propertyAsLong(options, "as-of-timestamp",
null);
+
+ if (snapshotId != null && asOfTimestamp != null) {
+ throw new IllegalArgumentException(
+ "Cannot scan using both snapshot-id and as-of-timestamp to select
the table snapshot");
+ }
+
+ this.startSnapshotId = Spark3Util.propertyAsLong(options,
"start-snapshot-id", null);
+ this.endSnapshotId = Spark3Util.propertyAsLong(options, "end-snapshot-id",
null);
+ if (snapshotId != null || asOfTimestamp != null) {
+ if (startSnapshotId != null || endSnapshotId != null) {
+ throw new IllegalArgumentException(
+ "Cannot specify start-snapshot-id and end-snapshot-id to do
incremental scan when either snapshot-id or " +
+ "as-of-timestamp is specified");
+ }
+ } else if (startSnapshotId == null && endSnapshotId != null) {
+ throw new IllegalArgumentException("Cannot only specify option
end-snapshot-id to do incremental scan");
+ }
+
+ // look for split behavior overrides in options
+ this.splitSize = Spark3Util.propertyAsLong(options, "split-size", null);
+ this.splitLookback = Spark3Util.propertyAsInt(options, "lookback", null);
+ this.splitOpenFileCost = Spark3Util.propertyAsLong(options,
"file-open-cost", null);
+ }
+
+ @Override
+ protected List<CombinedScanTask> tasks() {
+ if (tasks == null) {
+ TableScan scan = table()
+ .newScan()
+ .caseSensitive(caseSensitive())
+ .project(expectedSchema());
+
+ if (snapshotId != null) {
+ scan = scan.useSnapshot(snapshotId);
+ }
+
+ if (asOfTimestamp != null) {
+ scan = scan.asOfTime(asOfTimestamp);
+ }
+
+ if (startSnapshotId != null) {
+ if (endSnapshotId != null) {
+ scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
+ } else {
+ scan = scan.appendsAfter(startSnapshotId);
+ }
+ }
+
+ if (splitSize != null) {
+ scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
+ }
+
+ if (splitLookback != null) {
+ scan = scan.option(TableProperties.SPLIT_LOOKBACK,
splitLookback.toString());
+ }
+
+ if (splitOpenFileCost != null) {
+ scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST,
splitOpenFileCost.toString());
+ }
+
+ for (Expression filter : filterExpressions()) {
+ scan = scan.filter(filter);
+ }
+
+ try (CloseableIterable<CombinedScanTask> tasksIterable =
scan.planTasks()) {
+ this.tasks = Lists.newArrayList(tasksIterable);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close table scan: %s",
scan);
+ }
+ }
+
+ return tasks;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SparkBatchQueryScan that = (SparkBatchQueryScan) o;
+ return table().name().equals(that.table().name()) &&
+ readSchema().equals(that.readSchema()) && // compare Spark schemas to
ignore field ids
+
filterExpressions().toString().equals(that.filterExpressions().toString()) &&
+ Objects.equals(snapshotId, that.snapshotId) &&
+ Objects.equals(startSnapshotId, that.startSnapshotId) &&
+ Objects.equals(endSnapshotId, that.endSnapshotId) &&
+ Objects.equals(asOfTimestamp, that.asOfTimestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ table().name(), readSchema(), filterExpressions().toString(),
snapshotId, startSnapshotId, endSnapshotId,
+ asOfTimestamp);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
+ table(), expectedSchema().asStruct(), filterExpressions(),
caseSensitive());
+ }
+}
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
index 13b85ad..a6265e1 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
@@ -19,11 +19,10 @@
package org.apache.iceberg.spark.source;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
@@ -33,17 +32,13 @@ import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.TableScan;
import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.PropertyUtil;
@@ -63,7 +58,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
+abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics
{
private static final Logger LOG =
LoggerFactory.getLogger(SparkBatchScan.class);
private final Table table;
@@ -71,13 +66,6 @@ class SparkBatchScan implements Scan, Batch,
SupportsReportStatistics {
private final boolean localityPreferred;
private final Schema expectedSchema;
private final List<Expression> filterExpressions;
- private final Long snapshotId;
- private final Long startSnapshotId;
- private final Long endSnapshotId;
- private final Long asOfTimestamp;
- private final Long splitSize;
- private final Integer splitLookback;
- private final Long splitOpenFileCost;
private final Broadcast<FileIO> io;
private final Broadcast<EncryptionManager> encryptionManager;
private final boolean batchReadsEnabled;
@@ -85,45 +73,39 @@ class SparkBatchScan implements Scan, Batch,
SupportsReportStatistics {
// lazy variables
private StructType readSchema = null;
- private List<CombinedScanTask> tasks = null; // lazy cache of tasks
- SparkBatchScan(Table table, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryption, boolean caseSensitive,
- Schema expectedSchema, List<Expression> filters,
CaseInsensitiveStringMap options) {
+ SparkBatchScan(Table table, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryption,
+ boolean caseSensitive, Schema expectedSchema,
List<Expression> filters,
+ CaseInsensitiveStringMap options) {
this.table = table;
this.io = io;
this.encryptionManager = encryption;
this.caseSensitive = caseSensitive;
this.expectedSchema = expectedSchema;
- this.filterExpressions = filters;
- this.snapshotId = Spark3Util.propertyAsLong(options, "snapshot-id", null);
- this.asOfTimestamp = Spark3Util.propertyAsLong(options, "as-of-timestamp",
null);
-
- if (snapshotId != null && asOfTimestamp != null) {
- throw new IllegalArgumentException(
- "Cannot scan using both snapshot-id and as-of-timestamp to select
the table snapshot");
- }
-
- this.startSnapshotId = Spark3Util.propertyAsLong(options,
"start-snapshot-id", null);
- this.endSnapshotId = Spark3Util.propertyAsLong(options, "end-snapshot-id",
null);
- if (snapshotId != null || asOfTimestamp != null) {
- if (startSnapshotId != null || endSnapshotId != null) {
- throw new IllegalArgumentException(
- "Cannot specify start-snapshot-id and end-snapshot-id to do
incremental scan when either snapshot-id or " +
- "as-of-timestamp is specified");
- }
- } else if (startSnapshotId == null && endSnapshotId != null) {
- throw new IllegalArgumentException("Cannot only specify option
end-snapshot-id to do incremental scan");
- }
-
- // look for split behavior overrides in options
- this.splitSize = Spark3Util.propertyAsLong(options, "split-size", null);
- this.splitLookback = Spark3Util.propertyAsInt(options, "lookback", null);
- this.splitOpenFileCost = Spark3Util.propertyAsLong(options,
"file-open-cost", null);
+ this.filterExpressions = filters != null ? filters :
Collections.emptyList();
this.localityPreferred = Spark3Util.isLocalityEnabled(io.value(),
table.location(), options);
this.batchReadsEnabled =
Spark3Util.isVectorizationEnabled(table.properties(), options);
this.batchSize = Spark3Util.batchSize(table.properties(), options);
}
+ protected Table table() {
+ return table;
+ }
+
+ protected boolean caseSensitive() {
+ return caseSensitive;
+ }
+
+ protected Schema expectedSchema() {
+ return expectedSchema;
+ }
+
+ protected List<Expression> filterExpressions() {
+ return filterExpressions;
+ }
+
+ protected abstract List<CombinedScanTask> tasks();
+
@Override
public Batch toBatch() {
return this;
@@ -190,7 +172,7 @@ class SparkBatchScan implements Scan, Batch,
SupportsReportStatistics {
}
// estimate stats using snapshot summary only for partitioned tables
(metadata tables are unpartitioned)
- if (!table.spec().isUnpartitioned() && (filterExpressions == null ||
filterExpressions.isEmpty())) {
+ if (!table.spec().isUnpartitioned() && filterExpressions.isEmpty()) {
LOG.debug("using table metadata to estimate table statistics");
long totalRecords =
PropertyUtil.propertyAsLong(table.currentSnapshot().summary(),
SnapshotSummary.TOTAL_RECORDS_PROP, Long.MAX_VALUE);
@@ -214,96 +196,11 @@ class SparkBatchScan implements Scan, Batch,
SupportsReportStatistics {
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- SparkBatchScan that = (SparkBatchScan) o;
- return table.name().equals(that.table.name()) &&
- readSchema().equals(that.readSchema()) && // compare Spark schemas to
ignore field ids
- filterExpressions.toString().equals(that.filterExpressions.toString())
&&
- Objects.equals(snapshotId, that.snapshotId) &&
- Objects.equals(startSnapshotId, that.startSnapshotId) &&
- Objects.equals(endSnapshotId, that.endSnapshotId) &&
- Objects.equals(asOfTimestamp, that.asOfTimestamp);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- table.name(), readSchema(), filterExpressions.toString(), snapshotId,
startSnapshotId, endSnapshotId,
- asOfTimestamp);
- }
-
- private List<CombinedScanTask> tasks() {
- if (tasks == null) {
- TableScan scan = table
- .newScan()
- .caseSensitive(caseSensitive)
- .project(expectedSchema);
-
- if (snapshotId != null) {
- scan = scan.useSnapshot(snapshotId);
- }
-
- if (asOfTimestamp != null) {
- scan = scan.asOfTime(asOfTimestamp);
- }
-
- if (startSnapshotId != null) {
- if (endSnapshotId != null) {
- scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
- } else {
- scan = scan.appendsAfter(startSnapshotId);
- }
- }
-
- if (splitSize != null) {
- scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
- }
-
- if (splitLookback != null) {
- scan = scan.option(TableProperties.SPLIT_LOOKBACK,
splitLookback.toString());
- }
-
- if (splitOpenFileCost != null) {
- scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST,
splitOpenFileCost.toString());
- }
-
- if (filterExpressions != null) {
- for (Expression filter : filterExpressions) {
- scan = scan.filter(filter);
- }
- }
-
- try (CloseableIterable<CombinedScanTask> tasksIterable =
scan.planTasks()) {
- this.tasks = Lists.newArrayList(tasksIterable);
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to close table scan: %s",
scan);
- }
- }
-
- return tasks;
- }
-
- @Override
public String description() {
String filters =
filterExpressions.stream().map(Spark3Util::describe).collect(Collectors.joining(",
"));
return String.format("%s [filters=%s]", table, filters);
}
- @Override
- public String toString() {
- return String.format(
- "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
- table, expectedSchema.asStruct(), filterExpressions, caseSensitive);
- }
-
private static class ReaderFactory implements PartitionReaderFactory {
private final int batchSize;
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
new file mode 100644
index 0000000..8eb4690
--- /dev/null
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+class SparkMergeScan extends SparkBatchScan {
+
+ private final Table table;
+ private final boolean ignoreResiduals;
+ private final Schema expectedSchema;
+ private final Long snapshotId;
+ private final Long splitSize;
+ private final Integer splitLookback;
+ private final Long splitOpenFileCost;
+
+ // lazy variables
+ private List<FileScanTask> files = null; // lazy cache of files
+ private List<CombinedScanTask> tasks = null; // lazy cache of tasks
+
+ SparkMergeScan(Table table, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryption,
+ boolean caseSensitive, boolean ignoreResiduals, Schema
expectedSchema,
+ List<Expression> filters, CaseInsensitiveStringMap options) {
+
+ super(table, io, encryption, caseSensitive, expectedSchema, filters,
options);
+
+ this.table = table;
+ this.ignoreResiduals = ignoreResiduals;
+ this.expectedSchema = expectedSchema;
+ this.snapshotId = Spark3Util.propertyAsLong(options, "snapshot-id", null);
+
+ Map<String, String> props = table.properties();
+
+ long tableSplitSize = PropertyUtil.propertyAsLong(props, SPLIT_SIZE,
SPLIT_SIZE_DEFAULT);
+ this.splitSize = Spark3Util.propertyAsLong(options, "split-size",
tableSplitSize);
+
+ int tableSplitLookback = PropertyUtil.propertyAsInt(props, SPLIT_LOOKBACK,
SPLIT_LOOKBACK_DEFAULT);
+ this.splitLookback = Spark3Util.propertyAsInt(options, "lookback",
tableSplitLookback);
+
+ long tableOpenFileCost = PropertyUtil.propertyAsLong(props,
SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT);
+ this.splitOpenFileCost = Spark3Util.propertyAsLong(options,
"file-open-cost", tableOpenFileCost);
+
+ if (snapshotId == null) {
+ // init files with an empty list if the snapshot id is not set to avoid
picking any concurrent changes
+ files = Collections.emptyList();
+ }
+ }
+
+ Long snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public Statistics estimateStatistics() {
+ if (snapshotId == null) {
+ return new Stats(0L, 0L);
+ }
+ return super.estimateStatistics();
+ }
+
+ // should be accessible to the write
+ List<FileScanTask> files() {
+ if (files == null) {
+ TableScan scan = table
+ .newScan()
+ .caseSensitive(caseSensitive())
+ .useSnapshot(snapshotId)
+ .project(expectedSchema);
+
+ for (Expression filter : filterExpressions()) {
+ scan = scan.filter(filter);
+ }
+
+ if (ignoreResiduals) {
+ scan.ignoreResiduals();
+ }
+
+ try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) {
+ this.files = Lists.newArrayList(filesIterable);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close table scan: %s",
scan);
+ }
+ }
+
+ return files;
+ }
+
+ @Override
+ protected List<CombinedScanTask> tasks() {
+ if (tasks == null) {
+ CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles(
+ CloseableIterable.withNoopClose(files()),
+ splitSize);
+ CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks(
+ splitFiles, splitSize,
+ splitLookback, splitOpenFileCost);
+ tasks = Lists.newArrayList(scanTasks);
+ }
+
+ return tasks;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ // TODO: add filtered files
+ SparkMergeScan that = (SparkMergeScan) o;
+ return table().name().equals(that.table().name()) &&
+ readSchema().equals(that.readSchema()) && // compare Spark schemas to
ignore field ids
+
filterExpressions().toString().equals(that.filterExpressions().toString()) &&
+ ignoreResiduals == that.ignoreResiduals &&
+ Objects.equals(snapshotId, that.snapshotId);
+ }
+
+ @Override
+ public int hashCode() {
+ // TODO: add filtered files
+ return Objects.hash(table().name(), readSchema(),
filterExpressions().toString(), ignoreResiduals, snapshotId);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "IcebergMergeScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
+ table(), expectedSchema().asStruct(), filterExpressions(),
caseSensitive());
+ }
+}
diff --git
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index bc99b93..d68986d 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -55,6 +55,7 @@ public class SparkScanBuilder implements ScanBuilder,
SupportsPushDownFilters, S
private boolean caseSensitive;
private List<Expression> filterExpressions = null;
private Filter[] pushedFilters = NO_FILTERS;
+ private boolean ignoreResiduals = false;
// lazy variables
private JavaSparkContext lazySparkContext = null;
@@ -133,12 +134,25 @@ public class SparkScanBuilder implements ScanBuilder,
SupportsPushDownFilters, S
this.requestedProjection = requestedSchema;
}
+ public SparkScanBuilder ignoreResiduals() {
+ this.ignoreResiduals = true;
+ return this;
+ }
+
@Override
public Scan build() {
Broadcast<FileIO> io =
lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
Broadcast<EncryptionManager> encryption =
lazySparkContext().broadcast(table.encryption());
- return new SparkBatchScan(table, io, encryption, caseSensitive,
lazySchema(), filterExpressions, options);
+ return new SparkBatchQueryScan(table, io, encryption, caseSensitive,
lazySchema(), filterExpressions, options);
}
+ public Scan buildMergeScan() {
+ Broadcast<FileIO> io =
lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
+ Broadcast<EncryptionManager> encryption =
lazySparkContext().broadcast(table.encryption());
+
+ return new SparkMergeScan(
+ table, io, encryption, caseSensitive, ignoreResiduals,
+ lazySchema(), filterExpressions, options);
+ }
}