[
https://issues.apache.org/jira/browse/FLINK-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205375#comment-15205375
]
ASF GitHub Bot commented on FLINK-2997:
---------------------------------------
Github user gallenvara commented on a diff in the pull request:
https://github.com/apache/flink/pull/1776#discussion_r56916757
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
---
@@ -45,35 +46,48 @@
private final PartitionMethod pMethod;
private final String partitionLocationName;
private final Partitioner<?> customPartitioner;
-
-
+ private final DataDistribution distribution;
+
+
public PartitionOperator(DataSet<T> input, PartitionMethod pMethod,
Keys<T> pKeys, String partitionLocationName) {
- this(input, pMethod, pKeys, null, null, partitionLocationName);
+ this(input, pMethod, pKeys, null, null, null,
partitionLocationName);
}
-
+
+ public PartitionOperator(DataSet<T> input, PartitionMethod pMethod,
Keys<T> pKeys, DataDistribution distribution, String partitionLocationName) {
+ this(input, pMethod, pKeys, null, null, distribution,
partitionLocationName);
+ }
+
public PartitionOperator(DataSet<T> input, PartitionMethod pMethod,
String partitionLocationName) {
- this(input, pMethod, null, null, null, partitionLocationName);
+ this(input, pMethod, null, null, null, null,
partitionLocationName);
}
public PartitionOperator(DataSet<T> input, Keys<T> pKeys,
Partitioner<?> customPartitioner, String partitionLocationName) {
- this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner,
null, partitionLocationName);
+ this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner,
null, null, partitionLocationName);
}
- public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys,
Partitioner<P> customPartitioner,
+ public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys,
Partitioner<P> customPartitioner,
TypeInformation<P> partitionerTypeInfo, String
partitionLocationName)
{
- this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner,
partitionerTypeInfo, partitionLocationName);
+ this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner,
partitionerTypeInfo, null, partitionLocationName);
}
- private <P> PartitionOperator(DataSet<T> input, PartitionMethod
pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner,
- TypeInformation<P> partitionerTypeInfo, String
partitionLocationName)
+ private <P> PartitionOperator(DataSet<T> input, PartitionMethod
pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner,
+ TypeInformation<P> partitionerTypeInfo,
DataDistribution distribution, String partitionLocationName)
{
super(input, input.getType());
Preconditions.checkNotNull(pMethod);
Preconditions.checkArgument(pKeys != null || pMethod ==
PartitionMethod.REBALANCE, "Partitioning requires keys");
Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM
|| customPartitioner != null, "Custom partioning requires a partitioner.");
-
+ Preconditions.checkArgument(distribution == null || pMethod ==
PartitionMethod.RANGE, "Customized data distribution is only neccessary for
range partition.");
+
+ if (distribution != null) {
+
Preconditions.checkArgument(distribution.getNumberOfFields() ==
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and
range partitioner should be the same.");
+ for (int i = 0; i < pKeys.getNumberOfKeyFields(); i++) {
+
Preconditions.checkArgument(distribution.getKeyTypes()[i].equals(pKeys.getKeyFieldTypes()[i]),
"The types of key from the distribution and range partitioner are not equal.");
--- End diff --
code modified and rebase the new commit with previous one.( you must have
stayed up late last night :) )
> Support range partition with user customized data distribution.
> ---------------------------------------------------------------
>
> Key: FLINK-2997
> URL: https://issues.apache.org/jira/browse/FLINK-2997
> Project: Flink
> Issue Type: New Feature
> Reporter: Chengxiang Li
>
> This is a followup work of FLINK-7, sometime user have better knowledge of
> the source data, and they can build customized data distribution to do range
> partition more efficiently.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)