This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 65a076deda Spark 3.4: Read deletes in parallel and cache them on
executors (#9603)
65a076deda is described below
commit 65a076dedae2e62008ce64c6094dd519d29ce500
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Feb 2 15:24:55 2024 -0800
Spark 3.4: Read deletes in parallel and cache them on executors (#9603)
This change backports PR #8755 and PR #9583 to Spark 3.4.
---
spark/v3.4/build.gradle | 3 +
.../spark/extensions/TestSparkExecutorCache.java | 366 +++++++++++++++++
.../java/org/apache/iceberg/spark/JavaUtils.java | 76 ++++
.../org/apache/iceberg/spark/SparkConfParser.java | 40 ++
.../apache/iceberg/spark/SparkExecutorCache.java | 228 +++++++++++
.../apache/iceberg/spark/SparkSQLProperties.java | 16 +
.../apache/iceberg/spark/source/BaseReader.java | 29 ++
.../spark/source/SerializableTableWithSize.java | 10 +
.../java/org/apache/iceberg/spark/Employee.java | 66 +++
.../iceberg/spark/TestSparkExecutorCache.java | 444 +++++++++++++++++++++
.../apache/iceberg/spark/TestSparkWriteConf.java | 23 ++
11 files changed, 1301 insertions(+)
diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle
index 0b1c4df75a..7d81137a6b 100644
--- a/spark/v3.4/build.gradle
+++ b/spark/v3.4/build.gradle
@@ -89,6 +89,8 @@
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
+ implementation libs.caffeine
+
testImplementation(libs.hadoop2.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
@@ -157,6 +159,7 @@
project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation project(path: ':iceberg-data')
testImplementation project(path: ':iceberg-parquet')
testImplementation project(path: ':iceberg-hive-metastore')
+ testImplementation project(path: ':iceberg-data', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-hive-metastore', configuration:
'testArtifacts')
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java
new file mode 100644
index 0000000000..3d995cc4f0
--- /dev/null
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSparkExecutorCache.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.storage.memory.MemoryStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestSparkExecutorCache extends SparkExtensionsTestBase {
+
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {
+ "testhive",
+ SparkCatalog.class.getName(),
+ ImmutableMap.of(
+ "type",
+ "hive",
+ CatalogProperties.FILE_IO_IMPL,
+ CustomFileIO.class.getName(),
+ "default-namespace",
+ "default")
+ },
+ };
+ }
+
+ private static final String UPDATES_VIEW_NAME = "updates";
+ private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
+ private static final Map<String, CustomInputFile> INPUT_FILES =
+ Collections.synchronizedMap(Maps.newHashMap());
+
+ private String targetTableName;
+ private TableIdentifier targetTableIdent;
+
+ public TestSparkExecutorCache(
+ String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @Before
+ public void configureTargetTableName() {
+ String name = "target_exec_cache_" + JOB_COUNTER.incrementAndGet();
+ this.targetTableName = tableName(name);
+ this.targetTableIdent = TableIdentifier.of(Namespace.of("default"), name);
+ }
+
+ @After
+ public void releaseResources() {
+ sql("DROP TABLE IF EXISTS %s", targetTableName);
+ sql("DROP TABLE IF EXISTS %s", UPDATES_VIEW_NAME);
+ INPUT_FILES.clear();
+ }
+
+ @Test
+ public void testCopyOnWriteDelete() throws Exception {
+ checkDelete(COPY_ON_WRITE);
+ }
+
+ @Test
+ public void testMergeOnReadDelete() throws Exception {
+ checkDelete(MERGE_ON_READ);
+ }
+
+ private void checkDelete(RowLevelOperationMode mode) throws Exception {
+ List<DeleteFile> deleteFiles =
createAndInitTable(TableProperties.DELETE_MODE, mode);
+
+ sql("DELETE FROM %s WHERE id = 1 OR id = 4", targetTableName);
+
+ // there are 2 data files and 2 delete files that apply to both of them
+ // in CoW, the target table will be scanned 2 times (main query + runtime
filter)
+ // the runtime filter may invalidate the cache so check at least some
requests were hits
+ // in MoR, the target table will be scanned only once
+ // so each delete file must be opened once
+ int maxRequestCount = mode == COPY_ON_WRITE ? 3 : 1;
+ assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <=
maxRequestCount);
+
+ // verify the final set of records is correct
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+ }
+
+ @Test
+ public void testCopyOnWriteUpdate() throws Exception {
+ checkUpdate(COPY_ON_WRITE);
+ }
+
+ @Test
+ public void testMergeOnReadUpdate() throws Exception {
+ checkUpdate(MERGE_ON_READ);
+ }
+
+ private void checkUpdate(RowLevelOperationMode mode) throws Exception {
+ List<DeleteFile> deleteFiles =
createAndInitTable(TableProperties.UPDATE_MODE, mode);
+
+ Dataset<Integer> updateDS = spark.createDataset(ImmutableList.of(1, 4),
Encoders.INT());
+ updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
+
+ sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM %s)",
targetTableName, UPDATES_VIEW_NAME);
+
+ // there are 2 data files and 2 delete files that apply to both of them
+ // in CoW, the target table will be scanned 3 times (2 in main query +
runtime filter)
+ // the runtime filter may invalidate the cache so check at least some
requests were hits
+ // in MoR, the target table will be scanned only once
+ // so each delete file must be opened once
+ int maxRequestCount = mode == COPY_ON_WRITE ? 5 : 1;
+ assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <=
maxRequestCount);
+
+ // verify the final set of records is correct
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(-1, "hr"), row(-1, "hr")),
+ sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+ }
+
+ @Test
+ public void testCopyOnWriteMerge() throws Exception {
+ checkMerge(COPY_ON_WRITE);
+ }
+
+ @Test
+ public void testMergeOnReadMerge() throws Exception {
+ checkMerge(MERGE_ON_READ);
+ }
+
+ private void checkMerge(RowLevelOperationMode mode) throws Exception {
+ List<DeleteFile> deleteFiles =
createAndInitTable(TableProperties.MERGE_MODE, mode);
+
+ Dataset<Integer> updateDS = spark.createDataset(ImmutableList.of(1, 4),
Encoders.INT());
+ updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
+
+ sql(
+ "MERGE INTO %s t USING %s s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id = 100 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, dep) VALUES (-1, 'unknown')",
+ targetTableName, UPDATES_VIEW_NAME);
+
+ // there are 2 data files and 2 delete files that apply to both of them
+ // in CoW, the target table will be scanned 2 times (main query + runtime
filter)
+ // the runtime filter may invalidate the cache so check at least some
requests were hits
+ // in MoR, the target table will be scanned only once
+ // so each delete file must be opened once
+ int maxRequestCount = mode == COPY_ON_WRITE ? 3 : 1;
+ assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <=
maxRequestCount);
+
+ // verify the final set of records is correct
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(100, "hr"), row(100, "hr")),
+ sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+ }
+
+ private int streamCount(DeleteFile deleteFile) {
+ CustomInputFile inputFile = INPUT_FILES.get(deleteFile.path().toString());
+ return inputFile.streamCount();
+ }
+
+ private List<DeleteFile> createAndInitTable(String operation,
RowLevelOperationMode mode)
+ throws Exception {
+ sql(
+ "CREATE TABLE %s (id INT, dep STRING) "
+ + "USING iceberg "
+ + "TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')",
+ targetTableName,
+ TableProperties.WRITE_METADATA_LOCATION,
+ temp.toString().replaceFirst("file:", ""),
+ TableProperties.WRITE_DATA_LOCATION,
+ temp.toString().replaceFirst("file:", ""),
+ operation,
+ mode.modeName());
+
+ append(targetTableName, new Employee(0, "hr"), new Employee(1, "hr"), new
Employee(2, "hr"));
+ append(targetTableName, new Employee(3, "hr"), new Employee(4, "hr"), new
Employee(5, "hr"));
+
+ Table table = validationCatalog.loadTable(targetTableIdent);
+
+ List<Pair<CharSequence, Long>> posDeletes =
+ dataFiles(table).stream()
+ .map(dataFile -> Pair.of(dataFile.path(), 0L))
+ .collect(Collectors.toList());
+ Pair<DeleteFile, CharSequenceSet> posDeleteResult = writePosDeletes(table,
posDeletes);
+ DeleteFile posDeleteFile = posDeleteResult.first();
+ CharSequenceSet referencedDataFiles = posDeleteResult.second();
+
+ DeleteFile eqDeleteFile = writeEqDeletes(table, "id", 2, 5);
+
+ table
+ .newRowDelta()
+ .validateFromSnapshot(table.currentSnapshot().snapshotId())
+ .validateDataFilesExist(referencedDataFiles)
+ .addDeletes(posDeleteFile)
+ .addDeletes(eqDeleteFile)
+ .commit();
+
+ sql("REFRESH TABLE %s", targetTableName);
+
+ // invalidate the memory store to destroy all currently live table
broadcasts
+ SparkEnv sparkEnv = SparkEnv.get();
+ MemoryStore memoryStore = sparkEnv.blockManager().memoryStore();
+ memoryStore.clear();
+
+ return ImmutableList.of(posDeleteFile, eqDeleteFile);
+ }
+
+ private DeleteFile writeEqDeletes(Table table, String col, Object... values)
throws IOException {
+ Schema deleteSchema = table.schema().select(col);
+
+ Record delete = GenericRecord.create(deleteSchema);
+ List<Record> deletes = Lists.newArrayList();
+ for (Object value : values) {
+ deletes.add(delete.copy(col, value));
+ }
+
+ OutputFile out = Files.localOutput(temp.newFile("eq-deletes-" +
UUID.randomUUID()));
+ return FileHelpers.writeDeleteFile(table, out, null, deletes,
deleteSchema);
+ }
+
+ private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
+ Table table, List<Pair<CharSequence, Long>> deletes) throws IOException {
+ OutputFile out = Files.localOutput(temp.newFile("pos-deletes-" +
UUID.randomUUID()));
+ return FileHelpers.writeDeleteFile(table, out, null, deletes);
+ }
+
+ private void append(String target, Employee... employees) throws
NoSuchTableException {
+ List<Employee> input = Arrays.asList(employees);
+ Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class);
+ inputDF.coalesce(1).writeTo(target).append();
+ }
+
+ private Collection<DataFile> dataFiles(Table table) {
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ return ImmutableList.copyOf(Iterables.transform(tasks,
ContentScanTask::file));
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ public static class CustomFileIO implements FileIO {
+
+ public CustomFileIO() {}
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return INPUT_FILES.computeIfAbsent(path, key -> new
CustomInputFile(path));
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return Files.localOutput(path);
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ File file = new File(path);
+ if (!file.delete()) {
+ throw new RuntimeIOException("Failed to delete file: " + path);
+ }
+ }
+ }
+
+ public static class CustomInputFile implements InputFile {
+ private final InputFile delegate;
+ private final AtomicInteger streamCount;
+
+ public CustomInputFile(String path) {
+ this.delegate = Files.localInput(path);
+ this.streamCount = new AtomicInteger();
+ }
+
+ @Override
+ public long getLength() {
+ return delegate.getLength();
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ streamCount.incrementAndGet();
+ return delegate.newStream();
+ }
+
+ public int streamCount() {
+ return streamCount.get();
+ }
+
+ @Override
+ public String location() {
+ return delegate.location();
+ }
+
+ @Override
+ public boolean exists() {
+ return delegate.exists();
+ }
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/JavaUtils.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/JavaUtils.java
new file mode 100644
index 0000000000..2fa1289c85
--- /dev/null
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/JavaUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+// copied from internal JavaUtils in Spark, not accessible in 3.4
+class JavaUtils {
+
+ private static final Map<String, TimeUnit> TIME_SUFFIXES =
+ ImmutableMap.<String, TimeUnit>builder()
+ .put("us", TimeUnit.MICROSECONDS)
+ .put("ms", TimeUnit.MILLISECONDS)
+ .put("s", TimeUnit.SECONDS)
+ .put("m", TimeUnit.MINUTES)
+ .put("min", TimeUnit.MINUTES)
+ .put("h", TimeUnit.HOURS)
+ .put("d", TimeUnit.DAYS)
+ .build();
+
+ private JavaUtils() {}
+
+ public static long timeStringAsSec(String str) {
+ return timeStringAs(str, TimeUnit.SECONDS);
+ }
+
+ public static long timeStringAs(String str, TimeUnit unit) {
+ String lower = str.toLowerCase(Locale.ROOT).trim();
+
+ try {
+ Matcher matcher = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
+ if (!matcher.matches()) {
+ throw new NumberFormatException("Failed to parse time string: " + str);
+ }
+
+ long val = Long.parseLong(matcher.group(1));
+ String suffix = matcher.group(2);
+
+ // Check for invalid suffixes
+ if (suffix != null && !TIME_SUFFIXES.containsKey(suffix)) {
+ throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
+ }
+
+ // If suffix is valid use that, otherwise none was provided and use the
default passed
+ return unit.convert(val, suffix != null ? TIME_SUFFIXES.get(suffix) :
unit);
+ } catch (NumberFormatException e) {
+ String timeError =
+ "Time must be specified as seconds (s), "
+ + "milliseconds (ms), microseconds (us), minutes (m or min),
hour (h), or day (d). "
+ + "E.g. 50s, 100ms, or 250us.";
+
+ throw new NumberFormatException(timeError + "\n" + e.getMessage());
+ }
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
index eaa301524c..d666c9c35b 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
@@ -18,15 +18,18 @@
*/
package org.apache.iceberg.spark;
+import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
class SparkConfParser {
@@ -34,6 +37,12 @@ class SparkConfParser {
private final RuntimeConfig sessionConf;
private final Map<String, String> options;
+ SparkConfParser() {
+ this.properties = ImmutableMap.of();
+ this.sessionConf = new RuntimeConfig(SQLConf.get());
+ this.options = ImmutableMap.of();
+ }
+
SparkConfParser(SparkSession spark, Table table, Map<String, String>
options) {
this.properties = table.properties();
this.sessionConf = spark.conf();
@@ -56,6 +65,10 @@ class SparkConfParser {
return new StringConfParser();
}
+ public DurationConfParser durationConf() {
+ return new DurationConfParser();
+ }
+
class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
private Boolean defaultValue;
private boolean negate = false;
@@ -156,6 +169,33 @@ class SparkConfParser {
}
}
+ class DurationConfParser extends ConfParser<DurationConfParser, Duration> {
+ private Duration defaultValue;
+
+ @Override
+ protected DurationConfParser self() {
+ return this;
+ }
+
+ public DurationConfParser defaultValue(Duration value) {
+ this.defaultValue = value;
+ return self();
+ }
+
+ public Duration parse() {
+ Preconditions.checkArgument(defaultValue != null, "Default value cannot
be null");
+ return parse(this::toDuration, defaultValue);
+ }
+
+ public Duration parseOptional() {
+ return parse(this::toDuration, defaultValue);
+ }
+
+ private Duration toDuration(String time) {
+ return Duration.ofSeconds(JavaUtils.timeStringAsSec(time));
+ }
+ }
+
abstract class ConfParser<ThisT, T> {
private final List<String> optionNames = Lists.newArrayList();
private String sessionConfName;
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
new file mode 100644
index 0000000000..6490d6678b
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An executor cache for reducing the computation and IO overhead in tasks.
+ *
+ * <p>The cache is configured and controlled through Spark SQL properties. It
supports both limits
+ * on the total cache size and maximum size for individual entries.
Additionally, it implements
+ * automatic eviction of entries after a specified duration of inactivity. The
cache will respect
+ * the SQL configuration valid at the time of initialization. All subsequent
changes to the
+ * configuration will have no effect.
+ *
+ * <p>The cache is accessed and populated via {@link #getOrLoad(String,
String, Supplier, long)}. If
+ * the value is not present in the cache, it is computed using the provided
supplier and stored in
+ * the cache, subject to the defined size constraints. When a key is added, it
must be associated
+ * with a particular group ID. Once the group is no longer needed, it is
recommended to explicitly
+ * invalidate its state by calling {@link #invalidate(String)} instead of
relying on automatic
+ * eviction.
+ *
+ * <p>Note that this class employs the singleton pattern to ensure only one
cache exists per JVM.
+ */
+public class SparkExecutorCache {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkExecutorCache.class);
+
+ private static volatile SparkExecutorCache instance = null;
+
+ private final Duration timeout;
+ private final long maxEntrySize;
+ private final long maxTotalSize;
+ private volatile Cache<String, CacheValue> state;
+
+ private SparkExecutorCache(Conf conf) {
+ this.timeout = conf.timeout();
+ this.maxEntrySize = conf.maxEntrySize();
+ this.maxTotalSize = conf.maxTotalSize();
+ }
+
+ /**
+ * Returns the cache if created or creates and returns it.
+ *
+ * <p>Note this method returns null if caching is disabled.
+ */
+ public static SparkExecutorCache getOrCreate() {
+ if (instance == null) {
+ Conf conf = new Conf();
+ if (conf.cacheEnabled()) {
+ synchronized (SparkExecutorCache.class) {
+ if (instance == null) {
+ SparkExecutorCache.instance = new SparkExecutorCache(conf);
+ }
+ }
+ }
+ }
+
+ return instance;
+ }
+
+ /** Returns the cache if already created or null otherwise. */
+ public static SparkExecutorCache get() {
+ return instance;
+ }
+
+ /** Returns the max entry size in bytes that will be considered for caching.
*/
+ public long maxEntrySize() {
+ return maxEntrySize;
+ }
+
+ /**
+ * Gets the cached value for the key or populates the cache with a new
mapping.
+ *
+ * @param group a group ID
+ * @param key a cache key
+ * @param valueSupplier a supplier to compute the value
+ * @param valueSize an estimated memory size of the value in bytes
+ * @return the cached or computed value
+ */
+ public <V> V getOrLoad(String group, String key, Supplier<V> valueSupplier,
long valueSize) {
+ if (valueSize > maxEntrySize) {
+ LOG.debug("{} exceeds max entry size: {} > {}", key, valueSize,
maxEntrySize);
+ return valueSupplier.get();
+ }
+
+ String internalKey = group + "_" + key;
+ CacheValue value = state().get(internalKey, loadFunc(valueSupplier,
valueSize));
+ Preconditions.checkNotNull(value, "Loaded value must not be null");
+ return value.get();
+ }
+
+ private <V> Function<String, CacheValue> loadFunc(Supplier<V> valueSupplier,
long valueSize) {
+ return key -> {
+ long start = System.currentTimeMillis();
+ V value = valueSupplier.get();
+ long end = System.currentTimeMillis();
+ LOG.debug("Loaded {} with size {} in {} ms", key, valueSize, (end -
start));
+ return new CacheValue(value, valueSize);
+ };
+ }
+
+ /**
+ * Invalidates all keys associated with the given group ID.
+ *
+ * @param group a group ID
+ */
+ public void invalidate(String group) {
+ if (state != null) {
+ List<String> internalKeys = findInternalKeys(group);
+ LOG.info("Invalidating {} keys associated with {}", internalKeys.size(),
group);
+ internalKeys.forEach(internalKey -> state.invalidate(internalKey));
+ LOG.info("Current cache stats {}", state.stats());
+ }
+ }
+
+ private List<String> findInternalKeys(String group) {
+ return state.asMap().keySet().stream()
+ .filter(internalKey -> internalKey.startsWith(group))
+ .collect(Collectors.toList());
+ }
+
+ private Cache<String, CacheValue> state() {
+ if (state == null) {
+ synchronized (this) {
+ if (state == null) {
+ LOG.info("Initializing cache state");
+ this.state = initState();
+ }
+ }
+ }
+
+ return state;
+ }
+
+ private Cache<String, CacheValue> initState() {
+ return Caffeine.newBuilder()
+ .expireAfterAccess(timeout)
+ .maximumWeight(maxTotalSize)
+ .weigher((key, value) -> ((CacheValue) value).weight())
+ .recordStats()
+ .removalListener((key, value, cause) -> LOG.debug("Evicted {} ({})",
key, cause))
+ .build();
+ }
+
+ @VisibleForTesting
+ static class CacheValue {
+ private final Object value;
+ private final long size;
+
+ CacheValue(Object value, long size) {
+ this.value = value;
+ this.size = size;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <V> V get() {
+ return (V) value;
+ }
+
+ public int weight() {
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
+ }
+
+ @VisibleForTesting
+ static class Conf {
+ private final SparkConfParser confParser = new SparkConfParser();
+
+ public boolean cacheEnabled() {
+ return confParser
+ .booleanConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_ENABLED)
+ .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_ENABLED_DEFAULT)
+ .parse();
+ }
+
+ public Duration timeout() {
+ return confParser
+ .durationConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT)
+ .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT_DEFAULT)
+ .parse();
+ }
+
+ public long maxEntrySize() {
+ return confParser
+ .longConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE)
+
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT)
+ .parse();
+ }
+
+ public long maxTotalSize() {
+ return confParser
+ .longConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE)
+
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT)
+ .parse();
+ }
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 2176650191..6b2d2e79fb 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark;
+import java.time.Duration;
+
public class SparkSQLProperties {
private SparkSQLProperties() {}
@@ -77,4 +79,18 @@ public class SparkSQLProperties {
// Controls whether to report locality information to Spark while allocating
input partitions
public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";
+
+ public static final String EXECUTOR_CACHE_ENABLED =
"spark.sql.iceberg.executor-cache.enabled";
+ public static final boolean EXECUTOR_CACHE_ENABLED_DEFAULT = true;
+
+ public static final String EXECUTOR_CACHE_TIMEOUT =
"spark.sql.iceberg.executor-cache.timeout";
+ public static final Duration EXECUTOR_CACHE_TIMEOUT_DEFAULT =
Duration.ofMinutes(10);
+
+ public static final String EXECUTOR_CACHE_MAX_ENTRY_SIZE =
+ "spark.sql.iceberg.executor-cache.max-entry-size";
+ public static final long EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT = 64 * 1024 *
1024; // 64 MB
+
+ public static final String EXECUTOR_CACHE_MAX_TOTAL_SIZE =
+ "spark.sql.iceberg.executor-cache.max-total-size";
+ public static final long EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT = 128 * 1024
* 1024; // 128 MB
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 4fb838202c..c2b3e7c2dc 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
@@ -40,7 +42,9 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.BaseDeleteLoader;
import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
@@ -50,6 +54,7 @@ import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExecutorCache;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.NestedField;
@@ -279,5 +284,29 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
counter().increment();
}
}
+
+ @Override
+ protected DeleteLoader newDeleteLoader() {
+ return new CachingDeleteLoader(this::loadInputFile);
+ }
+
+ private class CachingDeleteLoader extends BaseDeleteLoader {
+ private final SparkExecutorCache cache;
+
+ CachingDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+ super(loadInputFile);
+ this.cache = SparkExecutorCache.getOrCreate();
+ }
+
+ @Override
+ protected boolean canCache(long size) {
+ return cache != null && size < cache.maxEntrySize();
+ }
+
+ @Override
+ protected <V> V getOrLoad(String key, Supplier<V> valueSupplier, long
valueSize) {
+ return cache.getOrLoad(table().name(), key, valueSupplier, valueSize);
+ }
+ }
}
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
index 65df29051c..f6913fb9d0 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkExecutorCache;
import org.apache.spark.util.KnownSizeEstimation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +68,7 @@ public class SerializableTableWithSize extends
SerializableTable
LOG.info("Releasing resources");
io().close();
}
+ invalidateCache(name());
}
public static class SerializableMetadataTableWithSize extends
SerializableMetadataTable
@@ -93,6 +95,14 @@ public class SerializableTableWithSize extends
SerializableTable
LOG.info("Releasing resources");
io().close();
}
+ invalidateCache(name());
+ }
+ }
+
+ private static void invalidateCache(String name) {
+ SparkExecutorCache cache = SparkExecutorCache.get();
+ if (cache != null) {
+ cache.invalidate(name);
}
}
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/Employee.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/Employee.java
new file mode 100644
index 0000000000..9c57936d98
--- /dev/null
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/Employee.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Objects;
+
+public class Employee {
+ private Integer id;
+ private String dep;
+
+ public Employee() {}
+
+ public Employee(Integer id, String dep) {
+ this.id = id;
+ this.dep = dep;
+ }
+
+ public Integer getId() {
+ return id;
+ }
+
+ public void setId(Integer id) {
+ this.id = id;
+ }
+
+ public String getDep() {
+ return dep;
+ }
+
+ public void setDep(String dep) {
+ this.dep = dep;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ } else if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ Employee employee = (Employee) other;
+ return Objects.equals(id, employee.id) && Objects.equals(dep,
employee.dep);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, dep);
+ }
+}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
new file mode 100644
index 0000000000..35dfb55d5b
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkExecutorCache.CacheValue;
+import org.apache.iceberg.spark.SparkExecutorCache.Conf;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.storage.memory.MemoryStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class TestSparkExecutorCache extends SparkTestBaseWithCatalog {
+
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {
+ "testhive",
+ SparkCatalog.class.getName(),
+ ImmutableMap.of(
+ "type",
+ "hive",
+ CatalogProperties.FILE_IO_IMPL,
+ CustomFileIO.class.getName(),
+ "default-namespace",
+ "default")
+ },
+ };
+ }
+
+ private static final String UPDATES_VIEW_NAME = "updates";
+ private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
+ private static final Map<String, CustomInputFile> INPUT_FILES =
+ Collections.synchronizedMap(Maps.newHashMap());
+
+ private String targetTableName;
+ private TableIdentifier targetTableIdent;
+
+ public TestSparkExecutorCache(
+ String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @Before
+ public void configureTargetTableName() {
+ String name = "target_exec_cache_" + JOB_COUNTER.incrementAndGet();
+ this.targetTableName = tableName(name);
+ this.targetTableIdent = TableIdentifier.of(Namespace.of("default"), name);
+ }
+
+ @After
+ public void releaseResources() {
+ sql("DROP TABLE IF EXISTS %s", targetTableName);
+ sql("DROP TABLE IF EXISTS %s", UPDATES_VIEW_NAME);
+ INPUT_FILES.clear();
+ }
+
+ @Test
+ public void testCacheValueWeightOverflow() {
+ CacheValue cacheValue = new CacheValue("v", Integer.MAX_VALUE + 1L);
+ assertThat(cacheValue.weight()).isEqualTo(Integer.MAX_VALUE);
+ }
+
+ @Test
+ public void testCacheEnabledConfig() {
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.cacheEnabled()).isTrue();
+ });
+
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "false"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.cacheEnabled()).isFalse();
+ });
+ }
+
+ @Test
+ public void testTimeoutConfig() {
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT, "10s"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.timeout()).hasSeconds(10);
+ });
+
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT, "2m"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.timeout()).hasMinutes(2);
+ });
+ }
+
+ @Test
+ public void testMaxEntrySizeConfig() {
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE,
"128"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.maxEntrySize()).isEqualTo(128L);
+ });
+ }
+
+ @Test
+ public void testMaxTotalSizeConfig() {
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE,
"512"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.maxTotalSize()).isEqualTo(512L);
+ });
+ }
+
+ @Test
+ public void testConcurrentAccess() throws InterruptedException {
+ SparkExecutorCache cache = SparkExecutorCache.getOrCreate();
+
+ String table1 = "table1";
+ String table2 = "table2";
+
+ Set<String> loadedInternalKeys = Sets.newHashSet();
+
+ String key1 = "key1";
+ String key2 = "key2";
+
+ long valueSize = 100L;
+
+ int threadCount = 10;
+ ExecutorService executorService =
Executors.newFixedThreadPool(threadCount);
+
+ for (int threadNumber = 0; threadNumber < threadCount; threadNumber++) {
+ String group = threadNumber % 2 == 0 ? table1 : table2;
+ executorService.submit(
+ () -> {
+ for (int batch = 0; batch < 3; batch++) {
+ cache.getOrLoad(
+ group,
+ key1,
+ () -> {
+ String internalKey = toInternalKey(group, key1);
+ synchronized (loadedInternalKeys) {
+ // verify only one load was done for this key
+
assertThat(loadedInternalKeys.contains(internalKey)).isFalse();
+ loadedInternalKeys.add(internalKey);
+ }
+ return "value1";
+ },
+ valueSize);
+
+ cache.getOrLoad(
+ group,
+ key2,
+ () -> {
+ String internalKey = toInternalKey(group, key2);
+ synchronized (loadedInternalKeys) {
+ // verify only one load was done for this key
+
assertThat(loadedInternalKeys.contains(internalKey)).isFalse();
+ loadedInternalKeys.add(internalKey);
+ }
+ return "value2";
+ },
+ valueSize);
+ }
+ });
+ }
+
+ executorService.shutdown();
+ assertThat(executorService.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+
+ cache.invalidate(table1);
+ cache.invalidate(table2);
+
+ // all keys must be invalidated
+ Cache<String, ?> state = fetchInternalCacheState();
+ Set<String> liveKeys = state.asMap().keySet();
+ assertThat(liveKeys).noneMatch(key -> key.startsWith(table1) ||
key.startsWith(table2));
+ }
+
+ @Test
+ public void testCopyOnWriteDelete() throws Exception {
+ checkDelete(COPY_ON_WRITE);
+ }
+
+ @Test
+ public void testMergeOnReadDelete() throws Exception {
+ checkDelete(MERGE_ON_READ);
+ }
+
+ private void checkDelete(RowLevelOperationMode mode) throws Exception {
+ List<DeleteFile> deleteFiles =
createAndInitTable(TableProperties.DELETE_MODE, mode);
+
+ sql("DELETE FROM %s WHERE id = 1 OR id = 4", targetTableName);
+
+ // there are 2 data files and 2 delete files that apply to both of them
+ // in CoW, the target table will be scanned 2 times (main query + runtime
filter)
+ // the runtime filter may invalidate the cache so check at least some
requests were hits
+ // in MoR, the target table will be scanned only once
+ // so each delete file must be opened once
+ int maxRequestCount = mode == COPY_ON_WRITE ? 3 : 1;
+ assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <=
maxRequestCount);
+
+ // verify the final set of records is correct
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+ }
+
+ private int streamCount(DeleteFile deleteFile) {
+ CustomInputFile inputFile = INPUT_FILES.get(deleteFile.path().toString());
+ return inputFile.streamCount();
+ }
+
+ private List<DeleteFile> createAndInitTable(String operation,
RowLevelOperationMode mode)
+ throws Exception {
+ sql(
+ "CREATE TABLE %s (id INT, dep STRING) "
+ + "USING iceberg "
+ + "TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')",
+ targetTableName,
+ TableProperties.WRITE_METADATA_LOCATION,
+ temp.toString().replaceFirst("file:", ""),
+ TableProperties.WRITE_DATA_LOCATION,
+ temp.toString().replaceFirst("file:", ""),
+ operation,
+ mode.modeName());
+
+ append(targetTableName, new Employee(0, "hr"), new Employee(1, "hr"), new
Employee(2, "hr"));
+ append(targetTableName, new Employee(3, "hr"), new Employee(4, "hr"), new
Employee(5, "hr"));
+
+ Table table = validationCatalog.loadTable(targetTableIdent);
+
+ List<Pair<CharSequence, Long>> posDeletes =
+ dataFiles(table).stream()
+ .map(dataFile -> Pair.of(dataFile.path(), 0L))
+ .collect(Collectors.toList());
+ Pair<DeleteFile, CharSequenceSet> posDeleteResult = writePosDeletes(table,
posDeletes);
+ DeleteFile posDeleteFile = posDeleteResult.first();
+ CharSequenceSet referencedDataFiles = posDeleteResult.second();
+
+ DeleteFile eqDeleteFile = writeEqDeletes(table, "id", 2, 5);
+
+ table
+ .newRowDelta()
+ .validateFromSnapshot(table.currentSnapshot().snapshotId())
+ .validateDataFilesExist(referencedDataFiles)
+ .addDeletes(posDeleteFile)
+ .addDeletes(eqDeleteFile)
+ .commit();
+
+ sql("REFRESH TABLE %s", targetTableName);
+
+ // invalidate the memory store to destroy all currently live table
broadcasts
+ SparkEnv sparkEnv = SparkEnv.get();
+ MemoryStore memoryStore = sparkEnv.blockManager().memoryStore();
+ memoryStore.clear();
+
+ return ImmutableList.of(posDeleteFile, eqDeleteFile);
+ }
+
+ private DeleteFile writeEqDeletes(Table table, String col, Object... values)
throws IOException {
+ Schema deleteSchema = table.schema().select(col);
+
+ Record delete = GenericRecord.create(deleteSchema);
+ List<Record> deletes = Lists.newArrayList();
+ for (Object value : values) {
+ deletes.add(delete.copy(col, value));
+ }
+
+ OutputFile out = Files.localOutput(temp.newFile("eq-deletes-" +
UUID.randomUUID()));
+ return FileHelpers.writeDeleteFile(table, out, null, deletes,
deleteSchema);
+ }
+
+ private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
+ Table table, List<Pair<CharSequence, Long>> deletes) throws IOException {
+ OutputFile out = Files.localOutput(temp.newFile("pos-deletes-" +
UUID.randomUUID()));
+ return FileHelpers.writeDeleteFile(table, out, null, deletes);
+ }
+
+ private void append(String target, Employee... employees) throws
NoSuchTableException {
+ List<Employee> input = Arrays.asList(employees);
+ Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class);
+ inputDF.coalesce(1).writeTo(target).append();
+ }
+
+ private Collection<DataFile> dataFiles(Table table) {
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ return ImmutableList.copyOf(Iterables.transform(tasks,
ContentScanTask::file));
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Cache<String, ?> fetchInternalCacheState() {
+ try {
+ Field stateField = SparkExecutorCache.class.getDeclaredField("state");
+ stateField.setAccessible(true);
+ SparkExecutorCache cache = SparkExecutorCache.get();
+ return (Cache<String, ?>) stateField.get(cache);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static String toInternalKey(String group, String key) {
+ return group + "_" + key;
+ }
+
+ public static class CustomFileIO implements FileIO {
+
+ public CustomFileIO() {}
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return INPUT_FILES.computeIfAbsent(path, key -> new
CustomInputFile(path));
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return Files.localOutput(path);
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ File file = new File(path);
+ if (!file.delete()) {
+ throw new RuntimeIOException("Failed to delete file: " + path);
+ }
+ }
+ }
+
+ public static class CustomInputFile implements InputFile {
+ private final InputFile delegate;
+ private final AtomicInteger streamCount;
+
+ public CustomInputFile(String path) {
+ this.delegate = Files.localInput(path);
+ this.streamCount = new AtomicInteger();
+ }
+
+ @Override
+ public long getLength() {
+ return delegate.getLength();
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ streamCount.incrementAndGet();
+ return delegate.newStream();
+ }
+
+ public int streamCount() {
+ return streamCount.get();
+ }
+
+ @Override
+ public String location() {
+ return delegate.location();
+ }
+
+ @Override
+ public boolean exists() {
+ return delegate.exists();
+ }
+ }
+}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index 63d037bd73..00faecf9a8 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -47,6 +47,7 @@ import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPD
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DistributionMode;
@@ -77,6 +78,28 @@ public class TestSparkWriteConf extends
SparkTestBaseWithCatalog {
sql("DROP TABLE IF EXISTS %s", tableName);
}
+ @Test
+ public void testDurationConf() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ String confName = "spark.sql.iceberg.some-duration-conf";
+
+ withSQLConf(
+ ImmutableMap.of(confName, "10s"),
+ () -> {
+ SparkConfParser parser = new SparkConfParser(spark, table,
ImmutableMap.of());
+ Duration duration =
parser.durationConf().sessionConf(confName).parseOptional();
+ assertThat(duration).hasSeconds(10);
+ });
+
+ withSQLConf(
+ ImmutableMap.of(confName, "2m"),
+ () -> {
+ SparkConfParser parser = new SparkConfParser(spark, table,
ImmutableMap.of());
+ Duration duration =
parser.durationConf().sessionConf(confName).parseOptional();
+ assertThat(duration).hasMinutes(2);
+ });
+ }
+
@Test
public void testDeleteGranularityDefault() {
Table table = validationCatalog.loadTable(tableIdent);