stevenzwu commented on code in PR #6049:
URL: https://github.com/apache/iceberg/pull/6049#discussion_r1004738763
##########
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -564,14 +571,58 @@ static IcebergStreamWriter<RowData> createStreamWriter(
Preconditions.checkArgument(table != null, "Iceberg table shouldn't be
null");
Table serializableTable = SerializableTable.copyOf(table);
+ FileFormat format = flinkWriteConf.dataFileFormat();
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
serializableTable,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
- flinkWriteConf.dataFileFormat(),
+ format,
+ writeProperties(table, format, flinkWriteConf),
equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
+
+ /**
+ * Based on the {@link FileFormat} overwrites the table level compression
properties for the table
+ * write.
+ *
+ * @param table The table to get the table level settings
+ * @param format The FileFormat to use
+ * @param conf The write configuration
+ * @return The properties to use for writing
+ */
+ private static Map<String, String> writeProperties(
+ Table table, FileFormat format, FlinkWriteConf conf) {
+ Map<String, String> writeProperties = Maps.newHashMap(table.properties());
+
+ String level;
Review Comment:
nit: not necessary to define it outside the switch block. should we define
it as local to the case block?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]