pvary commented on code in PR #12298:
URL: https://github.com/apache/iceberg/pull/12298#discussion_r2510212820
##########
data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java:
##########
@@ -107,62 +130,166 @@ public class GenericFileWriterFactory extends
BaseFileWriterFactory<Record> {
super(
table,
dataFileFormat,
+ Record.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- positionDeleteRowSchema);
+ ImmutableMap.of(),
+ dataSchema,
+ equalityDeleteRowSchema);
+ this.table = table;
+ this.format = dataFileFormat;
+ this.positionDeleteRowSchema = positionDeleteRowSchema;
}
static Builder builderFor(Table table) {
return new Builder(table);
}
- @Override
+ /**
+ * @deprecated Since 1.10.0, will be removed in 1.11.0. It won't be called
starting 1.10.0 as the
+ * configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
- builder.createWriterFunc(DataWriter::create);
+ throwUnsupportedOperationException();
}
- @Override
+ /**
+ * @deprecated Since 1.10.0, will be removed in 1.11.0. It won't be called
starting 1.10.0 as the
+ * configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
- builder.createWriterFunc(DataWriter::create);
+ throwUnsupportedOperationException();
}
- @Override
+ /**
+ * @deprecated Since 1.10.0, will be removed in 1.11.0. It won't be called
starting 1.10.0 as the
+ * configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
- builder.createWriterFunc(DataWriter::create);
+ throwUnsupportedOperationException();
}
- @Override
+ /**
+ * @deprecated Since 1.10.0, will be removed in 1.11.0. It won't be called
starting 1.10.0 as the
+ * configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
- builder.createWriterFunc(GenericParquetWriter::create);
+ throwUnsupportedOperationException();
}
- @Override
+ /**
+ * @deprecated Since 1.10.0, will be removed in 1.11.0. It won't be called
starting 1.10.0 as the
+ * configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(GenericParquetWriter::create);
+ throwUnsupportedOperationException();
}
- @Override
+ /**
+ * @deprecated Since 1.10.0, will be removed in 1.11.0. It won't be called
starting 1.10.0 as the
+ * configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(GenericParquetWriter::create);
+ throwUnsupportedOperationException();
}
- @Override
+ /**
+ * @deprecated Since 1.10.0, will be removed in 1.11.0. It won't be called
starting 1.10.0 as the
+ * configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureDataWrite(ORC.DataWriteBuilder builder) {
- builder.createWriterFunc(GenericOrcWriter::buildWriter);
+ throwUnsupportedOperationException();
}
- @Override
+ /**
+ * @deprecated Since 1.10.0, will be removed in 1.11.0. It won't be called
starting 1.10.0 as the
+ * configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(GenericOrcWriter::buildWriter);
+ throwUnsupportedOperationException();
}
- @Override
+ /**
+ * @deprecated Since 1.10.0, will be removed in 1.11.0. It won't be called
starting 1.10.0 as the
+ * configuration is done by the {@link FormatModelRegistry}.
+ */
+ @Deprecated
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(GenericOrcWriter::buildWriter);
+ throwUnsupportedOperationException();
+ }
+
+ private void throwUnsupportedOperationException() {
+ throw new UnsupportedOperationException(
+ "Method is deprecated and should not be called. "
+ + "Configuration is already done by the registry.");
+ }
+
+ @Override
+ public PositionDeleteWriter<Record> newPositionDeleteWriter(
+ EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+ if (positionDeleteRowSchema == null) {
+ return super.newPositionDeleteWriter(file, spec, partition);
+ } else {
+ LOG.info(
+ "Deprecated feature used. Position delete row schema is used to
create the position delete writer.");
+ MetricsConfig metricsConfig =
+ table != null
+ ? MetricsConfig.forPositionDelete(table)
+ : MetricsConfig.fromProperties(ImmutableMap.of());
+
+ try {
+ switch (format) {
+ case AVRO:
+ return Avro.writeDeletes(file)
+ .createWriterFunc(DataWriter::create)
+ .withPartition(partition)
+ .overwrite()
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .buildPositionWriter();
+
+ case ORC:
+ return ORC.writeDeletes(file)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .withPartition(partition)
+ .overwrite()
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .buildPositionWriter();
+
+ case PARQUET:
+ return Parquet.writeDeletes(file)
+ .createWriterFunc(GenericParquetWriter::create)
+ .withPartition(partition)
+ .overwrite()
+ .metricsConfig(metricsConfig)
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .buildPositionWriter();
+
+ default:
+ throw new UnsupportedOperationException(
+ "Cannot write pos-deletes for unsupported file format: " +
format);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
Review Comment:
Added
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java:
##########
@@ -105,119 +124,106 @@ class SparkFileWriterFactory extends
BaseFileWriterFactory<InternalRow> {
super(
table,
dataFileFormat,
+ InternalRow.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- ImmutableMap.of());
+ writeProperties,
+ calculateSparkType(dataSparkType, dataSchema),
+ calculateSparkType(equalityDeleteSparkType, equalityDeleteRowSchema));
- this.dataSparkType = dataSparkType;
- this.equalityDeleteSparkType = equalityDeleteSparkType;
- this.positionDeleteSparkType = null;
+ this.table = table;
+ this.format = dataFileFormat;
this.writeProperties = writeProperties != null ? writeProperties :
ImmutableMap.of();
+ this.positionDeleteRowSchema = null;
+ this.useDeprecatedPositionDeleteWriter = false;
}
static Builder builderFor(Table table) {
return new Builder(table);
}
- @Override
- protected void configureDataWrite(Avro.DataWriteBuilder builder) {
- builder.createWriterFunc(ignored -> new SparkAvroWriter(dataSparkType()));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
- builder.createWriterFunc(ignored -> new
SparkAvroWriter(equalityDeleteSparkType()));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
- boolean withRow =
-
positionDeleteSparkType().getFieldIndex(DELETE_FILE_ROW_FIELD_NAME).isDefined();
- if (withRow) {
- // SparkAvroWriter accepts just the Spark type of the row ignoring the
path and pos
- StructField rowField =
positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME);
- StructType positionDeleteRowSparkType = (StructType) rowField.dataType();
- builder.createWriterFunc(ignored -> new
SparkAvroWriter(positionDeleteRowSparkType));
- }
-
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
- builder.createWriterFunc(msgType ->
SparkParquetWriters.buildWriter(dataSparkType(), msgType));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- msgType -> SparkParquetWriters.buildWriter(equalityDeleteSparkType(),
msgType));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- msgType -> SparkParquetWriters.buildWriter(positionDeleteSparkType(),
msgType));
- builder.transformPaths(path -> UTF8String.fromString(path.toString()));
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureDataWrite(ORC.DataWriteBuilder builder) {
- builder.createWriterFunc(SparkOrcWriter::new);
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(SparkOrcWriter::new);
- builder.setAll(writeProperties);
- }
-
- @Override
- protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(SparkOrcWriter::new);
- builder.transformPaths(path -> UTF8String.fromString(path.toString()));
- builder.setAll(writeProperties);
- }
-
- private StructType dataSparkType() {
- if (dataSparkType == null) {
- Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
- this.dataSparkType = SparkSchemaUtil.convert(dataSchema());
- }
-
- return dataSparkType;
- }
-
- private StructType equalityDeleteSparkType() {
- if (equalityDeleteSparkType == null) {
- Preconditions.checkNotNull(
- equalityDeleteRowSchema(), "Equality delete schema must not be
null");
- this.equalityDeleteSparkType =
SparkSchemaUtil.convert(equalityDeleteRowSchema());
- }
-
- return equalityDeleteSparkType;
- }
-
private StructType positionDeleteSparkType() {
if (positionDeleteSparkType == null) {
// wrap the optional row schema into the position delete schema
containing path and position
- Schema positionDeleteSchema =
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema());
+ Schema positionDeleteSchema =
DeleteSchemaUtil.posDeleteSchema(positionDeleteRowSchema);
this.positionDeleteSparkType =
SparkSchemaUtil.convert(positionDeleteSchema);
}
return positionDeleteSparkType;
}
+ @Override
+ public PositionDeleteWriter<InternalRow> newPositionDeleteWriter(
+ EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
+ if (!useDeprecatedPositionDeleteWriter) {
+ return super.newPositionDeleteWriter(file, spec, partition);
+ } else {
+ LOG.info(
+ "Deprecated feature used. Position delete row schema is used to
create the position delete writer.");
+ MetricsConfig metricsConfig =
+ table != null
+ ? MetricsConfig.forPositionDelete(table)
+ : MetricsConfig.fromProperties(ImmutableMap.of());
+
+ try {
+ switch (format) {
+ case AVRO:
+ StructType positionDeleteRowSparkType =
+ (StructType)
positionDeleteSparkType().apply(DELETE_FILE_ROW_FIELD_NAME).dataType();
+
+ return Avro.writeDeletes(file)
+ .createWriterFunc(ignored -> new
SparkAvroWriter(positionDeleteRowSparkType))
+ .withPartition(partition)
+ .overwrite()
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .setAll(writeProperties)
+ .metricsConfig(metricsConfig)
+ .buildPositionWriter();
+
+ case ORC:
+ return ORC.writeDeletes(file)
+ .createWriterFunc(SparkOrcWriter::new)
+ .transformPaths(path -> UTF8String.fromString(path.toString()))
+ .withPartition(partition)
+ .overwrite()
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .setAll(writeProperties)
+ .metricsConfig(metricsConfig)
+ .buildPositionWriter();
+
+ case PARQUET:
+ return Parquet.writeDeletes(file)
+ .createWriterFunc(
+ msgType ->
SparkParquetWriters.buildWriter(positionDeleteSparkType(), msgType))
+ .transformPaths(path -> UTF8String.fromString(path.toString()))
+ .withPartition(partition)
+ .overwrite()
+ .metricsConfig(metricsConfig)
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withKeyMetadata(file.keyMetadata())
+ .setAll(writeProperties)
+ .metricsConfig(metricsConfig)
+ .buildPositionWriter();
+
+ default:
+ throw new UnsupportedOperationException(
+ "Cannot write pos-deletes for unsupported file format: " +
format);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
Review Comment:
added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]