kbendick commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559389518
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,46 @@ public Builder equalityFieldColumns(List<String> columns)
{
.name(String.format("IcebergSink %s", table.name()))
.setParallelism(1);
}
- }
- static IcebergStreamWriter<RowData> createStreamWriter(Table table,
TableSchema requestedSchema,
- List<Integer>
equalityFieldIds) {
- Preconditions.checkArgument(table != null, "Iceberg table should't be
null");
+ private boolean shouldShuffleByPartition(Map<String, String> properties) {
+ if (shuffleByPartition == null) {
+ // Use the configured table option if does not specify in FlinkSink
explicitly.
+ return PropertyUtil.propertyAsBoolean(properties,
+ WRITE_SHUFFLE_BY_PARTITION,
+ WRITE_SHUFFLE_BY_PARTITION_DEFAULT);
+ } else {
+ return shuffleByPartition;
Review comment:
That's a fair assessment.
I ask as we typically use job-clusters at my work and then try to specify as
much configuration as possible on the cluster's config. This is mostly to
provide one easy way to track this, and is definitely tied to how our build and
deployment system and configuration system is set up internally at my work.
Outside of job clusters, I would agree that it's too coarse grained. And
having the possibility to set it as a job config or a table config is good
enough. I'm not even sure if Flink would accept arbitrary configurations at the
cluster level (that it isn't aware of).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]