This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 0c46639e0e Spark 3.5: Pass FileIO on Spark's read path (#15683)
0c46639e0e is described below
commit 0c46639e0e2e60748d6823d3620bd70838151fa9
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu Mar 19 15:19:18 2026 +0100
Spark 3.5: Pass FileIO on Spark's read path (#15683)
---
.../TestRemoteScanPlanning.java | 47 +++++++-
.../iceberg/spark/source/BaseBatchReader.java | 10 +-
.../apache/iceberg/spark/source/BaseReader.java | 9 +-
.../apache/iceberg/spark/source/BaseRowReader.java | 10 +-
.../iceberg/spark/source/BatchDataReader.java | 4 +
.../iceberg/spark/source/ChangelogRowReader.java | 4 +
.../spark/source/EqualityDeleteRowReader.java | 11 +-
.../spark/source/PositionDeletesRowReader.java | 12 ++-
.../apache/iceberg/spark/source/RowDataReader.java | 11 +-
.../spark/source/SerializableFileIOWithSize.java | 120 +++++++++++++++++++++
.../apache/iceberg/spark/source/SparkBatch.java | 10 +-
.../iceberg/spark/source/SparkChangelogScan.java | 1 +
.../iceberg/spark/source/SparkInputPartition.java | 8 ++
.../spark/source/SparkMicroBatchStream.java | 11 +-
.../spark/source/SparkPartitioningAwareScan.java | 9 +-
.../org/apache/iceberg/spark/source/SparkScan.java | 15 ++-
.../iceberg/spark/source/SparkStagedScan.java | 2 +-
.../apache/iceberg/spark/TestBaseWithCatalog.java | 6 +-
.../iceberg/spark/source/TestBaseReader.java | 2 +-
.../iceberg/spark/source/TestChangelogReader.java | 12 ++-
.../spark/source/TestPositionDeletesReader.java | 3 +
.../spark/source/TestSparkReaderDeletes.java | 4 +-
22 files changed, 296 insertions(+), 25 deletions(-)
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
similarity index 53%
rename from
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
rename to
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
index 9c31eb970b..0152c8e0e6 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/source/TestRemoteScanPlanning.java
@@ -16,15 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iceberg.spark.extensions;
+package org.apache.iceberg.spark.source;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.rest.RESTCatalogProperties;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.sql.TestSelect;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(ParameterizedTestExtension.class)
@@ -38,8 +49,6 @@ public class TestRemoteScanPlanning extends TestSelect {
ImmutableMap.builder()
.putAll(SparkCatalogConfig.REST.properties())
.put(CatalogProperties.URI,
restCatalog.properties().get(CatalogProperties.URI))
- // this flag is typically only set by the server, but we set it
from the client for
- // testing
.put(
RESTCatalogProperties.SCAN_PLANNING_MODE,
RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())
@@ -48,4 +57,36 @@ public class TestRemoteScanPlanning extends TestSelect {
}
};
}
+
+ @TestTemplate
+ public void fileIOIsPropagated() {
+ RESTCatalog catalog = new RESTCatalog();
+ catalog.setConf(new Configuration());
+ catalog.initialize(
+ "test",
+ ImmutableMap.<String, String>builder()
+ .putAll(restCatalog.properties())
+ .put(
+ RESTCatalogProperties.SCAN_PLANNING_MODE,
+ RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())
+ .build());
+ Table table = catalog.loadTable(tableIdent);
+
+ SparkScanBuilder builder = new SparkScanBuilder(spark, table,
CaseInsensitiveStringMap.empty());
+ verifyFileIOHasPlanId(builder.build().toBatch(), table);
+ verifyFileIOHasPlanId(builder.buildCopyOnWriteScan().toBatch(), table);
+ }
+
+ private void verifyFileIOHasPlanId(Batch batch, Table table) {
+ FileIO fileIOForScan =
+ (FileIO)
+ assertThat(batch)
+ .extracting("fileIO")
+ .isInstanceOf(Supplier.class)
+ .asInstanceOf(InstanceOfAssertFactories.type(Supplier.class))
+ .actual()
+ .get();
+
assertThat(fileIOForScan.properties()).containsKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+
assertThat(table.io().properties()).doesNotContainKey(RESTCatalogProperties.REST_SCAN_PLAN_ID);
+ }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index fe062f9d73..c1b2a58737 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -49,6 +50,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
BaseBatchReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<T> taskGroup,
Schema tableSchema,
Schema expectedSchema,
@@ -57,7 +59,13 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
OrcBatchReadConf orcConf,
boolean cacheDeleteFilesOnExecutors) {
super(
- table, taskGroup, tableSchema, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
+ table,
+ fileIO,
+ taskGroup,
+ tableSchema,
+ expectedSchema,
+ caseSensitive,
+ cacheDeleteFilesOnExecutors);
this.parquetConf = parquetConf;
this.orcConf = orcConf;
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index bf16226171..0333f1e45d 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
@@ -66,6 +67,7 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BaseReader.class);
private final Table table;
+ private final EncryptingFileIO fileIO;
private final Schema tableSchema;
private final Schema expectedSchema;
private final boolean caseSensitive;
@@ -82,12 +84,14 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
BaseReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<TaskT> taskGroup,
Schema tableSchema,
Schema expectedSchema,
boolean caseSensitive,
boolean cacheDeleteFilesOnExecutors) {
this.table = table;
+ this.fileIO = EncryptingFileIO.combine(fileIO, table().encryption());
this.taskGroup = taskGroup;
this.tasks = taskGroup.tasks().iterator();
this.currentIterator = CloseableIterator.empty();
@@ -180,9 +184,8 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
private Map<String, InputFile> inputFiles() {
if (lazyInputFiles == null) {
this.lazyInputFiles =
- EncryptingFileIO.combine(table().io(), table().encryption())
- .bulkDecrypt(
- () ->
taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator());
+ fileIO.bulkDecrypt(
+ () ->
taskGroup.tasks().stream().flatMap(this::referencedFiles).iterator());
}
return lazyInputFiles;
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
index 53d44e760a..a7016e3b09 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
@@ -28,19 +28,27 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.spark.sql.catalyst.InternalRow;
abstract class BaseRowReader<T extends ScanTask> extends
BaseReader<InternalRow, T> {
BaseRowReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<T> taskGroup,
Schema tableSchema,
Schema expectedSchema,
boolean caseSensitive,
boolean cacheDeleteFilesOnExecutors) {
super(
- table, taskGroup, tableSchema, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
+ table,
+ fileIO,
+ taskGroup,
+ tableSchema,
+ expectedSchema,
+ caseSensitive,
+ cacheDeleteFilesOnExecutors);
}
protected CloseableIterable<InternalRow> newIterable(
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 9ec0f88577..3dcfb604ea 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -53,6 +54,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
OrcBatchReadConf orcBatchReadConf) {
this(
partition.table(),
+ partition.io(),
partition.taskGroup(),
SnapshotUtil.schemaFor(partition.table(), partition.branch()),
partition.expectedSchema(),
@@ -64,6 +66,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
BatchDataReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<FileScanTask> taskGroup,
Schema tableSchema,
Schema expectedSchema,
@@ -73,6 +76,7 @@ class BatchDataReader extends BaseBatchReader<FileScanTask>
boolean cacheDeleteFilesOnExecutors) {
super(
table,
+ fileIO,
taskGroup,
tableSchema,
expectedSchema,
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index b8fa129f6a..3657475651 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.SnapshotUtil;
@@ -51,6 +52,7 @@ class ChangelogRowReader extends
BaseRowReader<ChangelogScanTask>
ChangelogRowReader(SparkInputPartition partition) {
this(
partition.table(),
+ partition.io(),
partition.taskGroup(),
SnapshotUtil.schemaFor(partition.table(), partition.branch()),
partition.expectedSchema(),
@@ -60,6 +62,7 @@ class ChangelogRowReader extends
BaseRowReader<ChangelogScanTask>
ChangelogRowReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<ChangelogScanTask> taskGroup,
Schema tableSchema,
Schema expectedSchema,
@@ -67,6 +70,7 @@ class ChangelogRowReader extends
BaseRowReader<ChangelogScanTask>
boolean cacheDeleteFilesOnExecutors) {
super(
table,
+ fileIO,
taskGroup,
tableSchema,
ChangelogUtil.dropChangelogMetadata(expectedSchema),
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index e1292647b7..96dd99ea64 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -32,11 +33,19 @@ public class EqualityDeleteRowReader extends RowDataReader {
public EqualityDeleteRowReader(
CombinedScanTask task,
Table table,
+ FileIO fileIO,
Schema tableSchema,
Schema expectedSchema,
boolean caseSensitive,
boolean cacheDeleteFilesOnExecutors) {
- super(table, task, tableSchema, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
+ super(
+ table,
+ fileIO,
+ task,
+ tableSchema,
+ expectedSchema,
+ caseSensitive,
+ cacheDeleteFilesOnExecutors);
}
@Override
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 8ad1f3ad39..7c5969effb 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
@@ -49,6 +50,7 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
PositionDeletesRowReader(SparkInputPartition partition) {
this(
partition.table(),
+ partition.io(),
partition.taskGroup(),
SnapshotUtil.schemaFor(partition.table(), partition.branch()),
partition.expectedSchema(),
@@ -58,14 +60,20 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
PositionDeletesRowReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<PositionDeletesScanTask> taskGroup,
Schema tableSchema,
Schema expectedSchema,
boolean caseSensitive,
boolean cacheDeleteFilesOnExecutors) {
-
super(
- table, taskGroup, tableSchema, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
+ table,
+ fileIO,
+ taskGroup,
+ tableSchema,
+ expectedSchema,
+ caseSensitive,
+ cacheDeleteFilesOnExecutors);
int numSplits = taskGroup.tasks().size();
LOG.debug("Reading {} position delete file split(s) for table {}",
numSplits, table.name());
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index b2b3c78563..08aa44f710 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
@@ -48,6 +49,7 @@ class RowDataReader extends BaseRowReader<FileScanTask>
implements PartitionRead
RowDataReader(SparkInputPartition partition) {
this(
partition.table(),
+ partition.io(),
partition.taskGroup(),
SnapshotUtil.schemaFor(partition.table(), partition.branch()),
partition.expectedSchema(),
@@ -57,6 +59,7 @@ class RowDataReader extends BaseRowReader<FileScanTask>
implements PartitionRead
RowDataReader(
Table table,
+ FileIO fileIO,
ScanTaskGroup<FileScanTask> taskGroup,
Schema tableSchema,
Schema expectedSchema,
@@ -64,7 +67,13 @@ class RowDataReader extends BaseRowReader<FileScanTask>
implements PartitionRead
boolean cacheDeleteFilesOnExecutors) {
super(
- table, taskGroup, tableSchema, expectedSchema, caseSensitive,
cacheDeleteFilesOnExecutors);
+ table,
+ fileIO,
+ taskGroup,
+ tableSchema,
+ expectedSchema,
+ caseSensitive,
+ cacheDeleteFilesOnExecutors);
numSplits = taskGroup.tasks().size();
LOG.debug("Reading {} file split(s) for table {}", numSplits,
table.name());
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
new file mode 100644
index 0000000000..49189d9d57
--- /dev/null
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableFileIOWithSize.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.hadoop.HadoopConfigurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.spark.util.KnownSizeEstimation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides a serializable {@link FileIO} with a known size
estimate. Spark calls its
+ * {@link org.apache.spark.util.SizeEstimator} class when broadcasting
variables and this can be an
+ * expensive operation, so providing a known size estimate allows that
operation to be skipped.
+ *
+ * <p>This class also implements {@link AutoCloseable} to avoid leaking
resources upon broadcasting.
+ * Broadcast variables are destroyed and cleaned up on the driver and
executors once they are
+ * garbage collected on the driver. The implementation ensures only resources
used by copies of the
+ * main {@link FileIO} are released.
+ */
+class SerializableFileIOWithSize
+ implements FileIO, HadoopConfigurable, KnownSizeEstimation, AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(SerializableFileIOWithSize.class);
+ private static final long SIZE_ESTIMATE = 32_768L;
+ private final transient Object serializationMarker;
+ private final FileIO fileIO;
+
+ private SerializableFileIOWithSize(FileIO fileIO) {
+ this.fileIO = fileIO;
+ this.serializationMarker = new Object();
+ }
+
+ @Override
+ public long estimatedSize() {
+ return SIZE_ESTIMATE;
+ }
+
+ public static FileIO wrap(FileIO fileIO) {
+ return new SerializableFileIOWithSize(fileIO);
+ }
+
+ @Override
+ public void close() {
+ if (null == serializationMarker) {
+ LOG.debug("Closing FileIO");
+ fileIO.close();
+ }
+ }
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return fileIO.newInputFile(path);
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ return fileIO.newOutputFile(path);
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ fileIO.deleteFile(path);
+ }
+
+ @Override
+ public void initialize(Map<String, String> properties) {
+ fileIO.initialize(properties);
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return fileIO.properties();
+ }
+
+ @Override
+ public void serializeConfWith(
+ Function<Configuration, SerializableSupplier<Configuration>>
confSerializer) {
+ if (fileIO instanceof HadoopConfigurable configurable) {
+ configurable.serializeConfWith(confSerializer);
+ }
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ if (fileIO instanceof HadoopConfigurable configurable) {
+ configurable.setConf(conf);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ if (fileIO instanceof HadoopConfigurable hadoopConfigurable) {
+ return hadoopConfigurable.getConf();
+ }
+
+ return null;
+ }
+}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index 261d5fa227..2109936c96 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
import java.util.List;
import java.util.Objects;
+import java.util.function.Supplier;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
@@ -28,6 +29,7 @@ import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.spark.ImmutableOrcBatchReadConf;
import org.apache.iceberg.spark.ImmutableParquetBatchReadConf;
import org.apache.iceberg.spark.OrcBatchReadConf;
@@ -45,6 +47,7 @@ class SparkBatch implements Batch {
private final JavaSparkContext sparkContext;
private final Table table;
+ private final Supplier<FileIO> fileIO;
private final String branch;
private final SparkReadConf readConf;
private final Types.StructType groupingKeyType;
@@ -59,6 +62,7 @@ class SparkBatch implements Batch {
SparkBatch(
JavaSparkContext sparkContext,
Table table,
+ Supplier<FileIO> fileIO,
SparkReadConf readConf,
Types.StructType groupingKeyType,
List<? extends ScanTaskGroup<?>> taskGroups,
@@ -66,6 +70,7 @@ class SparkBatch implements Batch {
int scanHashCode) {
this.sparkContext = sparkContext;
this.table = table;
+ this.fileIO = fileIO;
this.branch = readConf.branch();
this.readConf = readConf;
this.groupingKeyType = groupingKeyType;
@@ -83,6 +88,8 @@ class SparkBatch implements Batch {
// broadcast the table metadata as input partitions will be sent to
executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ Broadcast<FileIO> fileIOBroadcast =
+ sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get()));
String expectedSchemaString = SchemaParser.toJson(expectedSchema);
String[][] locations = computePreferredLocations();
@@ -94,6 +101,7 @@ class SparkBatch implements Batch {
groupingKeyType,
taskGroups.get(index),
tableBroadcast,
+ fileIOBroadcast,
branch,
expectedSchemaString,
caseSensitive,
@@ -106,7 +114,7 @@ class SparkBatch implements Batch {
private String[][] computePreferredLocations() {
if (localityEnabled) {
- return SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups);
+ return SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups);
} else if (executorCacheLocalityEnabled) {
List<String> executorLocations = SparkUtil.executorLocations();
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
index 55ea137ca1..eba0431e3a 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -106,6 +106,7 @@ class SparkChangelogScan implements Scan,
SupportsReportStatistics {
return new SparkBatch(
sparkContext,
table,
+ null != scan ? scan.fileIO() : table::io,
readConf,
EMPTY_GROUPING_KEY_TYPE,
taskGroups(),
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
index 99b1d78a86..a930317802 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.types.Types;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -34,6 +35,7 @@ class SparkInputPartition implements InputPartition,
HasPartitionKey, Serializab
private final Types.StructType groupingKeyType;
private final ScanTaskGroup<?> taskGroup;
private final Broadcast<Table> tableBroadcast;
+ private final Broadcast<FileIO> fileIOBroadcast;
private final String branch;
private final String expectedSchemaString;
private final boolean caseSensitive;
@@ -46,6 +48,7 @@ class SparkInputPartition implements InputPartition,
HasPartitionKey, Serializab
Types.StructType groupingKeyType,
ScanTaskGroup<?> taskGroup,
Broadcast<Table> tableBroadcast,
+ Broadcast<FileIO> fileIOBroadcast,
String branch,
String expectedSchemaString,
boolean caseSensitive,
@@ -54,6 +57,7 @@ class SparkInputPartition implements InputPartition,
HasPartitionKey, Serializab
this.groupingKeyType = groupingKeyType;
this.taskGroup = taskGroup;
this.tableBroadcast = tableBroadcast;
+ this.fileIOBroadcast = fileIOBroadcast;
this.branch = branch;
this.expectedSchemaString = expectedSchemaString;
this.caseSensitive = caseSensitive;
@@ -84,6 +88,10 @@ class SparkInputPartition implements InputPartition,
HasPartitionKey, Serializab
return tableBroadcast.value();
}
+ public FileIO io() {
+ return fileIOBroadcast.value();
+ }
+
public String branch() {
return branch;
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 0745085259..a82583747a 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -27,6 +27,7 @@ import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataOperations;
@@ -77,10 +78,12 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
private static final Types.StructType EMPTY_GROUPING_KEY_TYPE =
Types.StructType.of();
private final Table table;
+ private final Supplier<FileIO> fileIO;
private final String branch;
private final boolean caseSensitive;
private final String expectedSchema;
private final Broadcast<Table> tableBroadcast;
+ private final Broadcast<FileIO> fileIOBroadcast;
private final long splitSize;
private final int splitLookback;
private final long splitOpenFileCost;
@@ -97,15 +100,18 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
SparkMicroBatchStream(
JavaSparkContext sparkContext,
Table table,
+ Supplier<FileIO> fileIO,
SparkReadConf readConf,
Schema expectedSchema,
String checkpointLocation) {
this.table = table;
+ this.fileIO = fileIO;
this.branch = readConf.branch();
this.caseSensitive = readConf.caseSensitive();
this.expectedSchema = SchemaParser.toJson(expectedSchema);
this.localityPreferred = readConf.localityEnabled();
this.tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ this.fileIOBroadcast =
sparkContext.broadcast(SerializableFileIOWithSize.wrap(fileIO.get()));
this.splitSize = readConf.splitSize();
this.splitLookback = readConf.splitLookback();
this.splitOpenFileCost = readConf.splitOpenFileCost();
@@ -172,6 +178,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
EMPTY_GROUPING_KEY_TYPE,
combinedScanTasks.get(index),
tableBroadcast,
+ fileIOBroadcast,
branch,
expectedSchema,
caseSensitive,
@@ -183,7 +190,9 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
}
private String[][] computePreferredLocations(List<CombinedScanTask>
taskGroups) {
- return localityPreferred ?
SparkPlanningUtil.fetchBlockLocations(table.io(), taskGroups) : null;
+ return localityPreferred
+ ? SparkPlanningUtil.fetchBlockLocations(fileIO.get(), taskGroups)
+ : null;
}
@Override
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
index c9726518ee..4d9fb7556b 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java
@@ -78,7 +78,14 @@ abstract class SparkPartitioningAwareScan<T extends
PartitionScanTask> extends S
Schema expectedSchema,
List<Expression> filters,
Supplier<ScanReport> scanReportSupplier) {
- super(spark, table, readConf, expectedSchema, filters, scanReportSupplier);
+ super(
+ spark,
+ table,
+ null != scan ? scan.fileIO() : table::io,
+ readConf,
+ expectedSchema,
+ filters,
+ scanReportSupplier);
this.scan = scan;
this.preserveDataGrouping = readConf.preserveDataGrouping();
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 106b296de0..a921f1446a 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -101,6 +102,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
private final JavaSparkContext sparkContext;
private final Table table;
+ private final Supplier<FileIO> fileIO;
private final SparkSession spark;
private final SparkReadConf readConf;
private final boolean caseSensitive;
@@ -115,6 +117,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
SparkScan(
SparkSession spark,
Table table,
+ Supplier<FileIO> fileIO,
SparkReadConf readConf,
Schema expectedSchema,
List<Expression> filters,
@@ -125,6 +128,7 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
this.spark = spark;
this.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
+ this.fileIO = fileIO;
this.readConf = readConf;
this.caseSensitive = readConf.caseSensitive();
this.expectedSchema = expectedSchema;
@@ -162,13 +166,20 @@ abstract class SparkScan implements Scan,
SupportsReportStatistics {
@Override
public Batch toBatch() {
return new SparkBatch(
- sparkContext, table, readConf, groupingKeyType(), taskGroups(),
expectedSchema, hashCode());
+ sparkContext,
+ table,
+ fileIO,
+ readConf,
+ groupingKeyType(),
+ taskGroups(),
+ expectedSchema,
+ hashCode());
}
@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
return new SparkMicroBatchStream(
- sparkContext, table, readConf, expectedSchema, checkpointLocation);
+ sparkContext, table, fileIO, readConf, expectedSchema,
checkpointLocation);
}
@Override
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
index d2eb4e5a56..99e0deabb0 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java
@@ -41,7 +41,7 @@ class SparkStagedScan extends SparkScan {
private List<ScanTaskGroup<ScanTask>> taskGroups = null; // lazy cache of
tasks
SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema,
SparkReadConf readConf) {
- super(spark, table, readConf, expectedSchema, ImmutableList.of(), null);
+ super(spark, table, table::io, readConf, expectedSchema,
ImmutableList.of(), null);
this.taskSetId = readConf.scanTaskSetId();
this.splitSize = readConf.splitSize();
this.splitLookback = readConf.splitLookback();
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
index 7df9c75fb3..1760143d2c 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
@@ -69,7 +69,11 @@ public abstract class TestBaseWithCatalog extends TestBase {
// status even belonging to the same catalog. Reference:
// https://www.sqlite.org/inmemorydb.html
CatalogProperties.CLIENT_POOL_SIZE,
- "1"));
+ "1",
+ "include-credentials",
+ "true",
+ "gcs.oauth2.token",
+ "dummyToken"));
protected static RESTCatalog restCatalog;
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
index 1e53710a0f..8e26a2f426 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestBaseReader.java
@@ -90,7 +90,7 @@ public class TestBaseReader {
private final Map<String, CloseableIntegerRange> tracker =
Maps.newHashMap();
ClosureTrackingReader(Table table, List<FileScanTask> tasks) {
- super(table, new BaseCombinedScanTask(tasks), null, null, false, true);
+ super(table, table.io(), new BaseCombinedScanTask(tasks), null, null,
false, true);
}
@Override
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
index b88f0233e2..be4391aab6 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
@@ -105,7 +105,8 @@ public class TestChangelogReader extends TestBase {
for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
ChangelogRowReader reader =
- new ChangelogRowReader(table, taskGroup, table.schema(),
table.schema(), false, true);
+ new ChangelogRowReader(
+ table, table.io(), taskGroup, table.schema(), table.schema(),
false, true);
while (reader.next()) {
rows.add(reader.get().copy());
}
@@ -136,7 +137,8 @@ public class TestChangelogReader extends TestBase {
for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
ChangelogRowReader reader =
- new ChangelogRowReader(table, taskGroup, table.schema(),
table.schema(), false, true);
+ new ChangelogRowReader(
+ table, table.io(), taskGroup, table.schema(), table.schema(),
false, true);
while (reader.next()) {
rows.add(reader.get().copy());
}
@@ -170,7 +172,8 @@ public class TestChangelogReader extends TestBase {
for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
ChangelogRowReader reader =
- new ChangelogRowReader(table, taskGroup, table.schema(),
table.schema(), false, true);
+ new ChangelogRowReader(
+ table, table.io(), taskGroup, table.schema(), table.schema(),
false, true);
while (reader.next()) {
rows.add(reader.get().copy());
}
@@ -197,7 +200,8 @@ public class TestChangelogReader extends TestBase {
for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
ChangelogRowReader reader =
- new ChangelogRowReader(table, taskGroup, table.schema(),
table.schema(), false, true);
+ new ChangelogRowReader(
+ table, table.io(), taskGroup, table.schema(), table.schema(),
false, true);
while (reader.next()) {
rows.add(reader.get().copy());
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
index 764e1c6c93..92aace3dfd 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
@@ -180,6 +180,7 @@ public class TestPositionDeletesReader extends TestBase {
try (PositionDeletesRowReader reader =
new PositionDeletesRowReader(
table,
+ table.io(),
new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
positionDeletesTable.schema(),
projectedSchema,
@@ -220,6 +221,7 @@ public class TestPositionDeletesReader extends TestBase {
try (PositionDeletesRowReader reader =
new PositionDeletesRowReader(
table,
+ table.io(),
new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask2)),
positionDeletesTable.schema(),
projectedSchema,
@@ -292,6 +294,7 @@ public class TestPositionDeletesReader extends TestBase {
try (PositionDeletesRowReader reader =
new PositionDeletesRowReader(
table,
+ table.io(),
new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
positionDeletesTable.schema(),
projectedSchema,
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index e16c9c2176..7dd4c6f7cf 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -329,7 +329,8 @@ public class TestSparkReaderDeletes extends DeleteReadTests
{
for (CombinedScanTask task : tasks) {
try (EqualityDeleteRowReader reader =
- new EqualityDeleteRowReader(task, table, table.schema(),
table.schema(), false, true)) {
+ new EqualityDeleteRowReader(
+ task, table, table.io(), table.schema(), table.schema(), false,
true)) {
while (reader.next()) {
actualRowSet.add(
new InternalRowWrapper(
@@ -675,6 +676,7 @@ public class TestSparkReaderDeletes extends DeleteReadTests
{
new BatchDataReader(
// expected column is id, while the equality filter column is dt
dateTable,
+ dateTable.io(),
task,
dateTable.schema(),
dateTable.schema().select("id"),