[ 
https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581322#comment-16581322
 ] 

ASF GitHub Bot commented on FLINK-9289:
---------------------------------------

asfgit closed pull request #6003: [FLINK-9289][Dataset] Parallelism of 
generated operators should have max parallelism of input
URL: https://github.com/apache/flink/pull/6003
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
index f6336cde2b7..3e7a552a68f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.Union;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
@@ -43,6 +44,19 @@
                        org.apache.flink.api.common.operators.Operator<T> input,
                        SelectorFunctionKeys<T, K> key) {
 
+               if (input instanceof Union) {
+                       // if input is a union, we apply the key extractors 
recursively to all inputs
+                       org.apache.flink.api.common.operators.Operator<T> 
firstInput = ((Union) input).getFirstInput();
+                       org.apache.flink.api.common.operators.Operator<T> 
secondInput = ((Union) input).getSecondInput();
+
+                       
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> firstInputWithKey =
+                                       appendKeyExtractor(firstInput, key);
+                       
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> secondInputWithKey 
=
+                                       appendKeyExtractor(secondInput, key);
+
+                       return new Union(firstInputWithKey, secondInputWithKey, 
input.getName());
+               }
+
                TypeInformation<T> inputType = key.getInputType();
                TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
createTypeWithKey(key);
                KeyExtractingMapper<T, K> extractor = new 
KeyExtractingMapper(key.getKeyExtractor());
@@ -66,6 +80,19 @@
                        SelectorFunctionKeys<T, K1> key1,
                        SelectorFunctionKeys<T, K2> key2) {
 
+               if (input instanceof Union) {
+                       // if input is a union, we apply the key extractors 
recursively to all inputs
+                       org.apache.flink.api.common.operators.Operator<T> 
firstInput = ((Union) input).getFirstInput();
+                       org.apache.flink.api.common.operators.Operator<T> 
secondInput = ((Union) input).getSecondInput();
+
+                       
org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> 
firstInputWithKey =
+                                       appendKeyExtractor(firstInput, key1, 
key2);
+                       
org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> 
secondInputWithKey =
+                                       appendKeyExtractor(secondInput, key1, 
key2);
+
+                       return new Union(firstInputWithKey, secondInputWithKey, 
input.getName());
+               }
+
                TypeInformation<T> inputType = key1.getInputType();
                TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = 
createTypeWithKey(key1, key2);
                TwoKeyExtractingMapper<T, K1, K2> extractor =
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index 0da5e01a0b3..7d3c0d6fc3f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -62,4 +62,11 @@ public UnionOperator(DataSet<T> input1, DataSet<T> input2, 
String unionLocationN
        protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T> 
input2) {
                return new Union<T>(input1, input2, unionLocationName);
        }
+
+       @Override
+       public UnionOperator<T> setParallelism(int parallelism) {
+               // Union is not translated to an independent operator but 
executed by multiplexing
+               // its input on the following operator. Hence, the parallelism 
of a Union cannot be set.
+               throw new UnsupportedOperationException("Cannot set the 
parallelism for Union.");
+       }
 }
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
new file mode 100644
index 00000000000..216e37f2acf
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of union operation.
+ */
+@SuppressWarnings("serial")
+public class UnionTranslationTest {
+
+       @Test
+       public void translateUnion2Group() {
+               try {
+                       final int parallelism = 4;
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
dataset1 = getSourceDataSet(env, 3);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
dataset2 = getSourceDataSet(env, 2);
+
+                       dataset1.union(dataset2)
+                                       .groupBy((KeySelector<Tuple3<Double, 
StringValue, LongValue>, String>) value -> "")
+                                       
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, 
String>) (values, out) -> {})
+                                       .returns(String.class)
+                                       .output(new DiscardingOutputFormat<>());
+
+                       Plan p = env.createProgramPlan();
+
+                       // The plan should look like the following one.
+                       //
+                       // DataSet1(3) - MapOperator(3)-+
+                       //                                  |- Union(-1) - 
SingleInputOperator - Sink
+                       // DataSet2(2) - MapOperator(2)-+
+
+                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
+                       Union unionOperator = (Union) ((SingleInputOperator) 
sink.getInput()).getInput();
+
+                       // The key mappers should be added to both of the two 
input streams for union.
+                       assertTrue(unionOperator.getFirstInput() instanceof 
MapOperatorBase<?, ?, ?>);
+                       assertTrue(unionOperator.getSecondInput() instanceof 
MapOperatorBase<?, ?, ?>);
+
+                       // The parallelisms of the key mappers should be equal 
to those of their inputs.
+                       
assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
+                       
assertEquals(unionOperator.getSecondInput().getParallelism(), 2);
+
+                       // The union should always have the default parallelism.
+                       assertEquals(unionOperator.getParallelism(), 
ExecutionConfig.PARALLELISM_DEFAULT);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test caused an error: " + e.getMessage());
+               }
+       }
+
+       @Test
+       public void translateUnion3SortedGroup() {
+               try {
+                       final int parallelism = 4;
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
dataset1 = getSourceDataSet(env, 2);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
dataset2 = getSourceDataSet(env, 3);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
dataset3 = getSourceDataSet(env, -1);
+
+                       dataset1.union(dataset2).union(dataset3)
+                                       .groupBy((KeySelector<Tuple3<Double, 
StringValue, LongValue>, String>) value -> "")
+                                       .sortGroup((KeySelector<Tuple3<Double, 
StringValue, LongValue>, String>) value -> "", Order.ASCENDING)
+                                       
.reduceGroup((GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, 
String>) (values, out) -> {})
+                                       .returns(String.class)
+                                       .output(new DiscardingOutputFormat<>());
+
+                       Plan p = env.createProgramPlan();
+
+                       // The plan should look like the following one.
+                       //
+                       // DataSet1(2) - MapOperator(2)-+
+                       //                                  |- Union(-1) -+
+                       // DataSet2(3) - MapOperator(3)-+             |- 
Union(-1) - SingleInputOperator - Sink
+                       //                                            |
+                       //             DataSet3(-1) - MapOperator(-1)-+
+
+                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
+                       Union secondUnionOperator = (Union) 
((SingleInputOperator) sink.getInput()).getInput();
+
+                       // The first input of the second union should be the 
first union.
+                       Union firstUnionOperator = (Union) 
secondUnionOperator.getFirstInput();
+
+                       // The key mapper should be added to the second input 
stream of the second union.
+                       assertTrue(secondUnionOperator.getSecondInput() 
instanceof MapOperatorBase<?, ?, ?>);
+
+                       // The key mappers should be added to both of the two 
input streams for the first union.
+                       assertTrue(firstUnionOperator.getFirstInput() 
instanceof MapOperatorBase<?, ?, ?>);
+                       assertTrue(firstUnionOperator.getSecondInput() 
instanceof MapOperatorBase<?, ?, ?>);
+
+                       // The parallelisms of the key mappers should be equal 
to those of their inputs.
+                       
assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
+                       
assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
+                       
assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);
+
+                       // The union should always have the default parallelism.
+                       assertEquals(secondUnionOperator.getParallelism(), 
ExecutionConfig.PARALLELISM_DEFAULT);
+                       assertEquals(firstUnionOperator.getParallelism(), 
ExecutionConfig.PARALLELISM_DEFAULT);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test caused an error: " + e.getMessage());
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private static DataSet<Tuple3<Double, StringValue, LongValue>> 
getSourceDataSet(ExecutionEnvironment env, int parallelism) {
+               return env
+                               .fromElements(new Tuple3<>(0.0, new 
StringValue(""), new LongValue(1L)))
+                               .setParallelism(parallelism);
+       }
+}
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index d259fac6341..932ad78a1d8 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -235,7 +235,6 @@ protected void 
mergeConfiguration(GraphAlgorithmWrappingBase other) {
                // s, adjusted pagerank(s)
                DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
                        .union(sourceVertices)
-                               .setParallelism(parallelism)
                                .name("Union with source vertices")
                        .map(new AdjustScores<>(dampingFactor))
                                .withBroadcastSet(sumOfScores, SUM_OF_SCORES)
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 1182708bb9f..4709fa52a5a 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -515,7 +515,7 @@ private void createSortOperation(PythonOperationInfo info) {
        private <IN> void createUnionOperation(PythonOperationInfo info) {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
                DataSet<IN> op2 = sets.getDataSet(info.otherID);
-               sets.add(info.setID, 
op1.union(op2).setParallelism(info.parallelism).name("Union"));
+               sets.add(info.setID, op1.union(op2).name("Union"));
        }
 
        private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo 
info, TypeInformation<OUT> type) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Parallelism of generated operators should have max parallism of input
> ---------------------------------------------------------------------
>
>                 Key: FLINK-9289
>                 URL: https://issues.apache.org/jira/browse/FLINK-9289
>             Project: Flink
>          Issue Type: Bug
>          Components: DataSet API
>    Affects Versions: 1.4.2, 1.5.2, 1.6.0
>            Reporter: Fabian Hueske
>            Assignee: Xingcan Cui
>            Priority: Major
>              Labels: pull-request-available
>
> The DataSet API aims to chain generated operators such as key extraction 
> mappers to their predecessor. This is done by assigning the same parallelism 
> as the input operator.
> If a generated operator has more than two inputs, the operator cannot be 
> chained anymore and the operator is generated with default parallelism. This 
> can lead to a {code}NoResourceAvailableException: Not enough free slots 
> available to run the job.{code} as reported by a user on the mailing list: 
> https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E
> I suggest to set the parallelism of a generated operator to the max 
> parallelism of all of its inputs to fix this problem.
> Until the problem is fixed, a workaround is to set the default parallelism at 
> the {{ExecutionEnvironment}}:
> {code}
> ExecutionEnvironment env = ...
> env.setParallelism(2);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to