This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 65a076deda Spark 3.4: Read deletes in parallel and cache them on 
executors (#9603)
65a076deda is described below

commit 65a076dedae2e62008ce64c6094dd519d29ce500
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Feb 2 15:24:55 2024 -0800

    Spark 3.4: Read deletes in parallel and cache them on executors (#9603)
    
    This change backports PR #8755 and PR #9583 to Spark 3.4.
---
 spark/v3.4/build.gradle                            |   3 +
 .../spark/extensions/TestSparkExecutorCache.java   | 366 +++++++++++++++++
 .../java/org/apache/iceberg/spark/JavaUtils.java   |  76 ++++
 .../org/apache/iceberg/spark/SparkConfParser.java  |  40 ++
 .../apache/iceberg/spark/SparkExecutorCache.java   | 228 +++++++++++
 .../apache/iceberg/spark/SparkSQLProperties.java   |  16 +
 .../apache/iceberg/spark/source/BaseReader.java    |  29 ++
 .../spark/source/SerializableTableWithSize.java    |  10 +
 .../java/org/apache/iceberg/spark/Employee.java    |  66 +++
 .../iceberg/spark/TestSparkExecutorCache.java      | 444 +++++++++++++++++++++
 .../apache/iceberg/spark/TestSparkWriteConf.java   |  23 ++
 11 files changed, 1301 insertions(+)

diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle
index 0b1c4df75a..7d81137a6b 100644
--- a/spark/v3.4/build.gradle
+++ b/spark/v3.4/build.gradle
@@ -89,6 +89,8 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
       exclude group: 'com.google.code.findbugs', module: 'jsr305'
     }
 
+    implementation libs.caffeine
+
     testImplementation(libs.hadoop2.minicluster) {
       exclude group: 'org.apache.avro', module: 'avro'
       // to make sure netty libs only come from project(':iceberg-arrow')
@@ -157,6 +159,7 @@ 
project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
     testImplementation project(path: ':iceberg-data')
     testImplementation project(path: ':iceberg-parquet')
     testImplementation project(path: ':iceberg-hive-metastore')
+    testImplementation project(path: ':iceberg-data', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-core', configuration: 
'testArtifacts')
     testImplementation project(path: ':iceberg-hive-metastore', configuration: 
'testArtifacts')
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java
new file mode 100644
index 0000000000..3d995cc4f0
--- /dev/null
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.storage.memory.MemoryStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestSparkExecutorCache extends SparkExtensionsTestBase {
+
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        "testhive",
+        SparkCatalog.class.getName(),
+        ImmutableMap.of(
+            "type",
+            "hive",
+            CatalogProperties.FILE_IO_IMPL,
+            CustomFileIO.class.getName(),
+            "default-namespace",
+            "default")
+      },
+    };
+  }
+
+  private static final String UPDATES_VIEW_NAME = "updates";
+  private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
+  private static final Map<String, CustomInputFile> INPUT_FILES =
+      Collections.synchronizedMap(Maps.newHashMap());
+
+  private String targetTableName;
+  private TableIdentifier targetTableIdent;
+
+  public TestSparkExecutorCache(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  public void configureTargetTableName() {
+    String name = "target_exec_cache_" + JOB_COUNTER.incrementAndGet();
+    this.targetTableName = tableName(name);
+    this.targetTableIdent = TableIdentifier.of(Namespace.of("default"), name);
+  }
+
+  @After
+  public void releaseResources() {
+    sql("DROP TABLE IF EXISTS %s", targetTableName);
+    sql("DROP TABLE IF EXISTS %s", UPDATES_VIEW_NAME);
+    INPUT_FILES.clear();
+  }
+
+  @Test
+  public void testCopyOnWriteDelete() throws Exception {
+    checkDelete(COPY_ON_WRITE);
+  }
+
+  @Test
+  public void testMergeOnReadDelete() throws Exception {
+    checkDelete(MERGE_ON_READ);
+  }
+
+  private void checkDelete(RowLevelOperationMode mode) throws Exception {
+    List<DeleteFile> deleteFiles = 
createAndInitTable(TableProperties.DELETE_MODE, mode);
+
+    sql("DELETE FROM %s WHERE id = 1 OR id = 4", targetTableName);
+
+    // there are 2 data files and 2 delete files that apply to both of them
+    // in CoW, the target table will be scanned 2 times (main query + runtime 
filter)
+    // the runtime filter may invalidate the cache so check at least some 
requests were hits
+    // in MoR, the target table will be scanned only once
+    // so each delete file must be opened once
+    int maxRequestCount = mode == COPY_ON_WRITE ? 3 : 1;
+    assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <= 
maxRequestCount);
+
+    // verify the final set of records is correct
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+  }
+
+  @Test
+  public void testCopyOnWriteUpdate() throws Exception {
+    checkUpdate(COPY_ON_WRITE);
+  }
+
+  @Test
+  public void testMergeOnReadUpdate() throws Exception {
+    checkUpdate(MERGE_ON_READ);
+  }
+
+  private void checkUpdate(RowLevelOperationMode mode) throws Exception {
+    List<DeleteFile> deleteFiles = 
createAndInitTable(TableProperties.UPDATE_MODE, mode);
+
+    Dataset<Integer> updateDS = spark.createDataset(ImmutableList.of(1, 4), 
Encoders.INT());
+    updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
+
+    sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM %s)", 
targetTableName, UPDATES_VIEW_NAME);
+
+    // there are 2 data files and 2 delete files that apply to both of them
+    // in CoW, the target table will be scanned 3 times (2 in main query + 
runtime filter)
+    // the runtime filter may invalidate the cache so check at least some 
requests were hits
+    // in MoR, the target table will be scanned only once
+    // so each delete file must be opened once
+    int maxRequestCount = mode == COPY_ON_WRITE ? 5 : 1;
+    assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <= 
maxRequestCount);
+
+    // verify the final set of records is correct
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(-1, "hr"), row(-1, "hr")),
+        sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+  }
+
+  @Test
+  public void testCopyOnWriteMerge() throws Exception {
+    checkMerge(COPY_ON_WRITE);
+  }
+
+  @Test
+  public void testMergeOnReadMerge() throws Exception {
+    checkMerge(MERGE_ON_READ);
+  }
+
+  private void checkMerge(RowLevelOperationMode mode) throws Exception {
+    List<DeleteFile> deleteFiles = 
createAndInitTable(TableProperties.MERGE_MODE, mode);
+
+    Dataset<Integer> updateDS = spark.createDataset(ImmutableList.of(1, 4), 
Encoders.INT());
+    updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
+
+    sql(
+        "MERGE INTO %s t USING %s s "
+            + "ON t.id == s.value "
+            + "WHEN MATCHED THEN "
+            + "  UPDATE SET id = 100 "
+            + "WHEN NOT MATCHED THEN "
+            + "  INSERT (id, dep) VALUES (-1, 'unknown')",
+        targetTableName, UPDATES_VIEW_NAME);
+
+    // there are 2 data files and 2 delete files that apply to both of them
+    // in CoW, the target table will be scanned 2 times (main query + runtime 
filter)
+    // the runtime filter may invalidate the cache so check at least some 
requests were hits
+    // in MoR, the target table will be scanned only once
+    // so each delete file must be opened once
+    int maxRequestCount = mode == COPY_ON_WRITE ? 3 : 1;
+    assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <= 
maxRequestCount);
+
+    // verify the final set of records is correct
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(100, "hr"), row(100, "hr")),
+        sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+  }
+
+  private int streamCount(DeleteFile deleteFile) {
+    CustomInputFile inputFile = INPUT_FILES.get(deleteFile.path().toString());
+    return inputFile.streamCount();
+  }
+
+  private List<DeleteFile> createAndInitTable(String operation, 
RowLevelOperationMode mode)
+      throws Exception {
+    sql(
+        "CREATE TABLE %s (id INT, dep STRING) "
+            + "USING iceberg "
+            + "TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')",
+        targetTableName,
+        TableProperties.WRITE_METADATA_LOCATION,
+        temp.toString().replaceFirst("file:", ""),
+        TableProperties.WRITE_DATA_LOCATION,
+        temp.toString().replaceFirst("file:", ""),
+        operation,
+        mode.modeName());
+
+    append(targetTableName, new Employee(0, "hr"), new Employee(1, "hr"), new 
Employee(2, "hr"));
+    append(targetTableName, new Employee(3, "hr"), new Employee(4, "hr"), new 
Employee(5, "hr"));
+
+    Table table = validationCatalog.loadTable(targetTableIdent);
+
+    List<Pair<CharSequence, Long>> posDeletes =
+        dataFiles(table).stream()
+            .map(dataFile -> Pair.of(dataFile.path(), 0L))
+            .collect(Collectors.toList());
+    Pair<DeleteFile, CharSequenceSet> posDeleteResult = writePosDeletes(table, 
posDeletes);
+    DeleteFile posDeleteFile = posDeleteResult.first();
+    CharSequenceSet referencedDataFiles = posDeleteResult.second();
+
+    DeleteFile eqDeleteFile = writeEqDeletes(table, "id", 2, 5);
+
+    table
+        .newRowDelta()
+        .validateFromSnapshot(table.currentSnapshot().snapshotId())
+        .validateDataFilesExist(referencedDataFiles)
+        .addDeletes(posDeleteFile)
+        .addDeletes(eqDeleteFile)
+        .commit();
+
+    sql("REFRESH TABLE %s", targetTableName);
+
+    // invalidate the memory store to destroy all currently live table 
broadcasts
+    SparkEnv sparkEnv = SparkEnv.get();
+    MemoryStore memoryStore = sparkEnv.blockManager().memoryStore();
+    memoryStore.clear();
+
+    return ImmutableList.of(posDeleteFile, eqDeleteFile);
+  }
+
+  private DeleteFile writeEqDeletes(Table table, String col, Object... values) 
throws IOException {
+    Schema deleteSchema = table.schema().select(col);
+
+    Record delete = GenericRecord.create(deleteSchema);
+    List<Record> deletes = Lists.newArrayList();
+    for (Object value : values) {
+      deletes.add(delete.copy(col, value));
+    }
+
+    OutputFile out = Files.localOutput(temp.newFile("eq-deletes-" + 
UUID.randomUUID()));
+    return FileHelpers.writeDeleteFile(table, out, null, deletes, 
deleteSchema);
+  }
+
+  private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
+      Table table, List<Pair<CharSequence, Long>> deletes) throws IOException {
+    OutputFile out = Files.localOutput(temp.newFile("pos-deletes-" + 
UUID.randomUUID()));
+    return FileHelpers.writeDeleteFile(table, out, null, deletes);
+  }
+
+  private void append(String target, Employee... employees) throws 
NoSuchTableException {
+    List<Employee> input = Arrays.asList(employees);
+    Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class);
+    inputDF.coalesce(1).writeTo(target).append();
+  }
+
+  private Collection<DataFile> dataFiles(Table table) {
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      return ImmutableList.copyOf(Iterables.transform(tasks, 
ContentScanTask::file));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  public static class CustomFileIO implements FileIO {
+
+    public CustomFileIO() {}
+
+    @Override
+    public InputFile newInputFile(String path) {
+      return INPUT_FILES.computeIfAbsent(path, key -> new 
CustomInputFile(path));
+    }
+
+    @Override
+    public OutputFile newOutputFile(String path) {
+      return Files.localOutput(path);
+    }
+
+    @Override
+    public void deleteFile(String path) {
+      File file = new File(path);
+      if (!file.delete()) {
+        throw new RuntimeIOException("Failed to delete file: " + path);
+      }
+    }
+  }
+
+  public static class CustomInputFile implements InputFile {
+    private final InputFile delegate;
+    private final AtomicInteger streamCount;
+
+    public CustomInputFile(String path) {
+      this.delegate = Files.localInput(path);
+      this.streamCount = new AtomicInteger();
+    }
+
+    @Override
+    public long getLength() {
+      return delegate.getLength();
+    }
+
+    @Override
+    public SeekableInputStream newStream() {
+      streamCount.incrementAndGet();
+      return delegate.newStream();
+    }
+
+    public int streamCount() {
+      return streamCount.get();
+    }
+
+    @Override
+    public String location() {
+      return delegate.location();
+    }
+
+    @Override
+    public boolean exists() {
+      return delegate.exists();
+    }
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/JavaUtils.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/JavaUtils.java
new file mode 100644
index 0000000000..2fa1289c85
--- /dev/null
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/JavaUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+// copied from internal JavaUtils in Spark, not accessible in 3.4
+class JavaUtils {
+
+  private static final Map<String, TimeUnit> TIME_SUFFIXES =
+      ImmutableMap.<String, TimeUnit>builder()
+          .put("us", TimeUnit.MICROSECONDS)
+          .put("ms", TimeUnit.MILLISECONDS)
+          .put("s", TimeUnit.SECONDS)
+          .put("m", TimeUnit.MINUTES)
+          .put("min", TimeUnit.MINUTES)
+          .put("h", TimeUnit.HOURS)
+          .put("d", TimeUnit.DAYS)
+          .build();
+
+  private JavaUtils() {}
+
+  public static long timeStringAsSec(String str) {
+    return timeStringAs(str, TimeUnit.SECONDS);
+  }
+
+  public static long timeStringAs(String str, TimeUnit unit) {
+    String lower = str.toLowerCase(Locale.ROOT).trim();
+
+    try {
+      Matcher matcher = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
+      if (!matcher.matches()) {
+        throw new NumberFormatException("Failed to parse time string: " + str);
+      }
+
+      long val = Long.parseLong(matcher.group(1));
+      String suffix = matcher.group(2);
+
+      // Check for invalid suffixes
+      if (suffix != null && !TIME_SUFFIXES.containsKey(suffix)) {
+        throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
+      }
+
+      // If suffix is valid use that, otherwise none was provided and use the 
default passed
+      return unit.convert(val, suffix != null ? TIME_SUFFIXES.get(suffix) : 
unit);
+    } catch (NumberFormatException e) {
+      String timeError =
+          "Time must be specified as seconds (s), "
+              + "milliseconds (ms), microseconds (us), minutes (m or min), 
hour (h), or day (d). "
+              + "E.g. 50s, 100ms, or 250us.";
+
+      throw new NumberFormatException(timeError + "\n" + e.getMessage());
+    }
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
index eaa301524c..d666c9c35b 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
@@ -18,15 +18,18 @@
  */
 package org.apache.iceberg.spark;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.function.Function;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.spark.sql.RuntimeConfig;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
 
 class SparkConfParser {
 
@@ -34,6 +37,12 @@ class SparkConfParser {
   private final RuntimeConfig sessionConf;
   private final Map<String, String> options;
 
+  SparkConfParser() {
+    this.properties = ImmutableMap.of();
+    this.sessionConf = new RuntimeConfig(SQLConf.get());
+    this.options = ImmutableMap.of();
+  }
+
   SparkConfParser(SparkSession spark, Table table, Map<String, String> 
options) {
     this.properties = table.properties();
     this.sessionConf = spark.conf();
@@ -56,6 +65,10 @@ class SparkConfParser {
     return new StringConfParser();
   }
 
+  public DurationConfParser durationConf() {
+    return new DurationConfParser();
+  }
+
   class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
     private Boolean defaultValue;
     private boolean negate = false;
@@ -156,6 +169,33 @@ class SparkConfParser {
     }
   }
 
+  class DurationConfParser extends ConfParser<DurationConfParser, Duration> {
+    private Duration defaultValue;
+
+    @Override
+    protected DurationConfParser self() {
+      return this;
+    }
+
+    public DurationConfParser defaultValue(Duration value) {
+      this.defaultValue = value;
+      return self();
+    }
+
+    public Duration parse() {
+      Preconditions.checkArgument(defaultValue != null, "Default value cannot 
be null");
+      return parse(this::toDuration, defaultValue);
+    }
+
+    public Duration parseOptional() {
+      return parse(this::toDuration, defaultValue);
+    }
+
+    private Duration toDuration(String time) {
+      return Duration.ofSeconds(JavaUtils.timeStringAsSec(time));
+    }
+  }
+
   abstract class ConfParser<ThisT, T> {
     private final List<String> optionNames = Lists.newArrayList();
     private String sessionConfName;
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
new file mode 100644
index 0000000000..6490d6678b
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An executor cache for reducing the computation and IO overhead in tasks.
+ *
+ * <p>The cache is configured and controlled through Spark SQL properties. It 
supports both limits
+ * on the total cache size and maximum size for individual entries. 
Additionally, it implements
+ * automatic eviction of entries after a specified duration of inactivity. The 
cache will respect
+ * the SQL configuration valid at the time of initialization. All subsequent 
changes to the
+ * configuration will have no effect.
+ *
+ * <p>The cache is accessed and populated via {@link #getOrLoad(String, 
String, Supplier, long)}. If
+ * the value is not present in the cache, it is computed using the provided 
supplier and stored in
+ * the cache, subject to the defined size constraints. When a key is added, it 
must be associated
+ * with a particular group ID. Once the group is no longer needed, it is 
recommended to explicitly
+ * invalidate its state by calling {@link #invalidate(String)} instead of 
relying on automatic
+ * eviction.
+ *
+ * <p>Note that this class employs the singleton pattern to ensure only one 
cache exists per JVM.
+ */
+public class SparkExecutorCache {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkExecutorCache.class);
+
+  private static volatile SparkExecutorCache instance = null;
+
+  private final Duration timeout;
+  private final long maxEntrySize;
+  private final long maxTotalSize;
+  private volatile Cache<String, CacheValue> state;
+
+  private SparkExecutorCache(Conf conf) {
+    this.timeout = conf.timeout();
+    this.maxEntrySize = conf.maxEntrySize();
+    this.maxTotalSize = conf.maxTotalSize();
+  }
+
+  /**
+   * Returns the cache if created or creates and returns it.
+   *
+   * <p>Note this method returns null if caching is disabled.
+   */
+  public static SparkExecutorCache getOrCreate() {
+    if (instance == null) {
+      Conf conf = new Conf();
+      if (conf.cacheEnabled()) {
+        synchronized (SparkExecutorCache.class) {
+          if (instance == null) {
+            SparkExecutorCache.instance = new SparkExecutorCache(conf);
+          }
+        }
+      }
+    }
+
+    return instance;
+  }
+
+  /** Returns the cache if already created or null otherwise. */
+  public static SparkExecutorCache get() {
+    return instance;
+  }
+
+  /** Returns the max entry size in bytes that will be considered for caching. 
*/
+  public long maxEntrySize() {
+    return maxEntrySize;
+  }
+
+  /**
+   * Gets the cached value for the key or populates the cache with a new 
mapping.
+   *
+   * @param group a group ID
+   * @param key a cache key
+   * @param valueSupplier a supplier to compute the value
+   * @param valueSize an estimated memory size of the value in bytes
+   * @return the cached or computed value
+   */
+  public <V> V getOrLoad(String group, String key, Supplier<V> valueSupplier, 
long valueSize) {
+    if (valueSize > maxEntrySize) {
+      LOG.debug("{} exceeds max entry size: {} > {}", key, valueSize, 
maxEntrySize);
+      return valueSupplier.get();
+    }
+
+    String internalKey = group + "_" + key;
+    CacheValue value = state().get(internalKey, loadFunc(valueSupplier, 
valueSize));
+    Preconditions.checkNotNull(value, "Loaded value must not be null");
+    return value.get();
+  }
+
+  private <V> Function<String, CacheValue> loadFunc(Supplier<V> valueSupplier, 
long valueSize) {
+    return key -> {
+      long start = System.currentTimeMillis();
+      V value = valueSupplier.get();
+      long end = System.currentTimeMillis();
+      LOG.debug("Loaded {} with size {} in {} ms", key, valueSize, (end - 
start));
+      return new CacheValue(value, valueSize);
+    };
+  }
+
+  /**
+   * Invalidates all keys associated with the given group ID.
+   *
+   * @param group a group ID
+   */
+  public void invalidate(String group) {
+    if (state != null) {
+      List<String> internalKeys = findInternalKeys(group);
+      LOG.info("Invalidating {} keys associated with {}", internalKeys.size(), 
group);
+      internalKeys.forEach(internalKey -> state.invalidate(internalKey));
+      LOG.info("Current cache stats {}", state.stats());
+    }
+  }
+
+  private List<String> findInternalKeys(String group) {
+    return state.asMap().keySet().stream()
+        .filter(internalKey -> internalKey.startsWith(group))
+        .collect(Collectors.toList());
+  }
+
+  private Cache<String, CacheValue> state() {
+    if (state == null) {
+      synchronized (this) {
+        if (state == null) {
+          LOG.info("Initializing cache state");
+          this.state = initState();
+        }
+      }
+    }
+
+    return state;
+  }
+
+  private Cache<String, CacheValue> initState() {
+    return Caffeine.newBuilder()
+        .expireAfterAccess(timeout)
+        .maximumWeight(maxTotalSize)
+        .weigher((key, value) -> ((CacheValue) value).weight())
+        .recordStats()
+        .removalListener((key, value, cause) -> LOG.debug("Evicted {} ({})", 
key, cause))
+        .build();
+  }
+
+  @VisibleForTesting
+  static class CacheValue {
+    private final Object value;
+    private final long size;
+
+    CacheValue(Object value, long size) {
+      this.value = value;
+      this.size = size;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <V> V get() {
+      return (V) value;
+    }
+
+    public int weight() {
+      return (int) Math.min(size, Integer.MAX_VALUE);
+    }
+  }
+
+  @VisibleForTesting
+  static class Conf {
+    private final SparkConfParser confParser = new SparkConfParser();
+
+    public boolean cacheEnabled() {
+      return confParser
+          .booleanConf()
+          .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_ENABLED)
+          .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_ENABLED_DEFAULT)
+          .parse();
+    }
+
+    public Duration timeout() {
+      return confParser
+          .durationConf()
+          .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT)
+          .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT_DEFAULT)
+          .parse();
+    }
+
+    public long maxEntrySize() {
+      return confParser
+          .longConf()
+          .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE)
+          
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT)
+          .parse();
+    }
+
+    public long maxTotalSize() {
+      return confParser
+          .longConf()
+          .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE)
+          
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT)
+          .parse();
+    }
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 2176650191..6b2d2e79fb 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.spark;
 
+import java.time.Duration;
+
 public class SparkSQLProperties {
 
   private SparkSQLProperties() {}
@@ -77,4 +79,18 @@ public class SparkSQLProperties {
 
   // Controls whether to report locality information to Spark while allocating 
input partitions
   public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";
+
+  public static final String EXECUTOR_CACHE_ENABLED = 
"spark.sql.iceberg.executor-cache.enabled";
+  public static final boolean EXECUTOR_CACHE_ENABLED_DEFAULT = true;
+
+  public static final String EXECUTOR_CACHE_TIMEOUT = 
"spark.sql.iceberg.executor-cache.timeout";
+  public static final Duration EXECUTOR_CACHE_TIMEOUT_DEFAULT = 
Duration.ofMinutes(10);
+
+  public static final String EXECUTOR_CACHE_MAX_ENTRY_SIZE =
+      "spark.sql.iceberg.executor-cache.max-entry-size";
+  public static final long EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT = 64 * 1024 * 
1024; // 64 MB
+
+  public static final String EXECUTOR_CACHE_MAX_TOTAL_SIZE =
+      "spark.sql.iceberg.executor-cache.max-total-size";
+  public static final long EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT = 128 * 1024 
* 1024; // 128 MB
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 4fb838202c..c2b3e7c2dc 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.avro.generic.GenericData;
@@ -40,7 +42,9 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.BaseDeleteLoader;
 import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.DeleteLoader;
 import org.apache.iceberg.deletes.DeleteCounter;
 import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.encryption.EncryptedInputFile;
@@ -50,6 +54,7 @@ import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExecutorCache;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types.NestedField;
@@ -279,5 +284,29 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
         counter().increment();
       }
     }
+
+    @Override
+    protected DeleteLoader newDeleteLoader() {
+      return new CachingDeleteLoader(this::loadInputFile);
+    }
+
+    private class CachingDeleteLoader extends BaseDeleteLoader {
+      private final SparkExecutorCache cache;
+
+      CachingDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+        super(loadInputFile);
+        this.cache = SparkExecutorCache.getOrCreate();
+      }
+
+      @Override
+      protected boolean canCache(long size) {
+        return cache != null && size < cache.maxEntrySize();
+      }
+
+      @Override
+      protected <V> V getOrLoad(String key, Supplier<V> valueSupplier, long 
valueSize) {
+        return cache.getOrLoad(table().name(), key, valueSupplier, valueSize);
+      }
+    }
   }
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
index 65df29051c..f6913fb9d0 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
 import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkExecutorCache;
 import org.apache.spark.util.KnownSizeEstimation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,6 +68,7 @@ public class SerializableTableWithSize extends 
SerializableTable
       LOG.info("Releasing resources");
       io().close();
     }
+    invalidateCache(name());
   }
 
   public static class SerializableMetadataTableWithSize extends 
SerializableMetadataTable
@@ -93,6 +95,14 @@ public class SerializableTableWithSize extends 
SerializableTable
         LOG.info("Releasing resources");
         io().close();
       }
+      invalidateCache(name());
+    }
+  }
+
+  private static void invalidateCache(String name) {
+    SparkExecutorCache cache = SparkExecutorCache.get();
+    if (cache != null) {
+      cache.invalidate(name);
     }
   }
 }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/Employee.java 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/Employee.java
new file mode 100644
index 0000000000..9c57936d98
--- /dev/null
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/Employee.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Objects;
+
+public class Employee {
+  private Integer id;
+  private String dep;
+
+  public Employee() {}
+
+  public Employee(Integer id, String dep) {
+    this.id = id;
+    this.dep = dep;
+  }
+
+  public Integer getId() {
+    return id;
+  }
+
+  public void setId(Integer id) {
+    this.id = id;
+  }
+
+  public String getDep() {
+    return dep;
+  }
+
+  public void setDep(String dep) {
+    this.dep = dep;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    } else if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    Employee employee = (Employee) other;
+    return Objects.equals(id, employee.id) && Objects.equals(dep, 
employee.dep);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, dep);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
new file mode 100644
index 0000000000..35dfb55d5b
--- /dev/null
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkExecutorCache.CacheValue;
+import org.apache.iceberg.spark.SparkExecutorCache.Conf;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.storage.memory.MemoryStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestSparkExecutorCache extends SparkTestBaseWithCatalog {
+
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        "testhive",
+        SparkCatalog.class.getName(),
+        ImmutableMap.of(
+            "type",
+            "hive",
+            CatalogProperties.FILE_IO_IMPL,
+            CustomFileIO.class.getName(),
+            "default-namespace",
+            "default")
+      },
+    };
+  }
+
+  private static final String UPDATES_VIEW_NAME = "updates";
+  private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
+  private static final Map<String, CustomInputFile> INPUT_FILES =
+      Collections.synchronizedMap(Maps.newHashMap());
+
+  private String targetTableName;
+  private TableIdentifier targetTableIdent;
+
+  public TestSparkExecutorCache(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  public void configureTargetTableName() {
+    String name = "target_exec_cache_" + JOB_COUNTER.incrementAndGet();
+    this.targetTableName = tableName(name);
+    this.targetTableIdent = TableIdentifier.of(Namespace.of("default"), name);
+  }
+
+  @After
+  public void releaseResources() {
+    sql("DROP TABLE IF EXISTS %s", targetTableName);
+    sql("DROP TABLE IF EXISTS %s", UPDATES_VIEW_NAME);
+    INPUT_FILES.clear();
+  }
+
+  @Test
+  public void testCacheValueWeightOverflow() {
+    CacheValue cacheValue = new CacheValue("v", Integer.MAX_VALUE + 1L);
+    assertThat(cacheValue.weight()).isEqualTo(Integer.MAX_VALUE);
+  }
+
+  @Test
+  public void testCacheEnabledConfig() {
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.cacheEnabled()).isTrue();
+        });
+
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "false"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.cacheEnabled()).isFalse();
+        });
+  }
+
+  @Test
+  public void testTimeoutConfig() {
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT, "10s"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.timeout()).hasSeconds(10);
+        });
+
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT, "2m"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.timeout()).hasMinutes(2);
+        });
+  }
+
+  @Test
+  public void testMaxEntrySizeConfig() {
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE, 
"128"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.maxEntrySize()).isEqualTo(128L);
+        });
+  }
+
+  @Test
+  public void testMaxTotalSizeConfig() {
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE, 
"512"),
+        () -> {
+          Conf conf = new Conf();
+          assertThat(conf.maxTotalSize()).isEqualTo(512L);
+        });
+  }
+
+  @Test
+  public void testConcurrentAccess() throws InterruptedException {
+    SparkExecutorCache cache = SparkExecutorCache.getOrCreate();
+
+    String table1 = "table1";
+    String table2 = "table2";
+
+    Set<String> loadedInternalKeys = Sets.newHashSet();
+
+    String key1 = "key1";
+    String key2 = "key2";
+
+    long valueSize = 100L;
+
+    int threadCount = 10;
+    ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount);
+
+    for (int threadNumber = 0; threadNumber < threadCount; threadNumber++) {
+      String group = threadNumber % 2 == 0 ? table1 : table2;
+      executorService.submit(
+          () -> {
+            for (int batch = 0; batch < 3; batch++) {
+              cache.getOrLoad(
+                  group,
+                  key1,
+                  () -> {
+                    String internalKey = toInternalKey(group, key1);
+                    synchronized (loadedInternalKeys) {
+                      // verify only one load was done for this key
+                      
assertThat(loadedInternalKeys.contains(internalKey)).isFalse();
+                      loadedInternalKeys.add(internalKey);
+                    }
+                    return "value1";
+                  },
+                  valueSize);
+
+              cache.getOrLoad(
+                  group,
+                  key2,
+                  () -> {
+                    String internalKey = toInternalKey(group, key2);
+                    synchronized (loadedInternalKeys) {
+                      // verify only one load was done for this key
+                      
assertThat(loadedInternalKeys.contains(internalKey)).isFalse();
+                      loadedInternalKeys.add(internalKey);
+                    }
+                    return "value2";
+                  },
+                  valueSize);
+            }
+          });
+    }
+
+    executorService.shutdown();
+    assertThat(executorService.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+
+    cache.invalidate(table1);
+    cache.invalidate(table2);
+
+    // all keys must be invalidated
+    Cache<String, ?> state = fetchInternalCacheState();
+    Set<String> liveKeys = state.asMap().keySet();
+    assertThat(liveKeys).noneMatch(key -> key.startsWith(table1) || 
key.startsWith(table2));
+  }
+
+  @Test
+  public void testCopyOnWriteDelete() throws Exception {
+    checkDelete(COPY_ON_WRITE);
+  }
+
+  @Test
+  public void testMergeOnReadDelete() throws Exception {
+    checkDelete(MERGE_ON_READ);
+  }
+
+  private void checkDelete(RowLevelOperationMode mode) throws Exception {
+    List<DeleteFile> deleteFiles = 
createAndInitTable(TableProperties.DELETE_MODE, mode);
+
+    sql("DELETE FROM %s WHERE id = 1 OR id = 4", targetTableName);
+
+    // there are 2 data files and 2 delete files that apply to both of them
+    // in CoW, the target table will be scanned 2 times (main query + runtime 
filter)
+    // the runtime filter may invalidate the cache so check at least some 
requests were hits
+    // in MoR, the target table will be scanned only once
+    // so each delete file must be opened once
+    int maxRequestCount = mode == COPY_ON_WRITE ? 3 : 1;
+    assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <= 
maxRequestCount);
+
+    // verify the final set of records is correct
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+  }
+
+  private int streamCount(DeleteFile deleteFile) {
+    CustomInputFile inputFile = INPUT_FILES.get(deleteFile.path().toString());
+    return inputFile.streamCount();
+  }
+
+  private List<DeleteFile> createAndInitTable(String operation, 
RowLevelOperationMode mode)
+      throws Exception {
+    sql(
+        "CREATE TABLE %s (id INT, dep STRING) "
+            + "USING iceberg "
+            + "TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')",
+        targetTableName,
+        TableProperties.WRITE_METADATA_LOCATION,
+        temp.toString().replaceFirst("file:", ""),
+        TableProperties.WRITE_DATA_LOCATION,
+        temp.toString().replaceFirst("file:", ""),
+        operation,
+        mode.modeName());
+
+    append(targetTableName, new Employee(0, "hr"), new Employee(1, "hr"), new 
Employee(2, "hr"));
+    append(targetTableName, new Employee(3, "hr"), new Employee(4, "hr"), new 
Employee(5, "hr"));
+
+    Table table = validationCatalog.loadTable(targetTableIdent);
+
+    List<Pair<CharSequence, Long>> posDeletes =
+        dataFiles(table).stream()
+            .map(dataFile -> Pair.of(dataFile.path(), 0L))
+            .collect(Collectors.toList());
+    Pair<DeleteFile, CharSequenceSet> posDeleteResult = writePosDeletes(table, 
posDeletes);
+    DeleteFile posDeleteFile = posDeleteResult.first();
+    CharSequenceSet referencedDataFiles = posDeleteResult.second();
+
+    DeleteFile eqDeleteFile = writeEqDeletes(table, "id", 2, 5);
+
+    table
+        .newRowDelta()
+        .validateFromSnapshot(table.currentSnapshot().snapshotId())
+        .validateDataFilesExist(referencedDataFiles)
+        .addDeletes(posDeleteFile)
+        .addDeletes(eqDeleteFile)
+        .commit();
+
+    sql("REFRESH TABLE %s", targetTableName);
+
+    // invalidate the memory store to destroy all currently live table 
broadcasts
+    SparkEnv sparkEnv = SparkEnv.get();
+    MemoryStore memoryStore = sparkEnv.blockManager().memoryStore();
+    memoryStore.clear();
+
+    return ImmutableList.of(posDeleteFile, eqDeleteFile);
+  }
+
+  private DeleteFile writeEqDeletes(Table table, String col, Object... values) 
throws IOException {
+    Schema deleteSchema = table.schema().select(col);
+
+    Record delete = GenericRecord.create(deleteSchema);
+    List<Record> deletes = Lists.newArrayList();
+    for (Object value : values) {
+      deletes.add(delete.copy(col, value));
+    }
+
+    OutputFile out = Files.localOutput(temp.newFile("eq-deletes-" + 
UUID.randomUUID()));
+    return FileHelpers.writeDeleteFile(table, out, null, deletes, 
deleteSchema);
+  }
+
+  private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
+      Table table, List<Pair<CharSequence, Long>> deletes) throws IOException {
+    OutputFile out = Files.localOutput(temp.newFile("pos-deletes-" + 
UUID.randomUUID()));
+    return FileHelpers.writeDeleteFile(table, out, null, deletes);
+  }
+
+  private void append(String target, Employee... employees) throws 
NoSuchTableException {
+    List<Employee> input = Arrays.asList(employees);
+    Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class);
+    inputDF.coalesce(1).writeTo(target).append();
+  }
+
+  private Collection<DataFile> dataFiles(Table table) {
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      return ImmutableList.copyOf(Iterables.transform(tasks, 
ContentScanTask::file));
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Cache<String, ?> fetchInternalCacheState() {
+    try {
+      Field stateField = SparkExecutorCache.class.getDeclaredField("state");
+      stateField.setAccessible(true);
+      SparkExecutorCache cache = SparkExecutorCache.get();
+      return (Cache<String, ?>) stateField.get(cache);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static String toInternalKey(String group, String key) {
+    return group + "_" + key;
+  }
+
+  public static class CustomFileIO implements FileIO {
+
+    public CustomFileIO() {}
+
+    @Override
+    public InputFile newInputFile(String path) {
+      return INPUT_FILES.computeIfAbsent(path, key -> new 
CustomInputFile(path));
+    }
+
+    @Override
+    public OutputFile newOutputFile(String path) {
+      return Files.localOutput(path);
+    }
+
+    @Override
+    public void deleteFile(String path) {
+      File file = new File(path);
+      if (!file.delete()) {
+        throw new RuntimeIOException("Failed to delete file: " + path);
+      }
+    }
+  }
+
+  public static class CustomInputFile implements InputFile {
+    private final InputFile delegate;
+    private final AtomicInteger streamCount;
+
+    public CustomInputFile(String path) {
+      this.delegate = Files.localInput(path);
+      this.streamCount = new AtomicInteger();
+    }
+
+    @Override
+    public long getLength() {
+      return delegate.getLength();
+    }
+
+    @Override
+    public SeekableInputStream newStream() {
+      streamCount.incrementAndGet();
+      return delegate.newStream();
+    }
+
+    public int streamCount() {
+      return streamCount.get();
+    }
+
+    @Override
+    public String location() {
+      return delegate.location();
+    }
+
+    @Override
+    public boolean exists() {
+      return delegate.exists();
+    }
+  }
+}
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index 63d037bd73..00faecf9a8 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -47,6 +47,7 @@ import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPD
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.DistributionMode;
@@ -77,6 +78,28 @@ public class TestSparkWriteConf extends 
SparkTestBaseWithCatalog {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @Test
+  public void testDurationConf() {
+    Table table = validationCatalog.loadTable(tableIdent);
+    String confName = "spark.sql.iceberg.some-duration-conf";
+
+    withSQLConf(
+        ImmutableMap.of(confName, "10s"),
+        () -> {
+          SparkConfParser parser = new SparkConfParser(spark, table, 
ImmutableMap.of());
+          Duration duration = 
parser.durationConf().sessionConf(confName).parseOptional();
+          assertThat(duration).hasSeconds(10);
+        });
+
+    withSQLConf(
+        ImmutableMap.of(confName, "2m"),
+        () -> {
+          SparkConfParser parser = new SparkConfParser(spark, table, 
ImmutableMap.of());
+          Duration duration = 
parser.durationConf().sessionConf(confName).parseOptional();
+          assertThat(duration).hasMinutes(2);
+        });
+  }
+
   @Test
   public void testDeleteGranularityDefault() {
     Table table = validationCatalog.loadTable(tableIdent);

Reply via email to