This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new dde0dc233d Core, Spark: Moving Spark to use the new FormatModel API
(#15328)
dde0dc233d is described below
commit dde0dc233d1be51f0f67cb57017c5f69febbbecc
Author: pvary <[email protected]>
AuthorDate: Wed Feb 18 16:10:25 2026 +0100
Core, Spark: Moving Spark to use the new FormatModel API (#15328)
---
.../iceberg/formats/FormatModelRegistry.java | 3 +-
.../SparkParquetReadersFlatDataBenchmark.java | 10 +-
.../SparkParquetReadersNestedDataBenchmark.java | 10 +-
.../SparkParquetWritersFlatDataBenchmark.java | 20 +-
.../SparkParquetWritersNestedDataBenchmark.java | 20 +-
.../spark/actions/RewriteTablePathSparkAction.java | 90 ++++-----
.../iceberg/spark/data/SparkParquetWriters.java | 23 ++-
.../vectorized/VectorizedSparkParquetReaders.java | 11 +-
.../iceberg/spark/source/BaseBatchReader.java | 102 +++-------
.../apache/iceberg/spark/source/BaseRowReader.java | 73 +------
.../spark/source/SparkFileWriterFactory.java | 222 +++++++++++----------
.../iceberg/spark/source/SparkFormatModels.java | 89 +++++++++
12 files changed, 353 insertions(+), 320 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
index 7e944510a8..4a6b5a6cf4 100644
--- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
+++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
@@ -58,7 +58,8 @@ public final class FormatModelRegistry {
ImmutableList.of(
"org.apache.iceberg.data.GenericFormatModels",
"org.apache.iceberg.arrow.vectorized.ArrowFormatModels",
- "org.apache.iceberg.flink.data.FlinkFormatModels");
+ "org.apache.iceberg.flink.data.FlinkFormatModels",
+ "org.apache.iceberg.spark.source.SparkFormatModels");
// Format models indexed by file format and object model class
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>>
MODELS =
diff --git
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
index 7f5d701715..c900a3d53c 100644
---
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
+++
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersFlatDataBenchmark.java
@@ -25,9 +25,11 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
@@ -113,9 +115,9 @@ public class SparkParquetReadersFlatDataBenchmark {
@Threads(1)
public void readUsingIcebergReader(Blackhole blackHole) throws IOException {
try (CloseableIterable<InternalRow> rows =
- Parquet.read(Files.localInput(dataFile))
+ FormatModelRegistry.readBuilder(
+ FileFormat.PARQUET, InternalRow.class,
Files.localInput(dataFile))
.project(SCHEMA)
- .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA,
type))
.build()) {
for (InternalRow row : rows) {
@@ -171,9 +173,9 @@ public class SparkParquetReadersFlatDataBenchmark {
@Threads(1)
public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws
IOException {
try (CloseableIterable<InternalRow> rows =
- Parquet.read(Files.localInput(dataFile))
+ FormatModelRegistry.readBuilder(
+ FileFormat.PARQUET, InternalRow.class,
Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
- .createReaderFunc(type ->
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
.build()) {
for (InternalRow row : rows) {
diff --git
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
index e16f18b281..32c45b1496 100644
---
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
+++
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetReadersNestedDataBenchmark.java
@@ -25,9 +25,11 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericData;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
@@ -111,9 +113,9 @@ public class SparkParquetReadersNestedDataBenchmark {
@Threads(1)
public void readUsingIcebergReader(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows =
- Parquet.read(Files.localInput(dataFile))
+ FormatModelRegistry.readBuilder(
+ FileFormat.PARQUET, InternalRow.class,
Files.localInput(dataFile))
.project(SCHEMA)
- .createReaderFunc(type -> SparkParquetReaders.buildReader(SCHEMA,
type))
.build()) {
for (InternalRow row : rows) {
@@ -169,9 +171,9 @@ public class SparkParquetReadersNestedDataBenchmark {
@Threads(1)
public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws
IOException {
try (CloseableIterable<InternalRow> rows =
- Parquet.read(Files.localInput(dataFile))
+ FormatModelRegistry.readBuilder(
+ FileFormat.PARQUET, InternalRow.class,
Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
- .createReaderFunc(type ->
SparkParquetReaders.buildReader(PROJECTED_SCHEMA, type))
.build()) {
for (InternalRow row : rows) {
diff --git
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
index 00c361449a..21d5ec1477 100644
---
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
+++
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersFlatDataBenchmark.java
@@ -23,13 +23,17 @@ import static
org.apache.iceberg.types.Types.NestedField.required;
import java.io.File;
import java.io.IOException;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.RandomData;
-import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -95,15 +99,16 @@ public class SparkParquetWritersFlatDataBenchmark {
@Benchmark
@Threads(1)
public void writeUsingIcebergWriter() throws IOException {
- try (FileAppender<InternalRow> writer =
- Parquet.write(Files.localOutput(dataFile))
- .createWriterFunc(
- msgType ->
-
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
+ try (DataWriter<InternalRow> writer =
+ FormatModelRegistry.dataWriteBuilder(
+ FileFormat.PARQUET,
+ InternalRow.class,
+
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
.schema(SCHEMA)
+ .spec(PartitionSpec.unpartitioned())
.build()) {
- writer.addAll(rows);
+ writer.write(rows);
}
}
@@ -121,6 +126,7 @@ public class SparkParquetWritersFlatDataBenchmark {
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.parquet.fieldId.write.enabled", "false")
+ .set("spark.sql.parquet.variant.annotateLogicalType.enabled",
"false")
.schema(SCHEMA)
.build()) {
diff --git
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
index 24d7fa4051..7e9e1ab085 100644
---
a/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
+++
b/spark/v4.1/spark/src/jmh/java/org/apache/iceberg/spark/data/parquet/SparkParquetWritersNestedDataBenchmark.java
@@ -23,13 +23,17 @@ import static
org.apache.iceberg.types.Types.NestedField.required;
import java.io.File;
import java.io.IOException;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.RandomData;
-import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
@@ -95,15 +99,16 @@ public class SparkParquetWritersNestedDataBenchmark {
@Benchmark
@Threads(1)
public void writeUsingIcebergWriter() throws IOException {
- try (FileAppender<InternalRow> writer =
- Parquet.write(Files.localOutput(dataFile))
- .createWriterFunc(
- msgType ->
-
SparkParquetWriters.buildWriter(SparkSchemaUtil.convert(SCHEMA), msgType))
+ try (DataWriter<InternalRow> writer =
+ FormatModelRegistry.dataWriteBuilder(
+ FileFormat.PARQUET,
+ InternalRow.class,
+
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
.schema(SCHEMA)
+ .spec(PartitionSpec.unpartitioned())
.build()) {
- writer.addAll(rows);
+ writer.write(rows);
}
}
@@ -121,6 +126,7 @@ public class SparkParquetWritersNestedDataBenchmark {
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.parquet.fieldId.write.enabled", "false")
+ .set("spark.sql.parquet.variant.annotateLogicalType.enabled",
"false")
.schema(SCHEMA)
.build()) {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
index d6a13bcd51..674d238b3a 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
@@ -52,13 +52,12 @@ import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataWriter;
-import org.apache.iceberg.data.avro.PlannedDataReader;
-import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.FileIO;
@@ -719,32 +718,10 @@ public class RewriteTablePathSparkAction extends
BaseSparkAction<RewriteTablePat
private static CloseableIterable<Record> positionDeletesReader(
InputFile inputFile, FileFormat format, PartitionSpec spec) {
- Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema());
- switch (format) {
- case AVRO:
- return Avro.read(inputFile)
- .project(deleteSchema)
- .reuseContainers()
- .createReaderFunc(fileSchema ->
PlannedDataReader.create(deleteSchema))
- .build();
-
- case PARQUET:
- return Parquet.read(inputFile)
- .project(deleteSchema)
- .reuseContainers()
- .createReaderFunc(
- fileSchema -> GenericParquetReaders.buildReader(deleteSchema,
fileSchema))
- .build();
-
- case ORC:
- return ORC.read(inputFile)
- .project(deleteSchema)
- .createReaderFunc(fileSchema ->
GenericOrcReader.buildReader(deleteSchema, fileSchema))
- .build();
-
- default:
- throw new UnsupportedOperationException("Unsupported file format: " +
format);
- }
+ return FormatModelRegistry.readBuilder(format, Record.class, inputFile)
+ .project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema()))
+ .reuseContainers()
+ .build();
}
private static PositionDeleteWriter<Record> positionDeletesWriter(
@@ -754,30 +731,37 @@ public class RewriteTablePathSparkAction extends
BaseSparkAction<RewriteTablePat
StructLike partition,
Schema rowSchema)
throws IOException {
- switch (format) {
- case AVRO:
- return Avro.writeDeletes(outputFile)
- .createWriterFunc(DataWriter::create)
- .withPartition(partition)
- .rowSchema(rowSchema)
- .withSpec(spec)
- .buildPositionWriter();
- case PARQUET:
- return Parquet.writeDeletes(outputFile)
- .createWriterFunc(GenericParquetWriter::create)
- .withPartition(partition)
- .rowSchema(rowSchema)
- .withSpec(spec)
- .buildPositionWriter();
- case ORC:
- return ORC.writeDeletes(outputFile)
- .createWriterFunc(GenericOrcWriter::buildWriter)
- .withPartition(partition)
- .rowSchema(rowSchema)
- .withSpec(spec)
- .buildPositionWriter();
- default:
- throw new UnsupportedOperationException("Unsupported file format: " +
format);
+ if (rowSchema == null) {
+ return FormatModelRegistry.<Record>positionDeleteWriteBuilder(
+ format, EncryptedFiles.plainAsEncryptedOutput(outputFile))
+ .partition(partition)
+ .spec(spec)
+ .build();
+ } else {
+ return switch (format) {
+ case AVRO ->
+ Avro.writeDeletes(outputFile)
+ .createWriterFunc(DataWriter::create)
+ .withPartition(partition)
+ .rowSchema(rowSchema)
+ .withSpec(spec)
+ .buildPositionWriter();
+ case PARQUET ->
+ Parquet.writeDeletes(outputFile)
+ .createWriterFunc(GenericParquetWriter::create)
+ .withPartition(partition)
+ .rowSchema(rowSchema)
+ .withSpec(spec)
+ .buildPositionWriter();
+ case ORC ->
+ ORC.writeDeletes(outputFile)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .withPartition(partition)
+ .rowSchema(rowSchema)
+ .withSpec(spec)
+ .buildPositionWriter();
+ default -> throw new UnsupportedOperationException("Unsupported file
format: " + format);
+ };
}
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
index 8bdfe7c3a8..3ff5ef9c57 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.iceberg.FieldMetrics;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
@@ -41,6 +42,7 @@ import org.apache.iceberg.parquet.TripleWriter;
import org.apache.iceberg.parquet.VariantWriterBuilder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DecimalUtil;
import org.apache.iceberg.util.UUIDUtil;
@@ -75,10 +77,27 @@ import org.apache.spark.unsafe.types.VariantVal;
public class SparkParquetWriters {
private SparkParquetWriters() {}
- @SuppressWarnings("unchecked")
public static <T> ParquetValueWriter<T> buildWriter(StructType dfSchema,
MessageType type) {
+ return buildWriter(null, type, dfSchema);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> ParquetValueWriter<T> buildWriter(
+ Schema icebergSchema, MessageType type, StructType dfSchema) {
+ return (ParquetValueWriter<T>)
+ ParquetWithSparkSchemaVisitor.visit(
+ dfSchema != null ? dfSchema :
SparkSchemaUtil.convert(icebergSchema),
+ type,
+ new WriteBuilder(type));
+ }
+
+ public static <T> ParquetValueWriter<T> buildWriter(
+ StructType dfSchema, MessageType type, Schema icebergSchema) {
return (ParquetValueWriter<T>)
- ParquetWithSparkSchemaVisitor.visit(dfSchema, type, new
WriteBuilder(type));
+ ParquetWithSparkSchemaVisitor.visit(
+ dfSchema != null ? dfSchema :
SparkSchemaUtil.convert(icebergSchema),
+ type,
+ new WriteBuilder(type));
}
private static class WriteBuilder extends
ParquetWithSparkSchemaVisitor<ParquetValueWriter<?>> {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 8e25e81a05..55f9fc1768 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -30,6 +30,8 @@ import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +77,7 @@ public class VectorizedSparkParquetReaders {
return buildReader(expectedSchema, fileSchema, idToConstant,
ArrowAllocation.rootAllocator());
}
- public static CometColumnarBatchReader buildCometReader(
+ public static VectorizedReader<ColumnarBatch> buildCometReader(
Schema expectedSchema, MessageType fileSchema, Map<Integer, ?>
idToConstant) {
return (CometColumnarBatchReader)
TypeWithSchemaVisitor.visit(
@@ -88,6 +90,13 @@ public class VectorizedSparkParquetReaders {
readers -> new CometColumnarBatchReader(readers,
expectedSchema)));
}
+ /** A subclass of ColumnarBatch to identify Comet readers. */
+ public static class CometColumnarBatch extends ColumnarBatch {
+ public CometColumnarBatch(ColumnVector[] columns) {
+ super(columns);
+ }
+ }
+
// enables unsafe memory access to avoid costly checks to see if index is
within bounds
// as long as it is not configured explicitly (see BoundsChecking in Arrow)
private static void enableUnsafeMemoryAccess() {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index a67496cd76..0acd8bc244 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -19,7 +19,6 @@
package org.apache.iceberg.spark.source;
import java.util.Map;
-import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
@@ -29,21 +28,18 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.OrcBatchReadConf;
import org.apache.iceberg.spark.ParquetBatchReadConf;
import org.apache.iceberg.spark.ParquetReaderType;
import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter;
import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil;
import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector;
-import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnVector;
@@ -74,79 +70,37 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
Expression residual,
Map<Integer, ?> idToConstant,
@Nonnull SparkDeleteFilter deleteFilter) {
- CloseableIterable<ColumnarBatch> iterable;
- switch (format) {
- case PARQUET:
- iterable =
- newParquetIterable(
- inputFile, start, length, residual, idToConstant,
deleteFilter.requiredSchema());
- break;
- case ORC:
- iterable = newOrcIterable(inputFile, start, length, residual,
idToConstant);
- break;
- default:
- throw new UnsupportedOperationException(
- "Format: " + format + " not supported for batched reads");
+ Class<? extends ColumnarBatch> readType =
+ useComet() ? VectorizedSparkParquetReaders.CometColumnarBatch.class :
ColumnarBatch.class;
+ ReadBuilder<ColumnarBatch, ?> readBuilder =
+ FormatModelRegistry.readBuilder(format, readType, inputFile);
+
+ if (parquetConf != null) {
+ readBuilder = readBuilder.recordsPerBatch(parquetConf.batchSize());
+ } else if (orcConf != null) {
+ readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize());
}
- return CloseableIterable.transform(iterable, new
BatchDeleteFilter(deleteFilter)::filterBatch);
- }
+ CloseableIterable<ColumnarBatch> iterable =
+ readBuilder
+ .project(deleteFilter.requiredSchema())
+ .idToConstant(idToConstant)
+ .split(start, length)
+ .filter(residual)
+ .caseSensitive(caseSensitive())
+ // Spark eagerly consumes the batches. So the underlying memory
allocated could be
+ // reused without worrying about subsequent reads clobbering over
each other. This
+ // improves read performance as every batch read doesn't have to
pay the cost of
+ // allocating memory.
+ .reuseContainers()
+ .withNameMapping(nameMapping())
+ .build();
- private CloseableIterable<ColumnarBatch> newParquetIterable(
- InputFile inputFile,
- long start,
- long length,
- Expression residual,
- Map<Integer, ?> idToConstant,
- Schema requiredSchema) {
- return Parquet.read(inputFile)
- .project(requiredSchema)
- .split(start, length)
- .createBatchedReaderFunc(
- fileSchema -> {
- if (parquetConf.readerType() == ParquetReaderType.COMET) {
- return VectorizedSparkParquetReaders.buildCometReader(
- requiredSchema, fileSchema, idToConstant);
- } else {
- return VectorizedSparkParquetReaders.buildReader(
- requiredSchema, fileSchema, idToConstant);
- }
- })
- .recordsPerBatch(parquetConf.batchSize())
- .filter(residual)
- .caseSensitive(caseSensitive())
- // Spark eagerly consumes the batches. So the underlying memory
allocated could be reused
- // without worrying about subsequent reads clobbering over each other.
This improves
- // read performance as every batch read doesn't have to pay the cost
of allocating memory.
- .reuseContainers()
- .withNameMapping(nameMapping())
- .build();
+ return CloseableIterable.transform(iterable, new
BatchDeleteFilter(deleteFilter)::filterBatch);
}
- private CloseableIterable<ColumnarBatch> newOrcIterable(
- InputFile inputFile,
- long start,
- long length,
- Expression residual,
- Map<Integer, ?> idToConstant) {
- Set<Integer> constantFieldIds = idToConstant.keySet();
- Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
- Sets.SetView<Integer> constantAndMetadataFieldIds =
- Sets.union(constantFieldIds, metadataFieldIds);
- Schema schemaWithoutConstantAndMetadataFields =
- TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
-
- return ORC.read(inputFile)
- .project(schemaWithoutConstantAndMetadataFields)
- .split(start, length)
- .createBatchedReaderFunc(
- fileSchema ->
- VectorizedSparkOrcReaders.buildReader(expectedSchema(),
fileSchema, idToConstant))
- .recordsPerBatch(orcConf.batchSize())
- .filter(residual)
- .caseSensitive(caseSensitive())
- .withNameMapping(nameMapping())
- .build();
+ private boolean useComet() {
+ return parquetConf != null && parquetConf.readerType() ==
ParquetReaderType.COMET;
}
@VisibleForTesting
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
index 52c32eff0f..14febb212a 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java
@@ -20,22 +20,15 @@ package org.apache.iceberg.spark.source;
import java.util.Map;
import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.spark.data.SparkPlannedAvroReader;
-import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
abstract class BaseRowReader<T extends ScanTask> extends
BaseReader<InternalRow, T> {
@@ -56,69 +49,15 @@ abstract class BaseRowReader<T extends ScanTask> extends
BaseReader<InternalRow,
Expression residual,
Schema projection,
Map<Integer, ?> idToConstant) {
- switch (format) {
- case PARQUET:
- return newParquetIterable(file, start, length, residual, projection,
idToConstant);
-
- case AVRO:
- return newAvroIterable(file, start, length, projection, idToConstant);
-
- case ORC:
- return newOrcIterable(file, start, length, residual, projection,
idToConstant);
-
- default:
- throw new UnsupportedOperationException("Cannot read unknown format: "
+ format);
- }
- }
-
- private CloseableIterable<InternalRow> newAvroIterable(
- InputFile file, long start, long length, Schema projection, Map<Integer,
?> idToConstant) {
- return Avro.read(file)
- .reuseContainers()
+ ReadBuilder<InternalRow, ?> reader =
+ FormatModelRegistry.readBuilder(format, InternalRow.class, file);
+ return reader
.project(projection)
- .split(start, length)
- .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema,
idToConstant))
- .withNameMapping(nameMapping())
- .build();
- }
-
- private CloseableIterable<InternalRow> newParquetIterable(
- InputFile file,
- long start,
- long length,
- Expression residual,
- Schema readSchema,
- Map<Integer, ?> idToConstant) {
- return Parquet.read(file)
+ .idToConstant(idToConstant)
.reuseContainers()
.split(start, length)
- .project(readSchema)
- .createReaderFunc(
- fileSchema -> SparkParquetReaders.buildReader(readSchema,
fileSchema, idToConstant))
- .filter(residual)
.caseSensitive(caseSensitive())
- .withNameMapping(nameMapping())
- .build();
- }
-
- private CloseableIterable<InternalRow> newOrcIterable(
- InputFile file,
- long start,
- long length,
- Expression residual,
- Schema readSchema,
- Map<Integer, ?> idToConstant) {
- Schema readSchemaWithoutConstantAndMetadataFields =
- TypeUtil.selectNot(
- readSchema, Sets.union(idToConstant.keySet(),
MetadataColumns.metadataFieldIds()));
-
- return ORC.read(file)
- .project(readSchemaWithoutConstantAndMetadataFields)
- .split(start, length)
- .createReaderFunc(
- readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema,
idToConstant))
.filter(residual)
- .caseSensitive(caseSensitive())
.withNameMapping(nameMapping())
.build();
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index a93db17e4a..2b3bf73d56 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -23,13 +23,20 @@ import static
org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
@@ -40,14 +47,20 @@ import org.apache.iceberg.spark.data.SparkAvroWriter;
import org.apache.iceberg.spark.data.SparkOrcWriter;
import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
-
-class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
- private StructType dataSparkType;
- private StructType equalityDeleteSparkType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SparkFileWriterFactory extends
RegistryBasedFileWriterFactory<InternalRow, StructType> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkFileWriterFactory.class);
+ // We need to use old writers to write position deletes with row data, which
is a deprecated
+ // feature.
+ private final boolean useDeprecatedPositionDeleteWriter;
private StructType positionDeleteSparkType;
+ private final Schema positionDeleteRowSchema;
+ private final Table table;
+ private final FileFormat format;
private final Map<String, String> writeProperties;
/**
@@ -75,18 +88,26 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
super(
table,
dataFileFormat,
+ InternalRow.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- positionDeleteRowSchema);
+ writeProperties,
+ useOrConvert(dataSparkType, dataSchema),
+ useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
- this.dataSparkType = dataSparkType;
- this.equalityDeleteSparkType = equalityDeleteSparkType;
- this.positionDeleteSparkType = positionDeleteSparkType;
+ this.table = table;
+ this.format = dataFileFormat;
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
+ this.positionDeleteRowSchema = positionDeleteRowSchema;
+ this.positionDeleteSparkType = positionDeleteSparkType;
+ this.useDeprecatedPositionDeleteWriter =
+ positionDeleteRowSchema != null
+ || (positionDeleteSparkType != null
+ &&
positionDeleteSparkType.getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined());
}
SparkFileWriterFactory(
@@ -105,119 +126,110 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
super(
table,
dataFileFormat,
+ InternalRow.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- ImmutableMap.of());
+ writeProperties,
+ useOrConvert(dataSparkType, dataSchema),
+ useOrConvert(equalityDeleteSparkType, equalityDeleteRowSchema));
- this.dataSparkType = dataSparkType;
- this.equalityDeleteSparkType = equalityDeleteSparkType;
- this.positionDeleteSparkType = null;
+ this.table = table;
+ this.format = dataFileFormat;
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
+ this.positionDeleteRowSchema = null;
+ this.useDeprecatedPositionDeleteWriter = false;
}
static Builder builderFor(Table table) {
return new Builder(table);
}
- @Override
- protected void configureDataWrite(Avro.DataWriteBuilder builder) {
- builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
- builder.createWriterFunc(ignored -> new
SparkAvroWriter(equalityDeleteSparkType()));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
- boolean withRow =
-
positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined();
- if (withRow) {
- // SparkAvroWriter accepts just the Spark type of the row ignoring the
path and pos
- StructField rowField =
positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME);
- StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
- builder.createWriterFunc(ignored -> new
SparkAvroWriter(positionDeleteRowSparkType));
- }
-
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
- builder.createWriterFunc(msgType ->
SparkParquetWriters.buildWriter(dataSparkType(), msgType));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(),
msgType));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(),
msgType));
- builder.transformPaths(path -> UTF8String.fromString(path.toString()));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureDataWrite(ORC.DataWriteBuilder builder) {
- builder.createWriterFunc(SparkOrcWriter::new);
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(SparkOrcWriter::new);
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(SparkOrcWriter::new);
- builder.transformPaths(path -> UTF8String.fromString(path.toString()));
- builder.setAll(writeProperties);
- }
-
- private StructType dataSparkType() {
- if (dataSparkType == null) {
- Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
- this.dataSparkType = SparkSchemaUtil.convert(dataSchema());
- }
-
- return dataSparkType;
- }
-
- private StructType equalityDeleteSparkType() {
- if (equalityDeleteSparkType == null) {
- Preconditions.checkNotNull(
- equalityDeleteRowSchema(), "Equality delete schema must not be
null");
- this.equalityDeleteSparkType =
SparkSchemaUtil.convert(equalityDeleteRowSchema());
- }
-
- return equalityDeleteSparkType;
- }
-
private StructType positionDeleteSparkType() {
if (positionDeleteSparkType == null) {
// wrap the optional row schema into the position delete schema
containing path and position
- Schema positionDeleteSchema =
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
+ Schema positionDeleteSchema =
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema);
this.positionDeleteSparkType =
SparkSchemaUtil.convert(positionDeleteSchema);
}
return positionDeleteSparkType;
}
+ @Override
+ public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
+ EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+ if (!useDeprecatedPositionDeleteWriter) {
+ return super.newPositionDeleteWriter(file, spec, partition);
+ } else {
+ LOG.warn("Position deletes with deleted rows are deprecated and will be
removed in 1.12.0.");
+ Map<String, String> properties = table == null ? ImmutableMap.of() :
table.properties();
+ MetricsConfig metricsConfig =
+ table == null
+ ? MetricsConfig.forPositionDelete()
+ : MetricsConfig.forPositionDelete(table);
+
+ try {
+ return switch (format) {
+ case AVRO ->
+ Avro.writeDeletes(file)
+ .createWriterFunc(
+ ignored ->
+ new SparkAvroWriter(
+ (StructType)
+ positionDeleteSparkType()
+ .apply(DELETE_FILE_ROW_FIELD_NAME)
+ .dataType()))
+ .setAll(properties)
+ .setAll(writeProperties)
+ .metricsConfig(metricsConfig)
+ .withPartition(partition)
+ .overwrite()
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .buildPositionWriter();
+ case ORC ->
+ ORC.writeDeletes(file)
+ .createWriterFunc(SparkOrcWriter::new)
+ .transformPaths(path ->
UTF8String.fromString(path.toString()))
+ .setAll(properties)
+ .setAll(writeProperties)
+ .metricsConfig(metricsConfig)
+ .withPartition(partition)
+ .overwrite()
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .buildPositionWriter();
+ case PARQUET ->
+ Parquet.writeDeletes(file)
+ .createWriterFunc(
+ msgType ->
+
SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType))
+ .transformPaths(path ->
UTF8String.fromString(path.toString()))
+ .setAll(properties)
+ .setAll(writeProperties)
+ .metricsConfig(metricsConfig)
+ .withPartition(partition)
+ .overwrite()
+ .metricsConfig(metricsConfig)
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .buildPositionWriter();
+ default ->
+ throw new UnsupportedOperationException(
+ "Cannot write pos-deletes for unsupported file format: " +
format);
+ };
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to create new position delete
writer", e);
+ }
+ }
+ }
+
static class Builder {
private final Table table;
private FileFormat dataFileFormat;
@@ -340,4 +352,14 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
writeProperties);
}
}
+
+ private static StructType useOrConvert(StructType sparkType, Schema schema) {
+ if (sparkType != null) {
+ return sparkType;
+ } else if (schema != null) {
+ return SparkSchemaUtil.convert(schema);
+ } else {
+ return null;
+ }
+ }
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
new file mode 100644
index 0000000000..677f2e950b
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFormatModels.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.avro.AvroFormatModel;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.orc.ORCFormatModel;
+import org.apache.iceberg.parquet.ParquetFormatModel;
+import org.apache.iceberg.spark.data.SparkAvroWriter;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkOrcWriter;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.iceberg.spark.data.SparkPlannedAvroReader;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class SparkFormatModels {
+ public static void register() {
+ FormatModelRegistry.register(
+ AvroFormatModel.create(
+ InternalRow.class,
+ StructType.class,
+ (icebergSchema, fileSchema, engineSchema) -> new
SparkAvroWriter(engineSchema),
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ SparkPlannedAvroReader.create(icebergSchema, idToConstant)));
+
+ FormatModelRegistry.register(
+ ParquetFormatModel.create(
+ InternalRow.class,
+ StructType.class,
+ SparkParquetWriters::buildWriter,
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ SparkParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant)));
+
+ FormatModelRegistry.register(
+ ParquetFormatModel.create(
+ ColumnarBatch.class,
+ StructType.class,
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ VectorizedSparkParquetReaders.buildReader(
+ icebergSchema, fileSchema, idToConstant)));
+
+ FormatModelRegistry.register(
+ ParquetFormatModel.create(
+ VectorizedSparkParquetReaders.CometColumnarBatch.class,
+ StructType.class,
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ VectorizedSparkParquetReaders.buildCometReader(
+ icebergSchema, fileSchema, idToConstant)));
+
+ FormatModelRegistry.register(
+ ORCFormatModel.create(
+ InternalRow.class,
+ StructType.class,
+ (icebergSchema, fileSchema, engineSchema) ->
+ new SparkOrcWriter(icebergSchema, fileSchema),
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ new SparkOrcReader(icebergSchema, fileSchema, idToConstant)));
+
+ FormatModelRegistry.register(
+ ORCFormatModel.create(
+ ColumnarBatch.class,
+ StructType.class,
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ VectorizedSparkOrcReaders.buildReader(icebergSchema,
fileSchema, idToConstant)));
+ }
+
+ private SparkFormatModels() {}
+}