aokolnychyi commented on code in PR #15374:
URL: https://github.com/apache/iceberg/pull/15374#discussion_r2831371311
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java:
##########
@@ -44,182 +48,181 @@
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite,
SupportsOverwrite {
private final SparkSession spark;
private final Table table;
private final SparkWriteConf writeConf;
- private final LogicalWriteInfo writeInfo;
- private final StructType dsSchema;
- private final String overwriteMode;
- private boolean overwriteDynamic = false;
- private boolean overwriteByFilter = false;
- private Expression overwriteExpr = null;
- private boolean overwriteFiles = false;
- private SparkCopyOnWriteScan copyOnWriteScan = null;
- private Command copyOnWriteCommand = null;
- private IsolationLevel copyOnWriteIsolationLevel = null;
+ private final LogicalWriteInfo info;
+ private final boolean caseSensitive;
+ private final boolean checkNullability;
+ private final boolean checkOrdering;
+ private final boolean mergeSchema;
+ private Mode mode = null;
SparkWriteBuilder(SparkSession spark, Table table, String branch,
LogicalWriteInfo info) {
this.spark = spark;
this.table = table;
this.writeConf = new SparkWriteConf(spark, table, branch, info.options());
- this.writeInfo = info;
- this.dsSchema = info.schema();
- this.overwriteMode = writeConf.overwriteMode();
+ this.info = info;
+ this.caseSensitive = writeConf.caseSensitive();
+ this.checkNullability = writeConf.checkNullability();
+ this.checkOrdering = writeConf.checkOrdering();
+ this.mergeSchema = writeConf.mergeSchema();
}
public WriteBuilder overwriteFiles(Scan scan, Command command,
IsolationLevel isolationLevel) {
- Preconditions.checkState(!overwriteByFilter, "Cannot overwrite individual
files and by filter");
- Preconditions.checkState(
- !overwriteDynamic, "Cannot overwrite individual files and
dynamically");
-
- this.overwriteFiles = true;
- this.copyOnWriteScan = (SparkCopyOnWriteScan) scan;
- this.copyOnWriteCommand = command;
- this.copyOnWriteIsolationLevel = isolationLevel;
+ Preconditions.checkState(mode == null, "Cannot use copy-on-write with
other modes");
+ this.mode = new CopyOnWriteOperation((SparkCopyOnWriteScan) scan, command,
isolationLevel);
return this;
}
@Override
public WriteBuilder overwriteDynamicPartitions() {
- Preconditions.checkState(
- !overwriteByFilter, "Cannot overwrite dynamically and by filter: %s",
overwriteExpr);
- Preconditions.checkState(!overwriteFiles, "Cannot overwrite individual
files and dynamically");
-
- this.overwriteDynamic = true;
+ Preconditions.checkState(mode == null, "Cannot use dynamic overwrite with
other modes");
+ this.mode = new DynamicOverwrite();
return this;
}
@Override
public WriteBuilder overwrite(Filter[] filters) {
- Preconditions.checkState(
- !overwriteFiles, "Cannot overwrite individual files and using
filters");
+ Preconditions.checkState(mode == null, "Cannot use overwrite by filter
with other modes");
+ Expression expr = SparkFilters.convert(filters);
+ this.mode = useDynamicOverwrite(expr) ? new DynamicOverwrite() : new
OverwriteByFilter(expr);
+ return this;
+ }
+
+ private boolean useDynamicOverwrite(Expression expr) {
+ return expr == Expressions.alwaysTrue() &&
"dynamic".equals(writeConf.overwriteMode());
+ }
+
+ private boolean writeNeedsRowLineage() {
Review Comment:
One of the reasons to split the compaction code into a separate hierarchy
was to simplify row lineage handling in this class. It only applies to CoW
operations and always comes from `info.metadataSchema()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]