Reo-LEI commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r681121716



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -260,18 +263,21 @@ public Builder uidPrefix(String newPrefix) {
       // Convert the requested flink table schema to flink row type.
       RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
-      // Distribute the records from input data stream based on the 
write.distribution-mode.
-      rowDataInput = distributeDataStream(rowDataInput, table.properties(), 
table.spec(), table.schema(), flinkRowType);
+      // Distribute the records from input data stream based on the 
write.distribution-mode and equality fields.
+      rowDataInput = distributeDataStream(rowDataInput, table.properties(), 
equalityFieldIds, table.spec(),
+          table.schema(), flinkRowType);
 
       // Chain the iceberg stream writer and committer operator.
       IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, 
flinkRowType, equalityFieldIds);
       IcebergFilesCommitter filesCommitter = new 
IcebergFilesCommitter(tableLoader, overwrite);
 
-      this.writeParallelism = writeParallelism == null ? 
rowDataInput.getParallelism() : writeParallelism;
-
       SingleOutputStreamOperator<WriteResult> writerStream = rowDataInput
-          .transform(ICEBERG_STREAM_WRITER_NAME, 
TypeInformation.of(WriteResult.class), streamWriter)
-          .setParallelism(writeParallelism);
+          .transform(ICEBERG_STREAM_WRITER_NAME, 
TypeInformation.of(WriteResult.class), streamWriter);
+
+      if (this.writeParallelism != null) {
+        writerStream.setParallelism(writeParallelism);
+      }

Review comment:
       That would be null if use not set, if `writeParallelism` is null, the 
`writerStream` would use flink job default parallelism, or else use 
`writeParallelism` specific value. This `writeParallelism` setting only effect 
to `IcebergStreamingWriter` parallelism, I think that would not bring other 
side effect. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to