fhueske commented on issue #6003: [FLINK-9289][Dataset] Parallelism of 
generated operators should have max parallelism of input
URL: https://github.com/apache/flink/pull/6003#issuecomment-411063032
 
 
   Hi @xccui, I think the problem can be fixed more easily with fewer side 
effects on tests. 
   
   As pointed out in the Jira issue, I think can add Union-specific logic to 
the `KeyFunctions.appendKeyExtractor()` methods. IMO, the best approach would 
be to add key extractors to all inputs of the union:
   
   ```
   public static <T, K> 
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> appendKeyExtractor(
       org.apache.flink.api.common.operators.Operator<T> input,
       SelectorFunctionKeys<T, K> key) {
   
     if (input instanceof Union) {
       // if input is a union, we apply the key extractors recursively to all 
inputs
       org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union) 
input).getFirstInput();
       org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union) 
input).getSecondInput();
   
       org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> 
firstKeyExtractor =
         appendKeyExtractor(firstInput, key);
       org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> 
secondKeyExtractor =
         appendKeyExtractor(secondInput, key);
   
       return new Union(firstKeyExtractor, secondKeyExtractor, input.getName());
     } else {
       // original implementation
   }
   ```
   
   This change for both `appendKeyExtractor()` methods and a test that verifies 
the correct parallelism of all operators would be sufficient to fix the issue.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to