This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e52c81  Spark: Add SparkMergeScan (#1782)
1e52c81 is described below

commit 1e52c817d05add2f3106e6e6767bb06f566b1c04
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Nov 20 14:37:57 2020 -0800

    Spark: Add SparkMergeScan (#1782)
---
 .../iceberg/spark/source/SparkBatchQueryScan.java  | 167 +++++++++++++++++++
 .../iceberg/spark/source/SparkBatchScan.java       | 153 +++---------------
 .../iceberg/spark/source/SparkMergeScan.java       | 179 +++++++++++++++++++++
 .../iceberg/spark/source/SparkScanBuilder.java     |  16 +-
 4 files changed, 386 insertions(+), 129 deletions(-)

diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java 
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
new file mode 100644
index 0000000..d6e0a3f
--- /dev/null
+++ 
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkBatchQueryScan extends SparkBatchScan {
+
+  private final Long snapshotId;
+  private final Long startSnapshotId;
+  private final Long endSnapshotId;
+  private final Long asOfTimestamp;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+
+  private List<CombinedScanTask> tasks = null; // lazy cache of tasks
+
+  SparkBatchQueryScan(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryption,
+                      boolean caseSensitive, Schema expectedSchema, 
List<Expression> filters,
+                      CaseInsensitiveStringMap options) {
+
+    super(table, io, encryption, caseSensitive, expectedSchema, filters, 
options);
+
+    this.snapshotId = Spark3Util.propertyAsLong(options, "snapshot-id", null);
+    this.asOfTimestamp = Spark3Util.propertyAsLong(options, "as-of-timestamp", 
null);
+
+    if (snapshotId != null && asOfTimestamp != null) {
+      throw new IllegalArgumentException(
+          "Cannot scan using both snapshot-id and as-of-timestamp to select 
the table snapshot");
+    }
+
+    this.startSnapshotId = Spark3Util.propertyAsLong(options, 
"start-snapshot-id", null);
+    this.endSnapshotId = Spark3Util.propertyAsLong(options, "end-snapshot-id", 
null);
+    if (snapshotId != null || asOfTimestamp != null) {
+      if (startSnapshotId != null || endSnapshotId != null) {
+        throw new IllegalArgumentException(
+            "Cannot specify start-snapshot-id and end-snapshot-id to do 
incremental scan when either snapshot-id or " +
+                "as-of-timestamp is specified");
+      }
+    } else if (startSnapshotId == null && endSnapshotId != null) {
+      throw new IllegalArgumentException("Cannot only specify option 
end-snapshot-id to do incremental scan");
+    }
+
+    // look for split behavior overrides in options
+    this.splitSize = Spark3Util.propertyAsLong(options, "split-size", null);
+    this.splitLookback = Spark3Util.propertyAsInt(options, "lookback", null);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
"file-open-cost", null);
+  }
+
+  @Override
+  protected List<CombinedScanTask> tasks() {
+    if (tasks == null) {
+      TableScan scan = table()
+          .newScan()
+          .caseSensitive(caseSensitive())
+          .project(expectedSchema());
+
+      if (snapshotId != null) {
+        scan = scan.useSnapshot(snapshotId);
+      }
+
+      if (asOfTimestamp != null) {
+        scan = scan.asOfTime(asOfTimestamp);
+      }
+
+      if (startSnapshotId != null) {
+        if (endSnapshotId != null) {
+          scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
+        } else {
+          scan = scan.appendsAfter(startSnapshotId);
+        }
+      }
+
+      if (splitSize != null) {
+        scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
+      }
+
+      if (splitLookback != null) {
+        scan = scan.option(TableProperties.SPLIT_LOOKBACK, 
splitLookback.toString());
+      }
+
+      if (splitOpenFileCost != null) {
+        scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, 
splitOpenFileCost.toString());
+      }
+
+      for (Expression filter : filterExpressions()) {
+        scan = scan.filter(filter);
+      }
+
+      try (CloseableIterable<CombinedScanTask> tasksIterable = 
scan.planTasks()) {
+        this.tasks = Lists.newArrayList(tasksIterable);
+      }  catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to close table scan: %s", 
scan);
+      }
+    }
+
+    return tasks;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SparkBatchQueryScan that = (SparkBatchQueryScan) o;
+    return table().name().equals(that.table().name()) &&
+        readSchema().equals(that.readSchema()) && // compare Spark schemas to 
ignore field ids
+        
filterExpressions().toString().equals(that.filterExpressions().toString()) &&
+        Objects.equals(snapshotId, that.snapshotId) &&
+        Objects.equals(startSnapshotId, that.startSnapshotId) &&
+        Objects.equals(endSnapshotId, that.endSnapshotId) &&
+        Objects.equals(asOfTimestamp, that.asOfTimestamp);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        table().name(), readSchema(), filterExpressions().toString(), 
snapshotId, startSnapshotId, endSnapshotId,
+        asOfTimestamp);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
+        table(), expectedSchema().asStruct(), filterExpressions(), 
caseSensitive());
+  }
+}
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java 
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
index 13b85ad..a6265e1 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
@@ -19,11 +19,10 @@
 
 package org.apache.iceberg.spark.source;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.stream.Collectors;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileFormat;
@@ -33,17 +32,13 @@ import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.TableScan;
 import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.util.PropertyUtil;
@@ -63,7 +58,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
+abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics 
{
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkBatchScan.class);
 
   private final Table table;
@@ -71,13 +66,6 @@ class SparkBatchScan implements Scan, Batch, 
SupportsReportStatistics {
   private final boolean localityPreferred;
   private final Schema expectedSchema;
   private final List<Expression> filterExpressions;
-  private final Long snapshotId;
-  private final Long startSnapshotId;
-  private final Long endSnapshotId;
-  private final Long asOfTimestamp;
-  private final Long splitSize;
-  private final Integer splitLookback;
-  private final Long splitOpenFileCost;
   private final Broadcast<FileIO> io;
   private final Broadcast<EncryptionManager> encryptionManager;
   private final boolean batchReadsEnabled;
@@ -85,45 +73,39 @@ class SparkBatchScan implements Scan, Batch, 
SupportsReportStatistics {
 
   // lazy variables
   private StructType readSchema = null;
-  private List<CombinedScanTask> tasks = null; // lazy cache of tasks
 
-  SparkBatchScan(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryption, boolean caseSensitive,
-                 Schema expectedSchema, List<Expression> filters, 
CaseInsensitiveStringMap options) {
+  SparkBatchScan(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryption,
+                 boolean caseSensitive, Schema expectedSchema, 
List<Expression> filters,
+                 CaseInsensitiveStringMap options) {
     this.table = table;
     this.io = io;
     this.encryptionManager = encryption;
     this.caseSensitive = caseSensitive;
     this.expectedSchema = expectedSchema;
-    this.filterExpressions = filters;
-    this.snapshotId = Spark3Util.propertyAsLong(options, "snapshot-id", null);
-    this.asOfTimestamp = Spark3Util.propertyAsLong(options, "as-of-timestamp", 
null);
-
-    if (snapshotId != null && asOfTimestamp != null) {
-      throw new IllegalArgumentException(
-          "Cannot scan using both snapshot-id and as-of-timestamp to select 
the table snapshot");
-    }
-
-    this.startSnapshotId = Spark3Util.propertyAsLong(options, 
"start-snapshot-id", null);
-    this.endSnapshotId = Spark3Util.propertyAsLong(options, "end-snapshot-id", 
null);
-    if (snapshotId != null || asOfTimestamp != null) {
-      if (startSnapshotId != null || endSnapshotId != null) {
-        throw new IllegalArgumentException(
-            "Cannot specify start-snapshot-id and end-snapshot-id to do 
incremental scan when either snapshot-id or " +
-                "as-of-timestamp is specified");
-      }
-    } else if (startSnapshotId == null && endSnapshotId != null) {
-      throw new IllegalArgumentException("Cannot only specify option 
end-snapshot-id to do incremental scan");
-    }
-
-    // look for split behavior overrides in options
-    this.splitSize = Spark3Util.propertyAsLong(options, "split-size", null);
-    this.splitLookback = Spark3Util.propertyAsInt(options, "lookback", null);
-    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
"file-open-cost", null);
+    this.filterExpressions = filters != null ? filters : 
Collections.emptyList();
     this.localityPreferred = Spark3Util.isLocalityEnabled(io.value(), 
table.location(), options);
     this.batchReadsEnabled = 
Spark3Util.isVectorizationEnabled(table.properties(), options);
     this.batchSize = Spark3Util.batchSize(table.properties(), options);
   }
 
+  protected Table table() {
+    return table;
+  }
+
+  protected boolean caseSensitive() {
+    return caseSensitive;
+  }
+
+  protected Schema expectedSchema() {
+    return expectedSchema;
+  }
+
+  protected List<Expression> filterExpressions() {
+    return filterExpressions;
+  }
+
+  protected abstract List<CombinedScanTask> tasks();
+
   @Override
   public Batch toBatch() {
     return this;
@@ -190,7 +172,7 @@ class SparkBatchScan implements Scan, Batch, 
SupportsReportStatistics {
     }
 
     // estimate stats using snapshot summary only for partitioned tables 
(metadata tables are unpartitioned)
-    if (!table.spec().isUnpartitioned() && (filterExpressions == null || 
filterExpressions.isEmpty())) {
+    if (!table.spec().isUnpartitioned() && filterExpressions.isEmpty()) {
       LOG.debug("using table metadata to estimate table statistics");
       long totalRecords = 
PropertyUtil.propertyAsLong(table.currentSnapshot().summary(),
           SnapshotSummary.TOTAL_RECORDS_PROP, Long.MAX_VALUE);
@@ -214,96 +196,11 @@ class SparkBatchScan implements Scan, Batch, 
SupportsReportStatistics {
   }
 
   @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    SparkBatchScan that = (SparkBatchScan) o;
-    return table.name().equals(that.table.name()) &&
-        readSchema().equals(that.readSchema()) && // compare Spark schemas to 
ignore field ids
-        filterExpressions.toString().equals(that.filterExpressions.toString()) 
&&
-        Objects.equals(snapshotId, that.snapshotId) &&
-        Objects.equals(startSnapshotId, that.startSnapshotId) &&
-        Objects.equals(endSnapshotId, that.endSnapshotId) &&
-        Objects.equals(asOfTimestamp, that.asOfTimestamp);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(
-        table.name(), readSchema(), filterExpressions.toString(), snapshotId, 
startSnapshotId, endSnapshotId,
-        asOfTimestamp);
-  }
-
-  private List<CombinedScanTask> tasks() {
-    if (tasks == null) {
-      TableScan scan = table
-          .newScan()
-          .caseSensitive(caseSensitive)
-          .project(expectedSchema);
-
-      if (snapshotId != null) {
-        scan = scan.useSnapshot(snapshotId);
-      }
-
-      if (asOfTimestamp != null) {
-        scan = scan.asOfTime(asOfTimestamp);
-      }
-
-      if (startSnapshotId != null) {
-        if (endSnapshotId != null) {
-          scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
-        } else {
-          scan = scan.appendsAfter(startSnapshotId);
-        }
-      }
-
-      if (splitSize != null) {
-        scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
-      }
-
-      if (splitLookback != null) {
-        scan = scan.option(TableProperties.SPLIT_LOOKBACK, 
splitLookback.toString());
-      }
-
-      if (splitOpenFileCost != null) {
-        scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, 
splitOpenFileCost.toString());
-      }
-
-      if (filterExpressions != null) {
-        for (Expression filter : filterExpressions) {
-          scan = scan.filter(filter);
-        }
-      }
-
-      try (CloseableIterable<CombinedScanTask> tasksIterable = 
scan.planTasks()) {
-        this.tasks = Lists.newArrayList(tasksIterable);
-      }  catch (IOException e) {
-        throw new RuntimeIOException(e, "Failed to close table scan: %s", 
scan);
-      }
-    }
-
-    return tasks;
-  }
-
-  @Override
   public String description() {
     String filters = 
filterExpressions.stream().map(Spark3Util::describe).collect(Collectors.joining(",
 "));
     return String.format("%s [filters=%s]", table, filters);
   }
 
-  @Override
-  public String toString() {
-    return String.format(
-        "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
-        table, expectedSchema.asStruct(), filterExpressions, caseSensitive);
-  }
-
   private static class ReaderFactory implements PartitionReaderFactory {
     private final int batchSize;
 
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java 
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
new file mode 100644
index 0000000..8eb4690
--- /dev/null
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+class SparkMergeScan extends SparkBatchScan {
+
+  private final Table table;
+  private final boolean ignoreResiduals;
+  private final Schema expectedSchema;
+  private final Long snapshotId;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+
+  // lazy variables
+  private List<FileScanTask> files = null; // lazy cache of files
+  private List<CombinedScanTask> tasks = null; // lazy cache of tasks
+
+  SparkMergeScan(Table table, Broadcast<FileIO> io, 
Broadcast<EncryptionManager> encryption,
+                 boolean caseSensitive, boolean ignoreResiduals, Schema 
expectedSchema,
+                 List<Expression> filters, CaseInsensitiveStringMap options) {
+
+    super(table, io, encryption, caseSensitive, expectedSchema, filters, 
options);
+
+    this.table = table;
+    this.ignoreResiduals = ignoreResiduals;
+    this.expectedSchema = expectedSchema;
+    this.snapshotId = Spark3Util.propertyAsLong(options, "snapshot-id", null);
+
+    Map<String, String> props = table.properties();
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(props, SPLIT_SIZE, 
SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, "split-size", 
tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(props, SPLIT_LOOKBACK, 
SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, "lookback", 
tableSplitLookback);
+
+    long tableOpenFileCost = PropertyUtil.propertyAsLong(props, 
SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, 
"file-open-cost", tableOpenFileCost);
+
+    if (snapshotId == null) {
+      // init files with an empty list if the snapshot id is not set to avoid 
picking any concurrent changes
+      files = Collections.emptyList();
+    }
+  }
+
+  Long snapshotId() {
+    return snapshotId;
+  }
+
+  @Override
+  public Statistics estimateStatistics() {
+    if (snapshotId == null) {
+      return new Stats(0L, 0L);
+    }
+    return super.estimateStatistics();
+  }
+
+  // should be accessible to the write
+  List<FileScanTask> files() {
+    if (files == null) {
+      TableScan scan = table
+          .newScan()
+          .caseSensitive(caseSensitive())
+          .useSnapshot(snapshotId)
+          .project(expectedSchema);
+
+      for (Expression filter : filterExpressions()) {
+        scan = scan.filter(filter);
+      }
+
+      if (ignoreResiduals) {
+        scan.ignoreResiduals();
+      }
+
+      try (CloseableIterable<FileScanTask> filesIterable = scan.planFiles()) {
+        this.files = Lists.newArrayList(filesIterable);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to close table scan: %s", 
scan);
+      }
+    }
+
+    return files;
+  }
+
+  @Override
+  protected List<CombinedScanTask> tasks() {
+    if (tasks == null) {
+      CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles(
+          CloseableIterable.withNoopClose(files()),
+          splitSize);
+      CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks(
+          splitFiles, splitSize,
+          splitLookback, splitOpenFileCost);
+      tasks = Lists.newArrayList(scanTasks);
+    }
+
+    return tasks;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    // TODO: add filtered files
+    SparkMergeScan that = (SparkMergeScan) o;
+    return table().name().equals(that.table().name()) &&
+        readSchema().equals(that.readSchema()) && // compare Spark schemas to 
ignore field ids
+        
filterExpressions().toString().equals(that.filterExpressions().toString()) &&
+        ignoreResiduals == that.ignoreResiduals &&
+        Objects.equals(snapshotId, that.snapshotId);
+  }
+
+  @Override
+  public int hashCode() {
+    // TODO: add filtered files
+    return Objects.hash(table().name(), readSchema(), 
filterExpressions().toString(), ignoreResiduals, snapshotId);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergMergeScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
+        table(), expectedSchema().asStruct(), filterExpressions(), 
caseSensitive());
+  }
+}
diff --git 
a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java 
b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index bc99b93..d68986d 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -55,6 +55,7 @@ public class SparkScanBuilder implements ScanBuilder, 
SupportsPushDownFilters, S
   private boolean caseSensitive;
   private List<Expression> filterExpressions = null;
   private Filter[] pushedFilters = NO_FILTERS;
+  private boolean ignoreResiduals = false;
 
   // lazy variables
   private JavaSparkContext lazySparkContext = null;
@@ -133,12 +134,25 @@ public class SparkScanBuilder implements ScanBuilder, 
SupportsPushDownFilters, S
     this.requestedProjection = requestedSchema;
   }
 
+  public SparkScanBuilder ignoreResiduals() {
+    this.ignoreResiduals = true;
+    return this;
+  }
+
   @Override
   public Scan build() {
     Broadcast<FileIO> io = 
lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
     Broadcast<EncryptionManager> encryption = 
lazySparkContext().broadcast(table.encryption());
 
-    return new SparkBatchScan(table, io, encryption, caseSensitive, 
lazySchema(), filterExpressions, options);
+    return new SparkBatchQueryScan(table, io, encryption, caseSensitive, 
lazySchema(), filterExpressions, options);
   }
 
+  public Scan buildMergeScan() {
+    Broadcast<FileIO> io = 
lazySparkContext().broadcast(SparkUtil.serializableFileIO(table));
+    Broadcast<EncryptionManager> encryption = 
lazySparkContext().broadcast(table.encryption());
+
+    return new SparkMergeScan(
+        table, io, encryption, caseSensitive, ignoreResiduals,
+        lazySchema(), filterExpressions, options);
+  }
 }

Reply via email to