[
https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581322#comment-16581322
]
ASF GitHub Bot commented on FLINK-9289:
---------------------------------------
asfgit closed pull request #6003: [FLINK-9289][Dataset] Parallelism of
generated operators should have max parallelism of input
URL: https://github.com/apache/flink/pull/6003
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
index f6336cde2b7..3e7a552a68f 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.Union;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
@@ -43,6 +44,19 @@
org.apache.flink.api.common.operators.Operator<T> input,
SelectorFunctionKeys<T, K> key) {
+ if (input instanceof Union) {
+ // if input is a union, we apply the key extractors
recursively to all inputs
+ org.apache.flink.api.common.operators.Operator<T>
firstInput = ((Union) input).getFirstInput();
+ org.apache.flink.api.common.operators.Operator<T>
secondInput = ((Union) input).getSecondInput();
+
+
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> firstInputWithKey =
+ appendKeyExtractor(firstInput, key);
+
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> secondInputWithKey
=
+ appendKeyExtractor(secondInput, key);
+
+ return new Union(firstInputWithKey, secondInputWithKey,
input.getName());
+ }
+
TypeInformation<T> inputType = key.getInputType();
TypeInformation<Tuple2<K, T>> typeInfoWithKey =
createTypeWithKey(key);
KeyExtractingMapper<T, K> extractor = new
KeyExtractingMapper(key.getKeyExtractor());
@@ -66,6 +80,19 @@
SelectorFunctionKeys<T, K1> key1,
SelectorFunctionKeys<T, K2> key2) {
+ if (input instanceof Union) {
+ // if input is a union, we apply the key extractors
recursively to all inputs
+ org.apache.flink.api.common.operators.Operator<T>
firstInput = ((Union) input).getFirstInput();
+ org.apache.flink.api.common.operators.Operator<T>
secondInput = ((Union) input).getSecondInput();
+
+
org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>>
firstInputWithKey =
+ appendKeyExtractor(firstInput, key1,
key2);
+
org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>>
secondInputWithKey =
+ appendKeyExtractor(secondInput, key1,
key2);
+
+ return new Union(firstInputWithKey, secondInputWithKey,
input.getName());
+ }
+
TypeInformation<T> inputType = key1.getInputType();
TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey =
createTypeWithKey(key1, key2);
TwoKeyExtractingMapper<T, K1, K2> extractor =
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index 0da5e01a0b3..7d3c0d6fc3f 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -62,4 +62,11 @@ public UnionOperator(DataSet<T> input1, DataSet<T> input2,
String unionLocationN
protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T>
input2) {
return new Union<T>(input1, input2, unionLocationName);
}
+
+ @Override
+ public UnionOperator<T> setParallelism(int parallelism) {
+ // Union is not translated to an independent operator but
executed by multiplexing
+ // its input on the following operator. Hence, the parallelism
of a Union cannot be set.
+ throw new UnsupportedOperationException("Cannot set the
parallelism for Union.");
+ }
}
diff --git
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
new file mode 100644
index 00000000000..216e37f2acf
--- /dev/null
+++
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of union operation.
+ */
+@SuppressWarnings("serial")
+public class UnionTranslationTest {
+
+ @Test
+ public void translateUnion2Group() {
+ try {
+ final int parallelism = 4;
+ ExecutionEnvironment env =
ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>>
dataset1 = getSourceDataSet(env, 3);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>>
dataset2 = getSourceDataSet(env, 2);
+
+ dataset1.union(dataset2)
+ .groupBy((KeySelector<Tuple3<Double,
StringValue, LongValue>, String>) value -> "")
+
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>,
String>) (values, out) -> {})
+ .returns(String.class)
+ .output(new DiscardingOutputFormat<>());
+
+ Plan p = env.createProgramPlan();
+
+ // The plan should look like the following one.
+ //
+ // DataSet1(3) - MapOperator(3)-+
+ // |- Union(-1) -
SingleInputOperator - Sink
+ // DataSet2(2) - MapOperator(2)-+
+
+ GenericDataSinkBase<?> sink =
p.getDataSinks().iterator().next();
+ Union unionOperator = (Union) ((SingleInputOperator)
sink.getInput()).getInput();
+
+ // The key mappers should be added to both of the two
input streams for union.
+ assertTrue(unionOperator.getFirstInput() instanceof
MapOperatorBase<?, ?, ?>);
+ assertTrue(unionOperator.getSecondInput() instanceof
MapOperatorBase<?, ?, ?>);
+
+ // The parallelisms of the key mappers should be equal
to those of their inputs.
+
assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
+
assertEquals(unionOperator.getSecondInput().getParallelism(), 2);
+
+ // The union should always have the default parallelism.
+ assertEquals(unionOperator.getParallelism(),
ExecutionConfig.PARALLELISM_DEFAULT);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void translateUnion3SortedGroup() {
+ try {
+ final int parallelism = 4;
+ ExecutionEnvironment env =
ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>>
dataset1 = getSourceDataSet(env, 2);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>>
dataset2 = getSourceDataSet(env, 3);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>>
dataset3 = getSourceDataSet(env, -1);
+
+ dataset1.union(dataset2).union(dataset3)
+ .groupBy((KeySelector<Tuple3<Double,
StringValue, LongValue>, String>) value -> "")
+ .sortGroup((KeySelector<Tuple3<Double,
StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
+
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>,
String>) (values, out) -> {})
+ .returns(String.class)
+ .output(new DiscardingOutputFormat<>());
+
+ Plan p = env.createProgramPlan();
+
+ // The plan should look like the following one.
+ //
+ // DataSet1(2) - MapOperator(2)-+
+ // |- Union(-1) -+
+ // DataSet2(3) - MapOperator(3)-+ |-
Union(-1) - SingleInputOperator - Sink
+ // |
+ // DataSet3(-1) - MapOperator(-1)-+
+
+ GenericDataSinkBase<?> sink =
p.getDataSinks().iterator().next();
+ Union secondUnionOperator = (Union)
((SingleInputOperator) sink.getInput()).getInput();
+
+ // The first input of the second union should be the
first union.
+ Union firstUnionOperator = (Union)
secondUnionOperator.getFirstInput();
+
+ // The key mapper should be added to the second input
stream of the second union.
+ assertTrue(secondUnionOperator.getSecondInput()
instanceof MapOperatorBase<?, ?, ?>);
+
+ // The key mappers should be added to both of the two
input streams for the first union.
+ assertTrue(firstUnionOperator.getFirstInput()
instanceof MapOperatorBase<?, ?, ?>);
+ assertTrue(firstUnionOperator.getSecondInput()
instanceof MapOperatorBase<?, ?, ?>);
+
+ // The parallelisms of the key mappers should be equal
to those of their inputs.
+
assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
+
assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
+
assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);
+
+ // The union should always have the default parallelism.
+ assertEquals(secondUnionOperator.getParallelism(),
ExecutionConfig.PARALLELISM_DEFAULT);
+ assertEquals(firstUnionOperator.getParallelism(),
ExecutionConfig.PARALLELISM_DEFAULT);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static DataSet<Tuple3<Double, StringValue, LongValue>>
getSourceDataSet(ExecutionEnvironment env, int parallelism) {
+ return env
+ .fromElements(new Tuple3<>(0.0, new
StringValue(""), new LongValue(1L)))
+ .setParallelism(parallelism);
+ }
+}
diff --git
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index d259fac6341..932ad78a1d8 100644
---
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -235,7 +235,6 @@ protected void
mergeConfiguration(GraphAlgorithmWrappingBase other) {
// s, adjusted pagerank(s)
DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
.union(sourceVertices)
- .setParallelism(parallelism)
.name("Union with source vertices")
.map(new AdjustScores<>(dampingFactor))
.withBroadcastSet(sumOfScores, SUM_OF_SCORES)
diff --git
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 1182708bb9f..4709fa52a5a 100644
---
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -515,7 +515,7 @@ private void createSortOperation(PythonOperationInfo info) {
private <IN> void createUnionOperation(PythonOperationInfo info) {
DataSet<IN> op1 = sets.getDataSet(info.parentID);
DataSet<IN> op2 = sets.getDataSet(info.otherID);
- sets.add(info.setID,
op1.union(op2).setParallelism(info.parallelism).name("Union"));
+ sets.add(info.setID, op1.union(op2).name("Union"));
}
private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo
info, TypeInformation<OUT> type) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Parallelism of generated operators should have max parallism of input
> ---------------------------------------------------------------------
>
> Key: FLINK-9289
> URL: https://issues.apache.org/jira/browse/FLINK-9289
> Project: Flink
> Issue Type: Bug
> Components: DataSet API
> Affects Versions: 1.4.2, 1.5.2, 1.6.0
> Reporter: Fabian Hueske
> Assignee: Xingcan Cui
> Priority: Major
> Labels: pull-request-available
>
> The DataSet API aims to chain generated operators such as key extraction
> mappers to their predecessor. This is done by assigning the same parallelism
> as the input operator.
> If a generated operator has more than two inputs, the operator cannot be
> chained anymore and the operator is generated with default parallelism. This
> can lead to a {code}NoResourceAvailableException: Not enough free slots
> available to run the job.{code} as reported by a user on the mailing list:
> https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E
> I suggest to set the parallelism of a generated operator to the max
> parallelism of all of its inputs to fix this problem.
> Until the problem is fixed, a workaround is to set the default parallelism at
> the {{ExecutionEnvironment}}:
> {code}
> ExecutionEnvironment env = ...
> env.setParallelism(2);
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)