This is an automated email from the ASF dual-hosted git repository.
xccui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7af933b [FLINK-9289] [Dataset] Parallelism of generated operators
should have the max parallelism of input
7af933b is described below
commit 7af933b78400921ae798b8a882cba8ecff5c94be
Author: Xingcan Cui <[email protected]>
AuthorDate: Sun May 13 20:20:36 2018 +0800
[FLINK-9289] [Dataset] Parallelism of generated operators should have the
max parallelism of input
This closes #6003.
---
.../flink/api/java/operators/KeyFunctions.java | 27 ++++
.../flink/api/java/operators/UnionOperator.java | 7 +
.../translation/UnionTranslationTest.java | 158 +++++++++++++++++++++
.../flink/graph/library/linkanalysis/PageRank.java | 1 -
.../apache/flink/python/api/PythonPlanBinder.java | 2 +-
5 files changed, 193 insertions(+), 2 deletions(-)
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
index f6336cd..3e7a552 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.Union;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
@@ -43,6 +44,19 @@ public class KeyFunctions {
org.apache.flink.api.common.operators.Operator<T> input,
SelectorFunctionKeys<T, K> key) {
+ if (input instanceof Union) {
+ // if input is a union, we apply the key extractors
recursively to all inputs
+ org.apache.flink.api.common.operators.Operator<T>
firstInput = ((Union) input).getFirstInput();
+ org.apache.flink.api.common.operators.Operator<T>
secondInput = ((Union) input).getSecondInput();
+
+
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> firstInputWithKey =
+ appendKeyExtractor(firstInput, key);
+
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> secondInputWithKey
=
+ appendKeyExtractor(secondInput, key);
+
+ return new Union(firstInputWithKey, secondInputWithKey,
input.getName());
+ }
+
TypeInformation<T> inputType = key.getInputType();
TypeInformation<Tuple2<K, T>> typeInfoWithKey =
createTypeWithKey(key);
KeyExtractingMapper<T, K> extractor = new
KeyExtractingMapper(key.getKeyExtractor());
@@ -66,6 +80,19 @@ public class KeyFunctions {
SelectorFunctionKeys<T, K1> key1,
SelectorFunctionKeys<T, K2> key2) {
+ if (input instanceof Union) {
+ // if input is a union, we apply the key extractors
recursively to all inputs
+ org.apache.flink.api.common.operators.Operator<T>
firstInput = ((Union) input).getFirstInput();
+ org.apache.flink.api.common.operators.Operator<T>
secondInput = ((Union) input).getSecondInput();
+
+
org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>>
firstInputWithKey =
+ appendKeyExtractor(firstInput, key1,
key2);
+
org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>>
secondInputWithKey =
+ appendKeyExtractor(secondInput, key1,
key2);
+
+ return new Union(firstInputWithKey, secondInputWithKey,
input.getName());
+ }
+
TypeInformation<T> inputType = key1.getInputType();
TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey =
createTypeWithKey(key1, key2);
TwoKeyExtractingMapper<T, K1, K2> extractor =
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index 0da5e01..7d3c0d6 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -62,4 +62,11 @@ public class UnionOperator<T> extends TwoInputOperator<T, T,
T, UnionOperator<T>
protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T>
input2) {
return new Union<T>(input1, input2, unionLocationName);
}
+
+ @Override
+ public UnionOperator<T> setParallelism(int parallelism) {
+ // Union is not translated to an independent operator but
executed by multiplexing
+ // its input on the following operator. Hence, the parallelism
of a Union cannot be set.
+ throw new UnsupportedOperationException("Cannot set the
parallelism for Union.");
+ }
}
diff --git
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
new file mode 100644
index 0000000..216e37f
--- /dev/null
+++
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of union operation.
+ */
+@SuppressWarnings("serial")
+public class UnionTranslationTest {
+
+ @Test
+ public void translateUnion2Group() {
+ try {
+ final int parallelism = 4;
+ ExecutionEnvironment env =
ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>>
dataset1 = getSourceDataSet(env, 3);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>>
dataset2 = getSourceDataSet(env, 2);
+
+ dataset1.union(dataset2)
+ .groupBy((KeySelector<Tuple3<Double,
StringValue, LongValue>, String>) value -> "")
+
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>,
String>) (values, out) -> {})
+ .returns(String.class)
+ .output(new DiscardingOutputFormat<>());
+
+ Plan p = env.createProgramPlan();
+
+ // The plan should look like the following one.
+ //
+ // DataSet1(3) - MapOperator(3)-+
+ // |- Union(-1) -
SingleInputOperator - Sink
+ // DataSet2(2) - MapOperator(2)-+
+
+ GenericDataSinkBase<?> sink =
p.getDataSinks().iterator().next();
+ Union unionOperator = (Union) ((SingleInputOperator)
sink.getInput()).getInput();
+
+ // The key mappers should be added to both of the two
input streams for union.
+ assertTrue(unionOperator.getFirstInput() instanceof
MapOperatorBase<?, ?, ?>);
+ assertTrue(unionOperator.getSecondInput() instanceof
MapOperatorBase<?, ?, ?>);
+
+ // The parallelisms of the key mappers should be equal
to those of their inputs.
+
assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
+
assertEquals(unionOperator.getSecondInput().getParallelism(), 2);
+
+ // The union should always have the default parallelism.
+ assertEquals(unionOperator.getParallelism(),
ExecutionConfig.PARALLELISM_DEFAULT);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void translateUnion3SortedGroup() {
+ try {
+ final int parallelism = 4;
+ ExecutionEnvironment env =
ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>>
dataset1 = getSourceDataSet(env, 2);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>>
dataset2 = getSourceDataSet(env, 3);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>>
dataset3 = getSourceDataSet(env, -1);
+
+ dataset1.union(dataset2).union(dataset3)
+ .groupBy((KeySelector<Tuple3<Double,
StringValue, LongValue>, String>) value -> "")
+ .sortGroup((KeySelector<Tuple3<Double,
StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
+
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>,
String>) (values, out) -> {})
+ .returns(String.class)
+ .output(new DiscardingOutputFormat<>());
+
+ Plan p = env.createProgramPlan();
+
+ // The plan should look like the following one.
+ //
+ // DataSet1(2) - MapOperator(2)-+
+ // |- Union(-1) -+
+ // DataSet2(3) - MapOperator(3)-+ |-
Union(-1) - SingleInputOperator - Sink
+ // |
+ // DataSet3(-1) - MapOperator(-1)-+
+
+ GenericDataSinkBase<?> sink =
p.getDataSinks().iterator().next();
+ Union secondUnionOperator = (Union)
((SingleInputOperator) sink.getInput()).getInput();
+
+ // The first input of the second union should be the
first union.
+ Union firstUnionOperator = (Union)
secondUnionOperator.getFirstInput();
+
+ // The key mapper should be added to the second input
stream of the second union.
+ assertTrue(secondUnionOperator.getSecondInput()
instanceof MapOperatorBase<?, ?, ?>);
+
+ // The key mappers should be added to both of the two
input streams for the first union.
+ assertTrue(firstUnionOperator.getFirstInput()
instanceof MapOperatorBase<?, ?, ?>);
+ assertTrue(firstUnionOperator.getSecondInput()
instanceof MapOperatorBase<?, ?, ?>);
+
+ // The parallelisms of the key mappers should be equal
to those of their inputs.
+
assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
+
assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
+
assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);
+
+ // The union should always have the default parallelism.
+ assertEquals(secondUnionOperator.getParallelism(),
ExecutionConfig.PARALLELISM_DEFAULT);
+ assertEquals(firstUnionOperator.getParallelism(),
ExecutionConfig.PARALLELISM_DEFAULT);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static DataSet<Tuple3<Double, StringValue, LongValue>>
getSourceDataSet(ExecutionEnvironment env, int parallelism) {
+ return env
+ .fromElements(new Tuple3<>(0.0, new
StringValue(""), new LongValue(1L)))
+ .setParallelism(parallelism);
+ }
+}
diff --git
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index d259fac..932ad78 100644
---
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -235,7 +235,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>>
{
// s, adjusted pagerank(s)
DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
.union(sourceVertices)
- .setParallelism(parallelism)
.name("Union with source vertices")
.map(new AdjustScores<>(dampingFactor))
.withBroadcastSet(sumOfScores, SUM_OF_SCORES)
diff --git
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 1182708..4709fa5 100644
---
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -515,7 +515,7 @@ public class PythonPlanBinder {
private <IN> void createUnionOperation(PythonOperationInfo info) {
DataSet<IN> op1 = sets.getDataSet(info.parentID);
DataSet<IN> op2 = sets.getDataSet(info.otherID);
- sets.add(info.setID,
op1.union(op2).setParallelism(info.parallelism).name("Union"));
+ sets.add(info.setID, op1.union(op2).name("Union"));
}
private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo
info, TypeInformation<OUT> type) {