openinx commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r554991546
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,46 @@ public Builder equalityFieldColumns(List<String> columns)
{
.name(String.format("IcebergSink %s", table.name()))
.setParallelism(1);
}
- }
- static IcebergStreamWriter<RowData> createStreamWriter(Table table,
TableSchema requestedSchema,
- List<Integer>
equalityFieldIds) {
- Preconditions.checkArgument(table != null, "Iceberg table should't be
null");
+ private boolean shouldShuffleByPartition(Map<String, String> properties) {
+ if (shuffleByPartition == null) {
+ // Use the configured table option if does not specify in FlinkSink
explicitly.
+ return PropertyUtil.propertyAsBoolean(properties,
+ WRITE_SHUFFLE_BY_PARTITION,
+ WRITE_SHUFFLE_BY_PARTITION_DEFAULT);
+ } else {
+ return shuffleByPartition;
Review comment:
I think it's flexible to provide job-level and table-level for this
option, shuffling by partition for the whole cluster's job seems to be coarse
granularity.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]