[
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14991256#comment-14991256
]
ASF GitHub Bot commented on FLINK-7:
------------------------------------
Github user ChengXiangLi commented on a diff in the pull request:
https://github.com/apache/flink/pull/1255#discussion_r43982452
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
---
@@ -135,7 +116,7 @@ public OutputEmitter(ShipStrategyType strategy,
TypeComparator<T> comparator, Pa
case PARTITION_CUSTOM:
return customPartition(record.getInstance(),
numberOfChannels);
case PARTITION_RANGE:
- return rangePartition(record.getInstance(),
numberOfChannels);
+ return rangePartition((Tuple2<Integer,
?>)record.getInstance(), numberOfChannels);
--- End diff --
I got your point, @fhueske , for the reason i implement in current way:
1. This PR only includes the part of range partition based on automatic
sampling, to only execute source node(before range partition) once, we send the
sampled distribution data by broadcast, so take the partition id part of the
shuffle data. In this case, the type of 'OutputEmitter' would definitely be
`Tuple2<Integer, ?>`.
2. Actually there would be a following up task to support range partition
by given `DataDistribution`. Current `DataDistribution.getBucketBoundary()`
return `Key<?>[]`, while `TypeComparator.extractKeys()` return `Object[]` which
does match each other. Besides, there are exists implementations based on
`DataDistribution` and `GeneralDataSinkBase.setGlobalOrder()`, and a bunch of
related logic.
Involved `DataDistribution` in `OutputEmitter` would basically bring case 2
into this PR, which i plan to implement as a followup work. Do you prefer that
we do both together?
> [GitHub] Enable Range Partitioner
> ---------------------------------
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Runtime
> Reporter: GitHub Import
> Assignee: Chengxiang Li
> Fix For: pre-apache
>
>
> The range partitioner is currently disabled. We need to implement the
> following aspects:
> 1) Distribution information, if available, must be propagated back together
> with the ordering property.
> 2) A generic bucket lookup structure (currently specific to PactRecord).
> Tests to re-enable after fixing this issue:
> - TeraSortITCase
> - GlobalSortingITCase
> - GlobalSortingMixedOrderITCase
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/7
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: core, enhancement, optimizer,
> Milestone: Release 0.4
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Fri Apr 26 13:48:24 CEST 2013
> State: open
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)