[ 
https://issues.apache.org/jira/browse/FLINK-1060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14117561#comment-14117561
 ] 

ASF GitHub Bot commented on FLINK-1060:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/108#discussion_r16961223
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
 ---
    @@ -0,0 +1,249 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.operators;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.MapPartitionFunction;
    +import org.apache.flink.api.common.functions.util.FunctionUtils;
    +import org.apache.flink.api.common.operators.Operator;
    +import org.apache.flink.api.common.operators.UnaryOperatorInformation;
    +import org.apache.flink.api.common.operators.base.MapOperatorBase;
    +import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
    +import 
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
    +import org.apache.flink.api.java.DataSet;
    +import 
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
    +import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
    +import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.types.TypeInformation;
    +
    +public class PartitionedDataSet<IN> {
    +   
    +   private final DataSet<IN> dataSet;
    +   
    +   private final Keys<IN> pKeys;
    +   private final PartitionMethod pMethod;
    +   
    +   public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod, 
Keys<IN> pKeys) {
    +           this.dataSet = input;
    +           
    +           if(pMethod == PartitionMethod.HASH && pKeys == null) {
    +                   throw new IllegalArgumentException("Hash Partitioning 
requires keys");
    +           } else if(pMethod == PartitionMethod.RANGE) {
    +                   throw new UnsupportedOperationException("Range 
Partitioning not yet supported");
    +           }
    +           
    +           if(pKeys instanceof Keys.FieldPositionKeys<?> && 
!input.getType().isTupleType()) {
    +                   throw new IllegalArgumentException("Hash Partitioning 
with key fields only possible on Tuple DataSets");
    +           }
    +           
    +           this.pMethod = pMethod;
    +           this.pKeys = pKeys;
    +   }
    +   
    +   public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod) {
    +           this(input, pMethod, null);
    +   }
    +   
    +   public DataSet<IN> getDataSet() {
    +           return this.dataSet;
    +   }
    +   
    +   
    +   /**
    +    * Applies a Map transformation on a {@link DataSet}.<br/>
    +    * The transformation calls a {@link 
org.apache.flink.api.java.functions.RichMapFunction} for each element of the 
DataSet.
    +    * Each MapFunction call returns exactly one element.
    +    * 
    +    * @param mapper The MapFunction that is called for each element of the 
DataSet.
    +    * @return A MapOperator that represents the transformed DataSet.
    +    * 
    +    * @see org.apache.flink.api.java.functions.RichMapFunction
    +    * @see MapOperator
    +    * @see DataSet
    +    */
    +   public <R> MapOperator<IN, R> map(MapFunction<IN, R> mapper) {
    +           if (mapper == null) {
    +                   throw new NullPointerException("Map function must not 
be null.");
    +           }
    +           if (FunctionUtils.isLambdaFunction(mapper)) {
    +                   throw new UnsupportedLambdaExpressionException();
    +           }
    +           return new MapOperator<IN, R>(this, mapper);
    +   }
    +
    +    /**
    +     * Applies a Map-style operation to the entire partition of the data.
    +    * The function is called once per parallel partition of the data,
    +    * and the entire partition is available through the given Iterator.
    +    * The number of elements that each instance of the MapPartition 
function
    +    * sees is non deterministic and depends on the degree of parallelism 
of the operation.
    +    *
    +    * This function is intended for operations that cannot transform 
individual elements,
    +    * requires no grouping of elements. To transform individual elements,
    +    * the use of {@code map()} and {@code flatMap()} is preferable.
    +    *
    +    * @param mapPartition The MapPartitionFunction that is called for the 
full DataSet.
    +     * @return A MapPartitionOperator that represents the transformed 
DataSet.
    --- End diff --
    
    It seems that Checkstyle is not checking the indentation in comments.
    These are spaces, not tabs


> Support explicit shuffling of DataSets
> --------------------------------------
>
>                 Key: FLINK-1060
>                 URL: https://issues.apache.org/jira/browse/FLINK-1060
>             Project: Flink
>          Issue Type: Improvement
>          Components: Java API, Optimizer
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>            Priority: Minor
>
> Right now, Flink only shuffles data if it is required by some operation such 
> as Reduce, Join, or CoGroup. There is no way to explicitly shuffle a data set.
> However, in some situations explicit shuffling would be very helpful 
> including:
> - rebalancing before compute-intensive Map operations
> - balancing, random or hash partitioning before PartitionMap operations (see 
> FLINK-1053)
> - better integration of support for HadoopJobs (see FLINK-838)
> With this issue, I propose to add the following methods to {{DataSet}}
> - {{DataSet.partitionHashBy(int...)}} and 
> {{DataSet.partitionHashBy(KeySelector)}} to perform an explicit hash 
> partitioning
> - {{DataSet.partitionRandomly()}} to shuffle data completely random
> - {{DataSet.partitionRoundRobin()}} to shuffle data in a round-robin fashion 
> that generates very even distribution with possible bias due to prior 
> distributions
> The {{DataSet.partitionRoundRobin()}} might not be necessary if we think that 
> random shuffling balances good enough.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to