This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 4ed7658bdc Spark 4.1: Introduce modes in SparkWriteBuilder (#15374)
4ed7658bdc is described below
commit 4ed7658bdcbab03c696472f56f3cfc810d1fbec8
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 19 22:33:25 2026 -0800
Spark 4.1: Introduce modes in SparkWriteBuilder (#15374)
---
.../iceberg/spark/source/SparkWriteBuilder.java | 219 +++++++++++----------
1 file changed, 111 insertions(+), 108 deletions(-)
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
index 182e56a861..14c6c8b068 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
@@ -32,6 +32,10 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.spark.SparkWriteRequirements;
+import org.apache.iceberg.spark.source.SparkWriteBuilder.Mode.Append;
+import
org.apache.iceberg.spark.source.SparkWriteBuilder.Mode.CopyOnWriteOperation;
+import org.apache.iceberg.spark.source.SparkWriteBuilder.Mode.DynamicOverwrite;
+import
org.apache.iceberg.spark.source.SparkWriteBuilder.Mode.OverwriteByFilter;
import org.apache.iceberg.types.TypeUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
@@ -44,115 +48,105 @@ import org.apache.spark.sql.connector.write.Write;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite,
SupportsOverwrite {
private final SparkSession spark;
private final Table table;
private final SparkWriteConf writeConf;
- private final LogicalWriteInfo writeInfo;
- private final StructType dsSchema;
- private final String overwriteMode;
- private boolean overwriteDynamic = false;
- private boolean overwriteByFilter = false;
- private Expression overwriteExpr = null;
- private boolean overwriteFiles = false;
- private SparkCopyOnWriteScan copyOnWriteScan = null;
- private Command copyOnWriteCommand = null;
- private IsolationLevel copyOnWriteIsolationLevel = null;
+ private final LogicalWriteInfo info;
+ private final boolean caseSensitive;
+ private final boolean checkNullability;
+ private final boolean checkOrdering;
+ private final boolean mergeSchema;
+ private Mode mode = null;
SparkWriteBuilder(SparkSession spark, Table table, String branch,
LogicalWriteInfo info) {
this.spark = spark;
this.table = table;
this.writeConf = new SparkWriteConf(spark, table, branch, info.options());
- this.writeInfo = info;
- this.dsSchema = info.schema();
- this.overwriteMode = writeConf.overwriteMode();
+ this.info = info;
+ this.caseSensitive = writeConf.caseSensitive();
+ this.checkNullability = writeConf.checkNullability();
+ this.checkOrdering = writeConf.checkOrdering();
+ this.mergeSchema = writeConf.mergeSchema();
}
public WriteBuilder overwriteFiles(Scan scan, Command command,
IsolationLevel isolationLevel) {
- Preconditions.checkState(!overwriteByFilter, "Cannot overwrite individual
files and by filter");
- Preconditions.checkState(
- !overwriteDynamic, "Cannot overwrite individual files and
dynamically");
-
- this.overwriteFiles = true;
- this.copyOnWriteScan = (SparkCopyOnWriteScan) scan;
- this.copyOnWriteCommand = command;
- this.copyOnWriteIsolationLevel = isolationLevel;
+ Preconditions.checkState(mode == null, "Cannot use copy-on-write with
other modes");
+ this.mode = new CopyOnWriteOperation((SparkCopyOnWriteScan) scan, command,
isolationLevel);
return this;
}
@Override
public WriteBuilder overwriteDynamicPartitions() {
- Preconditions.checkState(
- !overwriteByFilter, "Cannot overwrite dynamically and by filter: %s",
overwriteExpr);
- Preconditions.checkState(!overwriteFiles, "Cannot overwrite individual
files and dynamically");
-
- this.overwriteDynamic = true;
+ Preconditions.checkState(mode == null, "Cannot use dynamic overwrite with
other modes");
+ this.mode = new DynamicOverwrite();
return this;
}
@Override
public WriteBuilder overwrite(Filter[] filters) {
- Preconditions.checkState(
- !overwriteFiles, "Cannot overwrite individual files and using
filters");
+ Preconditions.checkState(mode == null, "Cannot use overwrite by filter
with other modes");
+ Expression expr = SparkFilters.convert(filters);
+ this.mode = useDynamicOverwrite(expr) ? new DynamicOverwrite() : new
OverwriteByFilter(expr);
+ return this;
+ }
+
+ private boolean useDynamicOverwrite(Expression expr) {
+ return expr == Expressions.alwaysTrue() &&
"dynamic".equals(writeConf.overwriteMode());
+ }
+
+ private boolean writeNeedsRowLineage() {
+ return TableUtil.supportsRowLineage(table) && mode instanceof
CopyOnWriteOperation;
+ }
- this.overwriteExpr = SparkFilters.convert(filters);
- if (overwriteExpr == Expressions.alwaysTrue() &&
"dynamic".equals(overwriteMode)) {
- // use the write option to override truncating the table. use dynamic
overwrite instead.
- this.overwriteDynamic = true;
+ private boolean writeIncludesRowLineage() {
+ return info.metadataSchema()
+ .map(schema -> schema.exists(field ->
field.name().equals(MetadataColumns.ROW_ID.name())))
+ .orElse(false);
+ }
+
+ private StructType sparkWriteSchema() {
+ if (writeIncludesRowLineage()) {
+ StructType writeSchema = info.schema();
+ StructType metaSchema = info.metadataSchema().get();
+ StructField rowId = metaSchema.apply(MetadataColumns.ROW_ID.name());
+ writeSchema = writeSchema.add(rowId);
+ StructField rowSeq =
metaSchema.apply(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
+ writeSchema = writeSchema.add(rowSeq);
+ return writeSchema;
} else {
- Preconditions.checkState(
- !overwriteDynamic, "Cannot overwrite dynamically and by filter: %s",
overwriteExpr);
- this.overwriteByFilter = true;
+ return info.schema();
}
- return this;
}
@Override
public Write build() {
- // The write schema should only include row lineage in the output if it's
an overwrite
- // operation or if it's a compaction.
- // In any other case, only null row IDs and sequence numbers would be
produced which
- // means the row lineage columns can be excluded from the output files
- boolean writeRequiresRowLineage = TableUtil.supportsRowLineage(table) &&
overwriteFiles;
- boolean writeAlreadyIncludesLineage =
- dsSchema.exists(field ->
field.name().equals(MetadataColumns.ROW_ID.name()));
- StructType sparkWriteSchema = dsSchema;
- if (writeRequiresRowLineage && !writeAlreadyIncludesLineage) {
- sparkWriteSchema = sparkWriteSchema.add(MetadataColumns.ROW_ID.name(),
LongType$.MODULE$);
- sparkWriteSchema =
- sparkWriteSchema.add(
- MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(),
LongType$.MODULE$);
- }
-
- Schema writeSchema =
- validateOrMergeWriteSchema(table, sparkWriteSchema, writeConf,
writeRequiresRowLineage);
-
+ validateRowLineage();
+ Schema writeSchema = mergeSchema ? mergeAndValidateWriteSchema() :
validateWriteSchema();
SparkUtil.validatePartitionTransforms(table.spec());
-
- // Get application id
String appId = spark.sparkContext().applicationId();
return new SparkWrite(
spark,
table,
writeConf,
- writeInfo,
+ info,
appId,
writeSchema,
- sparkWriteSchema,
+ sparkWriteSchema(),
writeRequirements()) {
@Override
public BatchWrite toBatch() {
- if (overwriteByFilter) {
- return asOverwriteByFilter(overwriteExpr);
- } else if (overwriteDynamic) {
+ if (mode instanceof OverwriteByFilter overwrite) {
+ return asOverwriteByFilter(overwrite.expr());
+ } else if (mode instanceof DynamicOverwrite) {
return asDynamicOverwrite();
- } else if (overwriteFiles) {
- return asCopyOnWriteOperation(copyOnWriteScan,
copyOnWriteIsolationLevel);
+ } else if (mode instanceof CopyOnWriteOperation cow) {
+ return asCopyOnWriteOperation(cow.scan(), cow.isolationLevel());
} else {
return asBatchAppend();
}
@@ -160,66 +154,75 @@ class SparkWriteBuilder implements WriteBuilder,
SupportsDynamicOverwrite, Suppo
@Override
public StreamingWrite toStreaming() {
- Preconditions.checkState(
- !overwriteDynamic, "Unsupported streaming operation: dynamic
partition overwrite");
- Preconditions.checkState(
- !overwriteByFilter || overwriteExpr == Expressions.alwaysTrue(),
- "Unsupported streaming operation: overwrite by filter: %s",
- overwriteExpr);
-
- if (overwriteByFilter) {
+ if (mode instanceof OverwriteByFilter overwrite) {
+ Preconditions.checkState(
+ overwrite.expr() == Expressions.alwaysTrue(),
+ "Unsupported streaming overwrite filter: " + overwrite.expr());
return asStreamingOverwrite();
- } else {
+ } else if (mode == null || mode instanceof Append) {
return asStreamingAppend();
+ } else {
+ throw new IllegalStateException("Unsupported streaming write mode: "
+ mode);
}
}
};
}
private SparkWriteRequirements writeRequirements() {
- if (overwriteFiles) {
- return writeConf.copyOnWriteRequirements(copyOnWriteCommand);
+ if (mode instanceof CopyOnWriteOperation cow) {
+ return writeConf.copyOnWriteRequirements(cow.command());
} else {
return writeConf.writeRequirements();
}
}
- private static Schema validateOrMergeWriteSchema(
- Table table, StructType dsSchema, SparkWriteConf writeConf, boolean
writeIncludesRowLineage) {
- Schema writeSchema;
- boolean caseSensitive = writeConf.caseSensitive();
- if (writeConf.mergeSchema()) {
- // convert the dataset schema and assign fresh ids for new fields
- Schema newSchema =
- SparkSchemaUtil.convertWithFreshIds(table.schema(), dsSchema,
caseSensitive);
-
- // update the table to get final id assignments and validate the changes
- UpdateSchema update =
-
table.updateSchema().caseSensitive(caseSensitive).unionByNameWith(newSchema);
- Schema mergedSchema = update.apply();
- if (writeIncludesRowLineage) {
- mergedSchema =
- TypeUtil.join(mergedSchema,
MetadataColumns.schemaWithRowLineage(table.schema()));
- }
+ private void validateRowLineage() {
+ Preconditions.checkArgument(
+ writeIncludesRowLineage() || !writeNeedsRowLineage(),
+ "Row lineage information is missing for write in mode: %s",
+ mode);
+ }
- // reconvert the dsSchema without assignment to use the ids assigned by
UpdateSchema
- writeSchema = SparkSchemaUtil.convert(mergedSchema, dsSchema,
caseSensitive);
+ private Schema validateWriteSchema() {
+ Schema writeSchema = SparkSchemaUtil.convert(table.schema(),
info.schema(), caseSensitive);
+ TypeUtil.validateWriteSchema(table.schema(), writeSchema,
checkNullability, checkOrdering);
+ return addRowLineageIfNeeded(writeSchema);
+ }
- TypeUtil.validateWriteSchema(
- mergedSchema, writeSchema, writeConf.checkNullability(),
writeConf.checkOrdering());
+ // merge schema flow:
+ // - convert Spark schema and assign fresh IDs for new fields
+ // - update table to get final ID assignments and validate changes
+ // - reconvert Spark schema without assignment to use IDs assigned by
UpdateSchema
+ // - if validation passed, update table schema
+ private Schema mergeAndValidateWriteSchema() {
+ Schema newSchema =
+ SparkSchemaUtil.convertWithFreshIds(table.schema(), info.schema(),
caseSensitive);
+ UpdateSchema update =
+
table.updateSchema().caseSensitive(caseSensitive).unionByNameWith(newSchema);
+ Schema mergedSchema = update.apply();
+ Schema writeSchema = SparkSchemaUtil.convert(mergedSchema, info.schema(),
caseSensitive);
+ TypeUtil.validateWriteSchema(mergedSchema, writeSchema, checkNullability,
checkOrdering);
+ update.commit();
+ return addRowLineageIfNeeded(writeSchema);
+ }
- // if the validation passed, update the table schema
- update.commit();
- } else {
- Schema schema =
- writeIncludesRowLineage
- ? MetadataColumns.schemaWithRowLineage(table.schema())
- : table.schema();
- writeSchema = SparkSchemaUtil.convert(schema, dsSchema, caseSensitive);
- TypeUtil.validateWriteSchema(
- table.schema(), writeSchema, writeConf.checkNullability(),
writeConf.checkOrdering());
- }
+ private Schema addRowLineageIfNeeded(Schema schema) {
+ return writeNeedsRowLineage() ?
MetadataColumns.schemaWithRowLineage(schema) : schema;
+ }
+
+ sealed interface Mode {
+ // add new data
+ record Append() implements Mode {}
+
+ // overwrite partitions that receive new data (determined at runtime)
+ record DynamicOverwrite() implements Mode {}
+
+ // overwrite data files matching filter expression (a.k.a static overwrite)
+ record OverwriteByFilter(Expression expr) implements Mode {}
- return writeSchema;
+ // copy-on-write operation (UPDATE/DELETE/MERGE) that completely rewrites
affected files
+ record CopyOnWriteOperation(
+ SparkCopyOnWriteScan scan, Command command, IsolationLevel
isolationLevel)
+ implements Mode {}
}
}