[
https://issues.apache.org/jira/browse/FLINK-1060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14117561#comment-14117561
]
ASF GitHub Bot commented on FLINK-1060:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/108#discussion_r16961223
--- Diff:
flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionedDataSet.java
---
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
+import
org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
+import org.apache.flink.api.java.DataSet;
+import
org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.TypeInformation;
+
+public class PartitionedDataSet<IN> {
+
+ private final DataSet<IN> dataSet;
+
+ private final Keys<IN> pKeys;
+ private final PartitionMethod pMethod;
+
+ public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod,
Keys<IN> pKeys) {
+ this.dataSet = input;
+
+ if(pMethod == PartitionMethod.HASH && pKeys == null) {
+ throw new IllegalArgumentException("Hash Partitioning
requires keys");
+ } else if(pMethod == PartitionMethod.RANGE) {
+ throw new UnsupportedOperationException("Range
Partitioning not yet supported");
+ }
+
+ if(pKeys instanceof Keys.FieldPositionKeys<?> &&
!input.getType().isTupleType()) {
+ throw new IllegalArgumentException("Hash Partitioning
with key fields only possible on Tuple DataSets");
+ }
+
+ this.pMethod = pMethod;
+ this.pKeys = pKeys;
+ }
+
+ public PartitionedDataSet(DataSet<IN> input, PartitionMethod pMethod) {
+ this(input, pMethod, null);
+ }
+
+ public DataSet<IN> getDataSet() {
+ return this.dataSet;
+ }
+
+
+ /**
+ * Applies a Map transformation on a {@link DataSet}.<br/>
+ * The transformation calls a {@link
org.apache.flink.api.java.functions.RichMapFunction} for each element of the
DataSet.
+ * Each MapFunction call returns exactly one element.
+ *
+ * @param mapper The MapFunction that is called for each element of the
DataSet.
+ * @return A MapOperator that represents the transformed DataSet.
+ *
+ * @see org.apache.flink.api.java.functions.RichMapFunction
+ * @see MapOperator
+ * @see DataSet
+ */
+ public <R> MapOperator<IN, R> map(MapFunction<IN, R> mapper) {
+ if (mapper == null) {
+ throw new NullPointerException("Map function must not
be null.");
+ }
+ if (FunctionUtils.isLambdaFunction(mapper)) {
+ throw new UnsupportedLambdaExpressionException();
+ }
+ return new MapOperator<IN, R>(this, mapper);
+ }
+
+ /**
+ * Applies a Map-style operation to the entire partition of the data.
+ * The function is called once per parallel partition of the data,
+ * and the entire partition is available through the given Iterator.
+ * The number of elements that each instance of the MapPartition
function
+ * sees is non deterministic and depends on the degree of parallelism
of the operation.
+ *
+ * This function is intended for operations that cannot transform
individual elements,
+ * requires no grouping of elements. To transform individual elements,
+ * the use of {@code map()} and {@code flatMap()} is preferable.
+ *
+ * @param mapPartition The MapPartitionFunction that is called for the
full DataSet.
+ * @return A MapPartitionOperator that represents the transformed
DataSet.
--- End diff --
It seems that Checkstyle is not checking the indentation in comments.
These are spaces, not tabs
> Support explicit shuffling of DataSets
> --------------------------------------
>
> Key: FLINK-1060
> URL: https://issues.apache.org/jira/browse/FLINK-1060
> Project: Flink
> Issue Type: Improvement
> Components: Java API, Optimizer
> Reporter: Fabian Hueske
> Assignee: Fabian Hueske
> Priority: Minor
>
> Right now, Flink only shuffles data if it is required by some operation such
> as Reduce, Join, or CoGroup. There is no way to explicitly shuffle a data set.
> However, in some situations explicit shuffling would be very helpful
> including:
> - rebalancing before compute-intensive Map operations
> - balancing, random or hash partitioning before PartitionMap operations (see
> FLINK-1053)
> - better integration of support for HadoopJobs (see FLINK-838)
> With this issue, I propose to add the following methods to {{DataSet}}
> - {{DataSet.partitionHashBy(int...)}} and
> {{DataSet.partitionHashBy(KeySelector)}} to perform an explicit hash
> partitioning
> - {{DataSet.partitionRandomly()}} to shuffle data completely random
> - {{DataSet.partitionRoundRobin()}} to shuffle data in a round-robin fashion
> that generates very even distribution with possible bias due to prior
> distributions
> The {{DataSet.partitionRoundRobin()}} might not be necessary if we think that
> random shuffling balances good enough.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)