rdblue commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r560400754
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -225,32 +249,71 @@ public Builder equalityFieldColumns(List<String> columns)
{
.name(String.format("IcebergSink %s", table.name()))
.setParallelism(1);
}
- }
- static IcebergStreamWriter<RowData> createStreamWriter(Table table,
TableSchema requestedSchema,
- List<Integer>
equalityFieldIds) {
- Preconditions.checkArgument(table != null, "Iceberg table should't be
null");
+ private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
+ Map<String, String>
properties,
+ PartitionSpec
partitionSpec,
+ Schema iSchema,
+ RowType flinkRowType) {
+ DistributionMode writeMode;
+ if (distributionMode == null) {
+ // Fallback to use distribution mode parsed from table properties if
don't specify in job level.
+ String modeName = PropertyUtil.propertyAsString(properties,
+ WRITE_DISTRIBUTION_MODE,
+ WRITE_DISTRIBUTION_MODE_DEFAULT);
+
+ writeMode = DistributionMode.fromName(modeName);
+ } else {
+ writeMode = distributionMode;
+ }
+
+ switch (writeMode) {
+ case NONE:
+ return input;
+
+ case HASH:
+ if (partitionSpec.isUnpartitioned()) {
+ return input;
+ } else {
+ return input.keyBy(new PartitionKeySelector(partitionSpec,
iSchema, flinkRowType));
Review comment:
This isn't going to cover all cases, but I think it is a necessary first
step. Data skew is going to require range distribution.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]