amogh-jahagirdar commented on code in PR #13637: URL: https://github.com/apache/iceberg/pull/13637#discussion_r2224376093
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java: ########## @@ -119,14 +119,24 @@ public WriteBuilder overwrite(Filter[] filters) { @Override public Write build() { // The write schema should only include row lineage in the output if it's an overwrite - // operation. + // operation or if it's a compaction. // In any other case, only null row IDs and sequence numbers would be produced which // means the row lineage columns can be excluded from the output files - boolean writeIncludesRowLineage = TableUtil.supportsRowLineage(table) && overwriteFiles; - Schema writeSchema = - validateOrMergeWriteSchema(table, dsSchema, writeConf, writeIncludesRowLineage); + boolean writeRequiresRowLineage = + TableUtil.supportsRowLineage(table) + && (overwriteFiles || writeConf.rewrittenFileSetId() != null); + boolean writeAlreadyIncludesLineage = + dsSchema.exists(field -> field.name().equals(MetadataColumns.ROW_ID.name())); + StructType sparkWriteSchema = dsSchema; + if (writeRequiresRowLineage && !writeAlreadyIncludesLineage) { + sparkWriteSchema = sparkWriteSchema.add(MetadataColumns.ROW_ID.name(), LongType$.MODULE$); + sparkWriteSchema = + sparkWriteSchema.add( + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), LongType$.MODULE$); Review Comment: This section already existed in 4.0 but not in 3.5 since in 3.5 the custom rules would already add these (in 4.0 we need this so that the writer schema is setup correctly for lineage). Now with handling the compaction case I updated 3.5 to be more like the 4.0 rules since now there's no rule adding it to the output schema already. ########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java: ########## @@ -119,14 +119,24 @@ public WriteBuilder overwrite(Filter[] filters) { @Override public Write build() { // The write schema should only include row lineage in the output if it's an overwrite - // operation. + // operation or if it's a compaction. // In any other case, only null row IDs and sequence numbers would be produced which // means the row lineage columns can be excluded from the output files - boolean writeIncludesRowLineage = TableUtil.supportsRowLineage(table) && overwriteFiles; - Schema writeSchema = - validateOrMergeWriteSchema(table, dsSchema, writeConf, writeIncludesRowLineage); + boolean writeRequiresRowLineage = + TableUtil.supportsRowLineage(table) + && (overwriteFiles || writeConf.rewrittenFileSetId() != null); + boolean writeAlreadyIncludesLineage = + dsSchema.exists(field -> field.name().equals(MetadataColumns.ROW_ID.name())); + StructType sparkWriteSchema = dsSchema; + if (writeRequiresRowLineage && !writeAlreadyIncludesLineage) { + sparkWriteSchema = sparkWriteSchema.add(MetadataColumns.ROW_ID.name(), LongType$.MODULE$); + sparkWriteSchema = + sparkWriteSchema.add( + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), LongType$.MODULE$); Review Comment: This section already existed in 4.0 but not in 3.5 since in 3.5 the custom rules would already add these (in 4.0 we need this so that the writer schema is setup correctly for lineage). Now with handling the compaction case I updated 3.5 to be more like the 4.0 rules since now there's a case where there's no rule adding it to the output schema already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org