This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 684f7a767c Core, Spark 3.5: Read deletes in parallel and cache them on
executors (#8755)
684f7a767c is described below
commit 684f7a767c2c216a402b60b73d2d55ef605921a0
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Jan 16 10:21:26 2024 -0800
Core, Spark 3.5: Read deletes in parallel and cache them on executors
(#8755)
---
.../java/org/apache/iceberg/types/TypeUtil.java | 64 +++
.../java/org/apache/iceberg/SystemConfigs.java | 6 +-
.../iceberg/deletes/BitmapPositionDeleteIndex.java | 4 +
.../java/org/apache/iceberg/deletes/Deletes.java | 30 ++
...eteIndex.java => EmptyPositionDeleteIndex.java} | 24 +-
.../iceberg/deletes/PositionDeleteIndex.java | 10 +
...leteIndex.java => PositionDeleteIndexUtil.java} | 37 +-
.../java/org/apache/iceberg/util/ThreadPools.java | 5 +-
.../org/apache/iceberg/data/BaseDeleteLoader.java | 261 +++++++++++
.../java/org/apache/iceberg/data/DeleteFilter.java | 107 +----
.../java/org/apache/iceberg/data/DeleteLoader.java | 45 ++
spark/v3.5/build.gradle | 2 +
.../org/apache/iceberg/spark/SparkConfParser.java | 41 ++
.../apache/iceberg/spark/SparkExecutorCache.java | 228 ++++++++++
.../apache/iceberg/spark/SparkSQLProperties.java | 16 +
.../apache/iceberg/spark/source/BaseReader.java | 29 ++
.../spark/source/SerializableTableWithSize.java | 10 +
.../java/org/apache/iceberg/spark/Employee.java | 49 +-
.../iceberg/spark/TestSparkExecutorCache.java | 503 +++++++++++++++++++++
.../apache/iceberg/spark/TestSparkWriteConf.java | 23 +
20 files changed, 1360 insertions(+), 134 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index d2f821ea00..7c13d60940 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -40,6 +40,8 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Maps;
public class TypeUtil {
+ private static final int HEADER_SIZE = 12;
+
private TypeUtil() {}
/**
@@ -452,6 +454,68 @@ public class TypeUtil {
}
}
+ /**
+ * Estimates the number of bytes a value for a given field may occupy in
memory.
+ *
+ * <p>This method approximates the memory size based on heuristics and the
internal Java
+ * representation defined by {@link Type.TypeID}. It is important to note
that the actual size
+ * might differ from this estimation. The method is designed to handle a
variety of data types,
+ * including primitive types, strings, and nested types such as structs,
maps, and lists.
+ *
+ * @param field a field for which to estimate the size
+ * @return the estimated size in bytes of the field's value in memory
+ */
+ public static int estimateSize(Types.NestedField field) {
+ return estimateSize(field.type());
+ }
+
+ private static int estimateSize(Type type) {
+ switch (type.typeId()) {
+ case BOOLEAN:
+ // the size of a boolean variable is virtual machine dependent
+ // it is common to believe booleans occupy 1 byte in most JVMs
+ return 1;
+ case INTEGER:
+ case FLOAT:
+ case DATE:
+ // ints and floats occupy 4 bytes
+ // dates are internally represented as ints
+ return 4;
+ case LONG:
+ case DOUBLE:
+ case TIME:
+ case TIMESTAMP:
+ // longs and doubles occupy 8 bytes
+ // times and timestamps are internally represented as longs
+ return 8;
+ case STRING:
+ // 12 (header) + 6 (fields) + 16 (array overhead) + 20 (10 chars, 2
bytes each) = 54 bytes
+ return 54;
+ case UUID:
+ // 12 (header) + 16 (two long variables) = 28 bytes
+ return 28;
+ case FIXED:
+ return ((Types.FixedType) type).length();
+ case BINARY:
+ return 80;
+ case DECIMAL:
+ // 12 (header) + (12 + 12 + 4) (BigInteger) + 4 (scale) = 44 bytes
+ return 44;
+ case STRUCT:
+ Types.StructType struct = (Types.StructType) type;
+ return HEADER_SIZE +
struct.fields().stream().mapToInt(TypeUtil::estimateSize).sum();
+ case LIST:
+ Types.ListType list = (Types.ListType) type;
+ return HEADER_SIZE + 5 * estimateSize(list.elementType());
+ case MAP:
+ Types.MapType map = (Types.MapType) type;
+ int entrySize = HEADER_SIZE + estimateSize(map.keyType()) +
estimateSize(map.valueType());
+ return HEADER_SIZE + 5 * entrySize;
+ default:
+ return 16;
+ }
+ }
+
/** Interface for passing a function that assigns column IDs. */
public interface NextID {
int get();
diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java
b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
index ce183ec5d4..feac1f61a1 100644
--- a/core/src/main/java/org/apache/iceberg/SystemConfigs.java
+++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
@@ -43,14 +43,14 @@ public class SystemConfigs {
Integer::parseUnsignedInt);
/**
- * Sets the size of the delete worker pool. This limits the number of
threads used to compute the
- * PositionDeleteIndex from the position deletes for a data file.
+ * Sets the size of the delete worker pool. This limits the number of
threads used to read delete
+ * files for a data file.
*/
public static final ConfigEntry<Integer> DELETE_WORKER_THREAD_POOL_SIZE =
new ConfigEntry<>(
"iceberg.worker.delete-num-threads",
"ICEBERG_WORKER_DELETE_NUM_THREADS",
- Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Math.max(2, 4 * Runtime.getRuntime().availableProcessors()),
Integer::parseUnsignedInt);
/** Whether to use the shared worker pool when planning table scans. */
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
index 7690ab7e48..97699c1c91 100644
---
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -27,6 +27,10 @@ class BitmapPositionDeleteIndex implements
PositionDeleteIndex {
roaring64Bitmap = new Roaring64Bitmap();
}
+ void merge(BitmapPositionDeleteIndex that) {
+ roaring64Bitmap.or(that.roaring64Bitmap);
+ }
+
@Override
public void delete(long position) {
roaring64Bitmap.add(position);
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index 118987e24b..ff20ba53ff 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -37,6 +37,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceMap;
import org.apache.iceberg.util.Filter;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.SortedMerge;
@@ -128,6 +129,35 @@ public class Deletes {
}
}
+ /**
+ * Builds a map of position delete indexes by path.
+ *
+ * <p>Unlike {@link #toPositionIndex(CharSequence, List)}, this method
builds a position delete
+ * index for each referenced data file and does not filter deletes. This can
be useful when the
+ * entire delete file content is needed (e.g. caching).
+ *
+ * @param posDeletes position deletes
+ * @return the map of position delete indexes by path
+ */
+ public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex>
toPositionIndexes(
+ CloseableIterable<T> posDeletes) {
+ CharSequenceMap<PositionDeleteIndex> indexes = CharSequenceMap.create();
+
+ try (CloseableIterable<T> deletes = posDeletes) {
+ for (T delete : deletes) {
+ CharSequence filePath = (CharSequence) FILENAME_ACCESSOR.get(delete);
+ long position = (long) POSITION_ACCESSOR.get(delete);
+ PositionDeleteIndex index =
+ indexes.computeIfAbsent(filePath, key -> new
BitmapPositionDeleteIndex());
+ index.delete(position);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close position delete source",
e);
+ }
+
+ return indexes;
+ }
+
public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
CharSequence dataLocation, List<CloseableIterable<T>> deleteFiles) {
return toPositionIndex(dataLocation, deleteFiles,
ThreadPools.getDeleteWorkerPool());
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
b/core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java
similarity index 66%
copy from
core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
copy to
core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java
index 7690ab7e48..660e01038c 100644
---
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++
b/core/src/main/java/org/apache/iceberg/deletes/EmptyPositionDeleteIndex.java
@@ -18,32 +18,38 @@
*/
package org.apache.iceberg.deletes;
-import org.roaringbitmap.longlong.Roaring64Bitmap;
+class EmptyPositionDeleteIndex implements PositionDeleteIndex {
-class BitmapPositionDeleteIndex implements PositionDeleteIndex {
- private final Roaring64Bitmap roaring64Bitmap;
+ private static final EmptyPositionDeleteIndex INSTANCE = new
EmptyPositionDeleteIndex();
- BitmapPositionDeleteIndex() {
- roaring64Bitmap = new Roaring64Bitmap();
+ private EmptyPositionDeleteIndex() {}
+
+ static EmptyPositionDeleteIndex get() {
+ return INSTANCE;
}
@Override
public void delete(long position) {
- roaring64Bitmap.add(position);
+ throw new UnsupportedOperationException("Cannot modify " +
getClass().getName());
}
@Override
public void delete(long posStart, long posEnd) {
- roaring64Bitmap.add(posStart, posEnd);
+ throw new UnsupportedOperationException("Cannot modify " +
getClass().getName());
}
@Override
public boolean isDeleted(long position) {
- return roaring64Bitmap.contains(position);
+ return false;
}
@Override
public boolean isEmpty() {
- return roaring64Bitmap.isEmpty();
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "PositionDeleteIndex{}";
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
index bcfa9f2cf5..be05875aeb 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
@@ -44,4 +44,14 @@ public interface PositionDeleteIndex {
/** Returns true if this collection contains no element. */
boolean isEmpty();
+
+ /** Returns true if this collection contains elements. */
+ default boolean isNotEmpty() {
+ return !isEmpty();
+ }
+
+ /** Returns an empty immutable position delete index. */
+ static PositionDeleteIndex empty() {
+ return EmptyPositionDeleteIndex.get();
+ }
}
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java
similarity index 55%
copy from
core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
copy to
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java
index 7690ab7e48..0c3bff28ee 100644
---
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndexUtil.java
@@ -18,32 +18,25 @@
*/
package org.apache.iceberg.deletes;
-import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-class BitmapPositionDeleteIndex implements PositionDeleteIndex {
- private final Roaring64Bitmap roaring64Bitmap;
+public class PositionDeleteIndexUtil {
- BitmapPositionDeleteIndex() {
- roaring64Bitmap = new Roaring64Bitmap();
- }
+ private PositionDeleteIndexUtil() {}
- @Override
- public void delete(long position) {
- roaring64Bitmap.add(position);
- }
+ public static PositionDeleteIndex merge(Iterable<? extends
PositionDeleteIndex> indexes) {
+ BitmapPositionDeleteIndex result = new BitmapPositionDeleteIndex();
- @Override
- public void delete(long posStart, long posEnd) {
- roaring64Bitmap.add(posStart, posEnd);
- }
-
- @Override
- public boolean isDeleted(long position) {
- return roaring64Bitmap.contains(position);
- }
+ for (PositionDeleteIndex index : indexes) {
+ if (index.isNotEmpty()) {
+ Preconditions.checkArgument(
+ index instanceof BitmapPositionDeleteIndex,
+ "Can merge only bitmap-based indexes, got %s",
+ index.getClass().getName());
+ result.merge((BitmapPositionDeleteIndex) index);
+ }
+ }
- @Override
- public boolean isEmpty() {
- return roaring64Bitmap.isEmpty();
+ return result;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
index 4f2314fec9..ced121c03c 100644
--- a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
+++ b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
@@ -68,8 +68,9 @@ public class ThreadPools {
/**
* Return an {@link ExecutorService} that uses the "delete worker"
thread-pool.
*
- * <p>The size of the delete worker pool limits the number of threads used
to compute the
- * PositionDeleteIndex from the position deletes for a data file.
+ * <p>The size of this worker pool limits the number of tasks concurrently
reading delete files
+ * within a single JVM. If there are multiple threads loading deletes, all
of them will share this
+ * worker pool by default.
*
* <p>The size of this thread-pool is controlled by the Java system property
{@code
* iceberg.worker.delete-num-threads}.
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
new file mode 100644
index 0000000000..be346467e0
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.deletes.PositionDeleteIndexUtil;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.orc.OrcRowReader;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.orc.TypeDescription;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BaseDeleteLoader implements DeleteLoader {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseDeleteLoader.class);
+ private static final Schema POS_DELETE_SCHEMA =
DeleteSchemaUtil.pathPosSchema();
+
+ private final Function<DeleteFile, InputFile> loadInputFile;
+ private final ExecutorService workerPool;
+
+ public BaseDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+ this(loadInputFile, ThreadPools.getDeleteWorkerPool());
+ }
+
+ public BaseDeleteLoader(
+ Function<DeleteFile, InputFile> loadInputFile, ExecutorService
workerPool) {
+ this.loadInputFile = loadInputFile;
+ this.workerPool = workerPool;
+ }
+
+ /**
+ * Checks if the given number of bytes can be cached.
+ *
+ * <p>Implementations should override this method if they support caching.
It is also recommended
+ * to use the provided size as a guideline to decide whether the value is
eligible for caching.
+ * For instance, it may be beneficial to discard values that are too large
to optimize the cache
+ * performance and utilization.
+ */
+ protected boolean canCache(long size) {
+ return false;
+ }
+
+ /**
+ * Gets the cached value for the key or populates the cache with a new
mapping.
+ *
+ * <p>If the value for the specified key is in the cache, it should be
returned. If the value is
+ * not in the cache, implementations should compute the value using the
provided supplier, cache
+ * it, and then return it.
+ *
+ * <p>This method will be called only if {@link #canCache(long)} returned
true.
+ */
+ protected <V> V getOrLoad(String key, Supplier<V> valueSupplier, long
valueSize) {
+ throw new UnsupportedOperationException(getClass().getName() + " does not
support caching");
+ }
+
+ @Override
+ public StructLikeSet loadEqualityDeletes(Iterable<DeleteFile> deleteFiles,
Schema projection) {
+ Iterable<Iterable<StructLike>> deletes =
+ execute(deleteFiles, deleteFile -> getOrReadEqDeletes(deleteFile,
projection));
+ StructLikeSet deleteSet = StructLikeSet.create(projection.asStruct());
+ Iterables.addAll(deleteSet, Iterables.concat(deletes));
+ return deleteSet;
+ }
+
+ private Iterable<StructLike> getOrReadEqDeletes(DeleteFile deleteFile,
Schema projection) {
+ long estimatedSize = estimateEqDeletesSize(deleteFile, projection);
+ if (canCache(estimatedSize)) {
+ String cacheKey = deleteFile.path().toString();
+ return getOrLoad(cacheKey, () -> readEqDeletes(deleteFile, projection),
estimatedSize);
+ } else {
+ return readEqDeletes(deleteFile, projection);
+ }
+ }
+
+ private Iterable<StructLike> readEqDeletes(DeleteFile deleteFile, Schema
projection) {
+ CloseableIterable<Record> deletes = openDeletes(deleteFile, projection);
+ CloseableIterable<Record> copiedDeletes =
CloseableIterable.transform(deletes, Record::copy);
+ CloseableIterable<StructLike> copiedDeletesAsStructs =
toStructs(copiedDeletes, projection);
+ return materialize(copiedDeletesAsStructs);
+ }
+
+ private CloseableIterable<StructLike> toStructs(
+ CloseableIterable<Record> records, Schema schema) {
+ InternalRecordWrapper wrapper = new
InternalRecordWrapper(schema.asStruct());
+ return CloseableIterable.transform(records, wrapper::copyFor);
+ }
+
+ // materializes the iterable and releases resources so that the result can
be cached
+ private <T> Iterable<T> materialize(CloseableIterable<T> iterable) {
+ try (CloseableIterable<T> closeableIterable = iterable) {
+ return ImmutableList.copyOf(closeableIterable);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close iterable", e);
+ }
+ }
+
+ @Override
+ public PositionDeleteIndex loadPositionDeletes(
+ Iterable<DeleteFile> deleteFiles, CharSequence filePath) {
+ Iterable<PositionDeleteIndex> deletes =
+ execute(deleteFiles, deleteFile -> getOrReadPosDeletes(deleteFile,
filePath));
+ return PositionDeleteIndexUtil.merge(deletes);
+ }
+
+ private PositionDeleteIndex getOrReadPosDeletes(DeleteFile deleteFile,
CharSequence filePath) {
+ long estimatedSize = estimatePosDeletesSize(deleteFile);
+ if (canCache(estimatedSize)) {
+ String cacheKey = deleteFile.path().toString();
+ CharSequenceMap<PositionDeleteIndex> indexes =
+ getOrLoad(cacheKey, () -> readPosDeletes(deleteFile), estimatedSize);
+ return indexes.getOrDefault(filePath, PositionDeleteIndex.empty());
+ } else {
+ return readPosDeletes(deleteFile, filePath);
+ }
+ }
+
+ private CharSequenceMap<PositionDeleteIndex> readPosDeletes(DeleteFile
deleteFile) {
+ CloseableIterable<Record> deletes = openDeletes(deleteFile,
POS_DELETE_SCHEMA);
+ return Deletes.toPositionIndexes(deletes);
+ }
+
+ private PositionDeleteIndex readPosDeletes(DeleteFile deleteFile,
CharSequence filePath) {
+ Expression filter =
Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath);
+ CloseableIterable<Record> deletes = openDeletes(deleteFile,
POS_DELETE_SCHEMA, filter);
+ return Deletes.toPositionIndex(filePath, ImmutableList.of(deletes));
+ }
+
+ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema
projection) {
+ return openDeletes(deleteFile, projection, null /* no filter */);
+ }
+
+ private CloseableIterable<Record> openDeletes(
+ DeleteFile deleteFile, Schema projection, Expression filter) {
+
+ FileFormat format = deleteFile.format();
+ LOG.trace("Opening delete file {}", deleteFile.path());
+ InputFile inputFile = loadInputFile.apply(deleteFile);
+
+ switch (format) {
+ case AVRO:
+ return Avro.read(inputFile)
+ .project(projection)
+ .reuseContainers()
+ .createReaderFunc(DataReader::create)
+ .build();
+
+ case PARQUET:
+ return Parquet.read(inputFile)
+ .project(projection)
+ .filter(filter)
+ .reuseContainers()
+ .createReaderFunc(newParquetReaderFunc(projection))
+ .build();
+
+ case ORC:
+ // reusing containers is automatic for ORC, no need to call
'reuseContainers'
+ return ORC.read(inputFile)
+ .project(projection)
+ .filter(filter)
+ .createReaderFunc(newOrcReaderFunc(projection))
+ .build();
+
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot read deletes, %s is not a supported file format: %s",
+ format.name(), inputFile.location()));
+ }
+ }
+
+ private Function<MessageType, ParquetValueReader<?>>
newParquetReaderFunc(Schema projection) {
+ return fileSchema -> GenericParquetReaders.buildReader(projection,
fileSchema);
+ }
+
+ private Function<TypeDescription, OrcRowReader<?>> newOrcReaderFunc(Schema
projection) {
+ return fileSchema -> GenericOrcReader.buildReader(projection, fileSchema);
+ }
+
+ private <I, O> Iterable<O> execute(Iterable<I> objects, Function<I, O> func)
{
+ Queue<O> output = new ConcurrentLinkedQueue<>();
+
+ Tasks.foreach(objects)
+ .executeWith(workerPool)
+ .stopOnFailure()
+ .onFailure((object, exc) -> LOG.error("Failed to process {}", object,
exc))
+ .run(object -> output.add(func.apply(object)));
+
+ return output;
+ }
+
+ // estimates the memory required to cache position deletes (in bytes)
+ private long estimatePosDeletesSize(DeleteFile deleteFile) {
+ // the space consumption highly depends on the nature of deleted positions
(sparse vs compact)
+ // testing shows Roaring bitmaps require around 8 bits (1 byte) per value
on average
+ return deleteFile.recordCount();
+ }
+
+ // estimates the memory required to cache equality deletes (in bytes)
+ private long estimateEqDeletesSize(DeleteFile deleteFile, Schema projection)
{
+ try {
+ long recordCount = deleteFile.recordCount();
+ int recordSize = estimateRecordSize(projection);
+ return LongMath.checkedMultiply(recordCount, recordSize);
+ } catch (ArithmeticException e) {
+ return Long.MAX_VALUE;
+ }
+ }
+
+ private int estimateRecordSize(Schema schema) {
+ return schema.columns().stream().mapToInt(TypeUtil::estimateSize).sum();
+ }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index 55acd32008..e7d8445cf8 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -25,25 +25,16 @@ import java.util.Set;
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.avro.DataReader;
-import org.apache.iceberg.data.orc.GenericOrcReader;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
-import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
@@ -58,8 +49,6 @@ import org.slf4j.LoggerFactory;
public abstract class DeleteFilter<T> {
private static final Logger LOG =
LoggerFactory.getLogger(DeleteFilter.class);
- private static final Schema POS_DELETE_SCHEMA =
- new Schema(MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS);
private final String filePath;
private final List<DeleteFile> posDeletes;
@@ -70,6 +59,7 @@ public abstract class DeleteFilter<T> {
private final int isDeletedColumnPosition;
private final DeleteCounter counter;
+ private volatile DeleteLoader deleteLoader = null;
private PositionDeleteIndex deleteRowPositions = null;
private List<Predicate<T>> isInDeleteSets = null;
private Predicate<T> eqDeleteRows = null;
@@ -143,10 +133,30 @@ public abstract class DeleteFilter<T> {
protected abstract InputFile getInputFile(String location);
+ protected InputFile loadInputFile(DeleteFile deleteFile) {
+ return getInputFile(deleteFile.path().toString());
+ }
+
protected long pos(T record) {
return (Long) posAccessor.get(asStructLike(record));
}
+ protected DeleteLoader newDeleteLoader() {
+ return new BaseDeleteLoader(this::loadInputFile);
+ }
+
+ private DeleteLoader deleteLoader() {
+ if (deleteLoader == null) {
+ synchronized (this) {
+ if (deleteLoader == null) {
+ this.deleteLoader = newDeleteLoader();
+ }
+ }
+ }
+
+ return deleteLoader;
+ }
+
public CloseableIterable<T> filter(CloseableIterable<T> records) {
return applyEqDeletes(applyPosDeletes(records));
}
@@ -173,22 +183,11 @@ public abstract class DeleteFilter<T> {
Iterable<DeleteFile> deletes = entry.getValue();
Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
- InternalRecordWrapper wrapper = new
InternalRecordWrapper(deleteSchema.asStruct());
// a projection to select and reorder fields of the file schema to match
the delete rows
StructProjection projectRow = StructProjection.create(requiredSchema,
deleteSchema);
- Iterable<CloseableIterable<Record>> deleteRecords =
- Iterables.transform(deletes, delete -> openDeletes(delete,
deleteSchema));
-
- // copy the delete records because they will be held in a set
- CloseableIterable<Record> records =
- CloseableIterable.transform(CloseableIterable.concat(deleteRecords),
Record::copy);
-
- StructLikeSet deleteSet =
- Deletes.toEqualitySet(
- CloseableIterable.transform(records, wrapper::copyFor),
deleteSchema.asStruct());
-
+ StructLikeSet deleteSet = deleteLoader().loadEqualityDeletes(deletes,
deleteSchema);
Predicate<T> isInDeleteSet =
record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
isInDeleteSets.add(isInDeleteSet);
@@ -224,14 +223,10 @@ public abstract class DeleteFilter<T> {
}
public PositionDeleteIndex deletedRowPositions() {
- if (posDeletes.isEmpty()) {
- return null;
+ if (deleteRowPositions == null && !posDeletes.isEmpty()) {
+ this.deleteRowPositions = deleteLoader().loadPositionDeletes(posDeletes,
filePath);
}
- if (deleteRowPositions == null) {
- List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes,
this::openPosDeletes);
- deleteRowPositions = Deletes.toPositionIndex(filePath, deletes);
- }
return deleteRowPositions;
}
@@ -240,9 +235,7 @@ public abstract class DeleteFilter<T> {
return records;
}
- List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes,
this::openPosDeletes);
-
- PositionDeleteIndex positionIndex = Deletes.toPositionIndex(filePath,
deletes);
+ PositionDeleteIndex positionIndex = deletedRowPositions();
Predicate<T> isDeleted = record -> positionIndex.isDeleted(pos(record));
return createDeleteIterable(records, isDeleted);
}
@@ -254,56 +247,6 @@ public abstract class DeleteFilter<T> {
: Deletes.filterDeleted(records, isDeleted, counter);
}
- private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
- return openDeletes(file, POS_DELETE_SCHEMA);
- }
-
- private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema
deleteSchema) {
- LOG.trace("Opening delete file {}", deleteFile.path());
- InputFile input = getInputFile(deleteFile.path().toString());
- switch (deleteFile.format()) {
- case AVRO:
- return Avro.read(input)
- .project(deleteSchema)
- .reuseContainers()
- .createReaderFunc(DataReader::create)
- .build();
-
- case PARQUET:
- Parquet.ReadBuilder builder =
- Parquet.read(input)
- .project(deleteSchema)
- .reuseContainers()
- .createReaderFunc(
- fileSchema ->
GenericParquetReaders.buildReader(deleteSchema, fileSchema));
-
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
-
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(),
filePath));
- }
-
- return builder.build();
-
- case ORC:
- // Reusing containers is automatic for ORC. No need to set
'reuseContainers' here.
- ORC.ReadBuilder orcBuilder =
- ORC.read(input)
- .project(deleteSchema)
- .createReaderFunc(
- fileSchema -> GenericOrcReader.buildReader(deleteSchema,
fileSchema));
-
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
-
orcBuilder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(),
filePath));
- }
-
- return orcBuilder.build();
- default:
- throw new UnsupportedOperationException(
- String.format(
- "Cannot read deletes, %s is not a supported format: %s",
- deleteFile.format().name(), deleteFile.path()));
- }
- }
-
private static Schema fileProjection(
Schema tableSchema,
Schema requestedSchema,
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
new file mode 100644
index 0000000000..07bdce6d83
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.util.StructLikeSet;
+
+/** An API for loading delete file content into in-memory data structures. */
+public interface DeleteLoader {
+ /**
+ * Loads the content of equality delete files into a set.
+ *
+ * @param deleteFiles equality delete files
+ * @param projection a projection of columns to load
+ * @return a set of equality deletes
+ */
+ StructLikeSet loadEqualityDeletes(Iterable<DeleteFile> deleteFiles, Schema
projection);
+
+ /**
+ * Loads the content of position delete files for a given data file path
into a position index.
+ *
+ * @param deleteFiles position delete files
+ * @param filePath the data file path for which to load deletes
+ * @return a position delete index for the provided data file path
+ */
+ PositionDeleteIndex loadPositionDeletes(Iterable<DeleteFile> deleteFiles,
CharSequence filePath);
+}
diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
index 2c58281904..eeef75be63 100644
--- a/spark/v3.5/build.gradle
+++ b/spark/v3.5/build.gradle
@@ -89,6 +89,8 @@
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
+ implementation libs.caffeine
+
testImplementation(libs.hadoop2.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
index d86246b1e6..e3b01b8375 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
@@ -18,15 +18,19 @@
*/
package org.apache.iceberg.spark;
+import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
class SparkConfParser {
@@ -34,6 +38,12 @@ class SparkConfParser {
private final RuntimeConfig sessionConf;
private final Map<String, String> options;
+ SparkConfParser() {
+ this.properties = ImmutableMap.of();
+ this.sessionConf = new RuntimeConfig(SQLConf.get());
+ this.options = ImmutableMap.of();
+ }
+
SparkConfParser(SparkSession spark, Table table, Map<String, String>
options) {
this.properties = table.properties();
this.sessionConf = spark.conf();
@@ -56,6 +66,10 @@ class SparkConfParser {
return new StringConfParser();
}
+ public DurationConfParser durationConf() {
+ return new DurationConfParser();
+ }
+
class BooleanConfParser extends ConfParser<BooleanConfParser, Boolean> {
private Boolean defaultValue;
private boolean negate = false;
@@ -156,6 +170,33 @@ class SparkConfParser {
}
}
+ class DurationConfParser extends ConfParser<DurationConfParser, Duration> {
+ private Duration defaultValue;
+
+ @Override
+ protected DurationConfParser self() {
+ return this;
+ }
+
+ public DurationConfParser defaultValue(Duration value) {
+ this.defaultValue = value;
+ return self();
+ }
+
+ public Duration parse() {
+ Preconditions.checkArgument(defaultValue != null, "Default value cannot
be null");
+ return parse(this::toDuration, defaultValue);
+ }
+
+ public Duration parseOptional() {
+ return parse(this::toDuration, defaultValue);
+ }
+
+ private Duration toDuration(String time) {
+ return Duration.ofSeconds(JavaUtils.timeStringAsSec(time));
+ }
+ }
+
abstract class ConfParser<ThisT, T> {
private final List<String> optionNames = Lists.newArrayList();
private String sessionConfName;
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
new file mode 100644
index 0000000000..6490d6678b
--- /dev/null
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkExecutorCache.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.time.Duration;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An executor cache for reducing the computation and IO overhead in tasks.
+ *
+ * <p>The cache is configured and controlled through Spark SQL properties. It
supports both limits
+ * on the total cache size and maximum size for individual entries.
Additionally, it implements
+ * automatic eviction of entries after a specified duration of inactivity. The
cache will respect
+ * the SQL configuration valid at the time of initialization. All subsequent
changes to the
+ * configuration will have no effect.
+ *
+ * <p>The cache is accessed and populated via {@link #getOrLoad(String,
String, Supplier, long)}. If
+ * the value is not present in the cache, it is computed using the provided
supplier and stored in
+ * the cache, subject to the defined size constraints. When a key is added, it
must be associated
+ * with a particular group ID. Once the group is no longer needed, it is
recommended to explicitly
+ * invalidate its state by calling {@link #invalidate(String)} instead of
relying on automatic
+ * eviction.
+ *
+ * <p>Note that this class employs the singleton pattern to ensure only one
cache exists per JVM.
+ */
+public class SparkExecutorCache {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkExecutorCache.class);
+
+ private static volatile SparkExecutorCache instance = null;
+
+ private final Duration timeout;
+ private final long maxEntrySize;
+ private final long maxTotalSize;
+ private volatile Cache<String, CacheValue> state;
+
+ private SparkExecutorCache(Conf conf) {
+ this.timeout = conf.timeout();
+ this.maxEntrySize = conf.maxEntrySize();
+ this.maxTotalSize = conf.maxTotalSize();
+ }
+
+ /**
+ * Returns the cache if created or creates and returns it.
+ *
+ * <p>Note this method returns null if caching is disabled.
+ */
+ public static SparkExecutorCache getOrCreate() {
+ if (instance == null) {
+ Conf conf = new Conf();
+ if (conf.cacheEnabled()) {
+ synchronized (SparkExecutorCache.class) {
+ if (instance == null) {
+ SparkExecutorCache.instance = new SparkExecutorCache(conf);
+ }
+ }
+ }
+ }
+
+ return instance;
+ }
+
+ /** Returns the cache if already created or null otherwise. */
+ public static SparkExecutorCache get() {
+ return instance;
+ }
+
+ /** Returns the max entry size in bytes that will be considered for caching.
*/
+ public long maxEntrySize() {
+ return maxEntrySize;
+ }
+
+ /**
+ * Gets the cached value for the key or populates the cache with a new
mapping.
+ *
+ * @param group a group ID
+ * @param key a cache key
+ * @param valueSupplier a supplier to compute the value
+ * @param valueSize an estimated memory size of the value in bytes
+ * @return the cached or computed value
+ */
+ public <V> V getOrLoad(String group, String key, Supplier<V> valueSupplier,
long valueSize) {
+ if (valueSize > maxEntrySize) {
+ LOG.debug("{} exceeds max entry size: {} > {}", key, valueSize,
maxEntrySize);
+ return valueSupplier.get();
+ }
+
+ String internalKey = group + "_" + key;
+ CacheValue value = state().get(internalKey, loadFunc(valueSupplier,
valueSize));
+ Preconditions.checkNotNull(value, "Loaded value must not be null");
+ return value.get();
+ }
+
+ private <V> Function<String, CacheValue> loadFunc(Supplier<V> valueSupplier,
long valueSize) {
+ return key -> {
+ long start = System.currentTimeMillis();
+ V value = valueSupplier.get();
+ long end = System.currentTimeMillis();
+ LOG.debug("Loaded {} with size {} in {} ms", key, valueSize, (end -
start));
+ return new CacheValue(value, valueSize);
+ };
+ }
+
+ /**
+ * Invalidates all keys associated with the given group ID.
+ *
+ * @param group a group ID
+ */
+ public void invalidate(String group) {
+ if (state != null) {
+ List<String> internalKeys = findInternalKeys(group);
+ LOG.info("Invalidating {} keys associated with {}", internalKeys.size(),
group);
+ internalKeys.forEach(internalKey -> state.invalidate(internalKey));
+ LOG.info("Current cache stats {}", state.stats());
+ }
+ }
+
+ private List<String> findInternalKeys(String group) {
+ return state.asMap().keySet().stream()
+ .filter(internalKey -> internalKey.startsWith(group))
+ .collect(Collectors.toList());
+ }
+
+ private Cache<String, CacheValue> state() {
+ if (state == null) {
+ synchronized (this) {
+ if (state == null) {
+ LOG.info("Initializing cache state");
+ this.state = initState();
+ }
+ }
+ }
+
+ return state;
+ }
+
+ private Cache<String, CacheValue> initState() {
+ return Caffeine.newBuilder()
+ .expireAfterAccess(timeout)
+ .maximumWeight(maxTotalSize)
+ .weigher((key, value) -> ((CacheValue) value).weight())
+ .recordStats()
+ .removalListener((key, value, cause) -> LOG.debug("Evicted {} ({})",
key, cause))
+ .build();
+ }
+
+ @VisibleForTesting
+ static class CacheValue {
+ private final Object value;
+ private final long size;
+
+ CacheValue(Object value, long size) {
+ this.value = value;
+ this.size = size;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <V> V get() {
+ return (V) value;
+ }
+
+ public int weight() {
+ return (int) Math.min(size, Integer.MAX_VALUE);
+ }
+ }
+
+ @VisibleForTesting
+ static class Conf {
+ private final SparkConfParser confParser = new SparkConfParser();
+
+ public boolean cacheEnabled() {
+ return confParser
+ .booleanConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_ENABLED)
+ .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_ENABLED_DEFAULT)
+ .parse();
+ }
+
+ public Duration timeout() {
+ return confParser
+ .durationConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT)
+ .defaultValue(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT_DEFAULT)
+ .parse();
+ }
+
+ public long maxEntrySize() {
+ return confParser
+ .longConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE)
+
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT)
+ .parse();
+ }
+
+ public long maxTotalSize() {
+ return confParser
+ .longConf()
+ .sessionConf(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE)
+
.defaultValue(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT)
+ .parse();
+ }
+ }
+}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index bca41b4155..4a66520231 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark;
+import java.time.Duration;
+
public class SparkSQLProperties {
private SparkSQLProperties() {}
@@ -70,4 +72,18 @@ public class SparkSQLProperties {
// Controls whether to report locality information to Spark while allocating
input partitions
public static final String LOCALITY = "spark.sql.iceberg.locality.enabled";
+
+ public static final String EXECUTOR_CACHE_ENABLED =
"spark.sql.iceberg.executor-cache.enabled";
+ public static final boolean EXECUTOR_CACHE_ENABLED_DEFAULT = true;
+
+ public static final String EXECUTOR_CACHE_TIMEOUT =
"spark.sql.iceberg.executor-cache.timeout";
+ public static final Duration EXECUTOR_CACHE_TIMEOUT_DEFAULT =
Duration.ofMinutes(10);
+
+ public static final String EXECUTOR_CACHE_MAX_ENTRY_SIZE =
+ "spark.sql.iceberg.executor-cache.max-entry-size";
+ public static final long EXECUTOR_CACHE_MAX_ENTRY_SIZE_DEFAULT = 64 * 1024 *
1024; // 64 MB
+
+ public static final String EXECUTOR_CACHE_MAX_TOTAL_SIZE =
+ "spark.sql.iceberg.executor-cache.max-total-size";
+ public static final long EXECUTOR_CACHE_MAX_TOTAL_SIZE_DEFAULT = 128 * 1024
* 1024; // 128 MB
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 4fb838202c..c2b3e7c2dc 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
@@ -40,7 +42,9 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.BaseDeleteLoader;
import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
@@ -50,6 +54,7 @@ import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkExecutorCache;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types.NestedField;
@@ -279,5 +284,29 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
counter().increment();
}
}
+
+ @Override
+ protected DeleteLoader newDeleteLoader() {
+ return new CachingDeleteLoader(this::loadInputFile);
+ }
+
+ private class CachingDeleteLoader extends BaseDeleteLoader {
+ private final SparkExecutorCache cache;
+
+ CachingDeleteLoader(Function<DeleteFile, InputFile> loadInputFile) {
+ super(loadInputFile);
+ this.cache = SparkExecutorCache.getOrCreate();
+ }
+
+ @Override
+ protected boolean canCache(long size) {
+ return cache != null && size < cache.maxEntrySize();
+ }
+
+ @Override
+ protected <V> V getOrLoad(String key, Supplier<V> valueSupplier, long
valueSize) {
+ return cache.getOrLoad(table().name(), key, valueSupplier, valueSize);
+ }
+ }
}
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
index 65df29051c..f6913fb9d0 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkExecutorCache;
import org.apache.spark.util.KnownSizeEstimation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +68,7 @@ public class SerializableTableWithSize extends
SerializableTable
LOG.info("Releasing resources");
io().close();
}
+ invalidateCache(name());
}
public static class SerializableMetadataTableWithSize extends
SerializableMetadataTable
@@ -93,6 +95,14 @@ public class SerializableTableWithSize extends
SerializableTable
LOG.info("Releasing resources");
io().close();
}
+ invalidateCache(name());
+ }
+ }
+
+ private static void invalidateCache(String name) {
+ SparkExecutorCache cache = SparkExecutorCache.get();
+ if (cache != null) {
+ cache.invalidate(name);
}
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/Employee.java
similarity index 52%
copy from
core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
copy to spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/Employee.java
index 7690ab7e48..9c57936d98 100644
---
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/Employee.java
@@ -16,34 +16,51 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.deletes;
+package org.apache.iceberg.spark;
-import org.roaringbitmap.longlong.Roaring64Bitmap;
+import java.util.Objects;
-class BitmapPositionDeleteIndex implements PositionDeleteIndex {
- private final Roaring64Bitmap roaring64Bitmap;
+public class Employee {
+ private Integer id;
+ private String dep;
- BitmapPositionDeleteIndex() {
- roaring64Bitmap = new Roaring64Bitmap();
+ public Employee() {}
+
+ public Employee(Integer id, String dep) {
+ this.id = id;
+ this.dep = dep;
}
- @Override
- public void delete(long position) {
- roaring64Bitmap.add(position);
+ public Integer getId() {
+ return id;
}
- @Override
- public void delete(long posStart, long posEnd) {
- roaring64Bitmap.add(posStart, posEnd);
+ public void setId(Integer id) {
+ this.id = id;
+ }
+
+ public String getDep() {
+ return dep;
+ }
+
+ public void setDep(String dep) {
+ this.dep = dep;
}
@Override
- public boolean isDeleted(long position) {
- return roaring64Bitmap.contains(position);
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ } else if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ Employee employee = (Employee) other;
+ return Objects.equals(id, employee.id) && Objects.equals(dep,
employee.dep);
}
@Override
- public boolean isEmpty() {
- return roaring64Bitmap.isEmpty();
+ public int hashCode() {
+ return Objects.hash(id, dep);
}
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
new file mode 100644
index 0000000000..016319884c
--- /dev/null
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkExecutorCache.CacheValue;
+import org.apache.iceberg.spark.SparkExecutorCache.Conf;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestSparkExecutorCache extends TestBaseWithCatalog {
+
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+ protected static Object[][] parameters() {
+ return new Object[][] {
+ {
+ "testhive",
+ SparkCatalog.class.getName(),
+ ImmutableMap.of(
+ "type",
+ "hive",
+ CatalogProperties.FILE_IO_IMPL,
+ CustomFileIO.class.getName(),
+ "default-namespace",
+ "default")
+ },
+ };
+ }
+
+ private static final String UPDATES_VIEW_NAME = "updates";
+ private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
+ private static final Map<String, CustomInputFile> INPUT_FILES =
+ Collections.synchronizedMap(Maps.newHashMap());
+
+ private String targetTableName;
+ private TableIdentifier targetTableIdent;
+
+ @BeforeEach
+ public void configureTargetTableName() {
+ String name = "target_exec_cache_" + JOB_COUNTER.incrementAndGet();
+ this.targetTableName = tableName(name);
+ this.targetTableIdent = TableIdentifier.of(Namespace.of("default"), name);
+ }
+
+ @AfterEach
+ public void releaseResources() {
+ sql("DROP TABLE IF EXISTS %s", targetTableName);
+ sql("DROP TABLE IF EXISTS %s", UPDATES_VIEW_NAME);
+ INPUT_FILES.clear();
+ }
+
+ @TestTemplate
+ public void testCacheValueWeightOverflow() {
+ CacheValue cacheValue = new CacheValue("v", Integer.MAX_VALUE + 1L);
+ assertThat(cacheValue.weight()).isEqualTo(Integer.MAX_VALUE);
+ }
+
+ @TestTemplate
+ public void testCacheEnabledConfig() {
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.cacheEnabled()).isTrue();
+ });
+
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "false"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.cacheEnabled()).isFalse();
+ });
+ }
+
+ @TestTemplate
+ public void testTimeoutConfig() {
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT, "10s"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.timeout()).hasSeconds(10);
+ });
+
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_TIMEOUT, "2m"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.timeout()).hasMinutes(2);
+ });
+ }
+
+ @TestTemplate
+ public void testMaxEntrySizeConfig() {
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_MAX_ENTRY_SIZE,
"128"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.maxEntrySize()).isEqualTo(128L);
+ });
+ }
+
+ @TestTemplate
+ public void testMaxTotalSizeConfig() {
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.EXECUTOR_CACHE_MAX_TOTAL_SIZE,
"512"),
+ () -> {
+ Conf conf = new Conf();
+ assertThat(conf.maxTotalSize()).isEqualTo(512L);
+ });
+ }
+
+ @TestTemplate
+ public void testConcurrentAccess() throws InterruptedException {
+ SparkExecutorCache cache = SparkExecutorCache.getOrCreate();
+
+ String table1 = "table1";
+ String table2 = "table2";
+
+ Set<String> loadedInternalKeys = Sets.newHashSet();
+
+ String key1 = "key1";
+ String key2 = "key2";
+
+ long valueSize = 100L;
+
+ int threadCount = 10;
+ ExecutorService executorService =
Executors.newFixedThreadPool(threadCount);
+
+ for (int threadNumber = 0; threadNumber < threadCount; threadNumber++) {
+ String group = threadNumber % 2 == 0 ? table1 : table2;
+ executorService.submit(
+ () -> {
+ for (int batch = 0; batch < 3; batch++) {
+ cache.getOrLoad(
+ group,
+ key1,
+ () -> {
+ String internalKey = toInternalKey(group, key1);
+ synchronized (loadedInternalKeys) {
+ // verify only one load was done for this key
+
assertThat(loadedInternalKeys.contains(internalKey)).isFalse();
+ loadedInternalKeys.add(internalKey);
+ }
+ return "value1";
+ },
+ valueSize);
+
+ cache.getOrLoad(
+ group,
+ key2,
+ () -> {
+ String internalKey = toInternalKey(group, key2);
+ synchronized (loadedInternalKeys) {
+ // verify only one load was done for this key
+
assertThat(loadedInternalKeys.contains(internalKey)).isFalse();
+ loadedInternalKeys.add(internalKey);
+ }
+ return "value2";
+ },
+ valueSize);
+ }
+ });
+ }
+
+ executorService.shutdown();
+ assertThat(executorService.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+
+ cache.invalidate(table1);
+ cache.invalidate(table2);
+
+ // all keys must be invalidated
+ Cache<String, ?> state = fetchInternalCacheState();
+ Set<String> liveKeys = state.asMap().keySet();
+ assertThat(liveKeys).noneMatch(key -> key.startsWith(table1) ||
key.startsWith(table2));
+ }
+
+ @TestTemplate
+ public void testCopyOnWriteDelete() throws Exception {
+ checkDelete(COPY_ON_WRITE);
+ }
+
+ @TestTemplate
+ public void testMergeOnReadDelete() throws Exception {
+ checkDelete(MERGE_ON_READ);
+ }
+
+ private void checkDelete(RowLevelOperationMode mode) throws Exception {
+ List<DeleteFile> deleteFiles =
createAndInitTable(TableProperties.DELETE_MODE, mode);
+
+ sql("DELETE FROM %s WHERE id = 1 OR id = 4", targetTableName);
+
+ // there are 2 data files and 2 delete files that apply to both of them
+ // in CoW, the target table will be scanned 2 times (main query + runtime
filter)
+ // the runtime filter may invalidate the cache so check at least some
requests were hits
+ // in MoR, the target table will be scanned only once
+ // so each delete file must be opened once
+ int maxRequestCount = mode == COPY_ON_WRITE ? 3 : 1;
+ assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <=
maxRequestCount);
+
+ // verify the final set of records is correct
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+ }
+
+ @TestTemplate
+ public void testCopyOnWriteUpdate() throws Exception {
+ checkUpdate(COPY_ON_WRITE);
+ }
+
+ @TestTemplate
+ public void testMergeOnReadUpdate() throws Exception {
+ checkUpdate(MERGE_ON_READ);
+ }
+
+ private void checkUpdate(RowLevelOperationMode mode) throws Exception {
+ List<DeleteFile> deleteFiles =
createAndInitTable(TableProperties.UPDATE_MODE, mode);
+
+ Dataset<Integer> updateDS = spark.createDataset(ImmutableList.of(1, 4),
Encoders.INT());
+ updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
+
+ sql("UPDATE %s SET id = -1 WHERE id IN (SELECT * FROM %s)",
targetTableName, UPDATES_VIEW_NAME);
+
+ // there are 2 data files and 2 delete files that apply to both of them
+ // in CoW, the target table will be scanned 3 times (2 in main query +
runtime filter)
+ // the runtime filter may invalidate the cache so check at least some
requests were hits
+ // in MoR, the target table will be scanned only once
+ // so each delete file must be opened once
+ int maxRequestCount = mode == COPY_ON_WRITE ? 5 : 1;
+ assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <=
maxRequestCount);
+
+ // verify the final set of records is correct
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(-1, "hr"), row(-1, "hr")),
+ sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+ }
+
+ @TestTemplate
+ public void testCopyOnWriteMerge() throws Exception {
+ checkMerge(COPY_ON_WRITE);
+ }
+
+ @TestTemplate
+ public void testMergeOnReadMerge() throws Exception {
+ checkMerge(MERGE_ON_READ);
+ }
+
+ private void checkMerge(RowLevelOperationMode mode) throws Exception {
+ List<DeleteFile> deleteFiles =
createAndInitTable(TableProperties.MERGE_MODE, mode);
+
+ Dataset<Integer> updateDS = spark.createDataset(ImmutableList.of(1, 4),
Encoders.INT());
+ updateDS.createOrReplaceTempView(UPDATES_VIEW_NAME);
+
+ sql(
+ "MERGE INTO %s t USING %s s "
+ + "ON t.id == s.value "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET id = 100 "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT (id, dep) VALUES (-1, 'unknown')",
+ targetTableName, UPDATES_VIEW_NAME);
+
+ // there are 2 data files and 2 delete files that apply to both of them
+ // in CoW, the target table will be scanned 2 times (main query + runtime
filter)
+ // the runtime filter may invalidate the cache so check at least some
requests were hits
+ // in MoR, the target table will be scanned only once
+ // so each delete file must be opened once
+ int maxRequestCount = mode == COPY_ON_WRITE ? 3 : 1;
+ assertThat(deleteFiles).allMatch(deleteFile -> streamCount(deleteFile) <=
maxRequestCount);
+
+ // verify the final set of records is correct
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(100, "hr"), row(100, "hr")),
+ sql("SELECT * FROM %s ORDER BY id ASC", targetTableName));
+ }
+
+ private int streamCount(DeleteFile deleteFile) {
+ CustomInputFile inputFile = INPUT_FILES.get(deleteFile.path().toString());
+ return inputFile.streamCount();
+ }
+
+ private List<DeleteFile> createAndInitTable(String operation,
RowLevelOperationMode mode)
+ throws Exception {
+ sql(
+ "CREATE TABLE %s (id INT, dep STRING) "
+ + "USING iceberg "
+ + "TBLPROPERTIES ('%s' '%s', '%s' '%s', '%s' '%s')",
+ targetTableName,
+ TableProperties.WRITE_METADATA_LOCATION,
+ temp.toString().replaceFirst("file:", ""),
+ TableProperties.WRITE_DATA_LOCATION,
+ temp.toString().replaceFirst("file:", ""),
+ operation,
+ mode.modeName());
+
+ append(targetTableName, new Employee(0, "hr"), new Employee(1, "hr"), new
Employee(2, "hr"));
+ append(targetTableName, new Employee(3, "hr"), new Employee(4, "hr"), new
Employee(5, "hr"));
+
+ Table table = validationCatalog.loadTable(targetTableIdent);
+
+ List<Pair<CharSequence, Long>> posDeletes =
+ dataFiles(table).stream()
+ .map(dataFile -> Pair.of(dataFile.path(), 0L))
+ .collect(Collectors.toList());
+ Pair<DeleteFile, CharSequenceSet> posDeleteResult = writePosDeletes(table,
posDeletes);
+ DeleteFile posDeleteFile = posDeleteResult.first();
+ CharSequenceSet referencedDataFiles = posDeleteResult.second();
+
+ DeleteFile eqDeleteFile = writeEqDeletes(table, "id", 2, 5);
+
+ table
+ .newRowDelta()
+ .validateFromSnapshot(table.currentSnapshot().snapshotId())
+ .validateDataFilesExist(referencedDataFiles)
+ .addDeletes(posDeleteFile)
+ .addDeletes(eqDeleteFile)
+ .commit();
+
+ sql("REFRESH TABLE %s", targetTableName);
+
+ return ImmutableList.of(posDeleteFile, eqDeleteFile);
+ }
+
+ private DeleteFile writeEqDeletes(Table table, String col, Object... values)
throws IOException {
+ Schema deleteSchema = table.schema().select(col);
+
+ Record delete = GenericRecord.create(deleteSchema);
+ List<Record> deletes = Lists.newArrayList();
+ for (Object value : values) {
+ deletes.add(delete.copy(col, value));
+ }
+
+ OutputFile out = Files.localOutput(new File(temp, "eq-deletes-" +
UUID.randomUUID()));
+ return FileHelpers.writeDeleteFile(table, out, null, deletes,
deleteSchema);
+ }
+
+ private Pair<DeleteFile, CharSequenceSet> writePosDeletes(
+ Table table, List<Pair<CharSequence, Long>> deletes) throws IOException {
+ OutputFile out = Files.localOutput(new File(temp, "pos-deletes-" +
UUID.randomUUID()));
+ return FileHelpers.writeDeleteFile(table, out, null, deletes);
+ }
+
+ private void append(String target, Employee... employees) throws
NoSuchTableException {
+ List<Employee> input = Arrays.asList(employees);
+ Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class);
+ inputDF.coalesce(1).writeTo(target).append();
+ }
+
+ private Collection<DataFile> dataFiles(Table table) {
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ return ImmutableList.copyOf(Iterables.transform(tasks,
ContentScanTask::file));
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Cache<String, ?> fetchInternalCacheState() {
+ try {
+ Field stateField = SparkExecutorCache.class.getDeclaredField("state");
+ stateField.setAccessible(true);
+ SparkExecutorCache cache = SparkExecutorCache.get();
+ return (Cache<String, ?>) stateField.get(cache);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static String toInternalKey(String group, String key) {
+ return group + "_" + key;
+ }
+
+ public static class CustomFileIO implements FileIO {
+
+ public CustomFileIO() {}
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return INPUT_FILES.computeIfAbsent(path, key -> new
CustomInputFile(path));
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return Files.localOutput(path);
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ File file = new File(path);
+ if (!file.delete()) {
+ throw new RuntimeIOException("Failed to delete file: " + path);
+ }
+ }
+ }
+
+ public static class CustomInputFile implements InputFile {
+ private final InputFile delegate;
+ private final AtomicInteger streamCount;
+
+ public CustomInputFile(String path) {
+ this.delegate = Files.localInput(path);
+ this.streamCount = new AtomicInteger();
+ }
+
+ @Override
+ public long getLength() {
+ return delegate.getLength();
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ streamCount.incrementAndGet();
+ return delegate.newStream();
+ }
+
+ public int streamCount() {
+ return streamCount.get();
+ }
+
+ @Override
+ public String location() {
+ return delegate.location();
+ }
+
+ @Override
+ public boolean exists() {
+ return delegate.exists();
+ }
+ }
+}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index 30b3b8a885..9f4a4f47bf 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -47,6 +47,7 @@ import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPD
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DistributionMode;
@@ -78,6 +79,28 @@ public class TestSparkWriteConf extends TestBaseWithCatalog {
sql("DROP TABLE IF EXISTS %s", tableName);
}
+ @TestTemplate
+ public void testDurationConf() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ String confName = "spark.sql.iceberg.some-duration-conf";
+
+ withSQLConf(
+ ImmutableMap.of(confName, "10s"),
+ () -> {
+ SparkConfParser parser = new SparkConfParser(spark, table,
ImmutableMap.of());
+ Duration duration =
parser.durationConf().sessionConf(confName).parseOptional();
+ assertThat(duration).hasSeconds(10);
+ });
+
+ withSQLConf(
+ ImmutableMap.of(confName, "2m"),
+ () -> {
+ SparkConfParser parser = new SparkConfParser(spark, table,
ImmutableMap.of());
+ Duration duration =
parser.durationConf().sessionConf(confName).parseOptional();
+ assertThat(duration).hasMinutes(2);
+ });
+ }
+
@TestTemplate
public void testDeleteGranularityDefault() {
Table table = validationCatalog.loadTable(tableIdent);