Github user ChengXiangLi commented on a diff in the pull request:
https://github.com/apache/flink/pull/1255#discussion_r43982452
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
---
@@ -135,7 +116,7 @@ public OutputEmitter(ShipStrategyType strategy,
TypeComparator<T> comparator, Pa
case PARTITION_CUSTOM:
return customPartition(record.getInstance(),
numberOfChannels);
case PARTITION_RANGE:
- return rangePartition(record.getInstance(),
numberOfChannels);
+ return rangePartition((Tuple2<Integer,
?>)record.getInstance(), numberOfChannels);
--- End diff --
I got your point, @fhueske , for the reason i implement in current way:
1. This PR only includes the part of range partition based on automatic
sampling, to only execute source node(before range partition) once, we send the
sampled distribution data by broadcast, so take the partition id part of the
shuffle data. In this case, the type of 'OutputEmitter' would definitely be
`Tuple2<Integer, ?>`.
2. Actually there would be a following up task to support range partition
by given `DataDistribution`. Current `DataDistribution.getBucketBoundary()`
return `Key<?>[]`, while `TypeComparator.extractKeys()` return `Object[]` which
does match each other. Besides, there are exists implementations based on
`DataDistribution` and `GeneralDataSinkBase.setGlobalOrder()`, and a bunch of
related logic.
Involved `DataDistribution` in `OutputEmitter` would basically bring case 2
into this PR, which i plan to implement as a followup work. Do you prefer that
we do both together?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---