[
https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16571680#comment-16571680
]
ASF GitHub Bot commented on FLINK-9289:
---------------------------------------
fhueske commented on issue #6003: [FLINK-9289][Dataset] Parallelism of
generated operators should have max parallelism of input
URL: https://github.com/apache/flink/pull/6003#issuecomment-411063032
Hi @xccui, I think the problem can be fixed more easily with fewer side
effects on tests.
As pointed out in the Jira issue, I think can add Union-specific logic to
the `KeyFunctions.appendKeyExtractor()` methods. IMO, the best approach would
be to add key extractors to all inputs of the union:
```
public static <T, K>
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> appendKeyExtractor(
org.apache.flink.api.common.operators.Operator<T> input,
SelectorFunctionKeys<T, K> key) {
if (input instanceof Union) {
// if input is a union, we apply the key extractors recursively to all
inputs
org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union)
input).getFirstInput();
org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union)
input).getSecondInput();
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>>
firstKeyExtractor =
appendKeyExtractor(firstInput, key);
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>>
secondKeyExtractor =
appendKeyExtractor(secondInput, key);
return new Union(firstKeyExtractor, secondKeyExtractor, input.getName());
} else {
// original implementation
}
```
This change for both `appendKeyExtractor()` methods and a test that verifies
the correct parallelism of all operators would be sufficient to fix the issue.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Parallelism of generated operators should have max parallism of input
> ---------------------------------------------------------------------
>
> Key: FLINK-9289
> URL: https://issues.apache.org/jira/browse/FLINK-9289
> Project: Flink
> Issue Type: Bug
> Components: DataSet API
> Affects Versions: 1.5.0, 1.4.2, 1.6.0
> Reporter: Fabian Hueske
> Assignee: Xingcan Cui
> Priority: Major
> Labels: pull-request-available
>
> The DataSet API aims to chain generated operators such as key extraction
> mappers to their predecessor. This is done by assigning the same parallelism
> as the input operator.
> If a generated operator has more than two inputs, the operator cannot be
> chained anymore and the operator is generated with default parallelism. This
> can lead to a {code}NoResourceAvailableException: Not enough free slots
> available to run the job.{code} as reported by a user on the mailing list:
> https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E
> I suggest to set the parallelism of a generated operator to the max
> parallelism of all of its inputs to fix this problem.
> Until the problem is fixed, a workaround is to set the default parallelism at
> the {{ExecutionEnvironment}}:
> {code}
> ExecutionEnvironment env = ...
> env.setParallelism(2);
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)