This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d35a1f995d Spark 4.1: Add BaseSparkScanBuilder (#15360)
d35a1f995d is described below

commit d35a1f995d3efb687a6a451a864cbe1ff46a5a33
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Feb 18 21:05:46 2026 -0800

    Spark 4.1: Add BaseSparkScanBuilder (#15360)
---
 .../iceberg/spark/source/BaseSparkScanBuilder.java | 270 ++++++++++++++++
 .../iceberg/spark/source/SparkScanBuilder.java     | 356 +++++----------------
 .../spark/source/SparkStagedScanBuilder.java       |  61 +---
 .../iceberg/spark/source/TestFilteredScan.java     |   3 +-
 4 files changed, 359 insertions(+), 331 deletions(-)

diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
new file mode 100644
index 0000000000..a7bf4bd187
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Scan;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Binder;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkV2Filters;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.TypeUtil.GetID;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A base Spark scan builder with common functionality like projection and 
predicate pushdown.
+ *
+ * <p>Note that this class intentionally doesn't implement any optional mix-in 
Spark interfaces even
+ * if it contains necessary logic, allowing each concrete scan implementation 
to select what
+ * functionality is applicable to that scan.
+ */
+abstract class BaseSparkScanBuilder implements ScanBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseSparkScanBuilder.class);
+  private static final Predicate[] NO_PREDICATES = new Predicate[0];
+
+  private final SparkSession spark;
+  private final Table table;
+  private final Schema schema;
+  private final SparkReadConf readConf;
+  private final boolean caseSensitive;
+  private final Set<String> metaFieldNames = Sets.newLinkedHashSet();
+  private final InMemoryMetricsReporter metricsReporter = new 
InMemoryMetricsReporter();
+
+  private Schema projection;
+  private List<Expression> filters = Lists.newArrayList();
+  private Predicate[] pushedPredicates = NO_PREDICATES;
+  private Integer limit = null;
+
+  protected BaseSparkScanBuilder(
+      SparkSession spark, Table table, Schema schema, CaseInsensitiveStringMap 
options) {
+    this(spark, table, schema, null, options);
+  }
+
+  protected BaseSparkScanBuilder(
+      SparkSession spark,
+      Table table,
+      Schema schema,
+      String branch,
+      CaseInsensitiveStringMap options) {
+    this.spark = spark;
+    this.table = table;
+    this.schema = schema;
+    this.readConf = new SparkReadConf(spark, table, branch, options);
+    this.caseSensitive = readConf.caseSensitive();
+    this.projection = schema;
+  }
+
+  protected SparkSession spark() {
+    return spark;
+  }
+
+  protected Table table() {
+    return table;
+  }
+
+  protected Schema schema() {
+    return schema;
+  }
+
+  protected Schema projection() {
+    return projection;
+  }
+
+  protected SparkReadConf readConf() {
+    return readConf;
+  }
+
+  protected boolean caseSensitive() {
+    return caseSensitive;
+  }
+
+  protected List<Expression> filters() {
+    return filters;
+  }
+
+  protected Expression filter() {
+    return filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
+  }
+
+  protected InMemoryMetricsReporter metricsReporter() {
+    return metricsReporter;
+  }
+
+  // logic necessary for SupportsPushDownRequiredColumns
+  public void pruneColumns(StructType requestedType) {
+    List<StructField> dataFields = Lists.newArrayList();
+
+    for (StructField field : requestedType.fields()) {
+      if (MetadataColumns.isMetadataColumn(field.name())) {
+        metaFieldNames.add(field.name());
+      } else {
+        dataFields.add(field);
+      }
+    }
+
+    StructType requestedDataType = SparkSchemaUtil.toStructType(dataFields);
+    this.projection = SparkSchemaUtil.prune(projection, requestedDataType, 
filter(), caseSensitive);
+  }
+
+  // logic necessary for SupportsPushDownV2Filters
+  public Predicate[] pushPredicates(Predicate[] predicates) {
+    // there are 3 kinds of filters:
+    // (1) filters that can be pushed down completely and don't have to 
evaluated by Spark
+    //     (e.g. filters that select entire partitions)
+    // (2) filters that can be pushed down partially and require record-level 
filtering in Spark
+    //     (e.g. filters that may select some but not necessarily all rows in 
a file)
+    // (3) filters that can't be pushed down at all and have to be evaluated 
by Spark
+    //     (e.g. unsupported filters)
+    // filters (1) and (2) are used to prune files during job planning in 
Iceberg
+    // filters (2) and (3) form a set of post scan filters and must be 
evaluated by Spark
+
+    List<Expression> expressions = 
Lists.newArrayListWithExpectedSize(predicates.length);
+    List<Predicate> pushablePredicates = 
Lists.newArrayListWithExpectedSize(predicates.length);
+    List<Predicate> postScanPredicates = 
Lists.newArrayListWithExpectedSize(predicates.length);
+
+    for (Predicate predicate : predicates) {
+      try {
+        Expression expr = SparkV2Filters.convert(predicate);
+
+        if (expr != null) {
+          // try binding the expression to ensure it can be pushed down
+          Binder.bind(projection.asStruct(), expr, caseSensitive);
+          expressions.add(expr);
+          pushablePredicates.add(predicate);
+        }
+
+        if (expr == null || !ExpressionUtil.selectsPartitions(expr, table, 
caseSensitive)) {
+          postScanPredicates.add(predicate);
+        } else {
+          LOG.info("Evaluating completely on Iceberg side: {}", predicate);
+        }
+
+      } catch (Exception e) {
+        LOG.warn("Failed to check if {} can be pushed down: {}", predicate, 
e.getMessage());
+        postScanPredicates.add(predicate);
+      }
+    }
+
+    this.filters = expressions;
+    this.pushedPredicates = pushablePredicates.toArray(new Predicate[0]);
+
+    return postScanPredicates.toArray(new Predicate[0]);
+  }
+
+  // logic necessary for SupportsPushDownV2Filters
+  public Predicate[] pushedPredicates() {
+    return pushedPredicates;
+  }
+
+  // logic necessary for SupportsPushDownLimit
+  public boolean pushLimit(int newLimit) {
+    this.limit = newLimit;
+    return true;
+  }
+
+  // schema of rows that must be returned by readers
+  protected Schema projectionWithMetadataColumns() {
+    return TypeUtil.join(projection, calculateMetadataSchema());
+  }
+
+  // computes metadata schema avoiding conflicts between partition and data 
field IDs
+  private Schema calculateMetadataSchema() {
+    List<Types.NestedField> metaFields = metaFields();
+    Optional<Types.NestedField> partitionField = 
findPartitionField(metaFields);
+
+    if (partitionField.isEmpty()) {
+      return new Schema(metaFields);
+    }
+
+    Types.StructType partitionType = 
partitionField.get().type().asStructType();
+    Set<Integer> partitionFieldIds = TypeUtil.getProjectedIds(partitionType);
+    GetID getId = TypeUtil.reassignConflictingIds(partitionFieldIds, 
allUsedFieldIds());
+    return new Schema(metaFields, getId);
+  }
+
+  private List<Types.NestedField> metaFields() {
+    return metaFieldNames.stream()
+        .map(name -> MetadataColumns.metadataColumn(table, name))
+        .collect(Collectors.toList());
+  }
+
+  private Optional<Types.NestedField> 
findPartitionField(List<Types.NestedField> fields) {
+    return fields.stream()
+        .filter(field -> MetadataColumns.PARTITION_COLUMN_ID == 
field.fieldId())
+        .findFirst();
+  }
+
+  // collects used data field IDs across all known table schemas
+  private Set<Integer> allUsedFieldIds() {
+    return table.schemas().values().stream()
+        .flatMap(tableSchema -> 
TypeUtil.getProjectedIds(tableSchema.asStruct()).stream())
+        .collect(Collectors.toSet());
+  }
+
+  protected <T extends Scan<T, ?, ?>> T configureSplitPlanning(T scan) {
+    T newScan = scan;
+
+    Long splitSize = readConf.splitSizeOption();
+    if (splitSize != null) {
+      newScan = newScan.option(SPLIT_SIZE, String.valueOf(splitSize));
+    }
+
+    Integer splitLookback = readConf.splitLookbackOption();
+    if (splitLookback != null) {
+      newScan = newScan.option(SPLIT_LOOKBACK, String.valueOf(splitLookback));
+    }
+
+    Long splitOpenFileCost = readConf.splitOpenFileCostOption();
+    if (splitOpenFileCost != null) {
+      newScan = newScan.option(SPLIT_OPEN_FILE_COST, 
String.valueOf(splitOpenFileCost));
+    }
+
+    if (limit != null) {
+      newScan = newScan.minRowsRequested(limit.longValue());
+    }
+
+    return newScan;
+  }
+}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 2fb188d83d..8495ae2a47 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -20,15 +20,11 @@ package org.apache.iceberg.spark.source;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.BatchScan;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.IncrementalAppendScan;
 import org.apache.iceberg.IncrementalChangelogScan;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.MetricsModes;
 import org.apache.iceberg.Schema;
@@ -36,72 +32,47 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SparkDistributedDataScan;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.expressions.AggregateEvaluator;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.BoundAggregate;
 import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.ExpressionUtil;
-import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.metrics.InMemoryMetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkAggregates;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.SparkV2Filters;
 import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.TypeUtil.GetID;
-import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc;
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation;
-import org.apache.spark.sql.connector.expressions.filter.Predicate;
 import org.apache.spark.sql.connector.read.Scan;
-import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.connector.read.Statistics;
 import org.apache.spark.sql.connector.read.SupportsPushDownAggregates;
 import org.apache.spark.sql.connector.read.SupportsPushDownLimit;
 import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters;
 import org.apache.spark.sql.connector.read.SupportsReportStatistics;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SparkScanBuilder
-    implements ScanBuilder,
-        SupportsPushDownAggregates,
-        SupportsPushDownV2Filters,
+public class SparkScanBuilder extends BaseSparkScanBuilder
+    implements SupportsPushDownV2Filters,
         SupportsPushDownRequiredColumns,
         SupportsReportStatistics,
-        SupportsPushDownLimit {
+        SupportsPushDownLimit,
+        SupportsPushDownAggregates {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkScanBuilder.class);
-  private static final Predicate[] NO_PREDICATES = new Predicate[0];
-  private Scan localScan;
 
-  private final SparkSession spark;
-  private final Table table;
   private final CaseInsensitiveStringMap options;
-  private final SparkReadConf readConf;
-  private final Set<String> metaFieldNames = Sets.newLinkedHashSet();
-  private final InMemoryMetricsReporter metricsReporter;
-
-  private Schema projection;
-  private boolean caseSensitive;
-  private List<Expression> filterExpressions = null;
-  private Predicate[] pushedPredicates = NO_PREDICATES;
-  private Integer limit = null;
+  private Scan localScan;
 
   SparkScanBuilder(
       SparkSession spark,
@@ -109,13 +80,8 @@ public class SparkScanBuilder
       String branch,
       Schema schema,
       CaseInsensitiveStringMap options) {
-    this.spark = spark;
-    this.table = table;
-    this.projection = schema;
+    super(spark, table, schema, branch, options);
     this.options = options;
-    this.readConf = new SparkReadConf(spark, table, branch, options);
-    this.caseSensitive = readConf.caseSensitive();
-    this.metricsReporter = new InMemoryMetricsReporter();
   }
 
   SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap 
options) {
@@ -132,68 +98,6 @@ public class SparkScanBuilder
     this(spark, table, null, schema, options);
   }
 
-  private Expression filterExpression() {
-    if (filterExpressions != null) {
-      return filterExpressions.stream().reduce(Expressions.alwaysTrue(), 
Expressions::and);
-    }
-    return Expressions.alwaysTrue();
-  }
-
-  public SparkScanBuilder caseSensitive(boolean isCaseSensitive) {
-    this.caseSensitive = isCaseSensitive;
-    return this;
-  }
-
-  @Override
-  public Predicate[] pushPredicates(Predicate[] predicates) {
-    // there are 3 kinds of filters:
-    // (1) filters that can be pushed down completely and don't have to 
evaluated by Spark
-    //     (e.g. filters that select entire partitions)
-    // (2) filters that can be pushed down partially and require record-level 
filtering in Spark
-    //     (e.g. filters that may select some but not necessarily all rows in 
a file)
-    // (3) filters that can't be pushed down at all and have to be evaluated 
by Spark
-    //     (e.g. unsupported filters)
-    // filters (1) and (2) are used prune files during job planning in Iceberg
-    // filters (2) and (3) form a set of post scan filters and must be 
evaluated by Spark
-
-    List<Expression> expressions = 
Lists.newArrayListWithExpectedSize(predicates.length);
-    List<Predicate> pushableFilters = 
Lists.newArrayListWithExpectedSize(predicates.length);
-    List<Predicate> postScanFilters = 
Lists.newArrayListWithExpectedSize(predicates.length);
-
-    for (Predicate predicate : predicates) {
-      try {
-        Expression expr = SparkV2Filters.convert(predicate);
-
-        if (expr != null) {
-          // try binding the expression to ensure it can be pushed down
-          Binder.bind(projection.asStruct(), expr, caseSensitive);
-          expressions.add(expr);
-          pushableFilters.add(predicate);
-        }
-
-        if (expr == null || !ExpressionUtil.selectsPartitions(expr, table, 
caseSensitive)) {
-          postScanFilters.add(predicate);
-        } else {
-          LOG.info("Evaluating completely on Iceberg side: {}", predicate);
-        }
-
-      } catch (Exception e) {
-        LOG.warn("Failed to check if {} can be pushed down: {}", predicate, 
e.getMessage());
-        postScanFilters.add(predicate);
-      }
-    }
-
-    this.filterExpressions = expressions;
-    this.pushedPredicates = pushableFilters.toArray(new Predicate[0]);
-
-    return postScanFilters.toArray(new Predicate[0]);
-  }
-
-  @Override
-  public Predicate[] pushedPredicates() {
-    return pushedPredicates;
-  }
-
   @Override
   public boolean pushAggregation(Aggregation aggregation) {
     if (!canPushDownAggregation(aggregation)) {
@@ -208,7 +112,7 @@ public class SparkScanBuilder
       try {
         Expression expr = SparkAggregates.convert(aggregateFunc);
         if (expr != null) {
-          Expression bound = Binder.bind(projection.asStruct(), expr, 
caseSensitive);
+          Expression bound = Binder.bind(projection().asStruct(), expr, 
caseSensitive());
           expressions.add((BoundAggregate<?, ?>) bound);
         } else {
           LOG.info(
@@ -255,18 +159,17 @@ public class SparkScanBuilder
     StructLike structLike = aggregateEvaluator.result();
     pushedAggregateRows[0] =
         new 
StructInternalRow(aggregateEvaluator.resultType()).setStruct(structLike);
-    localScan =
-        new SparkLocalScan(table, pushedAggregateSchema, pushedAggregateRows, 
filterExpressions);
+    localScan = new SparkLocalScan(table(), pushedAggregateSchema, 
pushedAggregateRows, filters());
 
     return true;
   }
 
   private boolean canPushDownAggregation(Aggregation aggregation) {
-    if (!(table instanceof BaseTable)) {
+    if (!(table() instanceof BaseTable)) {
       return false;
     }
 
-    if (!readConf.aggregatePushDownEnabled()) {
+    if (!readConf().aggregatePushDownEnabled()) {
       return false;
     }
 
@@ -282,7 +185,7 @@ public class SparkScanBuilder
   }
 
   private boolean metricsModeSupportsAggregatePushDown(List<BoundAggregate<?, 
?>> aggregates) {
-    MetricsConfig config = MetricsConfig.forTable(table);
+    MetricsConfig config = MetricsConfig.forTable(table());
     for (BoundAggregate aggregate : aggregates) {
       String colName = aggregate.columnName();
       if (!colName.equals("*")) {
@@ -316,67 +219,6 @@ public class SparkScanBuilder
     return true;
   }
 
-  @Override
-  public void pruneColumns(StructType requestedSchema) {
-    List<StructField> dataFields = Lists.newArrayList();
-
-    for (StructField field : requestedSchema.fields()) {
-      if (MetadataColumns.isMetadataColumn(field.name())) {
-        metaFieldNames.add(field.name());
-      } else {
-        dataFields.add(field);
-      }
-    }
-
-    StructType requestedProjection = SparkSchemaUtil.toStructType(dataFields);
-    this.projection = prune(projection, requestedProjection);
-  }
-
-  // the projection should include all columns that will be returned,
-  // including those only used in filters
-  private Schema prune(Schema schema, StructType requestedSchema) {
-    return SparkSchemaUtil.prune(schema, requestedSchema, filterExpression(), 
caseSensitive);
-  }
-
-  // schema of rows that must be returned by readers
-  protected Schema projectionWithMetadataColumns() {
-    return TypeUtil.join(projection, calculateMetadataSchema());
-  }
-
-  // computes metadata schema avoiding conflicts between partition and data 
field IDs
-  private Schema calculateMetadataSchema() {
-    List<Types.NestedField> metaFields = metaFields();
-    Optional<Types.NestedField> partitionField = 
findPartitionField(metaFields);
-
-    if (partitionField.isEmpty()) {
-      return new Schema(metaFields);
-    }
-
-    Types.StructType partitionType = 
partitionField.get().type().asStructType();
-    Set<Integer> partitionFieldIds = TypeUtil.getProjectedIds(partitionType);
-    GetID getId = TypeUtil.reassignConflictingIds(partitionFieldIds, 
allUsedFieldIds());
-    return new Schema(metaFields, getId);
-  }
-
-  private List<Types.NestedField> metaFields() {
-    return metaFieldNames.stream()
-        .map(name -> MetadataColumns.metadataColumn(table, name))
-        .collect(Collectors.toList());
-  }
-
-  private Optional<Types.NestedField> 
findPartitionField(List<Types.NestedField> fields) {
-    return fields.stream()
-        .filter(field -> MetadataColumns.PARTITION_COLUMN_ID == 
field.fieldId())
-        .findFirst();
-  }
-
-  // collects used data field IDs across all known table schemas
-  private Set<Integer> allUsedFieldIds() {
-    return table.schemas().values().stream()
-        .flatMap(tableSchema -> 
TypeUtil.getProjectedIds(tableSchema.asStruct()).stream())
-        .collect(Collectors.toSet());
-  }
-
   @Override
   public Scan build() {
     if (localScan != null) {
@@ -389,20 +231,20 @@ public class SparkScanBuilder
   private Scan buildBatchScan() {
     Schema expectedSchema = projectionWithMetadataColumns();
     return new SparkBatchQueryScan(
-        spark,
-        table,
+        spark(),
+        table(),
         buildIcebergBatchScan(false /* not include Column Stats */, 
expectedSchema),
-        readConf,
+        readConf(),
         expectedSchema,
-        filterExpressions,
-        metricsReporter::scanReport);
+        filters(),
+        metricsReporter()::scanReport);
   }
 
   private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats, 
Schema expectedSchema) {
-    Long snapshotId = readConf.snapshotId();
-    Long asOfTimestamp = readConf.asOfTimestamp();
-    String branch = readConf.branch();
-    String tag = readConf.tag();
+    Long snapshotId = readConf().snapshotId();
+    Long asOfTimestamp = readConf().asOfTimestamp();
+    String branch = readConf().branch();
+    String tag = readConf().tag();
 
     Preconditions.checkArgument(
         snapshotId == null || asOfTimestamp == null,
@@ -410,8 +252,8 @@ public class SparkScanBuilder
         SparkReadOptions.SNAPSHOT_ID,
         SparkReadOptions.AS_OF_TIMESTAMP);
 
-    Long startSnapshotId = readConf.startSnapshotId();
-    Long endSnapshotId = readConf.endSnapshotId();
+    Long startSnapshotId = readConf().startSnapshotId();
+    Long endSnapshotId = readConf().endSnapshotId();
 
     if (snapshotId != null || asOfTimestamp != null) {
       Preconditions.checkArgument(
@@ -429,8 +271,8 @@ public class SparkScanBuilder
         SparkReadOptions.END_SNAPSHOT_ID,
         SparkReadOptions.START_SNAPSHOT_ID);
 
-    Long startTimestamp = readConf.startTimestamp();
-    Long endTimestamp = readConf.endTimestamp();
+    Long startTimestamp = readConf().startTimestamp();
+    Long endTimestamp = readConf().endTimestamp();
     Preconditions.checkArgument(
         startTimestamp == null && endTimestamp == null,
         "Cannot set %s or %s for incremental scans and batch scan. They are 
only valid for "
@@ -454,10 +296,10 @@ public class SparkScanBuilder
       Schema expectedSchema) {
     BatchScan scan =
         newBatchScan()
-            .caseSensitive(caseSensitive)
-            .filter(filterExpression())
+            .caseSensitive(caseSensitive())
+            .filter(filter())
             .project(expectedSchema)
-            .metricsReporter(metricsReporter);
+            .metricsReporter(metricsReporter());
 
     if (withStats) {
       scan = scan.includeColumnStats();
@@ -485,13 +327,13 @@ public class SparkScanBuilder
   private org.apache.iceberg.Scan buildIncrementalAppendScan(
       long startSnapshotId, Long endSnapshotId, boolean withStats, Schema 
expectedSchema) {
     IncrementalAppendScan scan =
-        table
+        table()
             .newIncrementalAppendScan()
             .fromSnapshotExclusive(startSnapshotId)
-            .caseSensitive(caseSensitive)
-            .filter(filterExpression())
+            .caseSensitive(caseSensitive())
+            .filter(filter())
             .project(expectedSchema)
-            .metricsReporter(metricsReporter);
+            .metricsReporter(metricsReporter());
 
     if (withStats) {
       scan = scan.includeColumnStats();
@@ -507,20 +349,20 @@ public class SparkScanBuilder
   @SuppressWarnings("CyclomaticComplexity")
   public Scan buildChangelogScan() {
     Preconditions.checkArgument(
-        readConf.snapshotId() == null
-            && readConf.asOfTimestamp() == null
-            && readConf.branch() == null
-            && readConf.tag() == null,
+        readConf().snapshotId() == null
+            && readConf().asOfTimestamp() == null
+            && readConf().branch() == null
+            && readConf().tag() == null,
         "Cannot set neither %s, %s, %s and %s for changelogs",
         SparkReadOptions.SNAPSHOT_ID,
         SparkReadOptions.AS_OF_TIMESTAMP,
         SparkReadOptions.BRANCH,
         SparkReadOptions.TAG);
 
-    Long startSnapshotId = readConf.startSnapshotId();
-    Long endSnapshotId = readConf.endSnapshotId();
-    Long startTimestamp = readConf.startTimestamp();
-    Long endTimestamp = readConf.endTimestamp();
+    Long startSnapshotId = readConf().startSnapshotId();
+    Long endSnapshotId = readConf().endSnapshotId();
+    Long startTimestamp = readConf().startTimestamp();
+    Long endTimestamp = readConf().endTimestamp();
 
     Preconditions.checkArgument(
         !(startSnapshotId != null && startTimestamp != null),
@@ -544,8 +386,8 @@ public class SparkScanBuilder
 
     boolean emptyScan = false;
     if (startTimestamp != null) {
-      if (table.currentSnapshot() == null
-          || startTimestamp > table.currentSnapshot().timestampMillis()) {
+      if (table().currentSnapshot() == null
+          || startTimestamp > table().currentSnapshot().timestampMillis()) {
         emptyScan = true;
       }
       startSnapshotId = getStartSnapshotId(startTimestamp);
@@ -562,12 +404,12 @@ public class SparkScanBuilder
     Schema expectedSchema = projectionWithMetadataColumns();
 
     IncrementalChangelogScan scan =
-        table
+        table()
             .newIncrementalChangelogScan()
-            .caseSensitive(caseSensitive)
-            .filter(filterExpression())
+            .caseSensitive(caseSensitive())
+            .filter(filter())
             .project(expectedSchema)
-            .metricsReporter(metricsReporter);
+            .metricsReporter(metricsReporter());
 
     if (startSnapshotId != null) {
       scan = scan.fromSnapshotExclusive(startSnapshotId);
@@ -580,11 +422,11 @@ public class SparkScanBuilder
     scan = configureSplitPlanning(scan);
 
     return new SparkChangelogScan(
-        spark, table, scan, readConf, expectedSchema, filterExpressions, 
emptyScan);
+        spark(), table(), scan, readConf(), expectedSchema, filters(), 
emptyScan);
   }
 
   private Long getStartSnapshotId(Long startTimestamp) {
-    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, 
startTimestamp);
+    Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table(), 
startTimestamp);
 
     if (oldestSnapshotAfter == null) {
       return null;
@@ -597,7 +439,7 @@ public class SparkScanBuilder
 
   private Long getEndSnapshotId(Long endTimestamp) {
     Long endSnapshotId = null;
-    for (Snapshot snapshot : SnapshotUtil.currentAncestors(table)) {
+    for (Snapshot snapshot : SnapshotUtil.currentAncestors(table())) {
       if (snapshot.timestampMillis() <= endTimestamp) {
         endSnapshotId = snapshot.snapshotId();
         break;
@@ -608,29 +450,31 @@ public class SparkScanBuilder
 
   public Scan buildMergeOnReadScan() {
     Preconditions.checkArgument(
-        readConf.snapshotId() == null && readConf.asOfTimestamp() == null && 
readConf.tag() == null,
+        readConf().snapshotId() == null
+            && readConf().asOfTimestamp() == null
+            && readConf().tag() == null,
         "Cannot set time travel options %s, %s, %s for row-level command 
scans",
         SparkReadOptions.SNAPSHOT_ID,
         SparkReadOptions.AS_OF_TIMESTAMP,
         SparkReadOptions.TAG);
 
     Preconditions.checkArgument(
-        readConf.startSnapshotId() == null && readConf.endSnapshotId() == null,
+        readConf().startSnapshotId() == null && readConf().endSnapshotId() == 
null,
         "Cannot set incremental scan options %s and %s for row-level command 
scans",
         SparkReadOptions.START_SNAPSHOT_ID,
         SparkReadOptions.END_SNAPSHOT_ID);
 
-    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, readConf.branch());
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table(), 
readConf().branch());
 
     if (snapshot == null) {
       return new SparkBatchQueryScan(
-          spark,
-          table,
+          spark(),
+          table(),
           null,
-          readConf,
+          readConf(),
           projectionWithMetadataColumns(),
-          filterExpressions,
-          metricsReporter::scanReport);
+          filters(),
+          metricsReporter()::scanReport);
     }
 
     // remember the current snapshot ID for commit validation
@@ -639,41 +483,41 @@ public class SparkScanBuilder
     CaseInsensitiveStringMap adjustedOptions =
         Spark3Util.setOption(SparkReadOptions.SNAPSHOT_ID, 
Long.toString(snapshotId), options);
     SparkReadConf adjustedReadConf =
-        new SparkReadConf(spark, table, readConf.branch(), adjustedOptions);
+        new SparkReadConf(spark(), table(), readConf().branch(), 
adjustedOptions);
 
     Schema expectedSchema = projectionWithMetadataColumns();
 
     BatchScan scan =
         newBatchScan()
             .useSnapshot(snapshotId)
-            .caseSensitive(caseSensitive)
-            .filter(filterExpression())
+            .caseSensitive(caseSensitive())
+            .filter(filter())
             .project(expectedSchema)
-            .metricsReporter(metricsReporter);
+            .metricsReporter(metricsReporter());
 
     scan = configureSplitPlanning(scan);
 
     return new SparkBatchQueryScan(
-        spark,
-        table,
+        spark(),
+        table(),
         scan,
         adjustedReadConf,
         expectedSchema,
-        filterExpressions,
-        metricsReporter::scanReport);
+        filters(),
+        metricsReporter()::scanReport);
   }
 
   public Scan buildCopyOnWriteScan() {
-    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, readConf.branch());
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table(), 
readConf().branch());
 
     if (snapshot == null) {
       return new SparkCopyOnWriteScan(
-          spark,
-          table,
-          readConf,
+          spark(),
+          table(),
+          readConf(),
           projectionWithMetadataColumns(),
-          filterExpressions,
-          metricsReporter::scanReport);
+          filters(),
+          metricsReporter()::scanReport);
     }
 
     Schema expectedSchema = projectionWithMetadataColumns();
@@ -682,50 +526,22 @@ public class SparkScanBuilder
         newBatchScan()
             .useSnapshot(snapshot.snapshotId())
             .ignoreResiduals()
-            .caseSensitive(caseSensitive)
-            .filter(filterExpression())
+            .caseSensitive(caseSensitive())
+            .filter(filter())
             .project(expectedSchema)
-            .metricsReporter(metricsReporter);
+            .metricsReporter(metricsReporter());
 
     scan = configureSplitPlanning(scan);
 
     return new SparkCopyOnWriteScan(
-        spark,
-        table,
+        spark(),
+        table(),
         scan,
         snapshot,
-        readConf,
+        readConf(),
         expectedSchema,
-        filterExpressions,
-        metricsReporter::scanReport);
-  }
-
-  private <T extends org.apache.iceberg.Scan<T, ?, ?>> T 
configureSplitPlanning(T scan) {
-    T configuredScan = scan;
-
-    Long splitSize = readConf.splitSizeOption();
-    if (splitSize != null) {
-      configuredScan = configuredScan.option(TableProperties.SPLIT_SIZE, 
String.valueOf(splitSize));
-    }
-
-    Integer splitLookback = readConf.splitLookbackOption();
-    if (splitLookback != null) {
-      configuredScan =
-          configuredScan.option(TableProperties.SPLIT_LOOKBACK, 
String.valueOf(splitLookback));
-    }
-
-    Long splitOpenFileCost = readConf.splitOpenFileCostOption();
-    if (splitOpenFileCost != null) {
-      configuredScan =
-          configuredScan.option(
-              TableProperties.SPLIT_OPEN_FILE_COST, 
String.valueOf(splitOpenFileCost));
-    }
-
-    if (null != limit) {
-      configuredScan = configuredScan.minRowsRequested(limit.longValue());
-    }
-
-    return configuredScan;
+        filters(),
+        metricsReporter()::scanReport);
   }
 
   @Override
@@ -739,16 +555,10 @@ public class SparkScanBuilder
   }
 
   private BatchScan newBatchScan() {
-    if (readConf.distributedPlanningEnabled()) {
-      return new SparkDistributedDataScan(spark, table, readConf);
+    if (readConf().distributedPlanningEnabled()) {
+      return new SparkDistributedDataScan(spark(), table(), readConf());
     } else {
-      return table.newBatchScan();
+      return table().newBatchScan();
     }
   }
-
-  @Override
-  public boolean pushLimit(int pushedLimit) {
-    this.limit = pushedLimit;
-    return true;
-  }
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
index 7164c53a3d..bf7d79ac34 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
@@ -18,78 +18,27 @@
  */
 package org.apache.iceberg.spark.source;
 
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.read.Scan;
-import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
-class SparkStagedScanBuilder implements ScanBuilder, 
SupportsPushDownRequiredColumns {
+class SparkStagedScanBuilder extends BaseSparkScanBuilder
+    implements SupportsPushDownRequiredColumns {
 
-  private final SparkSession spark;
-  private final Table table;
   private final String taskSetId;
-  private final SparkReadConf readConf;
-  private final List<String> metaColumns = Lists.newArrayList();
-
-  private Schema schema;
 
   SparkStagedScanBuilder(
       SparkSession spark, Table table, String taskSetId, 
CaseInsensitiveStringMap options) {
-    this.spark = spark;
-    this.table = table;
+    super(spark, table, table.schema(), options);
     this.taskSetId = taskSetId;
-    this.readConf = new SparkReadConf(spark, table, options);
-    this.schema = table.schema();
   }
 
   @Override
   public Scan build() {
-    return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), 
taskSetId, readConf);
-  }
-
-  @Override
-  public void pruneColumns(StructType requestedSchema) {
-    StructType requestedProjection = removeMetaColumns(requestedSchema);
-    this.schema = SparkSchemaUtil.prune(schema, requestedProjection);
-
-    Stream.of(requestedSchema.fields())
-        .map(StructField::name)
-        .filter(MetadataColumns::isMetadataColumn)
-        .distinct()
-        .forEach(metaColumns::add);
-  }
-
-  private StructType removeMetaColumns(StructType structType) {
-    return new StructType(
-        Stream.of(structType.fields())
-            .filter(field -> MetadataColumns.nonMetadataColumn(field.name()))
-            .toArray(StructField[]::new));
-  }
-
-  private Schema schemaWithMetadataColumns() {
-    // metadata columns
-    List<Types.NestedField> fields =
-        metaColumns.stream()
-            .distinct()
-            .map(name -> MetadataColumns.metadataColumn(table, name))
-            .collect(Collectors.toList());
-    Schema meta = new Schema(fields);
-
-    // schema of rows returned by readers
-    return TypeUtil.join(schema, meta);
+    Schema projection = projectionWithMetadataColumns();
+    return new SparkStagedScan(spark(), table(), projection, taskSetId, 
readConf());
   }
 }
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index f2d7a7f7d5..b6c4a19980 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -223,8 +223,7 @@ public class TestFilteredScan {
 
       for (int i = 0; i < 10; i += 1) {
         SparkScanBuilder builder =
-            new SparkScanBuilder(spark, TABLES.load(options.get("path")), 
options)
-                .caseSensitive(false);
+            new SparkScanBuilder(spark, TABLES.load(options.get("path")), 
options);
 
         pushFilters(
             builder,


Reply via email to