lsyldliu commented on code in PR #23282:
URL: https://github.com/apache/flink/pull/23282#discussion_r1308505600
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DynamicFilteringDependencyProcessor.java:
##########
@@ -52,50 +55,91 @@ public class DynamicFilteringDependencyProcessor implements
ExecNodeGraphProcess
@Override
public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext
context) {
- ExecNodeGraph factSideProcessedGraph =
checkIfFactSourceNeedEnforceDependency(execGraph);
+ ExecNodeGraph factSideProcessedGraph =
+ checkIfFactSourceNeedEnforceDependency(execGraph, context);
return enforceDimSideBlockingExchange(factSideProcessedGraph, context);
}
- private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(ExecNodeGraph
execGraph) {
- Map<BatchExecTableSourceScan, List<ExecNode<?>>>
dynamicFilteringScanDescendants =
+ private ExecNodeGraph checkIfFactSourceNeedEnforceDependency(
+ ExecNodeGraph execGraph, ProcessorContext context) {
+ Map<BatchExecTableSourceScan, List<DescendantInfo>>
dynamicFilteringScanDescendants =
new HashMap<>();
AbstractExecNodeExactlyOnceVisitor dynamicFilteringScanCollector =
new AbstractExecNodeExactlyOnceVisitor() {
@Override
protected void visitNode(ExecNode<?> node) {
- node.getInputEdges().stream()
- .map(ExecEdge::getSource)
- .forEach(
- input -> {
- // The character of the dynamic
filter scan is that it
- // has an input.
- if (input instanceof
BatchExecTableSourceScan
- &&
input.getInputEdges().size() > 0) {
- dynamicFilteringScanDescendants
- .computeIfAbsent(
-
(BatchExecTableSourceScan) input,
- ignored -> new
ArrayList<>())
- .add(node);
- }
- });
+ for (int i = 0; i < node.getInputEdges().size(); ++i) {
+ ExecEdge edge = node.getInputEdges().get(i);
+ ExecNode<?> input = edge.getSource();
+
+ // The character of the dynamic filter scan is
that it
+ // has an input.
+ if (input instanceof BatchExecTableSourceScan
+ && input.getInputEdges().size() > 0) {
+ dynamicFilteringScanDescendants
+ .computeIfAbsent(
+ (BatchExecTableSourceScan)
input,
+ ignored -> new ArrayList<>())
+ .add(new DescendantInfo(node, i));
+ }
+ }
visitInputs(node);
}
};
execGraph.getRootNodes().forEach(node ->
node.accept(dynamicFilteringScanCollector));
- for (Map.Entry<BatchExecTableSourceScan, List<ExecNode<?>>> entry :
+ for (Map.Entry<BatchExecTableSourceScan, List<DescendantInfo>> entry :
dynamicFilteringScanDescendants.entrySet()) {
- if (entry.getValue().size() == 1) {
- ExecNode<?> next = entry.getValue().get(0);
- if (next instanceof BatchExecMultipleInput) {
- // the source can be chained with BatchExecMultipleInput
- continue;
- }
- }
- // otherwise we need dependencies
- entry.getKey().setNeedDynamicFilteringDependency(true);
+ BatchExecTableSourceScan tableSourceScan = entry.getKey();
+ BatchExecDynamicFilteringDataCollector
dynamicFilteringDataCollector =
+ getDynamicFilteringDataCollector(tableSourceScan);
+
+ // Add exchange between collector and enforcer
+ BatchExecExchange exchange =
+ new BatchExecExchange(
+ context.getPlanner().getTableConfig(),
+ InputProperty.builder()
+
.requiredDistribution(InputProperty.ANY_DISTRIBUTION)
+
.damBehavior(InputProperty.DamBehavior.BLOCKING)
+ .build(),
+ (RowType)
dynamicFilteringDataCollector.getOutputType(),
+ "Exchange");
+ exchange.setRequiredExchangeMode(StreamExchangeMode.BATCH);
+ exchange.setInputEdges(
+ Collections.singletonList(
+ ExecEdge.builder()
+ .source(dynamicFilteringDataCollector)
+ .target(exchange)
+ .build()));
+
+ // set enforcer inputs
+ BatchExecExecutionOrderEnforcer enforcer =
+ new BatchExecExecutionOrderEnforcer(
+ context.getPlanner().getTableConfig(),
+ Arrays.asList(
+ exchange.getInputProperties().get(0),
InputProperty.DEFAULT),
+ tableSourceScan.getOutputType(),
+ "OrderEnforcer");
+ ExecEdge edge1 =
ExecEdge.builder().source(exchange).target(enforcer).build();
+ ExecEdge edge2 =
ExecEdge.builder().source(tableSourceScan).target(enforcer).build();
+ enforcer.setInputEdges(Arrays.asList(edge1, edge2));
+
+ // we clear the input of tableSourceScan to avoid cycle in exec
plan
+ tableSourceScan.setInputEdges(Collections.emptyList());
+ tableSourceScan.setInputProperties(Collections.emptyList());
Review Comment:
I think it would be better if we new a new TableScan here instead of
changing the final `inputProperties `.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]