[ 
https://issues.apache.org/jira/browse/FLINK-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15205375#comment-15205375
 ] 

ASF GitHub Bot commented on FLINK-2997:
---------------------------------------

Github user gallenvara commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1776#discussion_r56916757
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 ---
    @@ -45,35 +46,48 @@
        private final PartitionMethod pMethod;
        private final String partitionLocationName;
        private final Partitioner<?> customPartitioner;
    -   
    -   
    +   private final DataDistribution distribution;
    +
    +
        public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, 
Keys<T> pKeys, String partitionLocationName) {
    -           this(input, pMethod, pKeys, null, null, partitionLocationName);
    +           this(input, pMethod, pKeys, null, null, null, 
partitionLocationName);
        }
    -   
    +
    +   public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, 
Keys<T> pKeys, DataDistribution distribution, String partitionLocationName) {
    +           this(input, pMethod, pKeys, null, null, distribution, 
partitionLocationName);
    +   }
    +
        public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, 
String partitionLocationName) {
    -           this(input, pMethod, null, null, null, partitionLocationName);
    +           this(input, pMethod, null, null, null, null, 
partitionLocationName);
        }
        
        public PartitionOperator(DataSet<T> input, Keys<T> pKeys, 
Partitioner<?> customPartitioner, String partitionLocationName) {
    -           this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
null, partitionLocationName);
    +           this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
null, null, partitionLocationName);
        }
        
    -   public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, 
Partitioner<P> customPartitioner, 
    +   public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, 
Partitioner<P> customPartitioner,
                        TypeInformation<P> partitionerTypeInfo, String 
partitionLocationName)
        {
    -           this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
partitionerTypeInfo, partitionLocationName);
    +           this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, 
partitionerTypeInfo, null, partitionLocationName);
        }
        
    -   private <P> PartitionOperator(DataSet<T> input, PartitionMethod 
pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner, 
    -                   TypeInformation<P> partitionerTypeInfo, String 
partitionLocationName)
    +   private <P> PartitionOperator(DataSet<T> input, PartitionMethod 
pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner,
    +                   TypeInformation<P> partitionerTypeInfo, 
DataDistribution distribution, String partitionLocationName)
        {
                super(input, input.getType());
                
                Preconditions.checkNotNull(pMethod);
                Preconditions.checkArgument(pKeys != null || pMethod == 
PartitionMethod.REBALANCE, "Partitioning requires keys");
                Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM 
|| customPartitioner != null, "Custom partioning requires a partitioner.");
    -
    +           Preconditions.checkArgument(distribution == null || pMethod == 
PartitionMethod.RANGE, "Customized data distribution is only neccessary for 
range partition.");
    +           
    +           if (distribution != null) {
    +                   
Preconditions.checkArgument(distribution.getNumberOfFields() == 
pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and 
range partitioner should be the same.");
    +                   for (int i = 0; i < pKeys.getNumberOfKeyFields(); i++) {
    +                           
Preconditions.checkArgument(distribution.getKeyTypes()[i].equals(pKeys.getKeyFieldTypes()[i]),
 "The types of key from the distribution and range partitioner are not equal.");
    --- End diff --
    
    code modified and rebase the new commit with previous one.( you must have 
stayed up late last night :) )


> Support range partition with user customized data distribution.
> ---------------------------------------------------------------
>
>                 Key: FLINK-2997
>                 URL: https://issues.apache.org/jira/browse/FLINK-2997
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Chengxiang Li
>
> This is a followup work of FLINK-7, sometime user have better knowledge of 
> the source data, and they can build customized data distribution to do range 
> partition more efficiently.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to