This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 684f7a767c Core, Spark 3.5: Read deletes in parallel and cache them on 
executors (#8755)
684f7a767c is described below

commit 684f7a767c2c216a402b60b73d2d55ef605921a0
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Jan 16 10:21:26 2024 -0800

    Core, Spark 3.5: Read deletes in parallel and cache them on executors 
(#8755)
---
 .../java/org/apache/iceberg/types/TypeUtil.java    |  64 +++
 .../java/org/apache/iceberg/SystemConfigs.java     |   6 +-
 .../iceberg/deletes/BitmapPositionDeleteIndex.java |   4 +
 .../java/org/apache/iceberg/deletes/Deletes.java   |  30 ++
 ...eteIndex.java => EmptyPositionDeleteIndex.java} |  24 +-
 .../iceberg/deletes/PositionDeleteIndex.java       |  10 +
 ...leteIndex.java => PositionDeleteIndexUtil.java} |  37 +-
 .../java/org/apache/iceberg/util/ThreadPools.java  |   5 +-
 .../org/apache/iceberg/data/BaseDeleteLoader.java  | 261 +++++++++++
 .../java/org/apache/iceberg/data/DeleteFilter.java | 107 +----
 .../java/org/apache/iceberg/data/DeleteLoader.java |  45 ++
 spark/v3.5/build.gradle                            |   2 +
 .../org/apache/iceberg/spark/SparkConfParser.java  |  41 ++
 .../apache/iceberg/spark/SparkExecutorCache.java   | 228 ++++++++++
 .../apache/iceberg/spark/SparkSQLProperties.java   |  16 +
 .../apache/iceberg/spark/source/BaseReader.java    |  29 ++
 .../spark/source/SerializableTableWithSize.java    |  10 +
 .../java/org/apache/iceberg/spark/Employee.java    |  49 +-
 .../iceberg/spark/TestSparkExecutorCache.java      | 503 +++++++++++++++++++++
 .../apache/iceberg/spark/TestSparkWriteConf.java   |  23 +
 20 files changed, 1360 insertions(+), 134 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java 
b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index d2f821ea00..7c13d60940 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -40,6 +40,8 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
 public class TypeUtil {
 
+  private static final int HEADER_SIZE = 12;
+
   private TypeUtil() {}
 
   /**
@@ -452,6 +454,68 @@ public class TypeUtil {
     }
   }
 
+  /**
+   * Estimates the number of bytes a value for a given field may occupy in 
memory.
+   *
+   * <p>This method approximates the memory size based on heuristics and the 
internal Java
+   * representation defined by {@link Type.TypeID}. It is important to note 
that the actual size
+   * might differ from this estimation. The method is designed to handle a 
variety of data types,
+   * including primitive types, strings, and nested types such as structs, 
maps, and lists.
+   *
+   * @param field a field for which to estimate the size
+   * @return the estimated size in bytes of the field's value in memory
+   */
+  public static int estimateSize(Types.NestedField field) {
+    return estimateSize(field.type());
+  }
+
+  private static int estimateSize(Type type) {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        // the size of a boolean variable is virtual machine dependent
+        // it is common to believe booleans occupy 1 byte in most JVMs
+        return 1;
+      case INTEGER:
+      case FLOAT:
+      case DATE:
+        // ints and floats occupy 4 bytes
+        // dates are internally represented as ints
+        return 4;
+      case LONG:
+      case DOUBLE:
+      case TIME:
+      case TIMESTAMP:
+        // longs and doubles occupy 8 bytes
+        // times and timestamps are internally represented as longs
+        return 8;
+      case STRING:
+        // 12 (header) + 6 (fields) + 16 (array overhead) + 20 (10 chars, 2 
bytes each) = 54 bytes
+        return 54;
+      case UUID:
+        // 12 (header) + 16 (two long variables) = 28 bytes
+        return 28;
+      case FIXED:
+        return ((Types.FixedType) type).length();
+      case BINARY:
+        return 80;
+      case DECIMAL:
+        // 12 (header) + (12 + 12 + 4) (BigInteger) + 4 (scale) = 44 bytes
+        return 44;
+      case STRUCT:
+        Types.StructType struct = (Types.StructType) type;
+        return HEADER_SIZE + 
struct.fields().stream().mapToInt(TypeUtil::estimateSize).sum();
+      case LIST:
+        Types.ListType list = (Types.ListType) type;
+        return HEADER_SIZE + 5 * estimateSize(list.elementType());
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        int entrySize = HEADER_SIZE + estimateSize(map.keyType()) + 
estimateSize(map.valueType());
+        return HEADER_SIZE + 5 * entrySize;
+      default:
+        return 16;
+    }
+  }
+
   /** Interface for passing a function that assigns column IDs. */
   public interface NextID {
     int get();
diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java 
b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
index ce183ec5d4..feac1f61a1 100644
--- a/core/src/main/java/org/apache/iceberg/SystemConfigs.java
+++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
@@ -43,14 +43,14 @@ public class SystemConfigs {
           Integer::parseUnsignedInt);
 
   /**
-   * Sets the size of the delete worker pool. This limits the number of 
threads used to compute the
-   * PositionDeleteIndex from the position deletes for a data file.
+   * Sets the size of the delete worker pool. This limits the number of 
threads used to read delete
+   * files for a data file.
    */
   public static final ConfigEntry<Integer> DELETE_WORKER_THREAD_POOL_SIZE =
       new ConfigEntry<>(
           "iceberg.worker.delete-num-threads",
           "ICEBERG_WORKER_DELETE_NUM_THREADS",
-          Math.max(2, Runtime.getRuntime().availableProcessors()),
+          Math.max(2, 4 * Runtime.getRuntime().availableProcessors()),
           Integer::parseUnsignedInt);
 
   /** Whether to use the shared worker pool when planning table scans. */
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java 
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
index 7690ab7e48..97699c1c91 100644
--- 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++ 
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -27,6 +27,10 @@ class BitmapPositionDeleteIndex implements 
PositionDeleteIndex {
     roaring64Bitmap = new Roaring64Bitmap();
   }
 
+  void merge(BitmapPositionDeleteIndex that) {
+    roaring64Bitmap.or(that.roaring64Bitmap);
+  }
+
   @Override
   public void delete(long position) {
     roaring64Bitmap.add(position);
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java 
b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index 118987e24b..ff20ba53ff 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -37,6 +37,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceMap;
 import org.apache.iceberg.util.Filter;
 import org.apache.iceberg.util.ParallelIterable;
 import org.apache.iceberg.util.SortedMerge;
@@ -128,6 +129,35 @@ public class Deletes {
     }
   }
 
+  /**
+   * Builds a map of position delete indexes by path.
+   *
+   * <p>Unlike {@link #toPositionIndex(CharSequence, List)}, this method 
builds a position delete
+   * index for each referenced data file and does not filter deletes. This can 
be useful when the
+   * entire delete file content is needed (e.g. caching).
+   *
+   * @param posDeletes position deletes
+   * @return the map of position delete indexes by path
+   */
+  public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> 
toPositionIndexes(
+      CloseableIterable<T> posDeletes) {
+    CharSequenceMap<PositionDeleteIndex> indexes = CharSequenceMap.create();
+
+    try (CloseableIterable<T> deletes = posDeletes) {
+      for (T delete : deletes) {
+        CharSequence filePath = (CharSequence) FILENAME_ACCESSOR.get(delete);
+        long position = (long) POSITION_ACCESSOR.get(delete);
+        PositionDeleteIndex index =
+            indexes.computeIfAbsent(filePath, key -> new 
BitmapPositionDeleteIndex());
+        index.delete(position);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close position delete source", 
e);
+    }
+
+    return indexes;
+  }
+
   public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
       CharSequence dataLocation, List<CloseableIterable<T>> deleteFiles) {
     return toPositionIndex(dataLocation, deleteFiles, 
ThreadPools.getDeleteWorkerPool());
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java 
b/core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java
similarity index 66%
copy from 
core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
copy to 
core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java
index 7690ab7e48..660e01038c 100644
--- 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++ 
b/core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java
@@ -18,32 +18,38 @@
  */
 package org.apache.iceberg.deletes;
 
-import org.roaringbitmap.longlong.Roaring64Bitmap;
+class EmptyPositionDeleteIndex implements PositionDeleteIndex {
 
-class BitmapPositionDeleteIndex implements PositionDeleteIndex {
-  private final Roaring64Bitmap roaring64Bitmap;
+  private static final EmptyPositionDeleteIndex INSTANCE = new 
EmptyPositionDeleteIndex();
 
-  BitmapPositionDeleteIndex() {
-    roaring64Bitmap = new Roaring64Bitmap();
+  private EmptyPositionDeleteIndex() {}
+
+  static EmptyPositionDeleteIndex get() {
+    return INSTANCE;
   }
 
   @Override
   public void delete(long position) {
-    roaring64Bitmap.add(position);
+    throw new UnsupportedOperationException("Cannot modify " + 
getClass().getName());
   }
 
   @Override
   public void delete(long posStart, long posEnd) {
-    roaring64Bitmap.add(posStart, posEnd);
+    throw new UnsupportedOperationException("Cannot modify " + 
getClass().getName());
   }
 
   @Override
   public boolean isDeleted(long position) {
-    return roaring64Bitmap.contains(position);
+    return false;
   }
 
   @Override
   public boolean isEmpty() {
-    return roaring64Bitmap.isEmpty();
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "PositionDeleteIndex{}";
   }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java 
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
index bcfa9f2cf5..be05875aeb 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
@@ -44,4 +44,14 @@ public interface PositionDeleteIndex {
 
   /** Returns true if this collection contains no element. */
   boolean isEmpty();
+
+  /** Returns true if this collection contains elements. */
+  default boolean isNotEmpty() {
+    return !isEmpty();
+  }
+
+  /** Returns an empty immutable position delete index. */
+  static PositionDeleteIndex empty() {
+    return EmptyPositionDeleteIndex.get();
+  }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java 
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java
similarity index 55%
copy from 
core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
copy to 
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java
index 7690ab7e48..0c3bff28ee 100644
--- 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java
@@ -18,32 +18,25 @@
  */
 package org.apache.iceberg.deletes;
 
-import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
-class BitmapPositionDeleteIndex implements PositionDeleteIndex {
-  private final Roaring64Bitmap roaring64Bitmap;
+public class PositionDeleteIndexUtil {
 
-  BitmapPositionDeleteIndex() {
-    roaring64Bitmap = new Roaring64Bitmap();
-  }
+  private PositionDeleteIndexUtil() {}
 
-  @Override
-  public void delete(long position) {
-    roaring64Bitmap.add(position);
-  }
+  public static PositionDeleteIndex merge(Iterable<? extends 
PositionDeleteIndex> indexes) {
+    BitmapPositionDeleteIndex result = new BitmapPositionDeleteIndex();
 
-  @Override
-  public void delete(long posStart, long posEnd) {
-    roaring64Bitmap.add(posStart, posEnd);
-  }
-
-  @Override
-  public boolean isDeleted(long position) {
-    return roaring64Bitmap.contains(position);
-  }
+    for (PositionDeleteIndex index : indexes) {
+      if (index.isNotEmpty()) {
+        Preconditions.checkArgument(
+            index instanceof BitmapPositionDeleteIndex,
+            "Can merge only bitmap-based indexes, got %s",
+            index.getClass().getName());
+        result.merge((BitmapPositionDeleteIndex) index);
+      }
+    }
 
-  @Override
-  public boolean isEmpty() {
-    return roaring64Bitmap.isEmpty();
+    return result;
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java 
b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
index 4f2314fec9..ced121c03c 100644
--- a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
+++ b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
@@ -68,8 +68,9 @@ public class ThreadPools {
   /**
    * Return an {@link ExecutorService} that uses the "delete worker" 
thread-pool.
    *
-   * <p>The size of the delete worker pool limits the number of threads used 
to compute the
-   * PositionDeleteIndex from the position deletes for a data file.
+   * <p>The size of this worker pool limits the number of tasks concurrently 
reading delete files
+   * within a single JVM. If there are multiple threads loading deletes, all 
of them will share this
+   * worker pool by default.
    *
    * <p>The size of this thread-pool is controlled by the Java system property 
{@code
    * iceberg.worker.delete-num-threads}.
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java 
b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
new file mode 100644
index 0000000000..be346467e0
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.deletes.PositionDeleteIndexUtil;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeleteLoader implements DeleteLoader {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseDeleteLoader.class);
+  private static final Schema POS_DELETE_SCHEMA = 
DeleteSchemaUtil.pathPosSchema();
+
+  private final Function<DeleteFile, InputFile> loadInputFile;
+  private final ExecutorService workerPool;
+
+  public BaseDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+    this(loadInputFile, ThreadPools.getDeleteWorkerPool());
+  }
+
+  public BaseDeleteLoader(
+      Function<DeleteFile, InputFile> loadInputFile, ExecutorService 
workerPool) {
+    this.loadInputFile = loadInputFile;
+    this.workerPool = workerPool;
+  }
+
+  /**
+   * Checks if the given number of bytes can be cached.
+   *
+   * <p>Implementations should override this method if they support caching. 
It is also recommended
+   * to use the provided size as a guideline to decide whether the value is 
eligible for caching.
+   * For instance, it may be beneficial to discard values that are too large 
to optimize the cache
+   * performance and utilization.
+   */
+  protected boolean canCache(long size) {
+    return false;
+  }
+
+  /**
+   * Gets the cached value for the key or populates the cache with a new 
mapping.
+   *
+   * <p>If the value for the specified key is in the cache, it should be 
returned. If the value is
+   * not in the cache, implementations should compute the value using the 
provided supplier, cache
+   * it, and then return it.
+   *
+   * <p>This method will be called only if {@link #canCache(long)} returned 
true.
+   */
+  protected <V> V getOrLoad(String key, Supplier<V> valueSupplier, long 
valueSize) {
+    throw new UnsupportedOperationException(getClass().getName() + " does not 
support caching");
+  }
+
+  @Override
+  public StructLikeSet loadEqualityDeletes(Iterable<DeleteFile> deleteFiles, 
Schema projection) {
+    Iterable<Iterable<StructLike>> deletes =
+        execute(deleteFiles, deleteFile -> getOrReadEqDeletes(deleteFile, 
projection));
+    StructLikeSet deleteSet = StructLikeSet.create(projection.asStruct());
+    Iterables.addAll(deleteSet, Iterables.concat(deletes));
+    return deleteSet;
+  }
+
+  private Iterable<StructLike> getOrReadEqDeletes(DeleteFile deleteFile, 
Schema projection) {
+    long estimatedSize = estimateEqDeletesSize(deleteFile, projection);
+    if (canCache(estimatedSize)) {
+      String cacheKey = deleteFile.path().toString();
+      return getOrLoad(cacheKey, () -> readEqDeletes(deleteFile, projection), 
estimatedSize);
+    } else {
+      return readEqDeletes(deleteFile, projection);
+    }
+  }
+
+  private Iterable<StructLike> readEqDeletes(DeleteFile deleteFile, Schema 
projection) {
+    CloseableIterable<Record> deletes = openDeletes(deleteFile, projection);
+    CloseableIterable<Record> copiedDeletes = 
CloseableIterable.transform(deletes, Record::copy);
+    CloseableIterable<StructLike> copiedDeletesAsStructs = 
toStructs(copiedDeletes, projection);
+    return materialize(copiedDeletesAsStructs);
+  }
+
+  private CloseableIterable<StructLike> toStructs(
+      CloseableIterable<Record> records, Schema schema) {
+    InternalRecordWrapper wrapper = new 
InternalRecordWrapper(schema.asStruct());
+    return CloseableIterable.transform(records, wrapper::copyFor);
+  }
+
+  // materializes the iterable and releases resources so that the result can 
be cached
+  private <T> Iterable<T> materialize(CloseableIterable<T> iterable) {
+    try (CloseableIterable<T> closeableIterable = iterable) {
+      return ImmutableList.copyOf(closeableIterable);
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close iterable", e);
+    }
+  }
+
+  @Override
+  public PositionDeleteIndex loadPositionDeletes(
+      Iterable<DeleteFile> deleteFiles, CharSequence filePath) {
+    Iterable<PositionDeleteIndex> deletes =
+        execute(deleteFiles, deleteFile -> getOrReadPosDeletes(deleteFile, 
filePath));
+    return PositionDeleteIndexUtil.merge(deletes);
+  }
+
+  private PositionDeleteIndex getOrReadPosDeletes(DeleteFile deleteFile, 
CharSequence filePath) {
+    long estimatedSize = estimatePosDeletesSize(deleteFile);
+    if (canCache(estimatedSize)) {
+      String cacheKey = deleteFile.path().toString();
+      CharSequenceMap<PositionDeleteIndex> indexes =
+          getOrLoad(cacheKey, () -> readPosDeletes(deleteFile), estimatedSize);
+      return indexes.getOrDefault(filePath, PositionDeleteIndex.empty());
+    } else {
+      return readPosDeletes(deleteFile, filePath);
+    }
+  }
+
+  private CharSequenceMap<PositionDeleteIndex> readPosDeletes(DeleteFile 
deleteFile) {
+    CloseableIterable<Record> deletes = openDeletes(deleteFile, 
POS_DELETE_SCHEMA);
+    return Deletes.toPositionIndexes(deletes);
+  }
+
+  private PositionDeleteIndex readPosDeletes(DeleteFile deleteFile, 
CharSequence filePath) {
+    Expression filter = 
Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath);
+    CloseableIterable<Record> deletes = openDeletes(deleteFile, 
POS_DELETE_SCHEMA, filter);
+    return Deletes.toPositionIndex(filePath, ImmutableList.of(deletes));
+  }
+
+  private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema 
projection) {
+    return openDeletes(deleteFile, projection, null /* no filter */);
+  }
+
+  private CloseableIterable<Record> openDeletes(
+      DeleteFile deleteFile, Schema projection, Expression filter) {
+
+    FileFormat format = deleteFile.format();
+    LOG.trace("Opening delete file {}", deleteFile.path());
+    InputFile inputFile = loadInputFile.apply(deleteFile);
+
+    switch (format) {
+      case AVRO:
+        return Avro.read(inputFile)
+            .project(projection)
+            .reuseContainers()
+            .createReaderFunc(DataReader::create)
+            .build();
+
+      case PARQUET:
+        return Parquet.read(inputFile)
+            .project(projection)
+            .filter(filter)
+            .reuseContainers()
+            .createReaderFunc(newParquetReaderFunc(projection))
+            .build();
+
+      case ORC:
+        // reusing containers is automatic for ORC, no need to call 
'reuseContainers'
+        return ORC.read(inputFile)
+            .project(projection)
+            .filter(filter)
+            .createReaderFunc(newOrcReaderFunc(projection))
+            .build();
+
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "Cannot read deletes, %s is not a supported file format: %s",
+                format.name(), inputFile.location()));
+    }
+  }
+
+  private Function<MessageType, ParquetValueReader<?>> 
newParquetReaderFunc(Schema projection) {
+    return fileSchema -> GenericParquetReaders.buildReader(projection, 
fileSchema);
+  }
+
+  private Function<TypeDescription, OrcRowReader<?>> newOrcReaderFunc(Schema 
projection) {
+    return fileSchema -> GenericOrcReader.buildReader(projection, fileSchema);
+  }
+
+  private <I, O> Iterable<O> execute(Iterable<I> objects, Function<I, O> func) 
{
+    Queue<O> output = new ConcurrentLinkedQueue<>();
+
+    Tasks.foreach(objects)
+        .executeWith(workerPool)
+        .stopOnFailure()
+        .onFailure((object, exc) -> LOG.error("Failed to process {}", object, 
exc))
+        .run(object -> output.add(func.apply(object)));
+
+    return output;
+  }
+
+  // estimates the memory required to cache position deletes (in bytes)
+  private long estimatePosDeletesSize(DeleteFile deleteFile) {
+    // the space consumption highly depends on the nature of deleted positions 
(sparse vs compact)
+    // testing shows Roaring bitmaps require around 8 bits (1 byte) per value 
on average
+    return deleteFile.recordCount();
+  }
+
+  // estimates the memory required to cache equality deletes (in bytes)
+  private long estimateEqDeletesSize(DeleteFile deleteFile, Schema projection) 
{
+    try {
+      long recordCount = deleteFile.recordCount();
+      int recordSize = estimateRecordSize(projection);
+      return LongMath.checkedMultiply(recordCount, recordSize);
+    } catch (ArithmeticException e) {
+      return Long.MAX_VALUE;
+    }
+  }
+
+  private int estimateRecordSize(Schema schema) {
+    return schema.columns().stream().mapToInt(TypeUtil::estimateSize).sum();
+  }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java 
b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index 55acd32008..e7d8445cf8 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -25,25 +25,16 @@ import java.util.Set;
 import java.util.function.Predicate;
 import org.apache.iceberg.Accessor;
 import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.avro.DataReader;
-import org.apache.iceberg.data.orc.GenericOrcReader;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.deletes.DeleteCounter;
 import org.apache.iceberg.deletes.Deletes;
 import org.apache.iceberg.deletes.PositionDeleteIndex;
-import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
@@ -58,8 +49,6 @@ import org.slf4j.LoggerFactory;
 
 public abstract class DeleteFilter<T> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DeleteFilter.class);
-  private static final Schema POS_DELETE_SCHEMA =
-      new Schema(MetadataColumns.DELETE_FILE_PATH, 
MetadataColumns.DELETE_FILE_POS);
 
   private final String filePath;
   private final List<DeleteFile> posDeletes;
@@ -70,6 +59,7 @@ public abstract class DeleteFilter<T> {
   private final int isDeletedColumnPosition;
   private final DeleteCounter counter;
 
+  private volatile DeleteLoader deleteLoader = null;
   private PositionDeleteIndex deleteRowPositions = null;
   private List<Predicate<T>> isInDeleteSets = null;
   private Predicate<T> eqDeleteRows = null;
@@ -143,10 +133,30 @@ public abstract class DeleteFilter<T> {
 
   protected abstract InputFile getInputFile(String location);
 
+  protected InputFile loadInputFile(DeleteFile deleteFile) {
+    return getInputFile(deleteFile.path().toString());
+  }
+
   protected long pos(T record) {
     return (Long) posAccessor.get(asStructLike(record));
   }
 
+  protected DeleteLoader newDeleteLoader() {
+    return new BaseDeleteLoader(this::loadInputFile);
+  }
+
+  private DeleteLoader deleteLoader() {
+    if (deleteLoader == null) {
+      synchronized (this) {
+        if (deleteLoader == null) {
+          this.deleteLoader = newDeleteLoader();
+        }
+      }
+    }
+
+    return deleteLoader;
+  }
+
   public CloseableIterable<T> filter(CloseableIterable<T> records) {
     return applyEqDeletes(applyPosDeletes(records));
   }
@@ -173,22 +183,11 @@ public abstract class DeleteFilter<T> {
       Iterable<DeleteFile> deletes = entry.getValue();
 
       Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
-      InternalRecordWrapper wrapper = new 
InternalRecordWrapper(deleteSchema.asStruct());
 
       // a projection to select and reorder fields of the file schema to match 
the delete rows
       StructProjection projectRow = StructProjection.create(requiredSchema, 
deleteSchema);
 
-      Iterable<CloseableIterable<Record>> deleteRecords =
-          Iterables.transform(deletes, delete -> openDeletes(delete, 
deleteSchema));
-
-      // copy the delete records because they will be held in a set
-      CloseableIterable<Record> records =
-          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), 
Record::copy);
-
-      StructLikeSet deleteSet =
-          Deletes.toEqualitySet(
-              CloseableIterable.transform(records, wrapper::copyFor), 
deleteSchema.asStruct());
-
+      StructLikeSet deleteSet = deleteLoader().loadEqualityDeletes(deletes, 
deleteSchema);
       Predicate<T> isInDeleteSet =
           record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
       isInDeleteSets.add(isInDeleteSet);
@@ -224,14 +223,10 @@ public abstract class DeleteFilter<T> {
   }
 
   public PositionDeleteIndex deletedRowPositions() {
-    if (posDeletes.isEmpty()) {
-      return null;
+    if (deleteRowPositions == null && !posDeletes.isEmpty()) {
+      this.deleteRowPositions = deleteLoader().loadPositionDeletes(posDeletes, 
filePath);
     }
 
-    if (deleteRowPositions == null) {
-      List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, 
this::openPosDeletes);
-      deleteRowPositions = Deletes.toPositionIndex(filePath, deletes);
-    }
     return deleteRowPositions;
   }
 
@@ -240,9 +235,7 @@ public abstract class DeleteFilter<T> {
       return records;
     }
 
-    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, 
this::openPosDeletes);
-
-    PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath, 
deletes);
+    PositionDeleteIndex positionIndex = deletedRowPositions();
     Predicate<T> isDeleted = record -> positionIndex.isDeleted(pos(record));
     return createDeleteIterable(records, isDeleted);
   }
@@ -254,56 +247,6 @@ public abstract class DeleteFilter<T> {
         : Deletes.filterDeleted(records, isDeleted, counter);
   }
 
-  private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
-    return openDeletes(file, POS_DELETE_SCHEMA);
-  }
-
-  private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema 
deleteSchema) {
-    LOG.trace("Opening delete file {}", deleteFile.path());
-    InputFile input = getInputFile(deleteFile.path().toString());
-    switch (deleteFile.format()) {
-      case AVRO:
-        return Avro.read(input)
-            .project(deleteSchema)
-            .reuseContainers()
-            .createReaderFunc(DataReader::create)
-            .build();
-
-      case PARQUET:
-        Parquet.ReadBuilder builder =
-            Parquet.read(input)
-                .project(deleteSchema)
-                .reuseContainers()
-                .createReaderFunc(
-                    fileSchema -> 
GenericParquetReaders.buildReader(deleteSchema, fileSchema));
-
-        if (deleteFile.content() == FileContent.POSITION_DELETES) {
-          
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), 
filePath));
-        }
-
-        return builder.build();
-
-      case ORC:
-        // Reusing containers is automatic for ORC. No need to set 
'reuseContainers' here.
-        ORC.ReadBuilder orcBuilder =
-            ORC.read(input)
-                .project(deleteSchema)
-                .createReaderFunc(
-                    fileSchema -> GenericOrcReader.buildReader(deleteSchema, 
fileSchema));
-
-        if (deleteFile.content() == FileContent.POSITION_DELETES) {
-          
orcBuilder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), 
filePath));
-        }
-
-        return orcBuilder.build();
-      default:
-        throw new UnsupportedOperationException(
-            String.format(
-                "Cannot read deletes, %s is not a supported format: %s",
-                deleteFile.format().name(), deleteFile.path()));
-    }
-  }
-
   private static Schema fileProjection(
       Schema tableSchema,
       Schema requestedSchema,
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java 
b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
new file mode 100644
index 0000000000..07bdce6d83
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.util.StructLikeSet;
+
+/** An API for loading delete file content into in-memory data structures. */
+public interface DeleteLoader {
+  /**
+   * Loads the content of equality delete files into a set.
+   *
+   * @param deleteFiles equality delete files
+   * @param projection a projection of columns to load
+   * @return a set of equality deletes
+   */
+  StructLikeSet loadEqualityDeletes(Iterable<DeleteFile> deleteFiles, Schema 
projection);
+
+  /**
+   * Loads the content of position delete files for a given data file path 
into a position index.
+   *
+   * @param deleteFiles position delete files
+   * @param filePath the data file path for which to load deletes
+   * @return a position delete index for the provided data file path
+   */
+  PositionDeleteIndex loadPositionDeletes(Iterable<DeleteFile> deleteFiles, 
CharSequence filePath);
+}
diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
index 2c58281904..eeef75be63 100644
--- a/spark/v3.5/build.gradle
+++ b/spark/v3.5/build.gradle
@@ -89,6 +89,8 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
 
+    implementation libs.caffeine
+
     testImplementation(libs.hadoop2.minicluster) {
       exclude group: 'org.apache.avro', module: 'avro'
       // to make sure netty libs only come from project(':iceberg-arrow')
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
index d86246b1e6..e3b01b8375 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
@@ -18,15 +18,19 @@
  */
 package org.apache.iceberg.spark;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.function.Function;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.spark.network.util.JavaUtils;
 import org.apache.spark.sql.RuntimeConfig;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
 
 class SparkConfParser {
 
@@ -34,6 +38,12 @@ class SparkConfParser {
   private final RuntimeConfig sessionConf;
   private final Map<String, String> options;
 
+  SparkConfParser() {
+    this.properties = ImmutableMap.of();
+    this.sessionConf = new RuntimeConfig(SQLConf.get());
+    this.options = ImmutableMap.of();
+  }
+
   SparkConfParser(SparkSession spark, Table table, Map<String, String> 
options) {
     this.properties = table.properties();
     this.sessionConf = spark.conf();
@@ -56,6 +66,10 @@ class SparkConfParser {
     return new StringConfParser();
   }
 
+  public DurationConfParser durationConf() {
+    return new DurationConfParser();
+  }
+
   class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
     private Boolean defaultValue;
     private boolean negate = false;
@@ -156,6 +170,33 @@ class SparkConfParser {
     }
   }
 
+  class DurationConfParser extends ConfParser<DurationConfParser, Duration> {
+    private Duration defaultValue;
+
+    @Override
+    protected DurationConfParser self() {
+      return this;
+    }
+
+    public DurationConfParser defaultValue(Duration value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public Duration parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot 
be null");
+      return parse(this::toDuration, defaultValue);
+    }
+
+    public Duration parseOptional() {
+      return parse(this::toDuration, defaultValue);
+    }
+
+    private Duration toDuration(String time) {
+      return Duration.ofSeconds(JavaUtils.timeStringAsSec(time));
+    }
+  }
+
   abstract class ConfParser<ThisT, T> {
     private final List<String> optionNames = Lists.newArrayList();
     private String sessionConfName;
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
new file mode 100644
index 0000000000..6490d6678b
--- /dev/null
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An executor cache for reducing the computation and IO overhead in tasks.
+ *
+ * <p>The cache is configured and controlled through Spark SQL properties. It 
supports both limits
+ * on the total cache size and maximum size for individual entries. 
Additionally, it implements
+ * automatic eviction of entries after a specified duration of inactivity. The 
cache will respect
+ * the SQL configuration valid at the time of initialization. All subsequent 
changes to the
+ * configuration will have no effect.
+ *
+ * <p>The cache is accessed and populated via {@link #getOrLoad(String, 
String, Supplier, long)}. If
+ * the value is not present in the cache, it is computed using the provided 
supplier and stored in
+ * the cache, subject to the defined size constraints. When a key is added, it 
must be associated
+ * with a particular group ID. Once the group is no longer needed, it is 
recommended to explicitly
+ * invalidate its state by calling {@link #invalidate(String)} instead of 
relying on automatic
+ * eviction.
+ *
+ * <p>Note that this class employs the singleton pattern to ensure only one 
cache exists per JVM.
+ */
+public class SparkExecutorCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkExecutorCache.class);
+
+  private static volatile SparkExecutorCache instance = null;
+
+  private final Duration timeout;
+  private final long maxEntrySize;
+  private final long maxTotalSize;
+  private volatile Cache<String, CacheValue> state;
+
+  private SparkExecutorCache(Conf conf) {
+    this.timeout = conf.timeout();
+    this.maxEntrySize = conf.maxEntrySize();
+    this.maxTotalSize = conf.maxTotalSize();
+  }
+
+  /**
+   * Returns the cache if created or creates and returns it.
+   *
+   * <p>Note this method returns null if caching is disabled.
+   */
+  public static SparkExecutorCache getOrCreate() {
+    if (instance == null) {
+      Conf conf = new Conf();
+      if (conf.cacheEnabled()) {
+        synchronized (SparkExecutorCache.class) {
+          if (instance == null) {
+            SparkExecutorCache.instance = new SparkExecutorCache(conf);
+          }
+        }
+      }
+    }
+
+    return instance;
+  }
+
+  /** Returns the cache if already created or null otherwise. */
+  public static SparkExecutorCache get() {
+    return instance;
+  }
+
+  /** Returns the max entry size in bytes that will be considered for caching. 
*/
+  public long maxEntrySize() {
+    return maxEntrySize;
+  }
+
+  /**
+   * Gets the cached value for the key or populates the cache with a new 
mapping.
+   *
+   * @param group a group ID
+   * @param key a cache key
+   * @param valueSupplier a supplier to compute the value
+   * @param valueSize an estimated memory size of the value in bytes
+   * @return the cached or computed value
+   */
+  public <V> V getOrLoad(String group, String key, Supplier<V> valueSupplier, 
long valueSize) {
+    if (valueSize > maxEntrySize) {
+      LOG.debug("{} exceeds max entry size: {} > {}", key, valueSize, 
maxEntrySize);
+      return valueSupplier.get();
+    }
+
+    String internalKey = group + "_" + key;
+    CacheValue value = state().get(internalKey, loadFunc(valueSupplier, 
valueSize));
+    Preconditions.checkNotNull(value, "Loaded value must not be null");
+    return value.get();
+  }
+
+  private <V> Function<String, CacheValue> loadFunc(Supplier<V> valueSupplier, 
long valueSize) {
+    return key -> {
+      long start = System.currentTimeMillis();
+      V value = valueSupplier.get();
+      long end = System.currentTimeMillis();
+      LOG.debug("Loaded {} with size {} in {} ms", key, valueSize, (end - 
start));
+      return new CacheValue(value, valueSize);
+    };
+  }
+
+  /**
+   * Invalidates all keys associated with the given group ID.
+   *
+   * @param group a group ID
+   */
+  public void invalidate(String group) {
+    if (state != null) {
+      List<String> internalKeys = findInternalKeys(group);
+      LOG.info("Invalidating {} keys associated with {}", internalKeys.size(), 
group);
+      internalKeys.forEach(internalKey -> state.invalidate(internalKey));
+      LOG.info("Current cache stats {}", state.stats());
+    }
+  }
+
+  private List<String> findInternalKeys(String group) {
+    return state.asMap().keySet().stream()
+        .filter(internalKey -> internalKey.startsWith(group))
+        .collect(Collectors.toList());
+  }
+
+  private Cache<String, CacheValue> state() {
+    if (state == null) {
+      synchronized (this) {
+        if (state == null) {
+          LOG.info("Initializing cache state");
+          this.state = initState();
+        }
+      }
+    }
+
+    return state;
+  }
+
+  private Cache<String, CacheValue> initState() {
+    return Caffeine.newBuilder()
+        .expireAfterAccess(timeout)
+        .maximumWeight(maxTotalSize)
+        .weigher((key, value) -> ((CacheValue) value).weight())
+        .recordStats()
+        .removalListener((key, value, cause) -> LOG.debug("Evicted {} ({})", 
key, cause))
+        .build();
+  }
+
+  @VisibleForTesting
+  static class CacheValue {
+    private final Object value;
+    private final long size;
+
+    CacheValue(Object value, long size) {
+      this.value = value;
+      this.size = size;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <V> V get() {
+      return (V) value;
+    }
+
+    public int weight() {
+      return (int) Math.min(size, Integer.MAX_VALUE);
+    }
+  }
+
+  @VisibleForTesting
+  static class Conf {
+    private final SparkConfParser confParser = new SparkConfParser();
+
+    public boolean cacheEnabled() {
+      return confParser
+          .booleanConf()
+          .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_ENABLED)
+          .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_ENABLED_DEFAULT)
+          .parse();
+    }
+
+    public Duration timeout() {
+      return confParser
+          .durationConf()
+          .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT)
+          .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT_DEFAULT)
+          .parse();
+    }
+
+    public long maxEntrySize() {
+      return confParser
+          .longConf()
+          .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE)
+          
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT)
+          .parse();
+    }
+
+    public long maxTotalSize() {
+      return confParser
+          .longConf()
+          .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE)
+          
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT)
+          .parse();
+    }
+  }
+}
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index bca41b4155..4a66520231 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.spark;
 
+import java.time.Duration;
+
 public class SparkSQLProperties {
 
   private SparkSQLProperties() {}
@@ -70,4 +72,18 @@ public class SparkSQLProperties {
 
   // Controls whether to report locality information to Spark while allocating 
input partitions
   public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";
+
+  public static final String EXECUTOR_CACHE_ENABLED = 
"spark.sql.iceberg.executor-cache.enabled";
+  public static final boolean EXECUTOR_CACHE_ENABLED_DEFAULT = true;
+
+  public static final String EXECUTOR_CACHE_TIMEOUT = 
"spark.sql.iceberg.executor-cache.timeout";
+  public static final Duration EXECUTOR_CACHE_TIMEOUT_DEFAULT = 
Duration.ofMinutes(10);
+
+  public static final String EXECUTOR_CACHE_MAX_ENTRY_SIZE =
+      "spark.sql.iceberg.executor-cache.max-entry-size";
+  public static final long EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT = 64 * 1024 * 
1024; // 64 MB
+
+  public static final String EXECUTOR_CACHE_MAX_TOTAL_SIZE =
+      "spark.sql.iceberg.executor-cache.max-total-size";
+  public static final long EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT = 128 * 1024 
* 1024; // 128 MB
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 4fb838202c..c2b3e7c2dc 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.avro.generic.GenericData;
@@ -40,7 +42,9 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.BaseDeleteLoader;
 import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.DeleteLoader;
 import org.apache.iceberg.deletes.DeleteCounter;
 import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.encryption.EncryptedInputFile;
@@ -50,6 +54,7 @@ import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExecutorCache;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types.NestedField;
@@ -279,5 +284,29 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
         counter().increment();
       }
     }
+
+    @Override
+    protected DeleteLoader newDeleteLoader() {
+      return new CachingDeleteLoader(this::loadInputFile);
+    }
+
+    private class CachingDeleteLoader extends BaseDeleteLoader {
+      private final SparkExecutorCache cache;
+
+      CachingDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+        super(loadInputFile);
+        this.cache = SparkExecutorCache.getOrCreate();
+      }
+
+      @Override
+      protected boolean canCache(long size) {
+        return cache != null && size < cache.maxEntrySize();
+      }
+
+      @Override
+      protected <V> V getOrLoad(String key, Supplier<V> valueSupplier, long 
valueSize) {
+        return cache.getOrLoad(table().name(), key, valueSupplier, valueSize);
+      }
+    }
   }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
index 65df29051c..f6913fb9d0 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
 import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkExecutorCache;
 import org.apache.spark.util.KnownSizeEstimation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,6 +68,7 @@ public class SerializableTableWithSize extends 
SerializableTable
       LOG.info("Releasing resources");
       io().close();
     }
+    invalidateCache(name());
   }
 
   public static class SerializableMetadataTableWithSize extends 
SerializableMetadataTable
@@ -93,6 +95,14 @@ public class SerializableTableWithSize extends 
SerializableTable
         LOG.info("Releasing resources");
         io().close();
       }
+      invalidateCache(name());
+    }
+  }
+
+  private static void invalidateCache(String name) {
+    SparkExecutorCache cache = SparkExecutorCache.get();
+    if (cache != null) {
+      cache.invalidate(name);
     }
   }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/Employee.java
similarity index 52%
copy from 
core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
copy to spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/Employee.java
index 7690ab7e48..9c57936d98 100644
--- 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/Employee.java
@@ -16,34 +16,51 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iceberg.deletes;
+package org.apache.iceberg.spark;
 
-import org.roaringbitmap.longlong.Roaring64Bitmap;
+import java.util.Objects;
 
-class BitmapPositionDeleteIndex implements PositionDeleteIndex {
-  private final Roaring64Bitmap roaring64Bitmap;
+public class Employee {
+  private Integer id;
+  private String dep;
 
-  BitmapPositionDeleteIndex() {
-    roaring64Bitmap = new Roaring64Bitmap();
+  public Employee() {}
+
+  public Employee(Integer id, String dep) {
+    this.id = id;
+    this.dep = dep;
   }
 
-  @Override
-  public void delete(long position) {
-    roaring64Bitmap.add(position);
+  public Integer getId() {
+    return id;
   }
 
-  @Override
-  public void delete(long posStart, long posEnd) {
-    roaring64Bitmap.add(posStart, posEnd);
+  public void setId(Integer id) {
+    this.id = id;
+  }
+
+  public String getDep() {
+    return dep;
+  }
+
+  public void setDep(String dep) {
+    this.dep = dep;
   }
 
   @Override
-  public boolean isDeleted(long position) {
-    return roaring64Bitmap.contains(position);
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    } else if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    Employee employee = (Employee) other;
+    return Objects.equals(id, employee.id) && Objects.equals(dep, 
employee.dep);
   }
 
   @Override
-  public boolean isEmpty() {
-    return roaring64Bitmap.isEmpty();
+  public int hashCode() {
+    return Objects.hash(id, dep);
   }
 }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
new file mode 100644
index 0000000000..016319884c
--- /dev/null
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkExecutorCache.CacheValue;
+import org.apache.iceberg.spark.SparkExecutorCache.Conf;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestSparkExecutorCache extends TestBaseWithCatalog {
+
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+  protected static Object[][] parameters() {
+    return new Object[][] {
+      {
+        "testhive",
+        SparkCatalog.class.getName(),
+        ImmutableMap.of(
+            "type",
+            "hive",
+            CatalogProperties.FILE_IO_IMPL,
+            CustomFileIO.class.getName(),
+            "default-namespace",
+            "default")
+      },
+    };
+  }
+
+  private static final String UPDATES_VIEW_NAME = "updates";
+  private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
+  private static final Map<String, CustomInputFile> INPUT_FILES =
+      Collections.synchronizedMap(Maps.newHashMap());
+
+  private String targetTableName;
+  private TableIdentifier targetTableIdent;
+
+  @BeforeEach
+  public void configureTargetTableName() {
+    String name = "target_exec_cache_" + JOB_COUNTER.incrementAndGet();
+    this.targetTableName = tableName(name);
+    this.targetTableIdent = TableIdentifier.of(Namespace.of("default"), name);
+  }
+
+  @AfterEach
+  public void releaseResources() {
+    sql("DROP TABLE IF EXISTS %s", targetTableName);
+    sql("DROP TABLE IF EXISTS %s", UPDATES_VIEW_NAME);
+    INPUT_FILES.clear();
+  }
+
+  @TestTemplate
+  public void testCacheValueWeightOverflow() {
+    CacheValue cacheValue = new CacheValue("v", Integer.MAX_VALUE + 1L);
+    assertThat(cacheValue.weight()).isEqualTo(Integer.MAX_VALUE);
+  }
+
+  @TestTemplate
+  public void testCacheEnabledConfig() {
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.cacheEnabled()).isTrue();
+        });
+
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "false"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.cacheEnabled()).isFalse();
+        });
+  }
+
+  @TestTemplate
+  public void testTimeoutConfig() {
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT, "10s"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.timeout()).hasSeconds(10);
+        });
+
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT, "2m"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.timeout()).hasMinutes(2);
+        });
+  }
+
+  @TestTemplate
+  public void testMaxEntrySizeConfig() {
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE, 
"128"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.maxEntrySize()).isEqualTo(128L);
+        });
+  }
+
+  @TestTemplate
+  public void testMaxTotalSizeConfig() {
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE, 
"512"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.maxTotalSize()).isEqualTo(512L);
+        });
+  }
+
+  @TestTemplate
+  public void testConcurrentAccess() throws InterruptedException {
+    SparkExecutorCache cache = SparkExecutorCache.getOrCreate();
+
+    String table1 = "table1";
+    String table2 = "table2";
+
+    Set<String> loadedInternalKeys = Sets.newHashSet();
+
+    String key1 = "key1";
+    String key2 = "key2";
+
+    long valueSize = 100L;
+
+    int threadCount = 10;
+    ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount);
+
+    for (int threadNumber = 0; threadNumber < threadCount; threadNumber++) {
+      String group = threadNumber % 2 == 0 ? table1 : table2;
+      executorService.submit(
+          () -> {
+            for (int batch = 0; batch < 3; batch++) {
+              cache.getOrLoad(
+                  group,
+                  key1,
+                  () -> {
+                    String internalKey = toInternalKey(group, key1);
+                    synchronized (loadedInternalKeys) {
+                      // verify only one load was done for this key
+                      
assertThat(loadedInternalKeys.contains(internalKey)).isFalse();
+                      loadedInternalKeys.add(internalKey);
+                    }
+                    return "value1";
+                  },
+                  valueSize);
+
+              cache.getOrLoad(
+                  group,
+                  key2,
+                  () -> {
+                    String internalKey = toInternalKey(group, key2);
+                    synchronized (loadedInternalKeys) {
+                      // verify only one load was done for this key
+                      
assertThat(loadedInternalKeys.contains(internalKey)).isFalse();
+                      loadedInternalKeys.add(internalKey);
+                    }
+                    return "value2";
+                  },
+                  valueSize);
+            }
+          });
+    }
+
+    executorService.shutdown();
+    assertThat(executorService.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+
+    cache.invalidate(table1);
+    cache.invalidate(table2);
+
+    // all keys must be invalidated
+    Cache<String, ?> state = fetchInternalCacheState();
+    Set<String> liveKeys = state.asMap().keySet();
+    assertThat(liveKeys).noneMatch(key -> key.startsWith(table1) || 
key.startsWith(table2));
+  }
+
+  @TestTemplate
+  public void testCopyOnWriteDelete() throws Exception {
+    checkDelete(COPY_ON_WRITE);
+  }
+
+  @TestTemplate
+  public void testMergeOnReadDelete() throws Exception {
+    checkDelete(MERGE_ON_READ);
+  }
+
+  private void checkDelete(RowLevelOperationMode mode) throws Exception {
+    List<DeleteFile> deleteFiles = 
createAndInitTable(TableProperties.DELETE_MODE, mode);
+
+    sql("DELETE FROM %s WHERE id = 1 OR id = 4", targetTableName);
+
+    // there are 2 data files and 2 delete files that apply to both of them
+    // in CoW, the target table will be scanned 2 times (main query + runtime 
filter)
+    // the runtime filter may invalidate the cache so check at least some 
requests were hits
+    // in MoR, the target table will be scanned only once
+    // so each delete file must be opened once
+    int maxRequestCount = mode == COPY_ON_WRITE ? 3 : 1;
+    assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <= 
maxRequestCount);
+
+    // verify the final set of records is correct
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+  }
+
+  @TestTemplate
+  public void testCopyOnWriteUpdate() throws Exception {
+    checkUpdate(COPY_ON_WRITE);
+  }
+
+  @TestTemplate
+  public void testMergeOnReadUpdate() throws Exception {
+    checkUpdate(MERGE_ON_READ);
+  }
+
+  private void checkUpdate(RowLevelOperationMode mode) throws Exception {
+    List<DeleteFile> deleteFiles = 
createAndInitTable(TableProperties.UPDATE_MODE, mode);
+
+    Dataset<Integer> updateDS = spark.createDataset(ImmutableList.of(1, 4), 
Encoders.INT());
+    updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
+
+    sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM %s)", 
targetTableName, UPDATES_VIEW_NAME);
+
+    // there are 2 data files and 2 delete files that apply to both of them
+    // in CoW, the target table will be scanned 3 times (2 in main query + 
runtime filter)
+    // the runtime filter may invalidate the cache so check at least some 
requests were hits
+    // in MoR, the target table will be scanned only once
+    // so each delete file must be opened once
+    int maxRequestCount = mode == COPY_ON_WRITE ? 5 : 1;
+    assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <= 
maxRequestCount);
+
+    // verify the final set of records is correct
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(-1, "hr"), row(-1, "hr")),
+        sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+  }
+
+  @TestTemplate
+  public void testCopyOnWriteMerge() throws Exception {
+    checkMerge(COPY_ON_WRITE);
+  }
+
+  @TestTemplate
+  public void testMergeOnReadMerge() throws Exception {
+    checkMerge(MERGE_ON_READ);
+  }
+
+  private void checkMerge(RowLevelOperationMode mode) throws Exception {
+    List<DeleteFile> deleteFiles = 
createAndInitTable(TableProperties.MERGE_MODE, mode);
+
+    Dataset<Integer> updateDS = spark.createDataset(ImmutableList.of(1, 4), 
Encoders.INT());
+    updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
+
+    sql(
+        "MERGE INTO %s t USING %s s "
+            + "ON t.id == s.value "
+            + "WHEN MATCHED THEN "
+            + "  UPDATE SET id = 100 "
+            + "WHEN NOT MATCHED THEN "
+            + "  INSERT (id, dep) VALUES (-1, 'unknown')",
+        targetTableName, UPDATES_VIEW_NAME);
+
+    // there are 2 data files and 2 delete files that apply to both of them
+    // in CoW, the target table will be scanned 2 times (main query + runtime 
filter)
+    // the runtime filter may invalidate the cache so check at least some 
requests were hits
+    // in MoR, the target table will be scanned only once
+    // so each delete file must be opened once
+    int maxRequestCount = mode == COPY_ON_WRITE ? 3 : 1;
+    assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <= 
maxRequestCount);
+
+    // verify the final set of records is correct
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(100, "hr"), row(100, "hr")),
+        sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+  }
+
+  private int streamCount(DeleteFile deleteFile) {
+    CustomInputFile inputFile = INPUT_FILES.get(deleteFile.path().toString());
+    return inputFile.streamCount();
+  }
+
+  private List<DeleteFile> createAndInitTable(String operation, 
RowLevelOperationMode mode)
+      throws Exception {
+    sql(
+        "CREATE TABLE %s (id INT, dep STRING) "
+            + "USING iceberg "
+            + "TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')",
+        targetTableName,
+        TableProperties.WRITE_METADATA_LOCATION,
+        temp.toString().replaceFirst("file:", ""),
+        TableProperties.WRITE_DATA_LOCATION,
+        temp.toString().replaceFirst("file:", ""),
+        operation,
+        mode.modeName());
+
+    append(targetTableName, new Employee(0, "hr"), new Employee(1, "hr"), new 
Employee(2, "hr"));
+    append(targetTableName, new Employee(3, "hr"), new Employee(4, "hr"), new 
Employee(5, "hr"));
+
+    Table table = validationCatalog.loadTable(targetTableIdent);
+
+    List<Pair<CharSequence, Long>> posDeletes =
+        dataFiles(table).stream()
+            .map(dataFile -> Pair.of(dataFile.path(), 0L))
+            .collect(Collectors.toList());
+    Pair<DeleteFile, CharSequenceSet> posDeleteResult = writePosDeletes(table, 
posDeletes);
+    DeleteFile posDeleteFile = posDeleteResult.first();
+    CharSequenceSet referencedDataFiles = posDeleteResult.second();
+
+    DeleteFile eqDeleteFile = writeEqDeletes(table, "id", 2, 5);
+
+    table
+        .newRowDelta()
+        .validateFromSnapshot(table.currentSnapshot().snapshotId())
+        .validateDataFilesExist(referencedDataFiles)
+        .addDeletes(posDeleteFile)
+        .addDeletes(eqDeleteFile)
+        .commit();
+
+    sql("REFRESH TABLE %s", targetTableName);
+
+    return ImmutableList.of(posDeleteFile, eqDeleteFile);
+  }
+
+  private DeleteFile writeEqDeletes(Table table, String col, Object... values) 
throws IOException {
+    Schema deleteSchema = table.schema().select(col);
+
+    Record delete = GenericRecord.create(deleteSchema);
+    List<Record> deletes = Lists.newArrayList();
+    for (Object value : values) {
+      deletes.add(delete.copy(col, value));
+    }
+
+    OutputFile out = Files.localOutput(new File(temp, "eq-deletes-" + 
UUID.randomUUID()));
+    return FileHelpers.writeDeleteFile(table, out, null, deletes, 
deleteSchema);
+  }
+
+  private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
+      Table table, List<Pair<CharSequence, Long>> deletes) throws IOException {
+    OutputFile out = Files.localOutput(new File(temp, "pos-deletes-" + 
UUID.randomUUID()));
+    return FileHelpers.writeDeleteFile(table, out, null, deletes);
+  }
+
+  private void append(String target, Employee... employees) throws 
NoSuchTableException {
+    List<Employee> input = Arrays.asList(employees);
+    Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class);
+    inputDF.coalesce(1).writeTo(target).append();
+  }
+
+  private Collection<DataFile> dataFiles(Table table) {
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      return ImmutableList.copyOf(Iterables.transform(tasks, 
ContentScanTask::file));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Cache<String, ?> fetchInternalCacheState() {
+    try {
+      Field stateField = SparkExecutorCache.class.getDeclaredField("state");
+      stateField.setAccessible(true);
+      SparkExecutorCache cache = SparkExecutorCache.get();
+      return (Cache<String, ?>) stateField.get(cache);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static String toInternalKey(String group, String key) {
+    return group + "_" + key;
+  }
+
+  public static class CustomFileIO implements FileIO {
+
+    public CustomFileIO() {}
+
+    @Override
+    public InputFile newInputFile(String path) {
+      return INPUT_FILES.computeIfAbsent(path, key -> new 
CustomInputFile(path));
+    }
+
+    @Override
+    public OutputFile newOutputFile(String path) {
+      return Files.localOutput(path);
+    }
+
+    @Override
+    public void deleteFile(String path) {
+      File file = new File(path);
+      if (!file.delete()) {
+        throw new RuntimeIOException("Failed to delete file: " + path);
+      }
+    }
+  }
+
+  public static class CustomInputFile implements InputFile {
+    private final InputFile delegate;
+    private final AtomicInteger streamCount;
+
+    public CustomInputFile(String path) {
+      this.delegate = Files.localInput(path);
+      this.streamCount = new AtomicInteger();
+    }
+
+    @Override
+    public long getLength() {
+      return delegate.getLength();
+    }
+
+    @Override
+    public SeekableInputStream newStream() {
+      streamCount.incrementAndGet();
+      return delegate.newStream();
+    }
+
+    public int streamCount() {
+      return streamCount.get();
+    }
+
+    @Override
+    public String location() {
+      return delegate.location();
+    }
+
+    @Override
+    public boolean exists() {
+      return delegate.exists();
+    }
+  }
+}
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index 30b3b8a885..9f4a4f47bf 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -47,6 +47,7 @@ import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPD
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.DistributionMode;
@@ -78,6 +79,28 @@ public class TestSparkWriteConf extends TestBaseWithCatalog {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @TestTemplate
+  public void testDurationConf() {
+    Table table = validationCatalog.loadTable(tableIdent);
+    String confName = "spark.sql.iceberg.some-duration-conf";
+
+    withSQLConf(
+        ImmutableMap.of(confName, "10s"),
+        () -> {
+          SparkConfParser parser = new SparkConfParser(spark, table, 
ImmutableMap.of());
+          Duration duration = 
parser.durationConf().sessionConf(confName).parseOptional();
+          assertThat(duration).hasSeconds(10);
+        });
+
+    withSQLConf(
+        ImmutableMap.of(confName, "2m"),
+        () -> {
+          SparkConfParser parser = new SparkConfParser(spark, table, 
ImmutableMap.of());
+          Duration duration = 
parser.durationConf().sessionConf(confName).parseOptional();
+          assertThat(duration).hasMinutes(2);
+        });
+  }
+
   @TestTemplate
   public void testDeleteGranularityDefault() {
     Table table = validationCatalog.loadTable(tableIdent);

Reply via email to