zhangjun0x01 commented on a change in pull request #1669:
URL: https://github.com/apache/iceberg/pull/1669#discussion_r513347072
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -179,37 +186,93 @@ public Builder writeParallelism(int newWriteParallelism) {
try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
- throw new UncheckedIOException("Failed to load iceberg table from
table loader: " + tableLoader, e);
+ throw new UncheckedIOException(
+ "Failed to load iceberg table from table loader: " + tableLoader,
+ e);
}
}
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table,
tableSchema);
IcebergFilesCommitter filesCommitter = new
IcebergFilesCommitter(tableLoader, overwrite);
- this.writeParallelism = writeParallelism == null ?
rowDataInput.getParallelism() : writeParallelism;
-
- DataStream<Void> returnStream = rowDataInput
- .transform(ICEBERG_STREAM_WRITER_NAME,
TypeInformation.of(DataFile.class), streamWriter)
- .setParallelism(writeParallelism)
- .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
+ DataStream<?> returnStream = rowDataInput
+ .transform(ICEBERG_STREAM_WRITER_NAME,
+ TypeInformation.of(DataFile.class), streamWriter)
+ .setParallelism(rowDataInput.getParallelism())
+ .transform(ICEBERG_FILES_COMMITTER_NAME,
+ TypeInformation.of(Long.class), filesCommitter)
.setParallelism(1)
Review comment:
I feel that the master is not the latest version when the patch is rebase
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]