stevenzwu commented on a change in pull request #2064:
URL: https://github.com/apache/iceberg/pull/2064#discussion_r559716019



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -209,7 +224,16 @@ public Builder equalityFieldColumns(List<String> columns) {
         }
       }
 
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, 
tableSchema, equalityFieldIds);
+      // Convert the flink requested table schema to flink row type.
+      RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
+
+      // Shuffle by partition key if possible.
+      if (shouldShuffleByPartition(table.properties()) && 
!table.spec().isUnpartitioned()) {
+        rowDataInput = rowDataInput.keyBy(new 
PartitionKeySelector(table.spec(), table.schema(), flinkRowType));

Review comment:
       Hashing over very large cardinality (like `user_id`) is fine. But I 
should caution about `keyBy` over partition key which has relatively small 
cardinality (like `bucket(user_id)`). It can get pretty uneven distribution. 
   ```
   {0=19, 1=62, 2=169, 3=282, 4=364, 5=318, 6=317, 7=207, 8=131, 9=60, 10=37, 
11=15, 12=14, 13=4, 14=1}
   ```
   
   Here is the sample code.
   ```java
     @Test
     public void testKeyBy() {
       final int maxParallelism = 4000;
       final int numberOfTasks = 2000;
       final int numberOfBuckets = 10000;
   
       final Integer[] assignment = new Integer[2000];
       for (int i = 0; i < numberOfTasks; ++i) {
         assignment[i] = 0;
       }
   
       for (int i = 0; i < numberOfBuckets; ++i) {
         final String key = "date=20210119/bucket=" + i;
         final int assignedTask = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(
             key, maxParallelism, numberOfTasks);
         assignment[assignedTask] += 1;
       }
   
       final Map<Integer, Long> assignmentStats = 
Lists.newArrayList(assignment).stream()
           .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
       System.out.println(assignmentStats);
     }
   ```
   
   To avoid this problem, we have to write a custom partitioner that directly 
map bucket id to the subtask and pass it to `DataStream#partitionCustom()`.
   ```java
     static class BucketPartitioner implements Partitioner<Integer> {
   
       @Override
       public int partition(Integer key, int numPartitions) {
         return key % numPartitions;
       }
     }
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to