stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559716019
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -209,7 +224,16 @@ public Builder equalityFieldColumns(List<String> columns) {
}
}
- IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table,
tableSchema, equalityFieldIds);
+ // Convert the flink requested table schema to flink row type.
+ RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+
+ // Shuffle by partition key if possible.
+ if (shouldShuffleByPartition(table.properties()) &&
!table.spec().isUnpartitioned()) {
+ rowDataInput = rowDataInput.keyBy(new
PartitionKeySelector(table.spec(), table.schema(), flinkRowType));
Review comment:
Hashing over very large cardinality (like `user_id`) is fine. But I
should caution about `keyBy` over partition key which has relatively small
cardinality (like `bucket(user_id)`). It can get pretty uneven distribution.
```
{0=19, 1=62, 2=169, 3=282, 4=364, 5=318, 6=317, 7=207, 8=131, 9=60, 10=37,
11=15, 12=14, 13=4, 14=1}
```
Here is the sample code.
```java
@Test
public void testKeyBy() {
final int maxParallelism = 4000;
final int numberOfTasks = 2000;
final int numberOfBuckets = 10000;
final Integer[] assignment = new Integer[2000];
for (int i = 0; i < numberOfTasks; ++i) {
assignment[i] = 0;
}
for (int i = 0; i < numberOfBuckets; ++i) {
final String key = "date=20210119/bucket=" + i;
final int assignedTask =
KeyGroupRangeAssignment.assignKeyToParallelOperator(
key, maxParallelism, numberOfTasks);
assignment[assignedTask] += 1;
}
final Map<Integer, Long> assignmentStats =
Lists.newArrayList(assignment).stream()
.collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
System.out.println(assignmentStats);
}
```
To avoid this problem, we have to write a custom partitioner that directly
map bucket id to the subtask. The custom partitioner can be passed into
`DataStream#partitionCustom()`.
```java
static class BucketPartitioner implements Partitioner<Integer> {
@Override
public int partition(Integer key, int numPartitions) {
return key % numPartitions;
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]