pvary commented on code in PR #6049:
URL: https://github.com/apache/iceberg/pull/6049#discussion_r1005303117
##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -564,14 +571,58 @@ static IcebergStreamWriter<RowData> createStreamWriter(
Preconditions.checkArgument(table != null, "Iceberg table shouldn't be
null");
Table serializableTable = SerializableTable.copyOf(table);
+ FileFormat format = flinkWriteConf.dataFileFormat();
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
serializableTable,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
- flinkWriteConf.dataFileFormat(),
+ format,
+ writeProperties(table, format, flinkWriteConf),
equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
+
+ /**
+ * Based on the {@link FileFormat} overwrites the table level compression
properties for the table
+ * write.
+ *
+ * @param table The table to get the table level settings
+ * @param format The FileFormat to use
+ * @param conf The write configuration
+ * @return The properties to use for writing
+ */
+ private static Map<String, String> writeProperties(
+ Table table, FileFormat format, FlinkWriteConf conf) {
+ Map<String, String> writeProperties = Maps.newHashMap(table.properties());
+
+ String level;
+ switch (format) {
+ case PARQUET:
+ writeProperties.put(PARQUET_COMPRESSION,
conf.parquetCompressionCodec());
+ level = conf.parquetCompressionLevel();
+ if (level != null) {
+ writeProperties.put(PARQUET_COMPRESSION_LEVEL, level);
+ }
+
+ break;
+ case AVRO:
+ writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
+ level = conf.avroCompressionLevel();
+ if (level != null) {
+ writeProperties.put(AVRO_COMPRESSION_LEVEL,
conf.avroCompressionLevel());
Review Comment:
I hate when the blocks in the switch are not independent.
I refactored to use a different variable instead inside each block
##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -564,14 +571,58 @@ static IcebergStreamWriter<RowData> createStreamWriter(
Preconditions.checkArgument(table != null, "Iceberg table shouldn't be
null");
Table serializableTable = SerializableTable.copyOf(table);
+ FileFormat format = flinkWriteConf.dataFileFormat();
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
serializableTable,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
- flinkWriteConf.dataFileFormat(),
+ format,
+ writeProperties(table, format, flinkWriteConf),
equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
+
+ /**
+ * Based on the {@link FileFormat} overwrites the table level compression
properties for the table
+ * write.
+ *
+ * @param table The table to get the table level settings
+ * @param format The FileFormat to use
+ * @param conf The write configuration
+ * @return The properties to use for writing
+ */
+ private static Map<String, String> writeProperties(
+ Table table, FileFormat format, FlinkWriteConf conf) {
+ Map<String, String> writeProperties = Maps.newHashMap(table.properties());
+
+ String level;
Review Comment:
Removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]