openinx commented on a change in pull request #1320:
URL: https://github.com/apache/iceberg/pull/1320#discussion_r468976794
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java
##########
@@ -38,37 +40,38 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-class RowTaskWriterFactory implements TaskWriterFactory<Row> {
+class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private final Schema schema;
private final PartitionSpec spec;
private final LocationProvider locations;
private final FileIO io;
private final EncryptionManager encryptionManager;
private final long targetFileSizeBytes;
private final FileFormat format;
- private final FileAppenderFactory<Row> appenderFactory;
+ private final RowType flinkSchema;
+ private final FileAppenderFactory<RowData> appenderFactory;
private OutputFileFactory outputFileFactory;
- RowTaskWriterFactory(Schema schema,
- PartitionSpec spec,
- LocationProvider locations,
- FileIO io,
- EncryptionManager encryptionManager,
- long targetFileSizeBytes,
- FileFormat format,
- Map<String, String> tableProperties) {
+ RowDataTaskWriterFactory(Schema schema,
+ PartitionSpec spec,
+ LocationProvider locations,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ long targetFileSizeBytes,
+ FileFormat format,
+ Map<String, String> tableProperties) {
this.schema = schema;
this.spec = spec;
this.locations = locations;
this.io = io;
this.encryptionManager = encryptionManager;
this.targetFileSizeBytes = targetFileSizeBytes;
this.format = format;
- this.appenderFactory = new FlinkFileAppenderFactory(schema,
tableProperties);
+ this.flinkSchema = FlinkSchemaUtil.convert(schema);
Review comment:
Thanks for the great remanding, I think the bug you mentioned is this
one https://github.com/apache/iceberg/pull/999. The flink don't support CTAS
but support `INSERT INTO iceberg_table SELECT * from table_2`, if the table_2
has a TINYINT or SMALLINT, them its `BinaryRowData` queried from `SELECT` will
be byte or short, we also need the raw flink's schema to read the values from
`BinaryRowData` (rather than the flink schema converted from iceberg schema),
and write those byte or short into integer. Let me consider how to fix this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]