This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new be25b80f7f Spark 4.1: Pass FileIO on Spark's read path (#15448)
be25b80f7f is described below
commit be25b80f7fa06406343f894463b72ded5f383e9e
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu Mar 19 15:18:48 2026 +0100
Spark 4.1: Pass FileIO on Spark's read path (#15448)
---
.../TestRemoteScanPlanning.java | 47 +++++++-
.../iceberg/spark/source/BaseBatchReader.java | 4 +-
.../apache/iceberg/spark/source/BaseReader.java | 9 +-
.../apache/iceberg/spark/source/BaseRowReader.java | 4 +-
.../iceberg/spark/source/BatchDataReader.java | 4 +
.../iceberg/spark/source/ChangelogRowReader.java | 4 +
.../spark/source/EqualityDeleteRowReader.java | 4 +-
.../spark/source/PositionDeletesRowReader.java | 6 +-
.../apache/iceberg/spark/source/RowDataReader.java | 5 +-
.../spark/source/SerializableFileIOWithSize.java | 120 +++++++++++++++++++++
.../apache/iceberg/spark/source/SparkBatch.java | 10 +-
.../iceberg/spark/source/SparkChangelogScan.java | 1 +
.../iceberg/spark/source/SparkInputPartition.java | 8 ++
.../spark/source/SparkMicroBatchStream.java | 11 +-
.../spark/source/SparkPartitioningAwareScan.java | 10 +-
.../org/apache/iceberg/spark/source/SparkScan.java | 16 ++-
.../iceberg/spark/source/SparkStagedScan.java | 2 +-
.../apache/iceberg/spark/TestBaseWithCatalog.java | 6 +-
.../iceberg/spark/source/TestBaseReader.java | 2 +-
.../iceberg/spark/source/TestChangelogReader.java | 8 +-
.../spark/source/TestPositionDeletesReader.java | 3 +
.../spark/source/TestSparkReaderDeletes.java | 11 +-
22 files changed, 269 insertions(+), 26 deletions(-)
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
similarity index 53%
rename from
spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
rename to
spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
index 9c31eb970b..0152c8e0e6 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
@@ -16,15 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.spark.extensions;
+package org.apache.iceberg.spark.source;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.RESTCatalogProperties;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.sql.TestSelect;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(ParameterizedTestExtension.class)
@@ -38,8 +49,6 @@ public class TestRemoteScanPlanning extends TestSelect {
ImmutableMap.builder()
.putAll(SparkCatalogConfig.REST.properties())
.put(CatalogProperties.URI,
restCatalog.properties().get(CatalogProperties.URI))
- // this flag is typically only set by the server, but we set it
from the client for
- // testing
.put(
RESTCatalogProperties.SCAN_PLANNING_MODE,
RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())
@@ -48,4 +57,36 @@ public class TestRemoteScanPlanning extends TestSelect {
}
};
}
+
+ @TestTemplate
+ public void fileIOIsPropagated() {
+ RESTCatalog catalog = new RESTCatalog();
+ catalog.setConf(new Configuration());
+ catalog.initialize(
+ "test",
+ ImmutableMap.<String, String>builder()
+ .putAll(restCatalog.properties())
+ .put(
+ RESTCatalogProperties.SCAN_PLANNING_MODE,
+ RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())
+ .build());
+ Table table = catalog.loadTable(tableIdent);
+
+ SparkScanBuilder builder = new SparkScanBuilder(spark, table,
CaseInsensitiveStringMap.empty());
+ verifyFileIOHasPlanId(builder.build().toBatch(), table);
+ verifyFileIOHasPlanId(builder.buildCopyOnWriteScan().toBatch(), table);
+ }
+
+ private void verifyFileIOHasPlanId(Batch batch, Table table) {
+ FileIO fileIOForScan =
+ (FileIO)
+ assertThat(batch)
+ .extracting("fileIO")
+ .isInstanceOf(Supplier.class)
+ .asInstanceOf(InstanceOfAssertFactories.type(Supplier.class))
+ .actual()
+ .get();
+
assertThat(fileIOForScan.properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+ }
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index 2dd6a52c26..a2af0964c6 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -49,13 +50,14 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
BaseBatchReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<T> taskGroup,
Schema expectedSchema,
boolean caseSensitive,
ParquetBatchReadConf parquetConf,
OrcBatchReadConf orcConf,
boolean cacheDeleteFilesOnExecutors) {
- super(table, taskGroup, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
+ super(table, fileIO, taskGroup, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
this.parquetConf = parquetConf;
this.orcConf = orcConf;
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 8adc38f2ae..a4d9766ae7 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -45,6 +45,7 @@ import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
@@ -68,6 +69,7 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
private final Table table;
+ private final EncryptingFileIO fileIO;
private final Schema expectedSchema;
private final boolean caseSensitive;
private final NameMapping nameMapping;
@@ -83,11 +85,13 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
BaseReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<TaskT> taskGroup,
Schema expectedSchema,
boolean caseSensitive,
boolean cacheDeleteFilesOnExecutors) {
this.table = table;
+ this.fileIO = EncryptingFileIO.combine(fileIO, table().encryption());
this.taskGroup = taskGroup;
this.tasks = taskGroup.tasks().iterator();
this.currentIterator = CloseableIterator.empty();
@@ -179,9 +183,8 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
private Map<String, InputFile> inputFiles() {
if (lazyInputFiles == null) {
this.lazyInputFiles =
- EncryptingFileIO.combine(table().io(), table().encryption())
- .bulkDecrypt(
- () ->
taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator());
+ fileIO.bulkDecrypt(
+ () ->
taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator());
}
return lazyInputFiles;
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
index 14febb212a..cbc40db533 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
@@ -28,17 +28,19 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.spark.sql.catalyst.InternalRow;
abstract class BaseRowReader<T extends ScanTask> extends
BaseReader<InternalRow, T> {
BaseRowReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<T> taskGroup,
Schema expectedSchema,
boolean caseSensitive,
boolean cacheDeleteFilesOnExecutors) {
- super(table, taskGroup, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
+ super(table, fileIO, taskGroup, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
}
protected CloseableIterable<InternalRow> newIterable(
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 9a4ab30fec..237dfd5c69 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -52,6 +53,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
OrcBatchReadConf orcBatchReadConf) {
this(
partition.table(),
+ partition.io(),
partition.taskGroup(),
partition.projection(),
partition.isCaseSensitive(),
@@ -62,6 +64,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
BatchDataReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<FileScanTask> taskGroup,
Schema expectedSchema,
boolean caseSensitive,
@@ -70,6 +73,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
boolean cacheDeleteFilesOnExecutors) {
super(
table,
+ fileIO,
taskGroup,
expectedSchema,
caseSensitive,
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 417440d4b4..eb8e5e63f4 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.spark.rdd.InputFileBlockHolder;
@@ -50,6 +51,7 @@ class ChangelogRowReader extends
BaseRowReader<ChangelogScanTask>
ChangelogRowReader(SparkInputPartition partition) {
this(
partition.table(),
+ partition.io(),
partition.taskGroup(),
partition.projection(),
partition.isCaseSensitive(),
@@ -58,12 +60,14 @@ class ChangelogRowReader extends
BaseRowReader<ChangelogScanTask>
ChangelogRowReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<ChangelogScanTask> taskGroup,
Schema expectedSchema,
boolean caseSensitive,
boolean cacheDeleteFilesOnExecutors) {
super(
table,
+ fileIO,
taskGroup,
ChangelogUtil.dropChangelogMetadata(expectedSchema),
caseSensitive,
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index 5c4c21df34..aae399c5f2 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -32,10 +33,11 @@ public class EqualityDeleteRowReader extends RowDataReader {
public EqualityDeleteRowReader(
CombinedScanTask task,
Table table,
+ FileIO fileIO,
Schema expectedSchema,
boolean caseSensitive,
boolean cacheDeleteFilesOnExecutors) {
- super(table, task, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
+ super(table, fileIO, task, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
}
@Override
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 1a45facba6..b14970722e 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
@@ -48,6 +49,7 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
PositionDeletesRowReader(SparkInputPartition partition) {
this(
partition.table(),
+ partition.io(),
partition.taskGroup(),
partition.projection(),
partition.isCaseSensitive(),
@@ -56,12 +58,12 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
PositionDeletesRowReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<PositionDeletesScanTask> taskGroup,
Schema expectedSchema,
boolean caseSensitive,
boolean cacheDeleteFilesOnExecutors) {
-
- super(table, taskGroup, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
+ super(table, fileIO, taskGroup, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
int numSplits = taskGroup.tasks().size();
LOG.debug("Reading {} position delete file split(s) for table {}",
numSplits, table.name());
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 0b53e72d99..dbfb0b7614 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
@@ -47,6 +48,7 @@ class RowDataReader extends BaseRowReader<FileScanTask>
implements PartitionRead
RowDataReader(SparkInputPartition partition) {
this(
partition.table(),
+ partition.io(),
partition.taskGroup(),
partition.projection(),
partition.isCaseSensitive(),
@@ -55,12 +57,13 @@ class RowDataReader extends BaseRowReader<FileScanTask>
implements PartitionRead
RowDataReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<FileScanTask> taskGroup,
Schema expectedSchema,
boolean caseSensitive,
boolean cacheDeleteFilesOnExecutors) {
- super(table, taskGroup, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
+ super(table, fileIO, taskGroup, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
numSplits = taskGroup.tasks().size();
LOG.debug("Reading {} file split(s) for table {}", numSplits,
table.name());
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
new file mode 100644
index 0000000000..49189d9d57
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.spark.util.KnownSizeEstimation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides a serializable {@link FileIO} with a known size
estimate. Spark calls its
+ * {@link org.apache.spark.util.SizeEstimator} class when broadcasting
variables and this can be an
+ * expensive operation, so providing a known size estimate allows that
operation to be skipped.
+ *
+ * <p>This class also implements {@link AutoCloseable} to avoid leaking
resources upon broadcasting.
+ * Broadcast variables are destroyed and cleaned up on the driver and
executors once they are
+ * garbage collected on the driver. The implementation ensures only resources
used by copies of the
+ * main {@link FileIO} are released.
+ */
+class SerializableFileIOWithSize
+ implements FileIO, HadoopConfigurable, KnownSizeEstimation, AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(SerializableFileIOWithSize.class);
+ private static final long SIZE_ESTIMATE = 32_768L;
+ private final transient Object serializationMarker;
+ private final FileIO fileIO;
+
+ private SerializableFileIOWithSize(FileIO fileIO) {
+ this.fileIO = fileIO;
+ this.serializationMarker = new Object();
+ }
+
+ @Override
+ public long estimatedSize() {
+ return SIZE_ESTIMATE;
+ }
+
+ public static FileIO wrap(FileIO fileIO) {
+ return new SerializableFileIOWithSize(fileIO);
+ }
+
+ @Override
+ public void close() {
+ if (null == serializationMarker) {
+ LOG.debug("Closing FileIO");
+ fileIO.close();
+ }
+ }
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return fileIO.newInputFile(path);
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return fileIO.newOutputFile(path);
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ fileIO.deleteFile(path);
+ }
+
+ @Override
+ public void initialize(Map<String, String> properties) {
+ fileIO.initialize(properties);
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return fileIO.properties();
+ }
+
+ @Override
+ public void serializeConfWith(
+ Function<Configuration, SerializableSupplier<Configuration>>
confSerializer) {
+ if (fileIO instanceof HadoopConfigurable configurable) {
+ configurable.serializeConfWith(confSerializer);
+ }
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (fileIO instanceof HadoopConfigurable configurable) {
+ configurable.setConf(conf);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ if (fileIO instanceof HadoopConfigurable hadoopConfigurable) {
+ return hadoopConfigurable.getConf();
+ }
+
+ return null;
+ }
+}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index f4946c6404..22a4b171b3 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
import java.util.List;
import java.util.Objects;
+import java.util.function.Supplier;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
@@ -28,6 +29,7 @@ import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.spark.ImmutableOrcBatchReadConf;
import org.apache.iceberg.spark.ImmutableParquetBatchReadConf;
import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -45,6 +47,7 @@ class SparkBatch implements Batch {
private final JavaSparkContext sparkContext;
private final Table table;
+ private final Supplier<FileIO> fileIO;
private final SparkReadConf readConf;
private final Types.StructType groupingKeyType;
private final List<? extends ScanTaskGroup<?>> taskGroups;
@@ -58,6 +61,7 @@ class SparkBatch implements Batch {
SparkBatch(
JavaSparkContext sparkContext,
Table table,
+ Supplier<FileIO> fileIO,
SparkReadConf readConf,
Types.StructType groupingKeyType,
List<? extends ScanTaskGroup<?>> taskGroups,
@@ -65,6 +69,7 @@ class SparkBatch implements Batch {
int scanHashCode) {
this.sparkContext = sparkContext;
this.table = table;
+ this.fileIO = fileIO;
this.readConf = readConf;
this.groupingKeyType = groupingKeyType;
this.taskGroups = taskGroups;
@@ -81,6 +86,8 @@ class SparkBatch implements Batch {
// broadcast the table metadata as input partitions will be sent to
executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ Broadcast<FileIO> fileIOBroadcast =
+ sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get()));
String projectionString = SchemaParser.toJson(projection);
String[][] locations = computePreferredLocations();
@@ -92,6 +99,7 @@ class SparkBatch implements Batch {
groupingKeyType,
taskGroups.get(index),
tableBroadcast,
+ fileIOBroadcast,
projectionString,
caseSensitive,
locations != null ? locations[index] :
SparkPlanningUtil.NO_LOCATION_PREFERENCE,
@@ -103,7 +111,7 @@ class SparkBatch implements Batch {
private String[][] computePreferredLocations() {
if (localityEnabled) {
- return SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups);
+ return SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups);
} else if (executorCacheLocalityEnabled) {
List<String> executorLocations = SparkUtil.executorLocations();
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 4425c4936a..57ccf92b96 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -105,6 +105,7 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
return new SparkBatch(
sparkContext,
table,
+ null != scan ? scan.fileIO() : table::io,
readConf,
EMPTY_GROUPING_KEY_TYPE,
taskGroups(),
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
index 98a0061b3a..a3d78b43a9 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.types.Types;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,6 +35,7 @@ class SparkInputPartition implements InputPartition,
HasPartitionKey, Serializab
private final Types.StructType groupingKeyType;
private final ScanTaskGroup<?> taskGroup;
private final Broadcast<Table> tableBroadcast;
+ private final Broadcast<FileIO> fileIOBroadcast;
private final String projectionString;
private final boolean caseSensitive;
private final transient String[] preferredLocations;
@@ -45,6 +47,7 @@ class SparkInputPartition implements InputPartition,
HasPartitionKey, Serializab
Types.StructType groupingKeyType,
ScanTaskGroup<?> taskGroup,
Broadcast<Table> tableBroadcast,
+ Broadcast<FileIO> fileIOBroadcast,
String projectionString,
boolean caseSensitive,
String[] preferredLocations,
@@ -52,6 +55,7 @@ class SparkInputPartition implements InputPartition,
HasPartitionKey, Serializab
this.groupingKeyType = groupingKeyType;
this.taskGroup = taskGroup;
this.tableBroadcast = tableBroadcast;
+ this.fileIOBroadcast = fileIOBroadcast;
this.projectionString = projectionString;
this.caseSensitive = caseSensitive;
this.preferredLocations = preferredLocations;
@@ -81,6 +85,10 @@ class SparkInputPartition implements InputPartition,
HasPartitionKey, Serializab
return tableBroadcast.value();
}
+ public FileIO io() {
+ return fileIOBroadcast.value();
+ }
+
public boolean isCaseSensitive() {
return caseSensitive;
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index c9a0f2566b..7adf3c633c 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -26,6 +26,7 @@ import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
@@ -61,10 +62,12 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
private static final Types.StructType EMPTY_GROUPING_KEY_TYPE =
Types.StructType.of();
private final Table table;
+ private final Supplier<FileIO> fileIO;
private final SparkReadConf readConf;
private final boolean caseSensitive;
private final String projection;
private final Broadcast<Table> tableBroadcast;
+ private final Broadcast<FileIO> fileIOBroadcast;
private final long splitSize;
private final int splitLookback;
private final long splitOpenFileCost;
@@ -80,15 +83,18 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
SparkMicroBatchStream(
JavaSparkContext sparkContext,
Table table,
+ Supplier<FileIO> fileIO,
SparkReadConf readConf,
Schema projection,
String checkpointLocation) {
this.table = table;
+ this.fileIO = fileIO;
this.readConf = readConf;
this.caseSensitive = readConf.caseSensitive();
this.projection = SchemaParser.toJson(projection);
this.localityPreferred = readConf.localityEnabled();
this.tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ this.fileIOBroadcast =
sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get()));
this.splitSize = readConf.splitSize();
this.splitLookback = readConf.splitLookback();
this.splitOpenFileCost = readConf.splitOpenFileCost();
@@ -158,6 +164,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
EMPTY_GROUPING_KEY_TYPE,
combinedScanTasks.get(index),
tableBroadcast,
+ fileIOBroadcast,
projection,
caseSensitive,
locations != null ? locations[index] :
SparkPlanningUtil.NO_LOCATION_PREFERENCE,
@@ -168,7 +175,9 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
}
private String[][] computePreferredLocations(List<CombinedScanTask>
taskGroups) {
- return localityPreferred ?
SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups) : null;
+ return localityPreferred
+ ? SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups)
+ : null;
}
@Override
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index dff844eb45..fe5eeee8fb 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -79,7 +79,15 @@ abstract class SparkPartitioningAwareScan<T extends
PartitionScanTask> extends S
Schema projection,
List<Expression> filters,
Supplier<ScanReport> scanReportSupplier) {
- super(spark, table, schema, readConf, projection, filters,
scanReportSupplier);
+ super(
+ spark,
+ table,
+ null != scan ? scan.fileIO() : table::io,
+ schema,
+ readConf,
+ projection,
+ filters,
+ scanReportSupplier);
this.scan = scan;
this.preserveDataGrouping = readConf.preserveDataGrouping();
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 2e5f50ea88..6b80199a25 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -101,6 +102,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
private final JavaSparkContext sparkContext;
private final Table table;
+ private final Supplier<FileIO> fileIO;
private final Schema schema;
private final SparkSession spark;
private final SparkReadConf readConf;
@@ -115,6 +117,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
SparkScan(
SparkSession spark,
Table table,
+ Supplier<FileIO> fileIO,
Schema schema,
SparkReadConf readConf,
Schema projection,
@@ -123,6 +126,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
this.spark = spark;
this.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
+ this.fileIO = fileIO;
this.schema = schema;
this.readConf = readConf;
this.caseSensitive = readConf.caseSensitive();
@@ -169,12 +173,20 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
@Override
public Batch toBatch() {
return new SparkBatch(
- sparkContext, table, readConf, groupingKeyType(), taskGroups(),
projection, hashCode());
+ sparkContext,
+ table,
+ fileIO,
+ readConf,
+ groupingKeyType(),
+ taskGroups(),
+ projection,
+ hashCode());
}
@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
- return new SparkMicroBatchStream(sparkContext, table, readConf,
projection, checkpointLocation);
+ return new SparkMicroBatchStream(
+ sparkContext, table, fileIO, readConf, projection, checkpointLocation);
}
@Override
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
index e4412f5cba..47481ec51c 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
@@ -47,7 +47,7 @@ class SparkStagedScan extends SparkScan {
Schema projection,
String taskSetId,
SparkReadConf readConf) {
- super(spark, table, schema, readConf, projection, ImmutableList.of(),
null);
+ super(spark, table, table::io, schema, readConf, projection,
ImmutableList.of(), null);
this.taskSetId = taskSetId;
this.splitSize = readConf.splitSize();
this.splitLookback = readConf.splitLookback();
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
index 7df9c75fb3..1760143d2c 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
@@ -69,7 +69,11 @@ public abstract class TestBaseWithCatalog extends TestBase {
// status even belonging to the same catalog. Reference:
// https://www.sqlite.org/inmemorydb.html
CatalogProperties.CLIENT_POOL_SIZE,
- "1"));
+ "1",
+ "include-credentials",
+ "true",
+ "gcs.oauth2.token",
+ "dummyToken"));
protected static RESTCatalog restCatalog;
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
index 5922723096..0eb9bbe52f 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
@@ -90,7 +90,7 @@ public class TestBaseReader {
private final Map<String, CloseableIntegerRange> tracker =
Maps.newHashMap();
ClosureTrackingReader(Table table, List<FileScanTask> tasks) {
- super(table, new BaseCombinedScanTask(tasks), null, false, true);
+ super(table, table.io(), new BaseCombinedScanTask(tasks), null, false,
true);
}
@Override
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
index c31c10f97c..b12fdd443f 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
@@ -105,7 +105,7 @@ public class TestChangelogReader extends TestBase {
for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
ChangelogRowReader reader =
- new ChangelogRowReader(table, taskGroup, table.schema(), false,
true);
+ new ChangelogRowReader(table, table.io(), taskGroup, table.schema(),
false, true);
while (reader.next()) {
rows.add(reader.get().copy());
}
@@ -136,7 +136,7 @@ public class TestChangelogReader extends TestBase {
for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
ChangelogRowReader reader =
- new ChangelogRowReader(table, taskGroup, table.schema(), false,
true);
+ new ChangelogRowReader(table, table.io(), taskGroup, table.schema(),
false, true);
while (reader.next()) {
rows.add(reader.get().copy());
}
@@ -170,7 +170,7 @@ public class TestChangelogReader extends TestBase {
for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
ChangelogRowReader reader =
- new ChangelogRowReader(table, taskGroup, table.schema(), false,
true);
+ new ChangelogRowReader(table, table.io(), taskGroup, table.schema(),
false, true);
while (reader.next()) {
rows.add(reader.get().copy());
}
@@ -197,7 +197,7 @@ public class TestChangelogReader extends TestBase {
for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
ChangelogRowReader reader =
- new ChangelogRowReader(table, taskGroup, table.schema(), false,
true);
+ new ChangelogRowReader(table, table.io(), taskGroup, table.schema(),
false, true);
while (reader.next()) {
rows.add(reader.get().copy());
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
index f4ace848af..681ab1fd76 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
@@ -180,6 +180,7 @@ public class TestPositionDeletesReader extends TestBase {
try (PositionDeletesRowReader reader =
new PositionDeletesRowReader(
table,
+ table.io(),
new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
projectedSchema,
false,
@@ -219,6 +220,7 @@ public class TestPositionDeletesReader extends TestBase {
try (PositionDeletesRowReader reader =
new PositionDeletesRowReader(
table,
+ table.io(),
new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask2)),
projectedSchema,
false,
@@ -290,6 +292,7 @@ public class TestPositionDeletesReader extends TestBase {
try (PositionDeletesRowReader reader =
new PositionDeletesRowReader(
table,
+ table.io(),
new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
projectedSchema,
false,
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index a58fc8bdb9..a5bf39a5a6 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -332,7 +332,7 @@ public class TestSparkReaderDeletes extends DeleteReadTests
{
for (CombinedScanTask task : tasks) {
try (EqualityDeleteRowReader reader =
- new EqualityDeleteRowReader(task, table, table.schema(), false,
true)) {
+ new EqualityDeleteRowReader(task, table, table.io(), table.schema(),
false, true)) {
while (reader.next()) {
actualRowSet.add(
new InternalRowWrapper(
@@ -677,7 +677,14 @@ public class TestSparkReaderDeletes extends
DeleteReadTests {
try (BatchDataReader reader =
new BatchDataReader(
// expected column is id, while the equality filter column is dt
- dateTable, task, dateTable.schema().select("id"), false, conf,
null, true)) {
+ dateTable,
+ dateTable.io(),
+ task,
+ dateTable.schema().select("id"),
+ false,
+ conf,
+ null,
+ true)) {
while (reader.next()) {
ColumnarBatch columnarBatch = reader.get();
int numOfCols = columnarBatch.numCols();