godfreyhe commented on a change in pull request #18785:
URL: https://github.com/apache/flink/pull/18785#discussion_r808121812
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
##########
@@ -136,25 +154,38 @@ public String getDescription() {
parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
break;
case KEEP_INPUT_AS_IS:
- RequiredDistribution inputDistribution =
- ((KeepInputAsIsDistribution)
requiredDistribution).getInputDistribution();
- checkArgument(
- inputDistribution instanceof HashDistribution,
- "Only HashDistribution is supported now");
- partitioner =
- new ForwardForConsecutiveHashPartitioner<>(
- createHashPartitioner(
- ((HashDistribution) inputDistribution),
- inputType,
- planner));
+ KeepInputAsIsDistribution keepInputAsIsDistribution =
+ (KeepInputAsIsDistribution) requiredDistribution;
+ if (keepInputAsIsDistribution.isStrict()) {
+ // explicitly use ForwardPartitioner to guarantee the data
distribution is
+ // exactly the same as input
+ partitioner = new ForwardPartitioner<>();
+ requireUndefinedExchangeMode = true;
+ } else {
+ RequiredDistribution inputDistribution =
+ ((KeepInputAsIsDistribution) requiredDistribution)
+ .getInputDistribution();
+ checkArgument(
+ inputDistribution instanceof HashDistribution,
+ "Only HashDistribution is supported now");
+ partitioner =
+ new ForwardForConsecutiveHashPartitioner<>(
+ createHashPartitioner(
+ ((HashDistribution)
inputDistribution),
+ inputType,
+ planner));
+ }
parallelism = inputTransform.getParallelism();
break;
default:
throw new TableException(distributionType + "is not supported
now!");
}
final StreamExchangeMode exchangeMode =
- getBatchStreamExchangeMode(planner.getConfiguration(),
requiredExchangeMode);
+ requireUndefinedExchangeMode
+ ? StreamExchangeMode.UNDEFINED
Review comment:
consider the chain logic in `StreamingJobGraphGenerator`, the exchange
mode of forward partitioner should not be `BATCH`, otherwise the operators can
be chained. while for `ForwardForConsecutiveHashPartitioner`, if it is
converted to hash shuffle and the exchange mode is set as `ALL_EDGES_BLOCKING`
through `table.exec.shuffle-mode`, its final shuffle mode should be batch. the
chain logic for ForwardForConsecutiveHashPartitioner will be updated in
FLINK-26168.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]