stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559803060
##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -138,6 +138,9 @@ private TableProperties() {
public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
+ public static final String WRITE_SHUFFLE_BY_PARTITION =
"write.shuffle-by.partition";
Review comment:
Hashing over very large cardinality (like `user_id`) is fine. But I
should caution about `keyBy` over partition key which has relatively small
cardinality (like `bucket(user_id)`). It can get pretty uneven distribution.
```
{0=19, 1=62, 2=169, 3=282, 4=364, 5=318, 6=317, 7=207, 8=131, 9=60, 10=37,
11=15, 12=14, 13=4, 14=1}
```
Here is the sample code that simulate the scenario that we experienced and
probably also cover the hash keyBy of partitionKey in this PR.
```java
@Test
public void testKeyBy() {
final int maxParallelism = 4000;
final int numberOfTasks = 2000;
final int numberOfBuckets = 10000;
final Integer[] assignment = new Integer[numberOfTasks];
for (int i = 0; i < numberOfTasks; ++i) {
assignment[i] = 0;
}
for (int i = 0; i < numberOfBuckets; ++i) {
// using integer key generates similar result
// final Integer key = i;
final String key = "date=20210119/bucket=" + i;
final int assignedTask =
KeyGroupRangeAssignment.assignKeyToParallelOperator(
key, maxParallelism, numberOfTasks);
assignment[assignedTask] += 1;
}
final Map<Integer, Long> assignmentStats =
Lists.newArrayList(assignment).stream()
.collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
System.out.println(assignmentStats);
}
```
To avoid this problem, we have to write a custom partitioner that directly
map bucket id to the subtask. The custom partitioner can be passed into
`DataStream#partitionCustom()`.
```java
static class BucketPartitioner implements Partitioner<Integer> {
@Override
public int partition(Integer key, int numPartitions) {
return key % numPartitions;
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]