This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 1c576c5952 Spark: Exclude reading _pos column if it's not in the scan
list (#11390)
1c576c5952 is described below
commit 1c576c5952fbc623591e800408cdba1518e6a410
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Nov 8 15:57:37 2024 -0800
Spark: Exclude reading _pos column if it's not in the scan list (#11390)
---
.../java/org/apache/iceberg/data/DeleteFilter.java | 20 ++++++++++++++++----
.../apache/iceberg/spark/source/BaseBatchReader.java | 17 ++++++++++++++++-
.../org/apache/iceberg/spark/source/BaseReader.java | 5 +++--
.../apache/iceberg/spark/source/BatchDataReader.java | 2 +-
.../iceberg/spark/source/ChangelogRowReader.java | 5 +++--
.../spark/source/EqualityDeleteRowReader.java | 2 +-
.../apache/iceberg/spark/source/RowDataReader.java | 3 ++-
.../apache/iceberg/spark/source/BaseBatchReader.java | 17 ++++++++++++++++-
.../org/apache/iceberg/spark/source/BaseReader.java | 5 +++--
.../apache/iceberg/spark/source/BatchDataReader.java | 2 +-
.../iceberg/spark/source/ChangelogRowReader.java | 5 +++--
.../spark/source/EqualityDeleteRowReader.java | 2 +-
.../apache/iceberg/spark/source/RowDataReader.java | 3 ++-
.../apache/iceberg/spark/source/BaseBatchReader.java | 17 ++++++++++++++++-
.../org/apache/iceberg/spark/source/BaseReader.java | 5 +++--
.../apache/iceberg/spark/source/BatchDataReader.java | 2 +-
.../iceberg/spark/source/ChangelogRowReader.java | 5 +++--
.../spark/source/EqualityDeleteRowReader.java | 2 +-
.../apache/iceberg/spark/source/RowDataReader.java | 3 ++-
19 files changed, 94 insertions(+), 28 deletions(-)
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index e7d8445cf8..aa5e00fd0e 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -69,7 +69,8 @@ public abstract class DeleteFilter<T> {
List<DeleteFile> deletes,
Schema tableSchema,
Schema requestedSchema,
- DeleteCounter counter) {
+ DeleteCounter counter,
+ boolean needRowPosCol) {
this.filePath = filePath;
this.counter = counter;
@@ -93,13 +94,23 @@ public abstract class DeleteFilter<T> {
this.posDeletes = posDeleteBuilder.build();
this.eqDeletes = eqDeleteBuilder.build();
- this.requiredSchema = fileProjection(tableSchema, requestedSchema,
posDeletes, eqDeletes);
+ this.requiredSchema =
+ fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes,
needRowPosCol);
this.posAccessor =
requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
this.hasIsDeletedColumn =
requiredSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null;
this.isDeletedColumnPosition =
requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED);
}
+ protected DeleteFilter(
+ String filePath,
+ List<DeleteFile> deletes,
+ Schema tableSchema,
+ Schema requestedSchema,
+ DeleteCounter counter) {
+ this(filePath, deletes, tableSchema, requestedSchema, counter, true);
+ }
+
protected DeleteFilter(
String filePath, List<DeleteFile> deletes, Schema tableSchema, Schema
requestedSchema) {
this(filePath, deletes, tableSchema, requestedSchema, new DeleteCounter());
@@ -251,13 +262,14 @@ public abstract class DeleteFilter<T> {
Schema tableSchema,
Schema requestedSchema,
List<DeleteFile> posDeletes,
- List<DeleteFile> eqDeletes) {
+ List<DeleteFile> eqDeletes,
+ boolean needRowPosCol) {
if (posDeletes.isEmpty() && eqDeletes.isEmpty()) {
return requestedSchema;
}
Set<Integer> requiredIds = Sets.newLinkedHashSet();
- if (!posDeletes.isEmpty()) {
+ if (needRowPosCol && !posDeletes.isEmpty()) {
requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index c05b694a60..49c4395213 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.source;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileFormat;
@@ -31,10 +32,12 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.vectorized.ColumnarBatch;
abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBatch, T> {
@@ -81,9 +84,21 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
SparkDeleteFilter deleteFilter) {
// get required schema if there are deletes
Schema requiredSchema = deleteFilter != null ?
deleteFilter.requiredSchema() : expectedSchema();
+ boolean hasPositionDelete = deleteFilter != null ?
deleteFilter.hasPosDeletes() : false;
+ Schema projectedSchema = requiredSchema;
+ if (hasPositionDelete) {
+ // We need to add MetadataColumns.ROW_POSITION in the schema for
+ // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed
any
+ // more after #10107 is merged.
+ List<Types.NestedField> columns =
Lists.newArrayList(requiredSchema.columns());
+ if (!columns.contains(MetadataColumns.ROW_POSITION)) {
+ columns.add(MetadataColumns.ROW_POSITION);
+ projectedSchema = new Schema(columns);
+ }
+ }
return Parquet.read(inputFile)
- .project(requiredSchema)
+ .project(projectedSchema)
.split(start, length)
.createBatchedReaderFunc(
fileSchema ->
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 0882edcb7c..96ff430c17 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -257,8 +257,9 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;
- SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter
counter) {
- super(filePath, deletes, tableSchema, expectedSchema, counter);
+ SparkDeleteFilter(
+ String filePath, List<DeleteFile> deletes, DeleteCounter counter,
boolean needRowPosCol) {
+ super(filePath, deletes, tableSchema, expectedSchema, counter,
needRowPosCol);
this.asStructLike =
new InternalRowWrapper(
SparkSchemaUtil.convert(requiredSchema()),
requiredSchema().asStruct());
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 389ad1d5a2..a2cb74c926 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -96,7 +96,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
SparkDeleteFilter deleteFilter =
task.deletes().isEmpty()
? null
- : new SparkDeleteFilter(filePath, task.deletes(), counter());
+ : new SparkDeleteFilter(filePath, task.deletes(), counter(),
false);
return newBatchIterable(
inputFile,
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 572f955884..25cd9eda6b 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -111,13 +111,14 @@ class ChangelogRowReader extends
BaseRowReader<ChangelogScanTask>
CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task)
{
String filePath = task.file().path().toString();
- SparkDeleteFilter deletes = new SparkDeleteFilter(filePath,
task.deletes(), counter());
+ SparkDeleteFilter deletes = new SparkDeleteFilter(filePath,
task.deletes(), counter(), true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}
private CloseableIterable<InternalRow>
openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
String filePath = task.file().path().toString();
- SparkDeleteFilter deletes = new SparkDeleteFilter(filePath,
task.existingDeletes(), counter());
+ SparkDeleteFilter deletes =
+ new SparkDeleteFilter(filePath, task.existingDeletes(), counter(),
true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index f5b98a5a43..7aa5a97156 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -41,7 +41,7 @@ public class EqualityDeleteRowReader extends RowDataReader {
@Override
protected CloseableIterator<InternalRow> open(FileScanTask task) {
SparkDeleteFilter matches =
- new SparkDeleteFilter(task.file().path().toString(), task.deletes(),
counter());
+ new SparkDeleteFilter(task.file().path().toString(), task.deletes(),
counter(), true);
// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 9356f62f35..33b1d6275d 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -83,7 +83,8 @@ class RowDataReader extends BaseRowReader<FileScanTask>
implements PartitionRead
protected CloseableIterator<InternalRow> open(FileScanTask task) {
String filePath = task.file().path().toString();
LOG.debug("Opening data file {}", filePath);
- SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath,
task.deletes(), counter());
+ SparkDeleteFilter deleteFilter =
+ new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
// schema or rows returned by readers
Schema requiredSchema = deleteFilter.requiredSchema();
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index c05b694a60..49c4395213 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.source;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileFormat;
@@ -31,10 +32,12 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.vectorized.ColumnarBatch;
abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBatch, T> {
@@ -81,9 +84,21 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
SparkDeleteFilter deleteFilter) {
// get required schema if there are deletes
Schema requiredSchema = deleteFilter != null ?
deleteFilter.requiredSchema() : expectedSchema();
+ boolean hasPositionDelete = deleteFilter != null ?
deleteFilter.hasPosDeletes() : false;
+ Schema projectedSchema = requiredSchema;
+ if (hasPositionDelete) {
+ // We need to add MetadataColumns.ROW_POSITION in the schema for
+ // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed
any
+ // more after #10107 is merged.
+ List<Types.NestedField> columns =
Lists.newArrayList(requiredSchema.columns());
+ if (!columns.contains(MetadataColumns.ROW_POSITION)) {
+ columns.add(MetadataColumns.ROW_POSITION);
+ projectedSchema = new Schema(columns);
+ }
+ }
return Parquet.read(inputFile)
- .project(requiredSchema)
+ .project(projectedSchema)
.split(start, length)
.createBatchedReaderFunc(
fileSchema ->
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 238069e1c9..8916408436 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -262,8 +262,9 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;
- SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter
counter) {
- super(filePath, deletes, tableSchema, expectedSchema, counter);
+ SparkDeleteFilter(
+ String filePath, List<DeleteFile> deletes, DeleteCounter counter,
boolean needRowPosCol) {
+ super(filePath, deletes, tableSchema, expectedSchema, counter,
needRowPosCol);
this.asStructLike =
new InternalRowWrapper(
SparkSchemaUtil.convert(requiredSchema()),
requiredSchema().asStruct());
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 389ad1d5a2..a2cb74c926 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -96,7 +96,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
SparkDeleteFilter deleteFilter =
task.deletes().isEmpty()
? null
- : new SparkDeleteFilter(filePath, task.deletes(), counter());
+ : new SparkDeleteFilter(filePath, task.deletes(), counter(),
false);
return newBatchIterable(
inputFile,
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 572f955884..25cd9eda6b 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -111,13 +111,14 @@ class ChangelogRowReader extends
BaseRowReader<ChangelogScanTask>
CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task)
{
String filePath = task.file().path().toString();
- SparkDeleteFilter deletes = new SparkDeleteFilter(filePath,
task.deletes(), counter());
+ SparkDeleteFilter deletes = new SparkDeleteFilter(filePath,
task.deletes(), counter(), true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}
private CloseableIterable<InternalRow>
openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
String filePath = task.file().path().toString();
- SparkDeleteFilter deletes = new SparkDeleteFilter(filePath,
task.existingDeletes(), counter());
+ SparkDeleteFilter deletes =
+ new SparkDeleteFilter(filePath, task.existingDeletes(), counter(),
true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index f5b98a5a43..7aa5a97156 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -41,7 +41,7 @@ public class EqualityDeleteRowReader extends RowDataReader {
@Override
protected CloseableIterator<InternalRow> open(FileScanTask task) {
SparkDeleteFilter matches =
- new SparkDeleteFilter(task.file().path().toString(), task.deletes(),
counter());
+ new SparkDeleteFilter(task.file().path().toString(), task.deletes(),
counter(), true);
// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 9356f62f35..33b1d6275d 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -83,7 +83,8 @@ class RowDataReader extends BaseRowReader<FileScanTask>
implements PartitionRead
protected CloseableIterator<InternalRow> open(FileScanTask task) {
String filePath = task.file().path().toString();
LOG.debug("Opening data file {}", filePath);
- SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath,
task.deletes(), counter());
+ SparkDeleteFilter deleteFilter =
+ new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
// schema or rows returned by readers
Schema requiredSchema = deleteFilter.requiredSchema();
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index c05b694a60..49c4395213 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.source;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.FileFormat;
@@ -31,10 +32,12 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.vectorized.ColumnarBatch;
abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBatch, T> {
@@ -81,9 +84,21 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
SparkDeleteFilter deleteFilter) {
// get required schema if there are deletes
Schema requiredSchema = deleteFilter != null ?
deleteFilter.requiredSchema() : expectedSchema();
+ boolean hasPositionDelete = deleteFilter != null ?
deleteFilter.hasPosDeletes() : false;
+ Schema projectedSchema = requiredSchema;
+ if (hasPositionDelete) {
+ // We need to add MetadataColumns.ROW_POSITION in the schema for
+ // ReadConf.generateOffsetToStartPos(Schema schema). This is not needed
any
+ // more after #10107 is merged.
+ List<Types.NestedField> columns =
Lists.newArrayList(requiredSchema.columns());
+ if (!columns.contains(MetadataColumns.ROW_POSITION)) {
+ columns.add(MetadataColumns.ROW_POSITION);
+ projectedSchema = new Schema(columns);
+ }
+ }
return Parquet.read(inputFile)
- .project(requiredSchema)
+ .project(projectedSchema)
.split(start, length)
.createBatchedReaderFunc(
fileSchema ->
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 3c9438480d..207035bd30 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -249,8 +249,9 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
private final InternalRowWrapper asStructLike;
- SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter
counter) {
- super(filePath, deletes, tableSchema, expectedSchema, counter);
+ SparkDeleteFilter(
+ String filePath, List<DeleteFile> deletes, DeleteCounter counter,
boolean needRowPosCol) {
+ super(filePath, deletes, tableSchema, expectedSchema, counter,
needRowPosCol);
this.asStructLike =
new InternalRowWrapper(
SparkSchemaUtil.convert(requiredSchema()),
requiredSchema().asStruct());
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 389ad1d5a2..a2cb74c926 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -96,7 +96,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
SparkDeleteFilter deleteFilter =
task.deletes().isEmpty()
? null
- : new SparkDeleteFilter(filePath, task.deletes(), counter());
+ : new SparkDeleteFilter(filePath, task.deletes(), counter(),
false);
return newBatchIterable(
inputFile,
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 572f955884..25cd9eda6b 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -111,13 +111,14 @@ class ChangelogRowReader extends
BaseRowReader<ChangelogScanTask>
CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task)
{
String filePath = task.file().path().toString();
- SparkDeleteFilter deletes = new SparkDeleteFilter(filePath,
task.deletes(), counter());
+ SparkDeleteFilter deletes = new SparkDeleteFilter(filePath,
task.deletes(), counter(), true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}
private CloseableIterable<InternalRow>
openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
String filePath = task.file().path().toString();
- SparkDeleteFilter deletes = new SparkDeleteFilter(filePath,
task.existingDeletes(), counter());
+ SparkDeleteFilter deletes =
+ new SparkDeleteFilter(filePath, task.existingDeletes(), counter(),
true);
return deletes.filter(rows(task, deletes.requiredSchema()));
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index f5b98a5a43..7aa5a97156 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -41,7 +41,7 @@ public class EqualityDeleteRowReader extends RowDataReader {
@Override
protected CloseableIterator<InternalRow> open(FileScanTask task) {
SparkDeleteFilter matches =
- new SparkDeleteFilter(task.file().path().toString(), task.deletes(),
counter());
+ new SparkDeleteFilter(task.file().path().toString(), task.deletes(),
counter(), true);
// schema or rows returned by readers
Schema requiredSchema = matches.requiredSchema();
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 9356f62f35..33b1d6275d 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -83,7 +83,8 @@ class RowDataReader extends BaseRowReader<FileScanTask>
implements PartitionRead
protected CloseableIterator<InternalRow> open(FileScanTask task) {
String filePath = task.file().path().toString();
LOG.debug("Opening data file {}", filePath);
- SparkDeleteFilter deleteFilter = new SparkDeleteFilter(filePath,
task.deletes(), counter());
+ SparkDeleteFilter deleteFilter =
+ new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
// schema or rows returned by readers
Schema requiredSchema = deleteFilter.requiredSchema();