pvary commented on code in PR #12298:
URL: https://github.com/apache/iceberg/pull/12298#discussion_r2549131555
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java:
##########
@@ -105,119 +124,106 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
super(
table,
dataFileFormat,
+ InternalRow.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- ImmutableMap.of());
+ writeProperties,
+ calculateSparkType(dataSparkType, dataSchema),
+ calculateSparkType(equalityDeleteSparkType, equalityDeleteRowSchema));
- this.dataSparkType = dataSparkType;
- this.equalityDeleteSparkType = equalityDeleteSparkType;
- this.positionDeleteSparkType = null;
+ this.table = table;
+ this.format = dataFileFormat;
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
+ this.positionDeleteRowSchema = null;
+ this.useDeprecatedPositionDeleteWriter = false;
}
static Builder builderFor(Table table) {
return new Builder(table);
}
- @Override
- protected void configureDataWrite(Avro.DataWriteBuilder builder) {
- builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
- builder.createWriterFunc(ignored -> new
SparkAvroWriter(equalityDeleteSparkType()));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
- boolean withRow =
-
positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined();
- if (withRow) {
- // SparkAvroWriter accepts just the Spark type of the row ignoring the
path and pos
- StructField rowField =
positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME);
- StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
- builder.createWriterFunc(ignored -> new
SparkAvroWriter(positionDeleteRowSparkType));
- }
-
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
- builder.createWriterFunc(msgType ->
SparkParquetWriters.buildWriter(dataSparkType(), msgType));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(),
msgType));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(),
msgType));
- builder.transformPaths(path -> UTF8String.fromString(path.toString()));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureDataWrite(ORC.DataWriteBuilder builder) {
- builder.createWriterFunc(SparkOrcWriter::new);
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(SparkOrcWriter::new);
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(SparkOrcWriter::new);
- builder.transformPaths(path -> UTF8String.fromString(path.toString()));
- builder.setAll(writeProperties);
- }
-
- private StructType dataSparkType() {
- if (dataSparkType == null) {
- Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
- this.dataSparkType = SparkSchemaUtil.convert(dataSchema());
- }
-
- return dataSparkType;
- }
-
- private StructType equalityDeleteSparkType() {
- if (equalityDeleteSparkType == null) {
- Preconditions.checkNotNull(
- equalityDeleteRowSchema(), "Equality delete schema must not be
null");
- this.equalityDeleteSparkType =
SparkSchemaUtil.convert(equalityDeleteRowSchema());
- }
-
- return equalityDeleteSparkType;
- }
-
private StructType positionDeleteSparkType() {
if (positionDeleteSparkType == null) {
// wrap the optional row schema into the position delete schema
containing path and position
- Schema positionDeleteSchema =
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
+ Schema positionDeleteSchema =
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema);
this.positionDeleteSparkType =
SparkSchemaUtil.convert(positionDeleteSchema);
}
return positionDeleteSparkType;
}
+ @Override
+ public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
+ EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
Review Comment:
Yes. This will be removed in 1.12.0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]