This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c576c5952 Spark: Exclude reading _pos column if it's not in the scan 
list (#11390)
1c576c5952 is described below

commit 1c576c5952fbc623591e800408cdba1518e6a410
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Nov 8 15:57:37 2024 -0800

    Spark: Exclude reading _pos column if it's not in the scan list (#11390)
---
 .../java/org/apache/iceberg/data/DeleteFilter.java   | 20 ++++++++++++++++----
 .../apache/iceberg/spark/source/BaseBatchReader.java | 17 ++++++++++++++++-
 .../org/apache/iceberg/spark/source/BaseReader.java  |  5 +++--
 .../apache/iceberg/spark/source/BatchDataReader.java |  2 +-
 .../iceberg/spark/source/ChangelogRowReader.java     |  5 +++--
 .../spark/source/EqualityDeleteRowReader.java        |  2 +-
 .../apache/iceberg/spark/source/RowDataReader.java   |  3 ++-
 .../apache/iceberg/spark/source/BaseBatchReader.java | 17 ++++++++++++++++-
 .../org/apache/iceberg/spark/source/BaseReader.java  |  5 +++--
 .../apache/iceberg/spark/source/BatchDataReader.java |  2 +-
 .../iceberg/spark/source/ChangelogRowReader.java     |  5 +++--
 .../spark/source/EqualityDeleteRowReader.java        |  2 +-
 .../apache/iceberg/spark/source/RowDataReader.java   |  3 ++-
 .../apache/iceberg/spark/source/BaseBatchReader.java | 17 ++++++++++++++++-
 .../org/apache/iceberg/spark/source/BaseReader.java  |  5 +++--
 .../apache/iceberg/spark/source/BatchDataReader.java |  2 +-
 .../iceberg/spark/source/ChangelogRowReader.java     |  5 +++--
 .../spark/source/EqualityDeleteRowReader.java        |  2 +-
 .../apache/iceberg/spark/source/RowDataReader.java   |  3 ++-
 19 files changed, 94 insertions(+), 28 deletions(-)

diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java 
b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index e7d8445cf8..aa5e00fd0e 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -69,7 +69,8 @@ public abstract class DeleteFilter<T> {
       List<DeleteFile> deletes,
       Schema tableSchema,
       Schema requestedSchema,
-      DeleteCounter counter) {
+      DeleteCounter counter,
+      boolean needRowPosCol) {
     this.filePath = filePath;
     this.counter = counter;
 
@@ -93,13 +94,23 @@ public abstract class DeleteFilter<T> {
 
     this.posDeletes = posDeleteBuilder.build();
     this.eqDeletes = eqDeleteBuilder.build();
-    this.requiredSchema = fileProjection(tableSchema, requestedSchema, 
posDeletes, eqDeletes);
+    this.requiredSchema =
+        fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes, 
needRowPosCol);
     this.posAccessor = 
requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
     this.hasIsDeletedColumn =
         requiredSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
     this.isDeletedColumnPosition = 
requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED);
   }
 
+  protected DeleteFilter(
+      String filePath,
+      List<DeleteFile> deletes,
+      Schema tableSchema,
+      Schema requestedSchema,
+      DeleteCounter counter) {
+    this(filePath, deletes, tableSchema, requestedSchema, counter, true);
+  }
+
   protected DeleteFilter(
       String filePath, List<DeleteFile> deletes, Schema tableSchema, Schema 
requestedSchema) {
     this(filePath, deletes, tableSchema, requestedSchema, new DeleteCounter());
@@ -251,13 +262,14 @@ public abstract class DeleteFilter<T> {
       Schema tableSchema,
       Schema requestedSchema,
       List<DeleteFile> posDeletes,
-      List<DeleteFile> eqDeletes) {
+      List<DeleteFile> eqDeletes,
+      boolean needRowPosCol) {
     if (posDeletes.isEmpty() && eqDeletes.isEmpty()) {
       return requestedSchema;
     }
 
     Set<Integer> requiredIds = Sets.newLinkedHashSet();
-    if (!posDeletes.isEmpty()) {
+    if (needRowPosCol && !posDeletes.isEmpty()) {
       requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
     }
 
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index c05b694a60..49c4395213 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.source;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.FileFormat;
@@ -31,10 +32,12 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBatch, T> {
@@ -81,9 +84,21 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
       SparkDeleteFilter deleteFilter) {
     // get required schema if there are deletes
     Schema requiredSchema = deleteFilter != null ? 
deleteFilter.requiredSchema() : expectedSchema();
+    boolean hasPositionDelete = deleteFilter != null ? 
deleteFilter.hasPosDeletes() : false;
+    Schema projectedSchema = requiredSchema;
+    if (hasPositionDelete) {
+      // We need to add MetadataColumns.ROW_POSITION in the schema for
+      // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed 
any
+      // more after #10107 is merged.
+      List<Types.NestedField> columns = 
Lists.newArrayList(requiredSchema.columns());
+      if (!columns.contains(MetadataColumns.ROW_POSITION)) {
+        columns.add(MetadataColumns.ROW_POSITION);
+        projectedSchema = new Schema(columns);
+      }
+    }
 
     return Parquet.read(inputFile)
-        .project(requiredSchema)
+        .project(projectedSchema)
         .split(start, length)
         .createBatchedReaderFunc(
             fileSchema ->
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 0882edcb7c..96ff430c17 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -257,8 +257,9 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
   protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
     private final InternalRowWrapper asStructLike;
 
-    SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter 
counter) {
-      super(filePath, deletes, tableSchema, expectedSchema, counter);
+    SparkDeleteFilter(
+        String filePath, List<DeleteFile> deletes, DeleteCounter counter, 
boolean needRowPosCol) {
+      super(filePath, deletes, tableSchema, expectedSchema, counter, 
needRowPosCol);
       this.asStructLike =
           new InternalRowWrapper(
               SparkSchemaUtil.convert(requiredSchema()), 
requiredSchema().asStruct());
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 389ad1d5a2..a2cb74c926 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -96,7 +96,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
     SparkDeleteFilter deleteFilter =
         task.deletes().isEmpty()
             ? null
-            : new SparkDeleteFilter(filePath, task.deletes(), counter());
+            : new SparkDeleteFilter(filePath, task.deletes(), counter(), 
false);
 
     return newBatchIterable(
             inputFile,
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 572f955884..25cd9eda6b 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -111,13 +111,14 @@ class ChangelogRowReader extends 
BaseRowReader<ChangelogScanTask>
 
   CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) 
{
     String filePath = task.file().path().toString();
-    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
+    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter(), true);
     return deletes.filter(rows(task, deletes.requiredSchema()));
   }
 
   private CloseableIterable<InternalRow> 
openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
     String filePath = task.file().path().toString();
-    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.existingDeletes(), counter());
+    SparkDeleteFilter deletes =
+        new SparkDeleteFilter(filePath, task.existingDeletes(), counter(), 
true);
     return deletes.filter(rows(task, deletes.requiredSchema()));
   }
 
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index f5b98a5a43..7aa5a97156 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -41,7 +41,7 @@ public class EqualityDeleteRowReader extends RowDataReader {
   @Override
   protected CloseableIterator<InternalRow> open(FileScanTask task) {
     SparkDeleteFilter matches =
-        new SparkDeleteFilter(task.file().path().toString(), task.deletes(), 
counter());
+        new SparkDeleteFilter(task.file().path().toString(), task.deletes(), 
counter(), true);
 
     // schema or rows returned by readers
     Schema requiredSchema = matches.requiredSchema();
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 9356f62f35..33b1d6275d 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -83,7 +83,8 @@ class RowDataReader extends BaseRowReader<FileScanTask> 
implements PartitionRead
   protected CloseableIterator<InternalRow> open(FileScanTask task) {
     String filePath = task.file().path().toString();
     LOG.debug("Opening data file {}", filePath);
-    SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
+    SparkDeleteFilter deleteFilter =
+        new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
 
     // schema or rows returned by readers
     Schema requiredSchema = deleteFilter.requiredSchema();
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index c05b694a60..49c4395213 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.source;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.FileFormat;
@@ -31,10 +32,12 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBatch, T> {
@@ -81,9 +84,21 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
       SparkDeleteFilter deleteFilter) {
     // get required schema if there are deletes
     Schema requiredSchema = deleteFilter != null ? 
deleteFilter.requiredSchema() : expectedSchema();
+    boolean hasPositionDelete = deleteFilter != null ? 
deleteFilter.hasPosDeletes() : false;
+    Schema projectedSchema = requiredSchema;
+    if (hasPositionDelete) {
+      // We need to add MetadataColumns.ROW_POSITION in the schema for
+      // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed 
any
+      // more after #10107 is merged.
+      List<Types.NestedField> columns = 
Lists.newArrayList(requiredSchema.columns());
+      if (!columns.contains(MetadataColumns.ROW_POSITION)) {
+        columns.add(MetadataColumns.ROW_POSITION);
+        projectedSchema = new Schema(columns);
+      }
+    }
 
     return Parquet.read(inputFile)
-        .project(requiredSchema)
+        .project(projectedSchema)
         .split(start, length)
         .createBatchedReaderFunc(
             fileSchema ->
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 238069e1c9..8916408436 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -262,8 +262,9 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
   protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
     private final InternalRowWrapper asStructLike;
 
-    SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter 
counter) {
-      super(filePath, deletes, tableSchema, expectedSchema, counter);
+    SparkDeleteFilter(
+        String filePath, List<DeleteFile> deletes, DeleteCounter counter, 
boolean needRowPosCol) {
+      super(filePath, deletes, tableSchema, expectedSchema, counter, 
needRowPosCol);
       this.asStructLike =
           new InternalRowWrapper(
               SparkSchemaUtil.convert(requiredSchema()), 
requiredSchema().asStruct());
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 389ad1d5a2..a2cb74c926 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -96,7 +96,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
     SparkDeleteFilter deleteFilter =
         task.deletes().isEmpty()
             ? null
-            : new SparkDeleteFilter(filePath, task.deletes(), counter());
+            : new SparkDeleteFilter(filePath, task.deletes(), counter(), 
false);
 
     return newBatchIterable(
             inputFile,
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 572f955884..25cd9eda6b 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -111,13 +111,14 @@ class ChangelogRowReader extends 
BaseRowReader<ChangelogScanTask>
 
   CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) 
{
     String filePath = task.file().path().toString();
-    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
+    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter(), true);
     return deletes.filter(rows(task, deletes.requiredSchema()));
   }
 
   private CloseableIterable<InternalRow> 
openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
     String filePath = task.file().path().toString();
-    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.existingDeletes(), counter());
+    SparkDeleteFilter deletes =
+        new SparkDeleteFilter(filePath, task.existingDeletes(), counter(), 
true);
     return deletes.filter(rows(task, deletes.requiredSchema()));
   }
 
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index f5b98a5a43..7aa5a97156 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -41,7 +41,7 @@ public class EqualityDeleteRowReader extends RowDataReader {
   @Override
   protected CloseableIterator<InternalRow> open(FileScanTask task) {
     SparkDeleteFilter matches =
-        new SparkDeleteFilter(task.file().path().toString(), task.deletes(), 
counter());
+        new SparkDeleteFilter(task.file().path().toString(), task.deletes(), 
counter(), true);
 
     // schema or rows returned by readers
     Schema requiredSchema = matches.requiredSchema();
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 9356f62f35..33b1d6275d 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -83,7 +83,8 @@ class RowDataReader extends BaseRowReader<FileScanTask> 
implements PartitionRead
   protected CloseableIterator<InternalRow> open(FileScanTask task) {
     String filePath = task.file().path().toString();
     LOG.debug("Opening data file {}", filePath);
-    SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
+    SparkDeleteFilter deleteFilter =
+        new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
 
     // schema or rows returned by readers
     Schema requiredSchema = deleteFilter.requiredSchema();
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index c05b694a60..49c4395213 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.source;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.FileFormat;
@@ -31,10 +32,12 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBatch, T> {
@@ -81,9 +84,21 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
       SparkDeleteFilter deleteFilter) {
     // get required schema if there are deletes
     Schema requiredSchema = deleteFilter != null ? 
deleteFilter.requiredSchema() : expectedSchema();
+    boolean hasPositionDelete = deleteFilter != null ? 
deleteFilter.hasPosDeletes() : false;
+    Schema projectedSchema = requiredSchema;
+    if (hasPositionDelete) {
+      // We need to add MetadataColumns.ROW_POSITION in the schema for
+      // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed 
any
+      // more after #10107 is merged.
+      List<Types.NestedField> columns = 
Lists.newArrayList(requiredSchema.columns());
+      if (!columns.contains(MetadataColumns.ROW_POSITION)) {
+        columns.add(MetadataColumns.ROW_POSITION);
+        projectedSchema = new Schema(columns);
+      }
+    }
 
     return Parquet.read(inputFile)
-        .project(requiredSchema)
+        .project(projectedSchema)
         .split(start, length)
         .createBatchedReaderFunc(
             fileSchema ->
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 3c9438480d..207035bd30 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -249,8 +249,9 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
   protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
     private final InternalRowWrapper asStructLike;
 
-    SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter 
counter) {
-      super(filePath, deletes, tableSchema, expectedSchema, counter);
+    SparkDeleteFilter(
+        String filePath, List<DeleteFile> deletes, DeleteCounter counter, 
boolean needRowPosCol) {
+      super(filePath, deletes, tableSchema, expectedSchema, counter, 
needRowPosCol);
       this.asStructLike =
           new InternalRowWrapper(
               SparkSchemaUtil.convert(requiredSchema()), 
requiredSchema().asStruct());
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 389ad1d5a2..a2cb74c926 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -96,7 +96,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
     SparkDeleteFilter deleteFilter =
         task.deletes().isEmpty()
             ? null
-            : new SparkDeleteFilter(filePath, task.deletes(), counter());
+            : new SparkDeleteFilter(filePath, task.deletes(), counter(), 
false);
 
     return newBatchIterable(
             inputFile,
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 572f955884..25cd9eda6b 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -111,13 +111,14 @@ class ChangelogRowReader extends 
BaseRowReader<ChangelogScanTask>
 
   CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) 
{
     String filePath = task.file().path().toString();
-    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
+    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.deletes(), counter(), true);
     return deletes.filter(rows(task, deletes.requiredSchema()));
   }
 
   private CloseableIterable<InternalRow> 
openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
     String filePath = task.file().path().toString();
-    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, 
task.existingDeletes(), counter());
+    SparkDeleteFilter deletes =
+        new SparkDeleteFilter(filePath, task.existingDeletes(), counter(), 
true);
     return deletes.filter(rows(task, deletes.requiredSchema()));
   }
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index f5b98a5a43..7aa5a97156 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -41,7 +41,7 @@ public class EqualityDeleteRowReader extends RowDataReader {
   @Override
   protected CloseableIterator<InternalRow> open(FileScanTask task) {
     SparkDeleteFilter matches =
-        new SparkDeleteFilter(task.file().path().toString(), task.deletes(), 
counter());
+        new SparkDeleteFilter(task.file().path().toString(), task.deletes(), 
counter(), true);
 
     // schema or rows returned by readers
     Schema requiredSchema = matches.requiredSchema();
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 9356f62f35..33b1d6275d 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -83,7 +83,8 @@ class RowDataReader extends BaseRowReader<FileScanTask> 
implements PartitionRead
   protected CloseableIterator<InternalRow> open(FileScanTask task) {
     String filePath = task.file().path().toString();
     LOG.debug("Opening data file {}", filePath);
-    SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath, 
task.deletes(), counter());
+    SparkDeleteFilter deleteFilter =
+        new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
 
     // schema or rows returned by readers
     Schema requiredSchema = deleteFilter.requiredSchema();

Reply via email to