fhueske commented on issue #6003: [FLINK-9289][Dataset] Parallelism of generated operators should have max parallelism of input URL: https://github.com/apache/flink/pull/6003#issuecomment-411063032 Hi @xccui, I think the problem can be fixed more easily with fewer side effects on tests. As pointed out in the Jira issue, I think can add Union-specific logic to the `KeyFunctions.appendKeyExtractor()` methods. IMO, the best approach would be to add key extractors to all inputs of the union: ``` public static <T, K> org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> appendKeyExtractor( org.apache.flink.api.common.operators.Operator<T> input, SelectorFunctionKeys<T, K> key) { if (input instanceof Union) { // if input is a union, we apply the key extractors recursively to all inputs org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union) input).getFirstInput(); org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union) input).getSecondInput(); org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> firstKeyExtractor = appendKeyExtractor(firstInput, key); org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> secondKeyExtractor = appendKeyExtractor(secondInput, key); return new Union(firstKeyExtractor, secondKeyExtractor, input.getName()); } else { // original implementation } ``` This change for both `appendKeyExtractor()` methods and a test that verifies the correct parallelism of all operators would be sufficient to fix the issue.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
