This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c4e35a55f2 Spark 3.4: Support distributed planning (#8123)
c4e35a55f2 is described below

commit c4e35a55f26b58ca71295adfe2b9c7b8bc34b55e
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Sep 12 14:06:15 2023 -0700

    Spark 3.4: Support distributed planning (#8123)
---
 .baseline/checkstyle/checkstyle.xml                |   1 +
 .../apache/iceberg/BaseDistributedDataScan.java    | 392 +++++++++++++++++++++
 .../src/main/java/org/apache/iceberg/BaseScan.java |  20 +-
 .../src/main/java/org/apache/iceberg/DataScan.java |  70 ++++
 .../java/org/apache/iceberg/DataTableScan.java     |   9 +-
 .../java/org/apache/iceberg/DeleteFileIndex.java   |   4 +
 .../java/org/apache/iceberg/ManifestGroup.java     |  21 +-
 .../ScanMetricsUtil.java => PlanningMode.java}     |  39 +-
 .../main/java/org/apache/iceberg/SnapshotScan.java |  10 +-
 .../java/org/apache/iceberg/TableProperties.java   |   4 +
 .../apache/iceberg/metrics/ScanMetricsUtil.java    |  14 +
 ...taTableScan.java => DataTableScanTestBase.java} |  65 ++--
 ...FileIndex.java => DeleteFileIndexTestBase.java} |  55 +--
 ...stFilterFiles.java => FilterFilesTestBase.java} |  25 +-
 .../apache/iceberg/TestLocalDataTableScan.java}    |  34 +-
 .../apache/iceberg/TestLocalDeleteFileIndex.java}  |  21 +-
 .../org/apache/iceberg/TestLocalFilterFiles.java}  |  28 +-
 .../SparkRowLevelOperationsTestBase.java           |  31 +-
 .../spark/extensions/TestCopyOnWriteDelete.java    |   7 +-
 .../spark/extensions/TestCopyOnWriteMerge.java     |   7 +-
 .../spark/extensions/TestCopyOnWriteUpdate.java    |   7 +-
 .../iceberg/spark/extensions/TestDelete.java       |   7 +-
 .../apache/iceberg/spark/extensions/TestMerge.java |   7 +-
 .../spark/extensions/TestMergeOnReadDelete.java    |   7 +-
 .../spark/extensions/TestMergeOnReadMerge.java     |   7 +-
 .../spark/extensions/TestMergeOnReadUpdate.java    |   7 +-
 .../iceberg/spark/extensions/TestUpdate.java       |   7 +-
 .../apache/iceberg/SparkDistributedDataScan.java   | 247 +++++++++++++
 .../org/apache/iceberg/spark/SparkReadConf.java    |  47 +++
 .../apache/iceberg/spark/SparkSQLProperties.java   |   6 +
 .../iceberg/spark/actions/BaseSparkAction.java     |   1 +
 .../iceberg/spark/actions/ManifestFileBean.java    |  28 +-
 .../iceberg/spark/source/SparkScanBuilder.java     |  15 +-
 .../iceberg/SparkDistributedDataScanTestBase.java  | 100 ++++++
 .../TestSparkDistributedDataScanDeletes.java       |  90 +++++
 .../TestSparkDistributedDataScanFilterFiles.java   |  91 +++++
 ...tSparkDistributedDataScanJavaSerialization.java |  32 +-
 ...tSparkDistributedDataScanKryoSerialization.java |  32 +-
 .../iceberg/spark/SparkTestBaseWithCatalog.java    |  16 +
 .../iceberg/spark/source/TestFilteredScan.java     |  31 +-
 .../spark/source/TestIdentityPartitionData.java    |  26 +-
 .../iceberg/spark/source/TestPartitionPruning.java |  26 +-
 .../iceberg/spark/source/TestRuntimeFiltering.java |  26 ++
 .../spark/source/TestSnapshotSelection.java        |  45 ++-
 .../spark/source/TestSparkReadProjection.java      |  30 +-
 .../iceberg/spark/sql/TestFilterPushDown.java      |  29 ++
 .../spark/sql/TestStoragePartitionedJoins.java     |  20 ++
 47 files changed, 1607 insertions(+), 237 deletions(-)

diff --git a/.baseline/checkstyle/checkstyle.xml 
b/.baseline/checkstyle/checkstyle.xml
index b2f9ef1244..aec14e30b2 100644
--- a/.baseline/checkstyle/checkstyle.xml
+++ b/.baseline/checkstyle/checkstyle.xml
@@ -120,6 +120,7 @@
                 org.apache.iceberg.NullOrder.*,
                 org.apache.iceberg.MetadataTableType.*,
                 org.apache.iceberg.MetadataColumns.*,
+                org.apache.iceberg.PlanningMode.*,
                 org.apache.iceberg.SortDirection.*,
                 org.apache.iceberg.TableProperties.*,
                 org.apache.iceberg.types.Type.*,
diff --git a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java 
b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
new file mode 100644
index 0000000000..152b62b443
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.DATA_PLANNING_MODE;
+import static org.apache.iceberg.TableProperties.DELETE_PLANNING_MODE;
+import static org.apache.iceberg.TableProperties.PLANNING_MODE_DEFAULT;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.metrics.ScanMetricsUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ParallelIterable;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract class for batch data scans that can utilize cluster resources 
for planning.
+ *
+ * <p>This class provides common logic to create data scans that are capable 
of reading and
+ * filtering manifests remotely when the metadata size exceeds the threshold 
for local processing.
+ * Also, it takes care of planning tasks locally if remote planning is not 
considered beneficial.
+ *
+ * <p>Note that this class is evolving and is subject to change even in minor 
releases.
+ */
+abstract class BaseDistributedDataScan
+    extends DataScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> implements 
BatchScan {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseDistributedDataScan.class);
+  private static final long LOCAL_PLANNING_MAX_SLOT_SIZE = 128L * 1024 * 1024; 
// 128 MB
+  private static final int MONITOR_POOL_SIZE = 2;
+
+  private final int localParallelism;
+  private final long localPlanningSizeThreshold;
+
+  protected BaseDistributedDataScan(Table table, Schema schema, 
TableScanContext context) {
+    super(table, schema, context);
+    this.localParallelism = PLAN_SCANS_WITH_WORKER_POOL ? 
ThreadPools.WORKER_THREAD_POOL_SIZE : 1;
+    this.localPlanningSizeThreshold = localParallelism * 
LOCAL_PLANNING_MAX_SLOT_SIZE;
+  }
+
+  /**
+   * Returns the cluster parallelism.
+   *
+   * <p>This value indicates the maximum number of manifests that can be 
processed concurrently by
+   * the cluster. Implementations should take into account both the currently 
available processing
+   * slots and potential dynamic allocation, if applicable.
+   *
+   * <p>The remote parallelism is compared against the size of the thread pool 
available locally to
+   * determine the feasibility of remote planning. This value is ignored if 
the planning mode is set
+   * explicitly as local or distributed.
+   */
+  protected abstract int remoteParallelism();
+
+  /** Returns which planning mode to use for data. */
+  protected PlanningMode dataPlanningMode() {
+    Map<String, String> properties = table().properties();
+    String modeName = properties.getOrDefault(DATA_PLANNING_MODE, 
PLANNING_MODE_DEFAULT);
+    return PlanningMode.fromName(modeName);
+  }
+
+  /**
+   * Controls whether defensive copies are created for remotely planned data 
files.
+   *
+   * <p>By default, this class creates defensive copies for each data file 
that is planned remotely,
+   * assuming the provided iterable can be lazy and may reuse objects. If 
unnecessary and data file
+   * objects can be safely added into a collection, implementations can 
override this behavior.
+   */
+  protected boolean shouldCopyRemotelyPlannedDataFiles() {
+    return true;
+  }
+
+  /**
+   * Plans data remotely.
+   *
+   * <p>Implementations are encouraged to return groups of matching data 
files, enabling this class
+   * to process multiple groups concurrently to speed up the remaining work. 
This is particularly
+   * useful when dealing with equality deletes, as delete index lookups with 
such delete files
+   * require comparing bounds and typically benefit from parallelization.
+   *
+   * <p>If the result iterable reuses objects, {@link 
#shouldCopyRemotelyPlannedDataFiles()} must
+   * return true.
+   *
+   * <p>The input data manifests have been already filtered to include only 
potential matches based
+   * on the scan filter. Implementations are expected to further filter these 
manifests and only
+   * return files that may hold data matching the scan filter.
+   *
+   * @param dataManifests data manifests that may contain files matching the 
scan filter
+   * @param withColumnStats a flag whether to load column stats
+   * @return groups of data files planned remotely
+   */
+  protected abstract Iterable<CloseableIterable<DataFile>> planDataRemotely(
+      List<ManifestFile> dataManifests, boolean withColumnStats);
+
+  /** Returns which planning mode to use for deletes. */
+  protected PlanningMode deletePlanningMode() {
+    Map<String, String> properties = table().properties();
+    String modeName = properties.getOrDefault(DELETE_PLANNING_MODE, 
PLANNING_MODE_DEFAULT);
+    return PlanningMode.fromName(modeName);
+  }
+
+  /**
+   * Plans deletes remotely.
+   *
+   * <p>The input delete manifests have been already filtered to include only 
potential matches
+   * based on the scan filter. Implementations are expected to further filter 
these manifests and
+   * return files that may hold deletes matching the scan filter.
+   *
+   * @param deleteManifests delete manifests that may contain files matching 
the scan filter
+   * @return a delete file index planned remotely
+   */
+  protected abstract DeleteFileIndex planDeletesRemotely(List<ManifestFile> 
deleteManifests);
+
+  @Override
+  protected CloseableIterable<ScanTask> doPlanFiles() {
+    Snapshot snapshot = snapshot();
+
+    List<ManifestFile> dataManifests = findMatchingDataManifests(snapshot);
+    boolean planDataLocally = shouldPlanLocally(dataPlanningMode(), 
dataManifests);
+
+    List<ManifestFile> deleteManifests = findMatchingDeleteManifests(snapshot);
+    boolean planDeletesLocally = shouldPlanLocally(deletePlanningMode(), 
deleteManifests);
+
+    if (planDataLocally && planDeletesLocally) {
+      return planFileTasksLocally(dataManifests, deleteManifests);
+    }
+
+    boolean mayHaveEqualityDeletes = deleteManifests.size() > 0 && 
mayHaveEqualityDeletes(snapshot);
+    boolean loadColumnStats = mayHaveEqualityDeletes || 
shouldReturnColumnStats();
+    boolean copyDataFiles = shouldCopyDataFiles(planDataLocally, 
loadColumnStats);
+
+    ExecutorService monitorPool = newMonitorPool();
+
+    CompletableFuture<DeleteFileIndex> deletesFuture =
+        newDeletesFuture(deleteManifests, planDeletesLocally, monitorPool);
+
+    CompletableFuture<Iterable<CloseableIterable<DataFile>>> dataFuture =
+        newDataFuture(dataManifests, planDataLocally, loadColumnStats, 
monitorPool);
+
+    try {
+      Iterable<CloseableIterable<ScanTask>> fileTasks =
+          toFileTasks(dataFuture, deletesFuture, copyDataFiles);
+
+      if (shouldPlanWithExecutor() && (planDataLocally || 
mayHaveEqualityDeletes)) {
+        return new ParallelIterable<>(fileTasks, planExecutor());
+      } else {
+        return CloseableIterable.concat(fileTasks);
+      }
+
+    } catch (CompletionException e) {
+      deletesFuture.cancel(true /* may interrupt */);
+      dataFuture.cancel(true /* may interrupt */);
+      throw new RuntimeException("Failed to plan files", e);
+
+    } finally {
+      monitorPool.shutdown();
+    }
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private List<ManifestFile> findMatchingDataManifests(Snapshot snapshot) {
+    List<ManifestFile> dataManifests = snapshot.dataManifests(io());
+    scanMetrics().totalDataManifests().increment(dataManifests.size());
+
+    List<ManifestFile> matchingDataManifests = filterManifests(dataManifests);
+    int skippedDataManifestsCount = dataManifests.size() - 
matchingDataManifests.size();
+    scanMetrics().skippedDataManifests().increment(skippedDataManifestsCount);
+
+    return matchingDataManifests;
+  }
+
+  private List<ManifestFile> findMatchingDeleteManifests(Snapshot snapshot) {
+    List<ManifestFile> deleteManifests = snapshot.deleteManifests(io());
+    scanMetrics().totalDeleteManifests().increment(deleteManifests.size());
+
+    List<ManifestFile> matchingDeleteManifests = 
filterManifests(deleteManifests);
+    int skippedDeleteManifestsCount = deleteManifests.size() - 
matchingDeleteManifests.size();
+    
scanMetrics().skippedDeleteManifests().increment(skippedDeleteManifestsCount);
+
+    return matchingDeleteManifests;
+  }
+
+  private List<ManifestFile> filterManifests(List<ManifestFile> manifests) {
+    Map<Integer, ManifestEvaluator> evalCache = 
specCache(this::newManifestEvaluator);
+
+    return manifests.stream()
+        .filter(manifest -> manifest.hasAddedFiles() || 
manifest.hasExistingFiles())
+        .filter(manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest))
+        .collect(Collectors.toList());
+  }
+
+  protected boolean shouldPlanLocally(PlanningMode mode, List<ManifestFile> 
manifests) {
+    if (context().planWithCustomizedExecutor()) {
+      return true;
+    }
+
+    switch (mode) {
+      case LOCAL:
+        return true;
+
+      case DISTRIBUTED:
+        return manifests.isEmpty();
+
+      case AUTO:
+        return remoteParallelism() <= localParallelism
+            || manifests.size() <= 2 * localParallelism
+            || totalSize(manifests) <= localPlanningSizeThreshold;
+
+      default:
+        throw new IllegalArgumentException("Unknown planning mode: " + mode);
+    }
+  }
+
+  private long totalSize(List<ManifestFile> manifests) {
+    return manifests.stream().mapToLong(ManifestFile::length).sum();
+  }
+
+  private boolean shouldCopyDataFiles(boolean planDataLocally, boolean 
loadColumnStats) {
+    return planDataLocally
+        || shouldCopyRemotelyPlannedDataFiles()
+        || (loadColumnStats && !shouldReturnColumnStats());
+  }
+
+  @SuppressWarnings("unchecked")
+  private CloseableIterable<ScanTask> planFileTasksLocally(
+      List<ManifestFile> dataManifests, List<ManifestFile> deleteManifests) {
+    LOG.info("Planning file tasks locally for table {}", table().name());
+    ManifestGroup manifestGroup = newManifestGroup(dataManifests, 
deleteManifests);
+    CloseableIterable<? extends ScanTask> fileTasks = 
manifestGroup.planFiles();
+    return (CloseableIterable<ScanTask>) fileTasks;
+  }
+
+  private CompletableFuture<DeleteFileIndex> newDeletesFuture(
+      List<ManifestFile> deleteManifests, boolean planLocally, ExecutorService 
monitorPool) {
+
+    return CompletableFuture.supplyAsync(
+        () -> {
+          if (planLocally) {
+            LOG.info("Planning deletes locally for table {}", table().name());
+            return planDeletesLocally(deleteManifests);
+          } else {
+            LOG.info("Planning deletes remotely for table {}", table().name());
+            return planDeletesRemotely(deleteManifests);
+          }
+        },
+        monitorPool);
+  }
+
+  private DeleteFileIndex planDeletesLocally(List<ManifestFile> 
deleteManifests) {
+    DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(io(), 
deleteManifests);
+
+    if (shouldPlanWithExecutor() && deleteManifests.size() > 1) {
+      builder.planWith(planExecutor());
+    }
+
+    return builder
+        .specsById(table().specs())
+        .filterData(filter())
+        .caseSensitive(isCaseSensitive())
+        .scanMetrics(scanMetrics())
+        .build();
+  }
+
+  private CompletableFuture<Iterable<CloseableIterable<DataFile>>> 
newDataFuture(
+      List<ManifestFile> dataManifests,
+      boolean planLocally,
+      boolean withColumnStats,
+      ExecutorService monitorPool) {
+
+    return CompletableFuture.supplyAsync(
+        () -> {
+          if (planLocally) {
+            LOG.info("Planning data locally for table {}", table().name());
+            ManifestGroup manifestGroup = newManifestGroup(dataManifests, 
withColumnStats);
+            return manifestGroup.fileGroups();
+          } else {
+            LOG.info("Planning data remotely for table {}", table().name());
+            return planDataRemotely(dataManifests, withColumnStats);
+          }
+        },
+        monitorPool);
+  }
+
+  private Iterable<CloseableIterable<ScanTask>> toFileTasks(
+      CompletableFuture<Iterable<CloseableIterable<DataFile>>> dataFuture,
+      CompletableFuture<DeleteFileIndex> deletesFuture,
+      boolean copyDataFiles) {
+
+    String schemaString = SchemaParser.toJson(tableSchema());
+    Map<Integer, String> specStringCache = 
specCache(PartitionSpecParser::toJson);
+    Map<Integer, ResidualEvaluator> residualCache = 
specCache(this::newResidualEvaluator);
+
+    Iterable<CloseableIterable<DataFile>> dataFileGroups = dataFuture.join();
+
+    return Iterables.transform(
+        dataFileGroups,
+        dataFiles ->
+            toFileTasks(
+                dataFiles,
+                deletesFuture,
+                copyDataFiles,
+                schemaString,
+                specStringCache,
+                residualCache));
+  }
+
+  private CloseableIterable<ScanTask> toFileTasks(
+      CloseableIterable<DataFile> dataFiles,
+      CompletableFuture<DeleteFileIndex> deletesFuture,
+      boolean copyDataFiles,
+      String schemaString,
+      Map<Integer, String> specStringCache,
+      Map<Integer, ResidualEvaluator> residualCache) {
+
+    return CloseableIterable.transform(
+        dataFiles,
+        dataFile -> {
+          DeleteFile[] deleteFiles = 
deletesFuture.join().forDataFile(dataFile);
+
+          String specString = specStringCache.get(dataFile.specId());
+          ResidualEvaluator residuals = residualCache.get(dataFile.specId());
+
+          ScanMetricsUtil.fileTask(scanMetrics(), dataFile, deleteFiles);
+
+          return new BaseFileScanTask(
+              copyDataFiles ? dataFile.copy(shouldReturnColumnStats()) : 
dataFile,
+              deleteFiles,
+              schemaString,
+              specString,
+              residuals);
+        });
+  }
+
+  private ManifestEvaluator newManifestEvaluator(PartitionSpec spec) {
+    Expression projection = Projections.inclusive(spec, 
isCaseSensitive()).project(filter());
+    return ManifestEvaluator.forPartitionFilter(projection, spec, 
isCaseSensitive());
+  }
+
+  private ResidualEvaluator newResidualEvaluator(PartitionSpec spec) {
+    return ResidualEvaluator.of(spec, residualFilter(), isCaseSensitive());
+  }
+
+  private <R> Map<Integer, R> specCache(Function<PartitionSpec, R> load) {
+    Map<Integer, R> cache = Maps.newHashMap();
+    table().specs().forEach((specId, spec) -> cache.put(specId, 
load.apply(spec)));
+    return cache;
+  }
+
+  private boolean mayHaveEqualityDeletes(Snapshot snapshot) {
+    String count = 
snapshot.summary().get(SnapshotSummary.TOTAL_EQ_DELETES_PROP);
+    return count == null || !count.equals("0");
+  }
+
+  // a monitor pool that enables planing data and deletes concurrently if 
remote planning is used
+  private ExecutorService newMonitorPool() {
+    return ThreadPools.newWorkerPool("iceberg-planning-monitor-service", 
MONITOR_POOL_SIZE);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java 
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 953ad754aa..17e6bc0445 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.metrics.MetricsReporter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -58,7 +59,7 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>>
           "upper_bounds",
           "column_sizes");
 
-  private static final List<String> SCAN_WITH_STATS_COLUMNS =
+  protected static final List<String> SCAN_WITH_STATS_COLUMNS =
       
ImmutableList.<String>builder().addAll(SCAN_COLUMNS).addAll(STATS_COLUMNS).build();
 
   protected static final List<String> DELETE_SCAN_COLUMNS =
@@ -73,12 +74,13 @@ abstract class BaseScan<ThisT, T extends ScanTask, G 
extends ScanTaskGroup<T>>
           "record_count",
           "partition",
           "key_metadata",
-          "split_offsets");
+          "split_offsets",
+          "equality_ids");
 
   protected static final List<String> DELETE_SCAN_WITH_STATS_COLUMNS =
       
ImmutableList.<String>builder().addAll(DELETE_SCAN_COLUMNS).addAll(STATS_COLUMNS).build();
 
-  private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
+  protected static final boolean PLAN_SCANS_WITH_WORKER_POOL =
       SystemConfigs.SCAN_THREAD_POOL_ENABLED.value();
 
   private final Table table;
@@ -95,6 +97,10 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>>
     return table;
   }
 
+  protected FileIO io() {
+    return table.io();
+  }
+
   protected Schema tableSchema() {
     return schema;
   }
@@ -111,10 +117,18 @@ abstract class BaseScan<ThisT, T extends ScanTask, G 
extends ScanTaskGroup<T>>
     return context.returnColumnStats() ? SCAN_WITH_STATS_COLUMNS : 
SCAN_COLUMNS;
   }
 
+  protected boolean shouldReturnColumnStats() {
+    return context().returnColumnStats();
+  }
+
   protected boolean shouldIgnoreResiduals() {
     return context().ignoreResiduals();
   }
 
+  protected Expression residualFilter() {
+    return shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
+  }
+
   protected boolean shouldPlanWithExecutor() {
     return PLAN_SCANS_WITH_WORKER_POOL || 
context().planWithCustomizedExecutor();
   }
diff --git a/core/src/main/java/org/apache/iceberg/DataScan.java 
b/core/src/main/java/org/apache/iceberg/DataScan.java
new file mode 100644
index 0000000000..8de48740b9
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/DataScan.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+abstract class DataScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
+    extends SnapshotScan<ThisT, T, G> {
+
+  protected DataScan(Table table, Schema schema, TableScanContext context) {
+    super(table, schema, context);
+  }
+
+  @Override
+  protected boolean useSnapshotSchema() {
+    return true;
+  }
+
+  protected ManifestGroup newManifestGroup(
+      List<ManifestFile> dataManifests, List<ManifestFile> deleteManifests) {
+    return newManifestGroup(dataManifests, deleteManifests, 
context().returnColumnStats());
+  }
+
+  protected ManifestGroup newManifestGroup(
+      List<ManifestFile> dataManifests, boolean withColumnStats) {
+    return newManifestGroup(dataManifests, ImmutableList.of(), 
withColumnStats);
+  }
+
+  protected ManifestGroup newManifestGroup(
+      List<ManifestFile> dataManifests,
+      List<ManifestFile> deleteManifests,
+      boolean withColumnStats) {
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(io(), dataManifests, deleteManifests)
+            .caseSensitive(isCaseSensitive())
+            .select(withColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
+            .filterData(filter())
+            .specsById(table().specs())
+            .scanMetrics(scanMetrics())
+            .ignoreDeleted();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (shouldPlanWithExecutor() && (dataManifests.size() > 1 || 
deleteManifests.size() > 1)) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java 
b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 6eaa0d5ec7..67d2b0ef35 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -22,7 +22,6 @@ import java.util.List;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.util.SnapshotUtil;
 
 public class DataTableScan extends BaseTableScan {
   protected DataTableScan(Table table, Schema schema, TableScanContext 
context) {
@@ -52,12 +51,8 @@ public class DataTableScan extends BaseTableScan {
   }
 
   @Override
-  public TableScan useSnapshot(long scanSnapshotId) {
-    // call method in superclass just for the side effect of argument 
validation;
-    // we do not use its return value
-    super.useSnapshot(scanSnapshotId);
-    Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId);
-    return newRefinedScan(table(), snapshotSchema, 
context().useSnapshotId(scanSnapshotId));
+  protected boolean useSnapshotSchema() {
+    return true;
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java 
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 6f16794d5b..51917a71e9 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -142,6 +142,10 @@ class DeleteFileIndex {
     return forDataFile(entry.dataSequenceNumber(), entry.file());
   }
 
+  DeleteFile[] forDataFile(DataFile file) {
+    return forDataFile(file.dataSequenceNumber(), file);
+  }
+
   DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
     if (isEmpty) {
       return NO_DELETES;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java 
b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 08449a1043..027b8764a9 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -222,6 +223,19 @@ class ManifestGroup {
     return CloseableIterable.concat(entries((manifest, entries) -> entries));
   }
 
+  /**
+   * Returns an iterable for groups of data files in the set of manifests.
+   *
+   * <p>Files are not copied, it is the caller's responsibility to make 
defensive copies if adding
+   * these files to a collection.
+   *
+   * @return an iterable of file groups
+   */
+  public Iterable<CloseableIterable<DataFile>> fileGroups() {
+    return entries(
+        (manifest, entries) -> CloseableIterable.transform(entries, 
ManifestEntry::file));
+  }
+
   private <T> Iterable<CloseableIterable<T>> entries(
       BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>, 
CloseableIterable<T>>
           entryFn) {
@@ -349,12 +363,7 @@ class ManifestGroup {
         entry -> {
           DataFile dataFile = entry.file().copy(ctx.shouldKeepStats());
           DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry);
-          for (DeleteFile deleteFile : deleteFiles) {
-            
ctx.scanMetrics().totalDeleteFileSizeInBytes().increment(deleteFile.fileSizeInBytes());
-          }
-          
ctx.scanMetrics().totalFileSizeInBytes().increment(dataFile.fileSizeInBytes());
-          ctx.scanMetrics().resultDataFiles().increment();
-          ctx.scanMetrics().resultDeleteFiles().increment((long) 
deleteFiles.length);
+          ScanMetricsUtil.fileTask(ctx.scanMetrics(), dataFile, deleteFiles);
           return new BaseFileScanTask(
               dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), 
ctx.residuals());
         });
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java 
b/core/src/main/java/org/apache/iceberg/PlanningMode.java
similarity index 50%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to core/src/main/java/org/apache/iceberg/PlanningMode.java
index 102f48ee19..ef448fb22c 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/PlanningMode.java
@@ -16,22 +16,39 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
 
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
-public class ScanMetricsUtil {
+public enum PlanningMode {
+  AUTO("auto"),
+  LOCAL("local"),
+  DISTRIBUTED("distributed");
 
-  private ScanMetricsUtil() {}
+  private final String modeName;
 
-  public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile 
deleteFile) {
-    metrics.indexedDeleteFiles().increment();
+  PlanningMode(String modeName) {
+    this.modeName = modeName;
+  }
+
+  public static PlanningMode fromName(String modeName) {
+    Preconditions.checkArgument(modeName != null, "Mode name is null");
+
+    if (AUTO.modeName().equalsIgnoreCase(modeName)) {
+      return AUTO;
+
+    } else if (LOCAL.modeName().equalsIgnoreCase(modeName)) {
+      return LOCAL;
 
-    if (deleteFile.content() == FileContent.POSITION_DELETES) {
-      metrics.positionalDeleteFiles().increment();
-    } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
-      metrics.equalityDeleteFiles().increment();
+    } else if (DISTRIBUTED.modeName().equalsIgnoreCase(modeName)) {
+      return DISTRIBUTED;
+
+    } else {
+      throw new IllegalArgumentException("Unknown planning mode: " + modeName);
     }
   }
+
+  public String modeName() {
+    return modeName;
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotScan.java 
b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
index de53444ba9..a98a8c9f13 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotScan.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotScan.java
@@ -66,6 +66,11 @@ public abstract class SnapshotScan<ThisT, T extends 
ScanTask, G extends ScanTask
 
   protected abstract CloseableIterable<T> doPlanFiles();
 
+  // controls whether to use the snapshot schema while time travelling
+  protected boolean useSnapshotSchema() {
+    return false;
+  }
+
   protected ScanMetrics scanMetrics() {
     if (scanMetrics == null) {
       this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext());
@@ -81,7 +86,10 @@ public abstract class SnapshotScan<ThisT, T extends 
ScanTask, G extends ScanTask
         table().snapshot(scanSnapshotId) != null,
         "Cannot find snapshot with ID %s",
         scanSnapshotId);
-    return newRefinedScan(table(), tableSchema(), 
context().useSnapshotId(scanSnapshotId));
+    Schema newSchema =
+        useSnapshotSchema() ? SnapshotUtil.schemaFor(table(), scanSnapshotId) 
: tableSchema();
+    TableScanContext newContext = context().useSnapshotId(scanSnapshotId);
+    return newRefinedScan(table(), newSchema, newContext);
   }
 
   public ThisT useRef(String name) {
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java 
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index a9116bc57f..03e1f3ce88 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -231,6 +231,10 @@ public class TableProperties {
   public static final String ORC_BATCH_SIZE = 
"read.orc.vectorization.batch-size";
   public static final int ORC_BATCH_SIZE_DEFAULT = 5000;
 
+  public static final String DATA_PLANNING_MODE = "read.data-planning-mode";
+  public static final String DELETE_PLANNING_MODE = 
"read.delete-planning-mode";
+  public static final String PLANNING_MODE_DEFAULT = 
PlanningMode.AUTO.modeName();
+
   public static final String OBJECT_STORE_ENABLED = 
"write.object-storage.enabled";
   public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;
 
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java 
b/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
index 102f48ee19..c5aa6e1dd6 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ b/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.metrics;
 
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
 
@@ -34,4 +35,17 @@ public class ScanMetricsUtil {
       metrics.equalityDeleteFiles().increment();
     }
   }
+
+  public static void fileTask(ScanMetrics metrics, DataFile dataFile, 
DeleteFile[] deleteFiles) {
+    metrics.totalFileSizeInBytes().increment(dataFile.fileSizeInBytes());
+    metrics.resultDataFiles().increment();
+    metrics.resultDeleteFiles().increment(deleteFiles.length);
+
+    long deletesSizeInBytes = 0L;
+    for (DeleteFile deleteFile : deleteFiles) {
+      deletesSizeInBytes += deleteFile.fileSizeInBytes();
+    }
+
+    metrics.totalDeleteFileSizeInBytes().increment(deletesSizeInBytes);
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java 
b/core/src/test/java/org/apache/iceberg/DataTableScanTestBase.java
similarity index 81%
rename from core/src/test/java/org/apache/iceberg/TestDataTableScan.java
rename to core/src/test/java/org/apache/iceberg/DataTableScanTestBase.java
index 8541a96c8d..7133a5a761 100644
--- a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/DataTableScanTestBase.java
@@ -30,15 +30,19 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
-public class TestDataTableScan extends ScanTestBase<TableScan, FileScanTask, 
CombinedScanTask> {
-  public TestDataTableScan(int formatVersion) {
+public abstract class DataTableScanTestBase<
+        ScanT extends Scan<ScanT, T, G>, T extends ScanTask, G extends 
ScanTaskGroup<T>>
+    extends ScanTestBase<ScanT, T, G> {
+
+  public DataTableScanTestBase(int formatVersion) {
     super(formatVersion);
   }
 
-  @Override
-  protected TableScan newScan() {
-    return table.newScan();
-  }
+  protected abstract ScanT useRef(ScanT scan, String ref);
+
+  protected abstract ScanT useSnapshot(ScanT scan, long snapshotId);
+
+  protected abstract ScanT asOfTime(ScanT scan, long timestampMillis);
 
   @Test
   public void testTaskRowCounts() {
@@ -56,17 +60,17 @@ public class TestDataTableScan extends 
ScanTestBase<TableScan, FileScanTask, Com
     DeleteFile deleteFile2 = newDeleteFile("data_bucket=1");
     table.newRowDelta().addDeletes(deleteFile2).commit();
 
-    TableScan scan = table.newScan().option(TableProperties.SPLIT_SIZE, "50");
+    ScanT scan = newScan().option(TableProperties.SPLIT_SIZE, "50");
 
-    List<FileScanTask> fileScanTasks = Lists.newArrayList(scan.planFiles());
+    List<T> fileScanTasks = Lists.newArrayList(scan.planFiles());
     Assert.assertEquals("Must have 2 FileScanTasks", 2, fileScanTasks.size());
-    for (FileScanTask task : fileScanTasks) {
+    for (T task : fileScanTasks) {
       Assert.assertEquals("Rows count must match", 10, 
task.estimatedRowsCount());
     }
 
-    List<CombinedScanTask> combinedScanTasks = 
Lists.newArrayList(scan.planTasks());
+    List<G> combinedScanTasks = Lists.newArrayList(scan.planTasks());
     Assert.assertEquals("Must have 4 CombinedScanTask", 4, 
combinedScanTasks.size());
-    for (CombinedScanTask task : combinedScanTasks) {
+    for (G task : combinedScanTasks) {
       Assert.assertEquals("Rows count must match", 5, 
task.estimatedRowsCount());
     }
   }
@@ -100,11 +104,11 @@ public class TestDataTableScan extends 
ScanTestBase<TableScan, FileScanTask, Com
     // Add D to main
     table.newFastAppend().appendFile(FILE_D).commit();
 
-    TableScan testBranchScan = table.newScan().useRef("testBranch");
+    ScanT testBranchScan = useRef(newScan(), "testBranch");
     validateExpectedFileScanTasks(
         testBranchScan, ImmutableList.of(FILE_A.path(), FILE_B.path(), 
FILE_C.path()));
 
-    TableScan mainScan = table.newScan();
+    ScanT mainScan = newScan();
     validateExpectedFileScanTasks(mainScan, ImmutableList.of(FILE_A.path(), 
FILE_D.path()));
   }
 
@@ -113,9 +117,9 @@ public class TestDataTableScan extends 
ScanTestBase<TableScan, FileScanTask, Com
     table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
     table.manageSnapshots().createTag("tagB", 
table.currentSnapshot().snapshotId()).commit();
     table.newFastAppend().appendFile(FILE_C).commit();
-    TableScan tagScan = table.newScan().useRef("tagB");
+    ScanT tagScan = useRef(newScan(), "tagB");
     validateExpectedFileScanTasks(tagScan, ImmutableList.of(FILE_A.path(), 
FILE_B.path()));
-    TableScan mainScan = table.newScan();
+    ScanT mainScan = newScan();
     validateExpectedFileScanTasks(
         mainScan, ImmutableList.of(FILE_A.path(), FILE_B.path(), 
FILE_C.path()));
   }
@@ -126,7 +130,7 @@ public class TestDataTableScan extends 
ScanTestBase<TableScan, FileScanTask, Com
     table.manageSnapshots().createTag("tagB", 
table.currentSnapshot().snapshotId()).commit();
 
     Assertions.assertThatThrownBy(
-            () -> 
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).useRef("tagB"))
+            () -> useRef(useSnapshot(newScan(), 
table.currentSnapshot().snapshotId()), "tagB"))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Cannot override ref, already set snapshot id=1");
   }
@@ -139,7 +143,7 @@ public class TestDataTableScan extends 
ScanTestBase<TableScan, FileScanTask, Com
     table.manageSnapshots().createTag("tagB", 
table.currentSnapshot().snapshotId()).commit();
 
     Assertions.assertThatThrownBy(
-            () -> 
table.newScan().useRef("tagB").useSnapshot(snapshotA.snapshotId()))
+            () -> useSnapshot(useRef(newScan(), "tagB"), 
snapshotA.snapshotId()))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Cannot override snapshot, already set snapshot id=2");
   }
@@ -153,7 +157,7 @@ public class TestDataTableScan extends 
ScanTestBase<TableScan, FileScanTask, Com
         .commit();
 
     Assertions.assertThatThrownBy(
-            () -> 
table.newScan().useRef("testBranch").asOfTime(System.currentTimeMillis()))
+            () -> asOfTime(useRef(newScan(), "testBranch"), 
System.currentTimeMillis()))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Cannot override snapshot, already set snapshot id=1");
   }
@@ -165,24 +169,26 @@ public class TestDataTableScan extends 
ScanTestBase<TableScan, FileScanTask, Com
     table.newFastAppend().appendFile(FILE_B).commit();
     table.manageSnapshots().createTag("tagB", 
table.currentSnapshot().snapshotId()).commit();
 
-    Assertions.assertThatThrownBy(() -> 
table.newScan().useRef("tagB").useRef("tagA"))
+    Assertions.assertThatThrownBy(() -> useRef(useRef(newScan(), "tagB"), 
"tagA"))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Cannot override ref, already set snapshot id=2");
   }
 
   @Test
   public void testSettingInvalidRefFails() {
-    Assertions.assertThatThrownBy(() -> table.newScan().useRef("nonexisting"))
+    Assertions.assertThatThrownBy(() -> useRef(newScan(), "nonexisting"))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessage("Cannot find ref nonexisting");
   }
 
-  private void validateExpectedFileScanTasks(
-      TableScan scan, List<CharSequence> expectedFileScanPaths) throws 
IOException {
-    try (CloseableIterable<FileScanTask> scanTasks = scan.planFiles()) {
+  private void validateExpectedFileScanTasks(ScanT scan, List<CharSequence> 
expectedFileScanPaths)
+      throws IOException {
+    try (CloseableIterable<T> scanTasks = scan.planFiles()) {
       Assert.assertEquals(expectedFileScanPaths.size(), 
Iterables.size(scanTasks));
       List<CharSequence> actualFiles = Lists.newArrayList();
-      scanTasks.forEach(task -> actualFiles.add(task.file().path()));
+      for (T task : scanTasks) {
+        actualFiles.add(((FileScanTask) task).file().path());
+      }
       Assert.assertTrue(actualFiles.containsAll(expectedFileScanPaths));
     }
   }
@@ -203,12 +209,13 @@ public class TestDataTableScan extends 
ScanTestBase<TableScan, FileScanTask, Com
     DeleteFile deleteFile2 = newDeleteFile("data_bucket=1");
     table.newRowDelta().addDeletes(deleteFile2).commit();
 
-    TableScan scan = table.newScan();
+    ScanT scan = newScan();
 
-    List<FileScanTask> fileScanTasks = Lists.newArrayList(scan.planFiles());
+    List<T> fileScanTasks = Lists.newArrayList(scan.planFiles());
     Assert.assertEquals("Must have 2 FileScanTasks", 2, fileScanTasks.size());
-    for (FileScanTask task : fileScanTasks) {
-      DataFile file = task.file();
+    for (T task : fileScanTasks) {
+      FileScanTask fileScanTask = (FileScanTask) task;
+      DataFile file = fileScanTask.file();
       long expectedDataSequenceNumber = 0L;
       long expectedDeleteSequenceNumber = 0L;
       if (file.path().equals(dataFile1.path())) {
@@ -230,7 +237,7 @@ public class TestDataTableScan extends 
ScanTestBase<TableScan, FileScanTask, Com
           expectedDataSequenceNumber,
           file.fileSequenceNumber().longValue());
 
-      List<DeleteFile> deleteFiles = task.deletes();
+      List<DeleteFile> deleteFiles = fileScanTask.deletes();
       Assert.assertEquals("Must have 1 delete file", 1, 
Iterables.size(deleteFiles));
       DeleteFile deleteFile = Iterables.getOnlyElement(deleteFiles);
       Assert.assertEquals(
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java 
b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
similarity index 92%
rename from core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
rename to core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
index 4e574690f7..6f33be0948 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
+++ b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
@@ -33,8 +33,11 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class TestDeleteFileIndex extends TableTestBase {
-  public TestDeleteFileIndex() {
+public abstract class DeleteFileIndexTestBase<
+        ScanT extends Scan<ScanT, T, G>, T extends ScanTask, G extends 
ScanTaskGroup<T>>
+    extends TableTestBase {
+
+  public DeleteFileIndexTestBase() {
     super(2 /* table format version */);
   }
 
@@ -109,6 +112,8 @@ public class TestDeleteFileIndex extends TableTestBase {
     return file;
   }
 
+  protected abstract ScanT newScan(Table table);
+
   @Test
   public void testMinSequenceNumberFilteringForFiles() {
     PartitionSpec partSpec = PartitionSpec.unpartitioned();
@@ -248,10 +253,10 @@ public class TestDeleteFileIndex extends TableTestBase {
     DeleteFile unpartitionedPosDeletes = 
unpartitionedPosDeletes(unpartitioned.spec());
     unpartitioned.newRowDelta().addDeletes(unpartitionedPosDeletes).commit();
 
-    List<FileScanTask> tasks = 
Lists.newArrayList(unpartitioned.newScan().planFiles().iterator());
+    List<T> tasks = 
Lists.newArrayList(newScan(unpartitioned).planFiles().iterator());
     Assert.assertEquals("Should have one task", 1, tasks.size());
 
-    FileScanTask task = tasks.get(0);
+    FileScanTask task = (FileScanTask) tasks.get(0);
     Assert.assertEquals(
         "Should have the correct data file path", unpartitionedFile.path(), 
task.file().path());
     Assert.assertEquals("Should have one associated delete file", 1, 
task.deletes().size());
@@ -264,8 +269,8 @@ public class TestDeleteFileIndex extends TableTestBase {
     DeleteFile unpartitionedEqDeletes = 
unpartitionedEqDeletes(unpartitioned.spec());
     unpartitioned.newRowDelta().addDeletes(unpartitionedEqDeletes).commit();
 
-    tasks = Lists.newArrayList(unpartitioned.newScan().planFiles().iterator());
-    task = tasks.get(0);
+    tasks = Lists.newArrayList(newScan(unpartitioned).planFiles().iterator());
+    task = (FileScanTask) tasks.get(0);
     Assert.assertEquals(
         "Should have the correct data file path", unpartitionedFile.path(), 
task.file().path());
     Assert.assertEquals("Should have two associated delete files", 2, 
task.deletes().size());
@@ -281,10 +286,10 @@ public class TestDeleteFileIndex extends TableTestBase {
 
     table.newRowDelta().addDeletes(FILE_A_POS_1).commit();
 
-    List<FileScanTask> tasks = 
Lists.newArrayList(table.newScan().planFiles().iterator());
+    List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
     Assert.assertEquals("Should have one task", 1, tasks.size());
 
-    FileScanTask task = tasks.get(0);
+    FileScanTask task = (FileScanTask) tasks.get(0);
     Assert.assertEquals(
         "Should have the correct data file path", FILE_A.path(), 
task.file().path());
     Assert.assertEquals("Should have one associated delete file", 1, 
task.deletes().size());
@@ -298,10 +303,10 @@ public class TestDeleteFileIndex extends TableTestBase {
 
     table.newRowDelta().addDeletes(FILE_A_EQ_1).commit();
 
-    List<FileScanTask> tasks = 
Lists.newArrayList(table.newScan().planFiles().iterator());
+    List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
     Assert.assertEquals("Should have one task", 1, tasks.size());
 
-    FileScanTask task = tasks.get(0);
+    FileScanTask task = (FileScanTask) tasks.get(0);
     Assert.assertEquals(
         "Should have the correct data file path", FILE_A.path(), 
task.file().path());
     Assert.assertEquals("Should have one associated delete file", 1, 
task.deletes().size());
@@ -315,10 +320,10 @@ public class TestDeleteFileIndex extends TableTestBase {
 
     
table.newRowDelta().addDeletes(FILE_A_POS_1).addDeletes(FILE_A_EQ_1).commit();
 
-    List<FileScanTask> tasks = 
Lists.newArrayList(table.newScan().planFiles().iterator());
+    List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
     Assert.assertEquals("Should have one task", 1, tasks.size());
 
-    FileScanTask task = tasks.get(0);
+    FileScanTask task = (FileScanTask) tasks.get(0);
     Assert.assertEquals(
         "Should have the correct data file path", FILE_B.path(), 
task.file().path());
     Assert.assertEquals("Should have no delete files to apply", 0, 
task.deletes().size());
@@ -330,10 +335,10 @@ public class TestDeleteFileIndex extends TableTestBase {
 
     table.newAppend().appendFile(FILE_A).commit();
 
-    List<FileScanTask> tasks = 
Lists.newArrayList(table.newScan().planFiles().iterator());
+    List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
     Assert.assertEquals("Should have one task", 1, tasks.size());
 
-    FileScanTask task = tasks.get(0);
+    FileScanTask task = (FileScanTask) tasks.get(0);
     Assert.assertEquals(
         "Should have the correct data file path", FILE_A.path(), 
task.file().path());
     Assert.assertEquals("Should have no delete files to apply", 0, 
task.deletes().size());
@@ -354,10 +359,10 @@ public class TestDeleteFileIndex extends TableTestBase {
         .addDeletes(unpartitionedEqDeletes)
         .commit();
 
-    List<FileScanTask> tasks = 
Lists.newArrayList(table.newScan().planFiles().iterator());
+    List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
     Assert.assertEquals("Should have one task", 1, tasks.size());
 
-    FileScanTask task = tasks.get(0);
+    FileScanTask task = (FileScanTask) tasks.get(0);
     Assert.assertEquals(
         "Should have the correct data file path", FILE_A.path(), 
task.file().path());
     Assert.assertEquals("Should have one associated delete file", 1, 
task.deletes().size());
@@ -384,10 +389,10 @@ public class TestDeleteFileIndex extends TableTestBase {
         .addDeletes(unpartitionedEqDeletes)
         .commit();
 
-    List<FileScanTask> tasks = 
Lists.newArrayList(table.newScan().planFiles().iterator());
+    List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
     Assert.assertEquals("Should have one task", 1, tasks.size());
 
-    FileScanTask task = tasks.get(0);
+    FileScanTask task = (FileScanTask) tasks.get(0);
     Assert.assertEquals(
         "Should have the correct data file path", FILE_A.path(), 
task.file().path());
     Assert.assertEquals("Should have two associated delete files", 2, 
task.deletes().size());
@@ -401,10 +406,10 @@ public class TestDeleteFileIndex extends TableTestBase {
   public void testPartitionedTableSequenceNumbers() {
     
table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A_EQ_1).addDeletes(FILE_A_POS_1).commit();
 
-    List<FileScanTask> tasks = 
Lists.newArrayList(table.newScan().planFiles().iterator());
+    List<T> tasks = Lists.newArrayList(newScan(table).planFiles().iterator());
     Assert.assertEquals("Should have one task", 1, tasks.size());
 
-    FileScanTask task = tasks.get(0);
+    FileScanTask task = (FileScanTask) tasks.get(0);
     Assert.assertEquals(
         "Should have the correct data file path", FILE_A.path(), 
task.file().path());
     Assert.assertEquals("Should have one associated delete file", 1, 
task.deletes().size());
@@ -495,16 +500,12 @@ public class TestDeleteFileIndex extends TableTestBase {
         2,
         
table.currentSnapshot().deleteManifests(table.io()).get(0).existingFilesCount().intValue());
 
-    List<FileScanTask> tasks =
+    List<T> tasks =
         Lists.newArrayList(
-            table
-                .newScan()
-                .filter(equal(bucket("data", BUCKETS_NUMBER), 0))
-                .planFiles()
-                .iterator());
+            newScan(table).filter(equal(bucket("data", BUCKETS_NUMBER), 
0)).planFiles().iterator());
     Assert.assertEquals("Should have one task", 1, tasks.size());
 
-    FileScanTask task = tasks.get(0);
+    FileScanTask task = (FileScanTask) tasks.get(0);
     Assert.assertEquals(
         "Should have the correct data file path", FILE_A.path(), 
task.file().path());
     Assert.assertEquals("Should have two associated delete files", 2, 
task.deletes().size());
diff --git a/core/src/test/java/org/apache/iceberg/TestFilterFiles.java 
b/core/src/test/java/org/apache/iceberg/FilterFilesTestBase.java
similarity index 87%
rename from core/src/test/java/org/apache/iceberg/TestFilterFiles.java
rename to core/src/test/java/org/apache/iceberg/FilterFilesTestBase.java
index 34b18e18b8..995a07f2eb 100644
--- a/core/src/test/java/org/apache/iceberg/TestFilterFiles.java
+++ b/core/src/test/java/org/apache/iceberg/FilterFilesTestBase.java
@@ -35,22 +35,18 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestFilterFiles {
-  @Parameterized.Parameters(name = "formatVersion = {0}")
-  public static Object[] parameters() {
-    return new Object[] {1, 2};
-  }
+
+public abstract class FilterFilesTestBase<
+    ScanT extends Scan<ScanT, T, G>, T extends ScanTask, G extends 
ScanTaskGroup<T>> {
 
   public final int formatVersion;
 
-  public TestFilterFiles(int formatVersion) {
+  public FilterFilesTestBase(int formatVersion) {
     this.formatVersion = formatVersion;
   }
 
+  protected abstract ScanT newScan(Table table);
+
   @Rule public TemporaryFolder temp = new TemporaryFolder();
   private final Schema schema =
       new Schema(
@@ -122,10 +118,10 @@ public class TestFilterFiles {
 
     table.refresh();
 
-    TableScan emptyScan = table.newScan().filter(Expressions.equal("id", 5));
+    ScanT emptyScan = newScan(table).filter(Expressions.equal("id", 5));
     assertEquals(0, Iterables.size(emptyScan.planFiles()));
 
-    TableScan nonEmptyScan = table.newScan().filter(Expressions.equal("id", 
1));
+    ScanT nonEmptyScan = newScan(table).filter(Expressions.equal("id", 1));
     assertEquals(1, Iterables.size(nonEmptyScan.planFiles()));
   }
 
@@ -156,11 +152,10 @@ public class TestFilterFiles {
 
     table.refresh();
 
-    TableScan emptyScan = 
table.newScan().caseSensitive(false).filter(Expressions.equal("ID", 5));
+    ScanT emptyScan = 
newScan(table).caseSensitive(false).filter(Expressions.equal("ID", 5));
     assertEquals(0, Iterables.size(emptyScan.planFiles()));
 
-    TableScan nonEmptyScan =
-        table.newScan().caseSensitive(false).filter(Expressions.equal("ID", 
1));
+    ScanT nonEmptyScan = 
newScan(table).caseSensitive(false).filter(Expressions.equal("ID", 1));
     assertEquals(1, Iterables.size(nonEmptyScan.planFiles()));
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java 
b/core/src/test/java/org/apache/iceberg/TestLocalDataTableScan.java
similarity index 55%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to core/src/test/java/org/apache/iceberg/TestLocalDataTableScan.java
index 102f48ee19..897cbed488 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestLocalDataTableScan.java
@@ -16,22 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
 
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+public class TestLocalDataTableScan
+    extends DataTableScanTestBase<TableScan, FileScanTask, CombinedScanTask> {
 
-public class ScanMetricsUtil {
+  public TestLocalDataTableScan(int formatVersion) {
+    super(formatVersion);
+  }
 
-  private ScanMetricsUtil() {}
+  @Override
+  protected TableScan useRef(TableScan scan, String ref) {
+    return scan.useRef(ref);
+  }
 
-  public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile 
deleteFile) {
-    metrics.indexedDeleteFiles().increment();
+  @Override
+  protected TableScan useSnapshot(TableScan scan, long snapshotId) {
+    return scan.useSnapshot(snapshotId);
+  }
+
+  @Override
+  protected TableScan asOfTime(TableScan scan, long timestampMillis) {
+    return scan.asOfTime(timestampMillis);
+  }
 
-    if (deleteFile.content() == FileContent.POSITION_DELETES) {
-      metrics.positionalDeleteFiles().increment();
-    } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
-      metrics.equalityDeleteFiles().increment();
-    }
+  @Override
+  protected TableScan newScan() {
+    return table.newScan();
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java 
b/core/src/test/java/org/apache/iceberg/TestLocalDeleteFileIndex.java
similarity index 59%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to core/src/test/java/org/apache/iceberg/TestLocalDeleteFileIndex.java
index 102f48ee19..d01e6253d4 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestLocalDeleteFileIndex.java
@@ -16,22 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
 
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+public class TestLocalDeleteFileIndex
+    extends DeleteFileIndexTestBase<TableScan, FileScanTask, CombinedScanTask> 
{
 
-public class ScanMetricsUtil {
-
-  private ScanMetricsUtil() {}
-
-  public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile 
deleteFile) {
-    metrics.indexedDeleteFiles().increment();
-
-    if (deleteFile.content() == FileContent.POSITION_DELETES) {
-      metrics.positionalDeleteFiles().increment();
-    } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
-      metrics.equalityDeleteFiles().increment();
-    }
+  @Override
+  protected TableScan newScan(Table table) {
+    return table.newScan();
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java 
b/core/src/test/java/org/apache/iceberg/TestLocalFilterFiles.java
similarity index 59%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to core/src/test/java/org/apache/iceberg/TestLocalFilterFiles.java
index 102f48ee19..b7ff71461c 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ b/core/src/test/java/org/apache/iceberg/TestLocalFilterFiles.java
@@ -16,22 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
 
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-public class ScanMetricsUtil {
+@RunWith(Parameterized.class)
+public class TestLocalFilterFiles
+    extends FilterFilesTestBase<TableScan, FileScanTask, CombinedScanTask> {
 
-  private ScanMetricsUtil() {}
+  @Parameterized.Parameters(name = "formatVersion = {0}")
+  public static Object[] parameters() {
+    return new Object[] {1, 2};
+  }
 
-  public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile 
deleteFile) {
-    metrics.indexedDeleteFiles().increment();
+  public TestLocalFilterFiles(int formatVersion) {
+    super(formatVersion);
+  }
 
-    if (deleteFile.content() == FileContent.POSITION_DELETES) {
-      metrics.positionalDeleteFiles().increment();
-    } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
-      metrics.equalityDeleteFiles().increment();
-    }
+  @Override
+  protected TableScan newScan(Table table) {
+    return table.newScan();
   }
 }
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
index e660a2ca16..24124a97e2 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
@@ -20,11 +20,15 @@ package org.apache.iceberg.spark.extensions;
 
 import static org.apache.iceberg.DataOperations.DELETE;
 import static org.apache.iceberg.DataOperations.OVERWRITE;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
 import static org.apache.iceberg.SnapshotSummary.ADDED_DELETE_FILES_PROP;
 import static org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP;
 import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP;
 import static org.apache.iceberg.SnapshotSummary.DELETED_FILES_PROP;
+import static org.apache.iceberg.TableProperties.DATA_PLANNING_MODE;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DELETE_PLANNING_MODE;
 import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
 import static 
org.apache.iceberg.TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
@@ -43,6 +47,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
@@ -74,6 +79,7 @@ public abstract class SparkRowLevelOperationsTestBase extends 
SparkExtensionsTes
   protected final String distributionMode;
   protected final boolean fanoutEnabled;
   protected final String branch;
+  protected final PlanningMode planningMode;
 
   public SparkRowLevelOperationsTestBase(
       String catalogName,
@@ -83,19 +89,22 @@ public abstract class SparkRowLevelOperationsTestBase 
extends SparkExtensionsTes
       boolean vectorized,
       String distributionMode,
       boolean fanoutEnabled,
-      String branch) {
+      String branch,
+      PlanningMode planningMode) {
     super(catalogName, implementation, config);
     this.fileFormat = fileFormat;
     this.vectorized = vectorized;
     this.distributionMode = distributionMode;
     this.fanoutEnabled = fanoutEnabled;
     this.branch = branch;
+    this.planningMode = planningMode;
   }
 
   @Parameters(
       name =
           "catalogName = {0}, implementation = {1}, config = {2},"
-              + " format = {3}, vectorized = {4}, distributionMode = {5}, 
fanout = {6}, branch = {7}")
+              + " format = {3}, vectorized = {4}, distributionMode = {5},"
+              + " fanout = {6}, branch = {7}, planningMode = {8}")
   public static Object[][] parameters() {
     return new Object[][] {
       {
@@ -108,7 +117,8 @@ public abstract class SparkRowLevelOperationsTestBase 
extends SparkExtensionsTes
         true,
         WRITE_DISTRIBUTION_MODE_NONE,
         true,
-        SnapshotRef.MAIN_BRANCH
+        SnapshotRef.MAIN_BRANCH,
+        LOCAL
       },
       {
         "testhive",
@@ -121,6 +131,7 @@ public abstract class SparkRowLevelOperationsTestBase 
extends SparkExtensionsTes
         WRITE_DISTRIBUTION_MODE_NONE,
         false,
         null,
+        DISTRIBUTED
       },
       {
         "testhadoop",
@@ -130,7 +141,8 @@ public abstract class SparkRowLevelOperationsTestBase 
extends SparkExtensionsTes
         RANDOM.nextBoolean(),
         WRITE_DISTRIBUTION_MODE_HASH,
         true,
-        null
+        null,
+        LOCAL
       },
       {
         "spark_catalog",
@@ -147,7 +159,8 @@ public abstract class SparkRowLevelOperationsTestBase 
extends SparkExtensionsTes
         false,
         WRITE_DISTRIBUTION_MODE_RANGE,
         false,
-        "test"
+        "test",
+        DISTRIBUTED
       }
     };
   }
@@ -156,14 +169,18 @@ public abstract class SparkRowLevelOperationsTestBase 
extends SparkExtensionsTes
 
   protected void initTable() {
     sql(
-        "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s', '%s' '%s', '%s' '%s')",
+        "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s', '%s' '%s', '%s' '%s', 
'%s' '%s', '%s' '%s')",
         tableName,
         DEFAULT_FILE_FORMAT,
         fileFormat,
         WRITE_DISTRIBUTION_MODE,
         distributionMode,
         SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
-        String.valueOf(fanoutEnabled));
+        String.valueOf(fanoutEnabled),
+        DATA_PLANNING_MODE,
+        planningMode.modeName(),
+        DELETE_PLANNING_MODE,
+        planningMode.modeName());
 
     switch (fileFormat) {
       case "parquet":
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
index 45cfb014eb..9ebe73da33 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteDelete.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -62,7 +63,8 @@ public class TestCopyOnWriteDelete extends TestDelete {
       Boolean vectorized,
       String distributionMode,
       boolean fanoutEnabled,
-      String branch) {
+      String branch,
+      PlanningMode planningMode) {
     super(
         catalogName,
         implementation,
@@ -71,7 +73,8 @@ public class TestCopyOnWriteDelete extends TestDelete {
         vectorized,
         distributionMode,
         fanoutEnabled,
-        branch);
+        branch,
+        planningMode);
   }
 
   @Override
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
index 6edbff480d..6b6819a924 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteMerge.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -60,7 +61,8 @@ public class TestCopyOnWriteMerge extends TestMerge {
       boolean vectorized,
       String distributionMode,
       boolean fanoutEnabled,
-      String branch) {
+      String branch,
+      PlanningMode planningMode) {
     super(
         catalogName,
         implementation,
@@ -69,7 +71,8 @@ public class TestCopyOnWriteMerge extends TestMerge {
         vectorized,
         distributionMode,
         fanoutEnabled,
-        branch);
+        branch,
+        planningMode);
   }
 
   @Override
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
index a398b99661..4354a1019c 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCopyOnWriteUpdate.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -59,7 +60,8 @@ public class TestCopyOnWriteUpdate extends TestUpdate {
       boolean vectorized,
       String distributionMode,
       boolean fanoutEnabled,
-      String branch) {
+      String branch,
+      PlanningMode planningMode) {
     super(
         catalogName,
         implementation,
@@ -68,7 +70,8 @@ public class TestCopyOnWriteUpdate extends TestUpdate {
         vectorized,
         distributionMode,
         fanoutEnabled,
-        branch);
+        branch,
+        planningMode);
   }
 
   @Override
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index b2d1e479e7..8214de6dfa 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
@@ -92,7 +93,8 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
       Boolean vectorized,
       String distributionMode,
       boolean fanoutEnabled,
-      String branch) {
+      String branch,
+      PlanningMode planningMode) {
     super(
         catalogName,
         implementation,
@@ -101,7 +103,8 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
         vectorized,
         distributionMode,
         fanoutEnabled,
-        branch);
+        branch,
+        planningMode);
   }
 
   @BeforeClass
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 843081a016..64d356af04 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
@@ -83,7 +84,8 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
       boolean vectorized,
       String distributionMode,
       boolean fanoutEnabled,
-      String branch) {
+      String branch,
+      PlanningMode planningMode) {
     super(
         catalogName,
         implementation,
@@ -92,7 +94,8 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
         vectorized,
         distributionMode,
         fanoutEnabled,
-        branch);
+        branch,
+        planningMode);
   }
 
   @BeforeClass
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
index e32b0f22ac..9c0e8235f8 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadDelete.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Table;
@@ -51,7 +52,8 @@ public class TestMergeOnReadDelete extends TestDelete {
       Boolean vectorized,
       String distributionMode,
       boolean fanoutEnabled,
-      String branch) {
+      String branch,
+      PlanningMode planningMode) {
     super(
         catalogName,
         implementation,
@@ -60,7 +62,8 @@ public class TestMergeOnReadDelete extends TestDelete {
         vectorized,
         distributionMode,
         fanoutEnabled,
-        branch);
+        branch,
+        planningMode);
   }
 
   @Override
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
index fc212ac844..e743b32b45 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadMerge.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.spark.extensions;
 
 import java.util.Map;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -33,7 +34,8 @@ public class TestMergeOnReadMerge extends TestMerge {
       boolean vectorized,
       String distributionMode,
       boolean fanoutEnabled,
-      String branch) {
+      String branch,
+      PlanningMode planningMode) {
     super(
         catalogName,
         implementation,
@@ -42,7 +44,8 @@ public class TestMergeOnReadMerge extends TestMerge {
         vectorized,
         distributionMode,
         fanoutEnabled,
-        branch);
+        branch,
+        planningMode);
   }
 
   @Override
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
index f40afac459..0207d4ce4d 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeOnReadUpdate.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.spark.extensions;
 
 import java.util.Map;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -33,7 +34,8 @@ public class TestMergeOnReadUpdate extends TestUpdate {
       boolean vectorized,
       String distributionMode,
       boolean fanoutEnabled,
-      String branch) {
+      String branch,
+      PlanningMode planningMode) {
     super(
         catalogName,
         implementation,
@@ -42,7 +44,8 @@ public class TestMergeOnReadUpdate extends TestUpdate {
         vectorized,
         distributionMode,
         fanoutEnabled,
-        branch);
+        branch,
+        planningMode);
   }
 
   @Override
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 39249173ae..8f7585ccd7 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
@@ -86,7 +87,8 @@ public abstract class TestUpdate extends 
SparkRowLevelOperationsTestBase {
       boolean vectorized,
       String distributionMode,
       boolean fanoutEnabled,
-      String branch) {
+      String branch,
+      PlanningMode planningMode) {
     super(
         catalogName,
         implementation,
@@ -95,7 +97,8 @@ public abstract class TestUpdate extends 
SparkRowLevelOperationsTestBase {
         vectorized,
         distributionMode,
         fanoutEnabled,
-        branch);
+        branch,
+        planningMode);
   }
 
   @BeforeClass
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
new file mode 100644
index 0000000000..d4c2848b45
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.ClosingIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.JobGroupInfo;
+import org.apache.iceberg.spark.JobGroupUtils;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.actions.ManifestFileBean;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A batch data scan that can utilize Spark cluster resources for planning.
+ *
+ * <p>This scan remotely filters manifests, fetching only the relevant data 
and delete files to the
+ * driver. The delete file assignment is done locally after the remote 
filtering step. Such approach
+ * is beneficial if the remote parallelism is much higher than the number of 
driver cores.
+ *
+ * <p>This scan is best suited for queries with selective filters on 
lower/upper bounds across all
+ * partitions, or against poorly clustered metadata. This allows job planning 
to benefit from highly
+ * concurrent remote filtering while not incurring high serialization and data 
transfer costs. This
+ * class is also useful for full table scans over large tables but the cost of 
bringing data and
+ * delete file details to the driver may become noticeable. Make sure to 
follow the performance tips
+ * below in such cases.
+ *
+ * <p>Ensure the filtered metadata size doesn't exceed the driver's max result 
size. For large table
+ * scans, consider increasing `spark.driver.maxResultSize` to avoid job 
failures.
+ *
+ * <p>Performance tips:
+ *
+ * <ul>
+ *   <li>Enable Kryo serialization (`spark.serializer`)
+ *   <li>Increase the number of driver cores (`spark.driver.cores`)
+ *   <li>Tune the number of threads used to fetch task results 
(`spark.resultGetter.threads`)
+ * </ul>
+ */
+public class SparkDistributedDataScan extends BaseDistributedDataScan {
+
+  private static final Joiner COMMA = Joiner.on(',');
+  private static final String DELETE_PLANNING_JOB_GROUP_ID = "DELETE-PLANNING";
+  private static final String DATA_PLANNING_JOB_GROUP_ID = "DATA-PLANNING";
+
+  private final SparkSession spark;
+  private final JavaSparkContext sparkContext;
+  private final SparkReadConf readConf;
+
+  private Broadcast<Table> tableBroadcast = null;
+
+  public SparkDistributedDataScan(SparkSession spark, Table table, 
SparkReadConf readConf) {
+    this(spark, table, readConf, table.schema(), TableScanContext.empty());
+  }
+
+  private SparkDistributedDataScan(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      Schema schema,
+      TableScanContext context) {
+    super(table, schema, context);
+    this.spark = spark;
+    this.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.readConf = readConf;
+  }
+
+  @Override
+  protected BatchScan newRefinedScan(
+      Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new SparkDistributedDataScan(spark, newTable, readConf, newSchema, 
newContext);
+  }
+
+  @Override
+  protected int remoteParallelism() {
+    return readConf.parallelism();
+  }
+
+  @Override
+  protected PlanningMode dataPlanningMode() {
+    return readConf.dataPlanningMode();
+  }
+
+  @Override
+  protected boolean shouldCopyRemotelyPlannedDataFiles() {
+    return false;
+  }
+
+  @Override
+  protected Iterable<CloseableIterable<DataFile>> planDataRemotely(
+      List<ManifestFile> dataManifests, boolean withColumnStats) {
+    JobGroupInfo info = new JobGroupInfo(DATA_PLANNING_JOB_GROUP_ID, 
jobDesc("data"));
+    return withJobGroupInfo(info, () -> doPlanDataRemotely(dataManifests, 
withColumnStats));
+  }
+
+  private Iterable<CloseableIterable<DataFile>> doPlanDataRemotely(
+      List<ManifestFile> dataManifests, boolean withColumnStats) {
+    scanMetrics().scannedDataManifests().increment(dataManifests.size());
+
+    JavaRDD<DataFile> dataFileRDD =
+        sparkContext
+            .parallelize(toBeans(dataManifests), dataManifests.size())
+            .flatMap(new ReadDataManifest(tableBroadcast(), context(), 
withColumnStats));
+    List<List<DataFile>> dataFileGroups = collectPartitions(dataFileRDD);
+
+    return Iterables.transform(dataFileGroups, 
CloseableIterable::withNoopClose);
+  }
+
+  @Override
+  protected PlanningMode deletePlanningMode() {
+    return readConf.deletePlanningMode();
+  }
+
+  @Override
+  protected DeleteFileIndex planDeletesRemotely(List<ManifestFile> 
deleteManifests) {
+    JobGroupInfo info = new JobGroupInfo(DELETE_PLANNING_JOB_GROUP_ID, 
jobDesc("deletes"));
+    return withJobGroupInfo(info, () -> 
doPlanDeletesRemotely(deleteManifests));
+  }
+
+  private DeleteFileIndex doPlanDeletesRemotely(List<ManifestFile> 
deleteManifests) {
+    scanMetrics().scannedDeleteManifests().increment(deleteManifests.size());
+
+    List<DeleteFile> deleteFiles =
+        sparkContext
+            .parallelize(toBeans(deleteManifests), deleteManifests.size())
+            .flatMap(new ReadDeleteManifest(tableBroadcast(), context()))
+            .collect();
+
+    return DeleteFileIndex.builderFor(deleteFiles)
+        .specsById(table().specs())
+        .caseSensitive(isCaseSensitive())
+        .scanMetrics(scanMetrics())
+        .build();
+  }
+
+  private <T> T withJobGroupInfo(JobGroupInfo info, Supplier<T> supplier) {
+    return JobGroupUtils.withJobGroupInfo(sparkContext, info, supplier);
+  }
+
+  private String jobDesc(String type) {
+    List<String> options = Lists.newArrayList();
+    options.add("snapshot_id=" + snapshot().snapshotId());
+    String optionsAsString = COMMA.join(options);
+    return String.format("Planning %s (%s) for %s", type, optionsAsString, 
table().name());
+  }
+
+  private List<ManifestFileBean> toBeans(List<ManifestFile> manifests) {
+    return 
manifests.stream().map(ManifestFileBean::fromManifest).collect(Collectors.toList());
+  }
+
+  private Broadcast<Table> tableBroadcast() {
+    if (tableBroadcast == null) {
+      Table serializableTable = SerializableTableWithSize.copyOf(table());
+      this.tableBroadcast = sparkContext.broadcast(serializableTable);
+    }
+
+    return tableBroadcast;
+  }
+
+  private <T> List<List<T>> collectPartitions(JavaRDD<T> rdd) {
+    int[] partitionIds = IntStream.range(0, rdd.getNumPartitions()).toArray();
+    return Arrays.asList(rdd.collectPartitions(partitionIds));
+  }
+
+  private static class ReadDataManifest implements 
FlatMapFunction<ManifestFileBean, DataFile> {
+
+    private final Broadcast<Table> table;
+    private final Expression filter;
+    private final boolean withStats;
+    private final boolean isCaseSensitive;
+
+    ReadDataManifest(Broadcast<Table> table, TableScanContext context, boolean 
withStats) {
+      this.table = table;
+      this.filter = context.rowFilter();
+      this.withStats = withStats;
+      this.isCaseSensitive = context.caseSensitive();
+    }
+
+    @Override
+    public Iterator<DataFile> call(ManifestFileBean manifest) throws Exception 
{
+      FileIO io = table.value().io();
+      Map<Integer, PartitionSpec> specs = table.value().specs();
+      return new ClosingIterator<>(
+          ManifestFiles.read(manifest, io, specs)
+              .select(withStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
+              .filterRows(filter)
+              .caseSensitive(isCaseSensitive)
+              .iterator());
+    }
+  }
+
+  private static class ReadDeleteManifest implements 
FlatMapFunction<ManifestFileBean, DeleteFile> {
+
+    private final Broadcast<Table> table;
+    private final Expression filter;
+    private final boolean isCaseSensitive;
+
+    ReadDeleteManifest(Broadcast<Table> table, TableScanContext context) {
+      this.table = table;
+      this.filter = context.rowFilter();
+      this.isCaseSensitive = context.caseSensitive();
+    }
+
+    @Override
+    public Iterator<DeleteFile> call(ManifestFileBean manifest) throws 
Exception {
+      FileIO io = table.value().io();
+      Map<Integer, PartitionSpec> specs = table.value().specs();
+      return new ClosingIterator<>(
+          ManifestFiles.readDeleteManifest(manifest, io, specs)
+              .select(DELETE_SCAN_WITH_STATS_COLUMNS)
+              .filterRows(filter)
+              .caseSensitive(isCaseSensitive)
+              .iterator());
+    }
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 75e060e556..be0ba7d6bc 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -18,12 +18,16 @@
  */
 package org.apache.iceberg.spark;
 
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
 import java.util.Map;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.SparkConf;
 import org.apache.spark.sql.SparkSession;
 
 /**
@@ -46,6 +50,10 @@ import org.apache.spark.sql.SparkSession;
  */
 public class SparkReadConf {
 
+  private static final String DRIVER_MAX_RESULT_SIZE = 
"spark.driver.maxResultSize";
+  private static final String DRIVER_MAX_RESULT_SIZE_DEFAULT = "1G";
+  private static final long DISTRIBUTED_PLANNING_MIN_RESULT_SIZE = 256L * 1024 
* 1024; // 256 MB
+
   private final SparkSession spark;
   private final Table table;
   private final String branch;
@@ -281,4 +289,43 @@ public class SparkReadConf {
     int numShufflePartitions = 
spark.sessionState().conf().numShufflePartitions();
     return Math.max(defaultParallelism, numShufflePartitions);
   }
+
+  public boolean distributedPlanningEnabled() {
+    return dataPlanningMode() != LOCAL || deletePlanningMode() != LOCAL;
+  }
+
+  public PlanningMode dataPlanningMode() {
+    if (driverMaxResultSize() < DISTRIBUTED_PLANNING_MIN_RESULT_SIZE) {
+      return LOCAL;
+    }
+
+    String modeName =
+        confParser
+            .stringConf()
+            .sessionConf(SparkSQLProperties.DATA_PLANNING_MODE)
+            .tableProperty(TableProperties.DATA_PLANNING_MODE)
+            .defaultValue(TableProperties.PLANNING_MODE_DEFAULT)
+            .parse();
+    return PlanningMode.fromName(modeName);
+  }
+
+  public PlanningMode deletePlanningMode() {
+    if (driverMaxResultSize() < DISTRIBUTED_PLANNING_MIN_RESULT_SIZE) {
+      return LOCAL;
+    }
+
+    String modeName =
+        confParser
+            .stringConf()
+            .sessionConf(SparkSQLProperties.DELETE_PLANNING_MODE)
+            .tableProperty(TableProperties.DELETE_PLANNING_MODE)
+            .defaultValue(TableProperties.PLANNING_MODE_DEFAULT)
+            .parse();
+    return PlanningMode.fromName(modeName);
+  }
+
+  private long driverMaxResultSize() {
+    SparkConf sparkConf = spark.sparkContext().conf();
+    return sparkConf.getSizeAsBytes(DRIVER_MAX_RESULT_SIZE, 
DRIVER_MAX_RESULT_SIZE_DEFAULT);
+  }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 0c08587b4b..9accd6f108 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -68,4 +68,10 @@ public class SparkSQLProperties {
   public static final String COMPRESSION_CODEC = 
"spark.sql.iceberg.compression-codec";
   public static final String COMPRESSION_LEVEL = 
"spark.sql.iceberg.compression-level";
   public static final String COMPRESSION_STRATEGY = 
"spark.sql.iceberg.compression-strategy";
+
+  // Overrides the data planning mode
+  public static final String DATA_PLANNING_MODE = 
"spark.sql.iceberg.data-planning-mode";
+
+  // Overrides the delete planning mode
+  public static final String DELETE_PLANNING_MODE = 
"spark.sql.iceberg.delete-planning-mode";
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index 613e53767e..62f5167526 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -157,6 +157,7 @@ abstract class BaseSparkAction<ThisT> {
                 "content",
                 "path",
                 "length",
+                "0 as sequenceNumber",
                 "partition_spec_id as partitionSpecId",
                 "added_snapshot_id as addedSnapshotId")
             .dropDuplicates("path")
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
index 45647070e6..11ad834244 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/ManifestFileBean.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.actions;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.iceberg.ManifestContent;
@@ -25,7 +26,8 @@ import org.apache.iceberg.ManifestFile;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.Encoders;
 
-public class ManifestFileBean implements ManifestFile {
+/** A serializable bean that contains a bare minimum to read a manifest. */
+public class ManifestFileBean implements ManifestFile, Serializable {
   public static final Encoder<ManifestFileBean> ENCODER = 
Encoders.bean(ManifestFileBean.class);
 
   private String path = null;
@@ -33,6 +35,20 @@ public class ManifestFileBean implements ManifestFile {
   private Integer partitionSpecId = null;
   private Long addedSnapshotId = null;
   private Integer content = null;
+  private Long sequenceNumber = null;
+
+  public static ManifestFileBean fromManifest(ManifestFile manifest) {
+    ManifestFileBean bean = new ManifestFileBean();
+
+    bean.setPath(manifest.path());
+    bean.setLength(manifest.length());
+    bean.setPartitionSpecId(manifest.partitionSpecId());
+    bean.setAddedSnapshotId(manifest.snapshotId());
+    bean.setContent(manifest.content().id());
+    bean.setSequenceNumber(manifest.sequenceNumber());
+
+    return bean;
+  }
 
   public String getPath() {
     return path;
@@ -74,6 +90,14 @@ public class ManifestFileBean implements ManifestFile {
     this.content = content;
   }
 
+  public Long getSequenceNumber() {
+    return sequenceNumber;
+  }
+
+  public void setSequenceNumber(Long sequenceNumber) {
+    this.sequenceNumber = sequenceNumber;
+  }
+
   @Override
   public String path() {
     return path;
@@ -96,7 +120,7 @@ public class ManifestFileBean implements ManifestFile {
 
   @Override
   public long sequenceNumber() {
-    return 0;
+    return sequenceNumber;
   }
 
   @Override
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 3a430cd86f..55b0096bf6 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.MetricsModes;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SparkDistributedDataScan;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -436,8 +437,7 @@ public class SparkScanBuilder
     Schema expectedSchema = schemaWithMetadataColumns();
 
     BatchScan scan =
-        table
-            .newBatchScan()
+        newBatchScan()
             .caseSensitive(caseSensitive)
             .filter(filterExpression())
             .project(expectedSchema)
@@ -625,8 +625,7 @@ public class SparkScanBuilder
     Schema expectedSchema = schemaWithMetadataColumns();
 
     BatchScan scan =
-        table
-            .newBatchScan()
+        newBatchScan()
             .useSnapshot(snapshotId)
             .caseSensitive(caseSensitive)
             .filter(filterExpression())
@@ -714,4 +713,12 @@ public class SparkScanBuilder
   public StructType readSchema() {
     return build().readSchema();
   }
+
+  private BatchScan newBatchScan() {
+    if (table instanceof BaseTable && readConf.distributedPlanningEnabled()) {
+      return new SparkDistributedDataScan(spark, table, readConf);
+    } else {
+      return table.newBatchScan();
+    }
+  }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
new file mode 100644
index 0000000000..47b8dbb1d9
--- /dev/null
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public abstract class SparkDistributedDataScanTestBase
+    extends DataTableScanTestBase<BatchScan, ScanTask, 
ScanTaskGroup<ScanTask>> {
+
+  @Parameters(name = "formatVersion = {0}, dataMode = {1}, deleteMode = {2}")
+  public static Object[] parameters() {
+    return new Object[][] {
+      new Object[] {1, LOCAL, LOCAL},
+      new Object[] {1, LOCAL, DISTRIBUTED},
+      new Object[] {1, DISTRIBUTED, LOCAL},
+      new Object[] {1, DISTRIBUTED, DISTRIBUTED},
+      new Object[] {2, LOCAL, LOCAL},
+      new Object[] {2, LOCAL, DISTRIBUTED},
+      new Object[] {2, DISTRIBUTED, LOCAL},
+      new Object[] {2, DISTRIBUTED, DISTRIBUTED}
+    };
+  }
+
+  protected static SparkSession spark = null;
+
+  private final PlanningMode dataMode;
+  private final PlanningMode deleteMode;
+
+  public SparkDistributedDataScanTestBase(
+      int formatVersion, PlanningMode dataPlanningMode, PlanningMode 
deletePlanningMode) {
+    super(formatVersion);
+    this.dataMode = dataPlanningMode;
+    this.deleteMode = deletePlanningMode;
+  }
+
+  @Before
+  public void configurePlanningModes() {
+    table
+        .updateProperties()
+        .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+        .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+        .commit();
+  }
+
+  @Override
+  protected BatchScan useRef(BatchScan scan, String ref) {
+    return scan.useRef(ref);
+  }
+
+  @Override
+  protected BatchScan useSnapshot(BatchScan scan, long snapshotId) {
+    return scan.useSnapshot(snapshotId);
+  }
+
+  @Override
+  protected BatchScan asOfTime(BatchScan scan, long timestampMillis) {
+    return scan.asOfTime(timestampMillis);
+  }
+
+  @Override
+  protected BatchScan newScan() {
+    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    return new SparkDistributedDataScan(spark, table, readConf);
+  }
+
+  protected static SparkSession initSpark(String serializer) {
+    return SparkSession.builder()
+        .master("local[2]")
+        .config("spark.serializer", serializer)
+        .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+        .getOrCreate();
+  }
+}
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
new file mode 100644
index 0000000000..8ed37db642
--- /dev/null
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestSparkDistributedDataScanDeletes
+    extends DeleteFileIndexTestBase<BatchScan, ScanTask, 
ScanTaskGroup<ScanTask>> {
+
+  @Parameterized.Parameters(name = "dataMode = {0}, deleteMode = {1}")
+  public static Object[] parameters() {
+    return new Object[][] {
+      new Object[] {LOCAL, LOCAL},
+      new Object[] {LOCAL, DISTRIBUTED},
+      new Object[] {DISTRIBUTED, LOCAL},
+      new Object[] {DISTRIBUTED, DISTRIBUTED}
+    };
+  }
+
+  private static SparkSession spark = null;
+
+  private final PlanningMode dataMode;
+  private final PlanningMode deleteMode;
+
+  public TestSparkDistributedDataScanDeletes(
+      PlanningMode dataPlanningMode, PlanningMode deletePlanningMode) {
+    this.dataMode = dataPlanningMode;
+    this.deleteMode = deletePlanningMode;
+  }
+
+  @Before
+  public void configurePlanningModes() {
+    table
+        .updateProperties()
+        .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+        .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+        .commit();
+  }
+
+  @BeforeClass
+  public static void startSpark() {
+    TestSparkDistributedDataScanDeletes.spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+            .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestSparkDistributedDataScanDeletes.spark;
+    TestSparkDistributedDataScanDeletes.spark = null;
+    currentSpark.stop();
+  }
+
+  @Override
+  protected BatchScan newScan(Table table) {
+    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    return new SparkDistributedDataScan(spark, table, readConf);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
new file mode 100644
index 0000000000..510c130a58
--- /dev/null
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestSparkDistributedDataScanFilterFiles
+    extends FilterFilesTestBase<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> {
+
+  @Parameters(name = "formatVersion = {0}, dataMode = {1}, deleteMode = {2}")
+  public static Object[] parameters() {
+    return new Object[][] {
+      new Object[] {1, LOCAL, LOCAL},
+      new Object[] {1, LOCAL, DISTRIBUTED},
+      new Object[] {1, DISTRIBUTED, LOCAL},
+      new Object[] {1, DISTRIBUTED, DISTRIBUTED},
+      new Object[] {2, LOCAL, LOCAL},
+      new Object[] {2, LOCAL, DISTRIBUTED},
+      new Object[] {2, DISTRIBUTED, LOCAL},
+      new Object[] {2, DISTRIBUTED, DISTRIBUTED}
+    };
+  }
+
+  private static SparkSession spark = null;
+
+  private final PlanningMode dataMode;
+  private final PlanningMode deleteMode;
+
+  public TestSparkDistributedDataScanFilterFiles(
+      int formatVersion, PlanningMode dataPlanningMode, PlanningMode 
deletePlanningMode) {
+    super(formatVersion);
+    this.dataMode = dataPlanningMode;
+    this.deleteMode = deletePlanningMode;
+  }
+
+  @BeforeClass
+  public static void startSpark() {
+    TestSparkDistributedDataScanFilterFiles.spark =
+        SparkSession.builder()
+            .master("local[2]")
+            .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+            .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
+            .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestSparkDistributedDataScanFilterFiles.spark;
+    TestSparkDistributedDataScanFilterFiles.spark = null;
+    currentSpark.stop();
+  }
+
+  @Override
+  protected BatchScan newScan(Table table) {
+    table
+        .updateProperties()
+        .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
+        .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
+        .commit();
+    SparkReadConf readConf = new SparkReadConf(spark, table, 
ImmutableMap.of());
+    return new SparkDistributedDataScan(spark, table, readConf);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanJavaSerialization.java
similarity index 50%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to 
spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanJavaSerialization.java
index 102f48ee19..ba1096ee36 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanJavaSerialization.java
@@ -16,22 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
 
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 
-public class ScanMetricsUtil {
+public class TestSparkDistributedDataScanJavaSerialization
+    extends SparkDistributedDataScanTestBase {
 
-  private ScanMetricsUtil() {}
+  public TestSparkDistributedDataScanJavaSerialization(
+      int formatVersion, PlanningMode dataPlanningMode, PlanningMode 
deletePlanningMode) {
+    super(formatVersion, dataPlanningMode, deletePlanningMode);
+  }
 
-  public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile 
deleteFile) {
-    metrics.indexedDeleteFiles().increment();
+  @BeforeClass
+  public static void startSpark() {
+    SparkDistributedDataScanTestBase.spark =
+        initSpark("org.apache.spark.serializer.JavaSerializer");
+  }
 
-    if (deleteFile.content() == FileContent.POSITION_DELETES) {
-      metrics.positionalDeleteFiles().increment();
-    } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
-      metrics.equalityDeleteFiles().increment();
-    }
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = SparkDistributedDataScanTestBase.spark;
+    SparkDistributedDataScanTestBase.spark = null;
+    currentSpark.stop();
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanKryoSerialization.java
similarity index 50%
copy from core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
copy to 
spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanKryoSerialization.java
index 102f48ee19..7a795eb477 100644
--- a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanKryoSerialization.java
@@ -16,22 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.metrics;
+package org.apache.iceberg;
 
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 
-public class ScanMetricsUtil {
+public class TestSparkDistributedDataScanKryoSerialization
+    extends SparkDistributedDataScanTestBase {
 
-  private ScanMetricsUtil() {}
+  public TestSparkDistributedDataScanKryoSerialization(
+      int formatVersion, PlanningMode dataPlanningMode, PlanningMode 
deletePlanningMode) {
+    super(formatVersion, dataPlanningMode, deletePlanningMode);
+  }
 
-  public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile 
deleteFile) {
-    metrics.indexedDeleteFiles().increment();
+  @BeforeClass
+  public static void startSpark() {
+    SparkDistributedDataScanTestBase.spark =
+        initSpark("org.apache.spark.serializer.KryoSerializer");
+  }
 
-    if (deleteFile.content() == FileContent.POSITION_DELETES) {
-      metrics.positionalDeleteFiles().increment();
-    } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
-      metrics.equalityDeleteFiles().increment();
-    }
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = SparkDistributedDataScanTestBase.spark;
+    SparkDistributedDataScanTestBase.spark = null;
+    currentSpark.stop();
   }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java
index 2537ff5f80..6581598945 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -111,4 +113,18 @@ public abstract class SparkTestBaseWithCatalog extends 
SparkTestBase {
     return PropertyUtil.propertyAsBoolean(
         catalogConfig, CatalogProperties.CACHE_ENABLED, 
CatalogProperties.CACHE_ENABLED_DEFAULT);
   }
+
+  protected void configurePlanningMode(PlanningMode planningMode) {
+    configurePlanningMode(tableName, planningMode);
+  }
+
+  protected void configurePlanningMode(String table, PlanningMode 
planningMode) {
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s', '%s' '%s')",
+        table,
+        TableProperties.DATA_PLANNING_MODE,
+        planningMode.modeName(),
+        TableProperties.DELETE_PLANNING_MODE,
+        planningMode.modeName());
+  }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index 22db454a6e..e8af5e51ec 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -19,6 +19,8 @@
 package org.apache.iceberg.spark.source;
 
 import static org.apache.iceberg.Files.localOutput;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
 import static 
org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp;
 import static org.apache.spark.sql.functions.callUDF;
 import static org.apache.spark.sql.functions.column;
@@ -37,8 +39,10 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
@@ -151,21 +155,23 @@ public class TestFilteredScan {
 
   private final String format;
   private final boolean vectorized;
+  private final PlanningMode planningMode;
 
-  @Parameterized.Parameters(name = "format = {0}, vectorized = {1}")
+  @Parameterized.Parameters(name = "format = {0}, vectorized = {1}, 
planningMode = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      {"parquet", false},
-      {"parquet", true},
-      {"avro", false},
-      {"orc", false},
-      {"orc", true}
+      {"parquet", false, LOCAL},
+      {"parquet", true, DISTRIBUTED},
+      {"avro", false, LOCAL},
+      {"orc", false, DISTRIBUTED},
+      {"orc", true, LOCAL}
     };
   }
 
-  public TestFilteredScan(String format, boolean vectorized) {
+  public TestFilteredScan(String format, boolean vectorized, PlanningMode 
planningMode) {
     this.format = format;
     this.vectorized = vectorized;
+    this.planningMode = planningMode;
   }
 
   private File parent = null;
@@ -179,7 +185,16 @@ public class TestFilteredScan {
     File dataFolder = new File(unpartitioned, "data");
     Assert.assertTrue("Mkdir should succeed", dataFolder.mkdirs());
 
-    Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
unpartitioned.toString());
+    Table table =
+        TABLES.create(
+            SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.of(
+                TableProperties.DATA_PLANNING_MODE,
+                planningMode.modeName(),
+                TableProperties.DELETE_PLANNING_MODE,
+                planningMode.modeName()),
+            unpartitioned.toString());
     Schema tableSchema = table.schema(); // use the table schema because ids 
are reassigned
 
     FileFormat fileFormat = FileFormat.fromString(format);
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
index 7313c18cc0..45a523917f 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
@@ -18,12 +18,16 @@
  */
 package org.apache.iceberg.spark.source;
 
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
 import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -51,23 +55,29 @@ public class TestIdentityPartitionData extends 
SparkTestBase {
   private static final Configuration CONF = new Configuration();
   private static final HadoopTables TABLES = new HadoopTables(CONF);
 
-  @Parameterized.Parameters(name = "format = {0}, vectorized = {1}")
+  @Parameterized.Parameters(name = "format = {0}, vectorized = {1}, 
planningMode = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      {"parquet", false},
-      {"parquet", true},
-      {"avro", false},
-      {"orc", false},
-      {"orc", true},
+      {"parquet", false, LOCAL},
+      {"parquet", true, DISTRIBUTED},
+      {"avro", false, LOCAL},
+      {"orc", false, DISTRIBUTED},
+      {"orc", true, LOCAL},
     };
   }
 
   private final String format;
   private final boolean vectorized;
+  private final Map<String, String> properties;
 
-  public TestIdentityPartitionData(String format, boolean vectorized) {
+  public TestIdentityPartitionData(String format, boolean vectorized, 
PlanningMode planningMode) {
     this.format = format;
     this.vectorized = vectorized;
+    this.properties =
+        ImmutableMap.of(
+            TableProperties.DEFAULT_FILE_FORMAT, format,
+            TableProperties.DATA_PLANNING_MODE, planningMode.modeName(),
+            TableProperties.DELETE_PLANNING_MODE, planningMode.modeName());
   }
 
   private static final Schema LOG_SCHEMA =
@@ -108,7 +118,6 @@ public class TestIdentityPartitionData extends 
SparkTestBase {
     String hiveTable = "hivetable";
     Assert.assertTrue("Temp folder should exist", location.exists());
 
-    Map<String, String> properties = 
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
     this.logs =
         spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", 
"level", "message");
     spark.sql(String.format("DROP TABLE IF EXISTS %s", hiveTable));
@@ -138,7 +147,6 @@ public class TestIdentityPartitionData extends 
SparkTestBase {
       File location = temp.newFolder("logs");
       Assert.assertTrue("Temp folder should exist", location.exists());
 
-      Map<String, String> properties = 
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
       this.table = TABLES.create(LOG_SCHEMA, spec, properties, 
location.toString());
       this.logs =
           spark.createDataFrame(LOGS, LogMessage.class).select("id", "date", 
"level", "message");
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
index 4ef022c50c..c00549c68f 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iceberg.spark.source;
 
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
@@ -37,6 +40,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -74,23 +78,25 @@ public class TestPartitionPruning {
   private static final Configuration CONF = new Configuration();
   private static final HadoopTables TABLES = new HadoopTables(CONF);
 
-  @Parameterized.Parameters(name = "format = {0}, vectorized = {1}")
+  @Parameterized.Parameters(name = "format = {0}, vectorized = {1}, 
planningMode = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      {"parquet", false},
-      {"parquet", true},
-      {"avro", false},
-      {"orc", false},
-      {"orc", true}
+      {"parquet", false, DISTRIBUTED},
+      {"parquet", true, LOCAL},
+      {"avro", false, DISTRIBUTED},
+      {"orc", false, LOCAL},
+      {"orc", true, DISTRIBUTED}
     };
   }
 
   private final String format;
   private final boolean vectorized;
+  private final PlanningMode planningMode;
 
-  public TestPartitionPruning(String format, boolean vectorized) {
+  public TestPartitionPruning(String format, boolean vectorized, PlanningMode 
planningMode) {
     this.format = format;
     this.vectorized = vectorized;
+    this.planningMode = planningMode;
   }
 
   private static SparkSession spark = null;
@@ -293,7 +299,11 @@ public class TestPartitionPruning {
 
   private Table createTable(File originTableLocation) {
     String trackedTableLocation = 
CountOpenLocalFileSystem.convertPath(originTableLocation);
-    Map<String, String> properties = 
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format);
+    Map<String, String> properties =
+        ImmutableMap.of(
+            TableProperties.DEFAULT_FILE_FORMAT, format,
+            TableProperties.DATA_PLANNING_MODE, planningMode.modeName(),
+            TableProperties.DELETE_PLANNING_MODE, planningMode.modeName());
     return TABLES.create(LOG_SCHEMA, spec, properties, trackedTableLocation);
   }
 
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
index beaf7b75c6..edd4cdf083 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRuntimeFiltering.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.spark.source;
 
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
 import static org.apache.spark.sql.functions.date_add;
 import static org.apache.spark.sql.functions.expr;
 
@@ -27,6 +29,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -41,9 +44,23 @@ import 
org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class TestRuntimeFiltering extends SparkTestBaseWithCatalog {
 
+  @Parameterized.Parameters(name = "planningMode = {0}")
+  public static Object[] parameters() {
+    return new Object[] {LOCAL, DISTRIBUTED};
+  }
+
+  private final PlanningMode planningMode;
+
+  public TestRuntimeFiltering(PlanningMode planningMode) {
+    this.planningMode = planningMode;
+  }
+
   @After
   public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
@@ -57,6 +74,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (date)",
         tableName);
+    configurePlanningMode(planningMode);
 
     Dataset<Row> df =
         spark
@@ -95,6 +113,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (bucket(8, id))",
         tableName);
+    configurePlanningMode(planningMode);
 
     Dataset<Row> df =
         spark
@@ -133,6 +152,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (bucket(8, id))",
         tableName);
+    configurePlanningMode(planningMode);
 
     Dataset<Row> df =
         spark
@@ -173,6 +193,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (data, bucket(8, id))",
         tableName);
+    configurePlanningMode(planningMode);
 
     Dataset<Row> df =
         spark
@@ -215,6 +236,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (data, bucket(8, id))",
         tableName);
+    configurePlanningMode(planningMode);
 
     Dataset<Row> df =
         spark
@@ -256,6 +278,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
     sql(
         "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) 
USING iceberg",
         tableName);
+    configurePlanningMode(planningMode);
 
     Dataset<Row> df1 =
         spark
@@ -309,6 +332,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (bucket(8, `i.d`))",
         tableName);
+    configurePlanningMode(planningMode);
 
     Dataset<Row> df =
         spark
@@ -352,6 +376,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (bucket(8, `i``d`))",
         tableName);
+    configurePlanningMode(planningMode);
 
     Dataset<Row> df =
         spark
@@ -390,6 +415,7 @@ public class TestRuntimeFiltering extends 
SparkTestBaseWithCatalog {
     sql(
         "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) 
USING iceberg",
         tableName);
+    configurePlanningMode(planningMode);
 
     Dataset<Row> df =
         spark
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
index 276fbcd592..f1374c050d 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -18,16 +18,22 @@
  */
 package org.apache.iceberg.spark.source;
 
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkReadOptions;
@@ -43,9 +49,17 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class TestSnapshotSelection {
 
+  @Parameterized.Parameters(name = "planningMode = {0}")
+  public static Object[] parameters() {
+    return new Object[] {LOCAL, DISTRIBUTED};
+  }
+
   private static final Configuration CONF = new Configuration();
   private static final Schema SCHEMA =
       new Schema(
@@ -55,6 +69,15 @@ public class TestSnapshotSelection {
 
   private static SparkSession spark = null;
 
+  private final Map<String, String> properties;
+
+  public TestSnapshotSelection(PlanningMode planningMode) {
+    this.properties =
+        ImmutableMap.of(
+            TableProperties.DATA_PLANNING_MODE, planningMode.modeName(),
+            TableProperties.DELETE_PLANNING_MODE, planningMode.modeName());
+  }
+
   @BeforeClass
   public static void startSpark() {
     TestSnapshotSelection.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
@@ -73,7 +96,7 @@ public class TestSnapshotSelection {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    Table table = tables.create(SCHEMA, spec, tableLocation);
+    Table table = tables.create(SCHEMA, spec, properties, tableLocation);
 
     // produce the first snapshot
     List<SimpleRecord> firstBatchRecords =
@@ -118,7 +141,7 @@ public class TestSnapshotSelection {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    Table table = tables.create(SCHEMA, spec, tableLocation);
+    Table table = tables.create(SCHEMA, spec, properties, tableLocation);
 
     // produce the first snapshot
     List<SimpleRecord> firstBatchRecords =
@@ -168,7 +191,7 @@ public class TestSnapshotSelection {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    tables.create(SCHEMA, spec, tableLocation);
+    tables.create(SCHEMA, spec, properties, tableLocation);
 
     Dataset<Row> df = spark.read().format("iceberg").option("snapshot-id", 
-10).load(tableLocation);
 
@@ -184,7 +207,7 @@ public class TestSnapshotSelection {
     String tableLocation = temp.newFolder("iceberg-table").toString();
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    tables.create(SCHEMA, spec, tableLocation);
+    tables.create(SCHEMA, spec, properties, tableLocation);
 
     Assertions.assertThatThrownBy(
             () ->
@@ -203,7 +226,7 @@ public class TestSnapshotSelection {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    Table table = tables.create(SCHEMA, spec, tableLocation);
+    Table table = tables.create(SCHEMA, spec, properties, tableLocation);
 
     List<SimpleRecord> firstBatchRecords =
         Lists.newArrayList(
@@ -235,7 +258,7 @@ public class TestSnapshotSelection {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    Table table = tables.create(SCHEMA, spec, tableLocation);
+    Table table = tables.create(SCHEMA, spec, properties, tableLocation);
 
     // produce the first snapshot
     List<SimpleRecord> firstBatchRecords =
@@ -270,7 +293,7 @@ public class TestSnapshotSelection {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    Table table = tables.create(SCHEMA, spec, tableLocation);
+    Table table = tables.create(SCHEMA, spec, properties, tableLocation);
 
     // produce the first snapshot
     List<SimpleRecord> firstBatchRecords =
@@ -305,7 +328,7 @@ public class TestSnapshotSelection {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    Table table = tables.create(SCHEMA, spec, tableLocation);
+    Table table = tables.create(SCHEMA, spec, properties, tableLocation);
 
     // produce the first snapshot
     List<SimpleRecord> firstBatchRecords =
@@ -336,7 +359,7 @@ public class TestSnapshotSelection {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    Table table = tables.create(SCHEMA, spec, tableLocation);
+    Table table = tables.create(SCHEMA, spec, properties, tableLocation);
 
     List<SimpleRecord> firstBatchRecords =
         Lists.newArrayList(
@@ -379,7 +402,7 @@ public class TestSnapshotSelection {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    Table table = tables.create(SCHEMA, spec, tableLocation);
+    Table table = tables.create(SCHEMA, spec, properties, tableLocation);
 
     // produce the first snapshot
     List<SimpleRecord> firstBatchRecords =
@@ -420,7 +443,7 @@ public class TestSnapshotSelection {
 
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
-    Table table = tables.create(SCHEMA, spec, tableLocation);
+    Table table = tables.create(SCHEMA, spec, properties, tableLocation);
 
     // produce the first snapshot
     List<SimpleRecord> firstBatchRecords =
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
index dde1eb7b36..3a4b235c46 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
@@ -19,6 +19,8 @@
 package org.apache.iceberg.spark.source;
 
 import static org.apache.iceberg.Files.localOutput;
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
@@ -31,8 +33,10 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.FileAppender;
@@ -58,24 +62,26 @@ public class TestSparkReadProjection extends 
TestReadProjection {
 
   private static SparkSession spark = null;
 
-  @Parameterized.Parameters(name = "format = {0}, vectorized = {1}")
+  @Parameterized.Parameters(name = "format = {0}, vectorized = {1}, 
planningMode = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      {"parquet", false},
-      {"parquet", true},
-      {"avro", false},
-      {"orc", false},
-      {"orc", true}
+      {"parquet", false, LOCAL},
+      {"parquet", true, DISTRIBUTED},
+      {"avro", false, LOCAL},
+      {"orc", false, DISTRIBUTED},
+      {"orc", true, LOCAL}
     };
   }
 
   private final FileFormat format;
   private final boolean vectorized;
+  private final PlanningMode planningMode;
 
-  public TestSparkReadProjection(String format, boolean vectorized) {
+  public TestSparkReadProjection(String format, boolean vectorized, 
PlanningMode planningMode) {
     super(format);
     this.format = FileFormat.fromString(format);
     this.vectorized = vectorized;
+    this.planningMode = planningMode;
   }
 
   @BeforeClass
@@ -111,7 +117,15 @@ public class TestSparkReadProjection extends 
TestReadProjection {
 
     File testFile = new File(dataFolder, 
format.addExtension(UUID.randomUUID().toString()));
 
-    Table table = TestTables.create(location, desc, writeSchema, 
PartitionSpec.unpartitioned());
+    Table table =
+        TestTables.create(
+            location,
+            desc,
+            writeSchema,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.of(
+                TableProperties.DATA_PLANNING_MODE, planningMode.modeName(),
+                TableProperties.DELETE_PLANNING_MODE, 
planningMode.modeName()));
     try {
       // Important: use the table's schema for the rest of the test
       // When tables are created, the column ids are reassigned.
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
index 0ea34e187f..e7401a00e8 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
@@ -18,9 +18,13 @@
  */
 package org.apache.iceberg.spark.sql;
 
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -29,9 +33,23 @@ import org.apache.spark.sql.execution.SparkPlan;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class TestFilterPushDown extends SparkTestBaseWithCatalog {
 
+  @Parameterized.Parameters(name = "planningMode = {0}")
+  public static Object[] parameters() {
+    return new Object[] {LOCAL, DISTRIBUTED};
+  }
+
+  private final PlanningMode planningMode;
+
+  public TestFilterPushDown(PlanningMode planningMode) {
+    this.planningMode = planningMode;
+  }
+
   @After
   public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
@@ -45,6 +63,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (dep)",
         tableName);
+    configurePlanningMode(planningMode);
 
     sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
     sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
@@ -156,6 +175,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (hours(t))",
         tableName);
+    configurePlanningMode(planningMode);
 
     sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP 
'2021-06-30T01:00:00.000Z')", tableName);
     sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP 
'2021-06-30T02:00:00.000Z')", tableName);
@@ -201,6 +221,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (days(t))",
         tableName);
+    configurePlanningMode(planningMode);
 
     sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP 
'2021-06-15T01:00:00.000Z')", tableName);
     sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP 
'2021-06-30T02:00:00.000Z')", tableName);
@@ -243,6 +264,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (months(t))",
         tableName);
+    configurePlanningMode(planningMode);
 
     sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP 
'2021-06-30T01:00:00.000Z')", tableName);
     sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP 
'2021-06-30T02:00:00.000Z')", tableName);
@@ -285,6 +307,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (years(t))",
         tableName);
+    configurePlanningMode(planningMode);
 
     sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP 
'2021-06-30T01:00:00.000Z')", tableName);
     sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP 
'2021-06-30T02:00:00.000Z')", tableName);
@@ -327,6 +350,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (dep, bucket(8, id))",
         tableName);
+    configurePlanningMode(planningMode);
 
     sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
     sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
@@ -345,6 +369,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (truncate(1, dep))",
         tableName);
+    configurePlanningMode(planningMode);
 
     sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
     sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
@@ -369,6 +394,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (dep)",
         tableName);
+    configurePlanningMode(planningMode);
 
     sql("INSERT INTO %s VALUES (1, 100, 'd1', 'sd1')", tableName);
 
@@ -409,6 +435,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (truncate(2, dep))",
         tableName);
+    configurePlanningMode(planningMode);
 
     sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
 
@@ -449,6 +476,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
             + "USING iceberg "
             + "PARTITIONED BY (hours(t))",
         tableName);
+    configurePlanningMode(planningMode);
 
     withDefaultTimeZone(
         "UTC",
@@ -483,6 +511,7 @@ public class TestFilterPushDown extends 
SparkTestBaseWithCatalog {
     sql(
         "CREATE TABLE %s (id INT, salary DOUBLE)" + "USING iceberg " + 
"PARTITIONED BY (salary)",
         tableName);
+    configurePlanningMode(planningMode);
 
     sql("INSERT INTO %s VALUES (1, 100.5)", tableName);
     sql("INSERT INTO %s VALUES (2, double('NaN'))", tableName);
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
index ebb190b698..8a1ec5060f 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
@@ -18,10 +18,14 @@
  */
 package org.apache.iceberg.spark.sql;
 
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.PlanningMode;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -44,9 +48,17 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog {
 
+  @Parameterized.Parameters(name = "planningMode = {0}")
+  public static Object[] parameters() {
+    return new Object[] {LOCAL, DISTRIBUTED};
+  }
+
   private static final String OTHER_TABLE_NAME = "other_table";
 
   // open file cost and split size are set as 16 MB to produce a split per file
@@ -84,6 +96,12 @@ public class TestStoragePartitionedJoins extends 
SparkTestBaseWithCatalog {
           SparkSQLProperties.PRESERVE_DATA_GROUPING,
           "true");
 
+  private final PlanningMode planningMode;
+
+  public TestStoragePartitionedJoins(PlanningMode planningMode) {
+    this.planningMode = planningMode;
+  }
+
   @BeforeClass
   public static void setupSparkConf() {
     spark.conf().set("spark.sql.shuffle.partitions", "4");
@@ -564,6 +582,7 @@ public class TestStoragePartitionedJoins extends 
SparkTestBaseWithCatalog {
         sourceColumnType,
         transform,
         tablePropsAsString(TABLE_PROPERTIES));
+    configurePlanningMode(tableName, planningMode);
 
     sql(
         createTableStmt,
@@ -572,6 +591,7 @@ public class TestStoragePartitionedJoins extends 
SparkTestBaseWithCatalog {
         sourceColumnType,
         transform,
         tablePropsAsString(TABLE_PROPERTIES));
+    configurePlanningMode(tableName(OTHER_TABLE_NAME), planningMode);
 
     Table table = validationCatalog.loadTable(tableIdent);
     Dataset<Row> dataDF = randomDataDF(table.schema(), 200);


Reply via email to