[
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14976806#comment-14976806
]
ASF GitHub Bot commented on FLINK-7:
------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1255#discussion_r43156865
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
---
@@ -1223,6 +1230,51 @@ public long count() throws Exception {
final TypeInformation<K> keyType =
TypeExtractor.getKeySelectorTypes(keyExtractor, getType());
return new PartitionOperator<T>(this, PartitionMethod.HASH, new
Keys.SelectorFunctionKeys<T, K>(clean(keyExtractor), this.getType(), keyType),
Utils.getCallLocationName());
}
+
+ /**
+ * Range-partitions a DataSet using the specified KeySelector.
+ * <p>
+ * <b>Important:</b>This operation shuffles the whole DataSet over the
network and can take significant amount of time.
+ *
+ * @param keySelector The KeySelector with which the DataSet is
range-partitioned.
+ * @return The partitioned DataSet.
+ *
+ * @see KeySelector
+ */
+ public <K extends Comparable<K>> DataSet<T>
partitionByRange(KeySelector<T, K> keySelector) {
+ final TypeInformation<K> keyType =
TypeExtractor.getKeySelectorTypes(keySelector, getType());
+ String callLocation = Utils.getCallLocationName();
+
+ // Extract key from input element by keySelector.
+ KeyExtractorMapper<T, K> keyExtractorMapper = new
KeyExtractorMapper<T, K>(keySelector);
--- End diff --
I think you would still have the nodes and all the information of the
`OptimizedPlan` available in `connectJobVertices()`. However, I would also be
OK to do it as a preprocessing step in `compileJobGraph()`.
Let me know if you face any obstacles or have any questions.
> [GitHub] Enable Range Partitioner
> ---------------------------------
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Runtime
> Reporter: GitHub Import
> Assignee: Chengxiang Li
> Fix For: pre-apache
>
>
> The range partitioner is currently disabled. We need to implement the
> following aspects:
> 1) Distribution information, if available, must be propagated back together
> with the ordering property.
> 2) A generic bucket lookup structure (currently specific to PactRecord).
> Tests to re-enable after fixing this issue:
> - TeraSortITCase
> - GlobalSortingITCase
> - GlobalSortingMixedOrderITCase
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/7
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: core, enhancement, optimizer,
> Milestone: Release 0.4
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Fri Apr 26 13:48:24 CEST 2013
> State: open
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)