This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc01ed7376441853f8c0ae2d32a44a8069dbdb1d
Author: lincoln lee <[email protected]>
AuthorDate: Fri Aug 9 17:36:33 2024 +0800

    [refactor][table-planner] Separate specific node logic from visit block for 
StreamNonDeterministicUpdatePlanVisitor to improve readability
---
 .../StreamNonDeterministicUpdatePlanVisitor.java   | 1089 +++++++++++---------
 1 file changed, 576 insertions(+), 513 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
index f8ccd8fd436..34e5d487a98 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
@@ -155,518 +155,45 @@ public class StreamNonDeterministicUpdatePlanVisitor {
     public StreamPhysicalRel visit(
             final StreamPhysicalRel rel, final ImmutableBitSet 
requireDeterminism) {
         if (rel instanceof StreamPhysicalSink) {
-            if (inputInsertOnly(rel)) {
-                // for append stream, not care about NDU
-                return transmitDeterminismRequirement(rel, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // for update streaming, when
-                // 1. sink with pk:
-                // upsert sink, update by pk, ideally pk == input.upsertKey,
-                // (otherwise upsertMaterialize will handle it)
-
-                // 1.1 input.upsertKey nonEmpty -> not care about NDU
-                // 1.2 input.upsertKey isEmpty -> retract by complete row, 
must not contain NDU
-
-                // once sink's requirement on pk was satisfied, no further 
request will be transited
-                // only when new requirement generated at stateful node which 
input has update
-                // (e.g., grouping keys)
-
-                // 2. sink without pk:
-                // retract sink, retract by complete row (all input columns 
should be deterministic)
-                // whether input.upsertKey is empty or not, must not contain 
NDU
-                StreamPhysicalSink sink = (StreamPhysicalSink) rel;
-                int[] primaryKey =
-                        
sink.contextResolvedTable().getResolvedSchema().getPrimaryKeyIndexes();
-                ImmutableBitSet requireInputDeterminism;
-                if (sink.upsertMaterialize() || null == primaryKey || 
primaryKey.length == 0) {
-                    // SinkUpsertMaterializer only support no upsertKey mode, 
it says all input
-                    // columns should be deterministic (same as no primary key 
defined on sink)
-                    // TODO should optimize it after SinkUpsertMaterializer 
support upsertKey
-                    // FLINK-28569.
-                    requireInputDeterminism =
-                            
ImmutableBitSet.range(sink.getInput().getRowType().getFieldCount());
-                } else {
-                    requireInputDeterminism = ImmutableBitSet.of(primaryKey);
-                }
-                return transmitDeterminismRequirement(sink, 
requireInputDeterminism);
-            }
+            return visitSink((StreamPhysicalSink) rel, requireDeterminism);
         } else if (rel instanceof StreamPhysicalLegacySink<?>) {
-            if (inputInsertOnly(rel)) {
-                // for append stream, not care about NDU
-                return transmitDeterminismRequirement(rel, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                StreamPhysicalLegacySink<?> sink = 
(StreamPhysicalLegacySink<?>) rel;
-                TableSchema tableSchema = sink.sink().getTableSchema();
-                Optional<UniqueConstraint> primaryKey = 
tableSchema.getPrimaryKey();
-                List<String> columns = 
Arrays.asList(tableSchema.getFieldNames());
-                // SinkUpsertMaterializer does not support legacy sink
-                ImmutableBitSet requireInputDeterminism;
-                if (primaryKey.isPresent()) {
-                    requireInputDeterminism =
-                            ImmutableBitSet.of(
-                                    primaryKey.get().getColumns().stream()
-                                            .map(columns::indexOf)
-                                            .collect(Collectors.toList()));
-                } else {
-                    requireInputDeterminism = 
ImmutableBitSet.range(columns.size());
-                }
-                return transmitDeterminismRequirement(rel, 
requireInputDeterminism);
-            }
+            return visitLegacySink((StreamPhysicalLegacySink<?>) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalCalcBase) {
-            if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) {
-                // for append stream, not care about NDU
-                return transmitDeterminismRequirement(rel, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // if input has updates, any non-deterministic conditions are 
not acceptable, also
-                // requireDeterminism should be satisfied.
-                StreamPhysicalCalcBase calc = (StreamPhysicalCalcBase) rel;
-                checkNonDeterministicRexProgram(requireDeterminism, 
calc.getProgram(), calc);
-
-                // evaluate required determinism from input
-                List<RexNode> projects =
-                        calc.getProgram().getProjectList().stream()
-                                .map(expr -> 
calc.getProgram().expandLocalRef(expr))
-                                .collect(Collectors.toList());
-                Map<Integer, List<Integer>> outFromSourcePos = 
extractSourceMapping(projects);
-                List<Integer> conv2Inputs =
-                        requireDeterminism.toList().stream()
-                                .map(
-                                        out ->
-                                                
Optional.ofNullable(outFromSourcePos.get(out))
-                                                        .orElseThrow(
-                                                                () ->
-                                                                        new 
TableException(
-                                                                               
 String.format(
-                                                                               
         "Invalid pos:%d over projection:%s",
-                                                                               
         out,
-                                                                               
         calc
-                                                                               
                 .getProgram()))))
-                                .flatMap(Collection::stream)
-                                .filter(index -> index != -1)
-                                .distinct()
-                                .collect(Collectors.toList());
-
-                return transmitDeterminismRequirement(calc, 
ImmutableBitSet.of(conv2Inputs));
-            }
+            return visitCalc((StreamPhysicalCalcBase) rel, requireDeterminism);
         } else if (rel instanceof StreamPhysicalCorrelateBase) {
-            if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) {
-                return transmitDeterminismRequirement(rel, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // check if non-deterministic condition (may exist after 
FLINK-7865 was fixed).
-                StreamPhysicalCorrelateBase correlate = 
(StreamPhysicalCorrelateBase) rel;
-                if (correlate.condition().isDefined()) {
-                    RexNode rexNode = correlate.condition().get();
-                    checkNonDeterministicCondition(rexNode, correlate);
-                }
-                // check if it is a non-deterministic function
-                int leftFieldCnt = 
correlate.inputRel().getRowType().getFieldCount();
-                Optional<String> ndCall =
-                        
FlinkRexUtil.getNonDeterministicCallName(correlate.scan().getCall());
-                if (ndCall.isPresent()) {
-                    // all columns from table function scan cannot satisfy the 
required determinism
-                    List<Integer> unsatisfiedColumns =
-                            requireDeterminism.toList().stream()
-                                    .filter(index -> index >= leftFieldCnt)
-                                    .collect(Collectors.toList());
-                    if (!unsatisfiedColumns.isEmpty()) {
-                        throwNonDeterministicColumnsError(
-                                unsatisfiedColumns,
-                                correlate.getRowType(),
-                                correlate,
-                                null,
-                                ndCall);
-                    }
-                }
-                // evaluate required determinism from input
-                List<Integer> fromLeft =
-                        requireDeterminism.toList().stream()
-                                .filter(index -> index < leftFieldCnt)
-                                .collect(Collectors.toList());
-                if (fromLeft.isEmpty()) {
-                    return transmitDeterminismRequirement(correlate, 
NO_REQUIRED_DETERMINISM);
-                }
-                return transmitDeterminismRequirement(correlate, 
ImmutableBitSet.of(fromLeft));
-            }
-
+            return visitCorrelate((StreamPhysicalCorrelateBase) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalLookupJoin) {
-            if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) {
-                return transmitDeterminismRequirement(rel, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                /**
-                 * if input has updates, the lookup join may produce 
non-deterministic result itself
-                 * due to backed lookup source which data may change over 
time, we can try to
-                 * eliminate this non-determinism by adding materialization to 
the join operator,
-                 * but still exists non-determinism we cannot solve: 1. join 
condition 2. the inner
-                 * calc in lookJoin.
-                 */
-                StreamPhysicalLookupJoin lookupJoin = 
(StreamPhysicalLookupJoin) rel;
-
-                // required determinism cannot be satisfied even upsert 
materialize was enabled if:
-                // 1. remaining join condition contains non-deterministic call
-                
JavaScalaConversionUtil.toJava(lookupJoin.finalPreFilterCondition())
-                        .ifPresent(cond -> 
checkNonDeterministicCondition(cond, lookupJoin));
-                
JavaScalaConversionUtil.toJava(lookupJoin.finalRemainingCondition())
-                        .ifPresent(cond -> 
checkNonDeterministicCondition(cond, lookupJoin));
-
-                // 2. inner calc in lookJoin contains either non-deterministic 
condition or calls
-                
JavaScalaConversionUtil.toJava(lookupJoin.calcOnTemporalTable())
-                        .ifPresent(
-                                calc ->
-                                        checkNonDeterministicRexProgram(
-                                                requireDeterminism, calc, 
lookupJoin));
-
-                // Try to resolve non-determinism by adding materialization 
which can eliminate
-                // non-determinism produced by lookup join via an evolving 
source.
-                int leftFieldCnt = 
lookupJoin.getInput().getRowType().getFieldCount();
-                List<Integer> requireRight =
-                        requireDeterminism.toList().stream()
-                                .filter(index -> index >= leftFieldCnt)
-                                .collect(Collectors.toList());
-                boolean omitUpsertMaterialize = false;
-                // two optimizations: 1. no fields from lookup source was 
required 2. lookup key
-                // contains pk and no requirement on other fields we can omit 
materialization,
-                // otherwise upsert materialize can not be omitted.
-                if (requireRight.isEmpty()) {
-                    omitUpsertMaterialize = true;
-                } else {
-                    int[] outputPkIdx = 
lookupJoin.getOutputIndexesOfTemporalTablePrimaryKey();
-                    ImmutableBitSet outputPkBitSet = 
ImmutableBitSet.of(outputPkIdx);
-                    // outputPkIdx need to used so not using 
#lookupKeyContainsPrimaryKey directly.
-                    omitUpsertMaterialize =
-                            Arrays.stream(outputPkIdx)
-                                            .allMatch(
-                                                    index ->
-                                                            lookupJoin
-                                                                    
.allLookupKeys()
-                                                                    
.contains(index))
-                                    && 
requireRight.stream().allMatch(outputPkBitSet::get);
-                }
-                List<Integer> requireLeft =
-                        requireDeterminism.toList().stream()
-                                .filter(index -> index < leftFieldCnt)
-                                .collect(Collectors.toList());
-
-                if (omitUpsertMaterialize) {
-                    return transmitDeterminismRequirement(
-                            lookupJoin, ImmutableBitSet.of(requireLeft));
-                } else {
-                    // enable materialize for lookup join
-                    return transmitDeterminismRequirement(
-                            lookupJoin.copy(true), 
ImmutableBitSet.of(requireLeft));
-                }
-            }
+            return visitLookupJoin((StreamPhysicalLookupJoin) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalTableSourceScan) {
-            // tableScan has no input, so only check metadata from cdc source
-            if (!requireDeterminism.isEmpty()) {
-                StreamPhysicalTableSourceScan tableScan = 
(StreamPhysicalTableSourceScan) rel;
-                boolean insertOnly =
-                        
tableScan.tableSource().getChangelogMode().containsOnly(RowKind.INSERT);
-                boolean supportsReadingMetadata =
-                        tableScan.tableSource() instanceof 
SupportsReadingMetadata;
-                if (!insertOnly && supportsReadingMetadata) {
-                    TableSourceTable sourceTable =
-                            
tableScan.getTable().unwrap(TableSourceTable.class);
-                    TableConfig tableConfig =
-                            
ShortcutUtils.unwrapContext(rel.getCluster()).getTableConfig();
-                    ResolvedSchema resolvedSchema =
-                            
sourceTable.contextResolvedTable().getResolvedSchema();
-                    // check if changelogNormalize is enabled for the source, 
if yes, the metadata
-                    // columns are deterministic
-                    if (!DynamicSourceUtils.changelogNormalizeEnabled(
-                            tableScan.eventTimeSnapshotRequired(),
-                            resolvedSchema,
-                            sourceTable.tableSource(),
-                            tableConfig)) {
-                        // check if requireDeterminism contains metadata column
-                        List<Column.MetadataColumn> metadataColumns =
-                                DynamicSourceUtils.extractMetadataColumns(
-                                        
sourceTable.contextResolvedTable().getResolvedSchema());
-                        Set<String> metaColumnSet =
-                                metadataColumns.stream()
-                                        .map(Column::getName)
-                                        .collect(Collectors.toSet());
-                        List<String> columns = 
tableScan.getRowType().getFieldNames();
-                        List<String> metadataCauseErr = new ArrayList<>();
-                        for (int index = 0; index < columns.size(); index++) {
-                            String column = columns.get(index);
-                            if (metaColumnSet.contains(column) && 
requireDeterminism.get(index)) {
-                                metadataCauseErr.add(column);
-                            }
-                        }
-                        if (!metadataCauseErr.isEmpty()) {
-                            StringBuilder errorMsg = new StringBuilder();
-                            errorMsg.append("The metadata column(s): '")
-                                    .append(
-                                            String.join(
-                                                    ", ", 
metadataCauseErr.toArray(new String[0])))
-                                    .append("' in cdc source may cause wrong 
result or error on")
-                                    .append(" downstream operators, please 
consider removing these")
-                                    .append(" columns or use a non-cdc source 
that only has insert")
-                                    .append(" messages.\nsource node:\n")
-                                    .append(
-                                            FlinkRelOptUtil.toString(
-                                                    tableScan,
-                                                    
SqlExplainLevel.DIGEST_ATTRIBUTES,
-                                                    false,
-                                                    true,
-                                                    false,
-                                                    true,
-                                                    false));
-                            throw new TableException(errorMsg.toString());
-                        }
-                    }
-                }
-            }
-            return rel;
+            return visitTableSourceScan((StreamPhysicalTableSourceScan) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalLegacyTableSourceScan
                 || rel instanceof StreamPhysicalDataStreamScan
                 || rel instanceof StreamPhysicalValues) {
             // not cdc source, end visit
             return rel;
         } else if (rel instanceof StreamPhysicalGroupAggregateBase) {
-            // output row type = grouping keys + aggCalls
-            StreamPhysicalGroupAggregateBase groupAgg = 
(StreamPhysicalGroupAggregateBase) rel;
-            if (inputInsertOnly(groupAgg)) {
-                // no further requirement to input, only check if it can 
satisfy the
-                // requiredDeterminism
-                if (!requireDeterminism.isEmpty()) {
-                    checkUnsatisfiedDeterminism(
-                            requireDeterminism,
-                            groupAgg.grouping().length,
-                            // TODO remove this conversion when scala-free was 
total done.
-                            
scala.collection.JavaConverters.seqAsJavaList(groupAgg.aggCalls()),
-                            groupAgg.getRowType(),
-                            groupAgg);
-                }
-                return transmitDeterminismRequirement(groupAgg, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // agg works under retract mode if input is not insert only, 
and requires all input
-                // columns be deterministic
-                return transmitDeterminismRequirement(
-                        groupAgg,
-                        
ImmutableBitSet.range(groupAgg.getInput().getRowType().getFieldCount()));
-            }
+            return visitGroupAggregate((StreamPhysicalGroupAggregateBase) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalWindowAggregateBase) {
-            // output row type = grouping keys + aggCalls + windowProperties
-            // same logic with 'groupAgg' but they have no common parent
-            StreamPhysicalWindowAggregateBase windowAgg = 
(StreamPhysicalWindowAggregateBase) rel;
-            if (inputInsertOnly(windowAgg)) {
-                // no further requirement to input, only check if it can 
satisfy the
-                // requiredDeterminism
-                if (!requireDeterminism.isEmpty()) {
-                    checkUnsatisfiedDeterminism(
-                            requireDeterminism,
-                            windowAgg.grouping().length,
-                            // TODO remove this conversion when scala-free was 
total done.
-                            
scala.collection.JavaConverters.seqAsJavaList(windowAgg.aggCalls()),
-                            windowAgg.getRowType(),
-                            windowAgg);
-                }
-                return transmitDeterminismRequirement(windowAgg, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // agg works under retract mode if input is not insert only, 
and requires all input
-                // columns be deterministic
-                return transmitDeterminismRequirement(
-                        windowAgg,
-                        
ImmutableBitSet.range(windowAgg.getInput().getRowType().getFieldCount()));
-            }
+            return visitWindowAggregate(
+                    (StreamPhysicalWindowAggregateBase) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalExpand) {
-            // Expand is an internal operator only for plan rewriting 
currently, so only remove the
-            // expandIdIndex from requireDeterminism. We also skip checking if 
input has updates due
-            // to this is a non-stateful node which never changes the 
changelog mode.
-            StreamPhysicalExpand expand = (StreamPhysicalExpand) rel;
-            return transmitDeterminismRequirement(
-                    expand, 
requireDeterminism.except(ImmutableBitSet.of(expand.expandIdIndex())));
+            return visitExpand((StreamPhysicalExpand) rel, requireDeterminism);
         } else if (rel instanceof CommonPhysicalJoin) {
-            // output row type = left row type + right row type
-            CommonPhysicalJoin join = (CommonPhysicalJoin) rel;
-            StreamPhysicalRel leftRel = (StreamPhysicalRel) join.getLeft();
-            StreamPhysicalRel rightRel = (StreamPhysicalRel) join.getRight();
-            boolean leftInputHasUpdate = !inputInsertOnly(leftRel);
-            boolean rightInputHasUpdate = !inputInsertOnly(rightRel);
-            boolean innerOrSemi =
-                    join.joinSpec().getJoinType() == FlinkJoinType.INNER
-                            || join.joinSpec().getJoinType() == 
FlinkJoinType.SEMI;
-            /**
-             * we do not distinguish the time attribute condition in 
interval/temporal join from
-             * regular/window join here because: rowtime field always from 
source, proctime is not
-             * limited (from source), when proctime appended to an update row 
without upsertKey then
-             * result may goes wrong, in such a case proctime( was 
materialized as
-             * PROCTIME_MATERIALIZE(PROCTIME())) is equal to a normal dynamic 
temporal function and
-             * will be validated in calc node.
-             */
-            Optional<String> ndCall = 
FlinkRexUtil.getNonDeterministicCallName(join.getCondition());
-            if ((leftInputHasUpdate || rightInputHasUpdate || !innerOrSemi) && 
ndCall.isPresent()) {
-                // when output has update, the join condition cannot be 
non-deterministic:
-                // 1. input has update -> output has update
-                // 2. input insert only and is not innerOrSemi join -> output 
has update
-                throwNonDeterministicConditionError(
-                        ndCall.get(), join.getCondition(), (StreamPhysicalRel) 
join);
-            }
-            int leftFieldCnt = leftRel.getRowType().getFieldCount();
-            StreamPhysicalRel newLeft =
-                    visitJoinChild(
-                            requireDeterminism,
-                            leftRel,
-                            leftInputHasUpdate,
-                            leftFieldCnt,
-                            true,
-                            join.joinSpec().getLeftKeys(),
-                            // TODO remove this conversion when scala-free was 
total done.
-                            scala.collection.JavaConverters.seqAsJavaList(
-                                    join.getUpsertKeys(leftRel, 
join.joinSpec().getLeftKeys())));
-            StreamPhysicalRel newRight =
-                    visitJoinChild(
-                            requireDeterminism,
-                            rightRel,
-                            rightInputHasUpdate,
-                            leftFieldCnt,
-                            false,
-                            join.joinSpec().getRightKeys(),
-                            // TODO remove this conversion when scala-free was 
total done.
-                            scala.collection.JavaConverters.seqAsJavaList(
-                                    join.getUpsertKeys(rightRel, 
join.joinSpec().getRightKeys())));
-
-            return (StreamPhysicalRel)
-                    join.copy(
-                            join.getTraitSet(),
-                            join.getCondition(),
-                            newLeft,
-                            newRight,
-                            join.getJoinType(),
-                            join.isSemiJoin());
-
+            return visitJoin((CommonPhysicalJoin) rel, requireDeterminism);
         } else if (rel instanceof StreamPhysicalOverAggregateBase) {
-            // output row type = input row type + overAgg outputs
-            StreamPhysicalOverAggregateBase overAgg = 
((StreamPhysicalOverAggregateBase) rel);
-            if (inputInsertOnly(overAgg)) {
-                // no further requirement to input, only check if the agg 
outputs can satisfy the
-                // requiredDeterminism
-                if (!requireDeterminism.isEmpty()) {
-                    int inputFieldCnt = 
overAgg.getInput().getRowType().getFieldCount();
-                    OverSpec overSpec = 
OverAggregateUtil.createOverSpec(overAgg.logicWindow());
-                    // add aggCall's input
-                    int aggOutputIndex = inputFieldCnt;
-                    for (OverSpec.GroupSpec groupSpec : overSpec.getGroups()) {
-                        checkUnsatisfiedDeterminism(
-                                requireDeterminism,
-                                aggOutputIndex,
-                                groupSpec.getAggCalls(),
-                                overAgg.getRowType(),
-                                overAgg);
-                        aggOutputIndex += groupSpec.getAggCalls().size();
-                    }
-                }
-                return transmitDeterminismRequirement(overAgg, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // OverAgg does not support input with updates currently, so 
this branch will not be
-                // reached for now.
-
-                // We should append partition keys and order key to 
requireDeterminism
-                return transmitDeterminismRequirement(
-                        overAgg, 
mappingRequireDeterminismToInput(requireDeterminism, overAgg));
-            }
+            return visitOverAggregate((StreamPhysicalOverAggregateBase) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalRank) {
-            // if outputRankNumber:  output row type = input row type + rank 
number type
-            // else keeps the same as input
-            StreamPhysicalRank rank = (StreamPhysicalRank) rel;
-            if (inputInsertOnly(rank)) {
-                // rank output is deterministic when input is insert only, so 
required determinism
-                // always be satisfied here.
-                return transmitDeterminismRequirement(rank, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                int inputFieldCnt = 
rank.getInput().getRowType().getFieldCount();
-                if (rank.rankStrategy() instanceof 
RankProcessStrategy.UpdateFastStrategy) {
-                    // in update fast mode, pass required determinism excludes 
partition keys and
-                    // order key
-                    ImmutableBitSet.Builder bitSetBuilder = 
ImmutableBitSet.builder();
-                    rank.partitionKey().toList().forEach(bitSetBuilder::set);
-                    
rank.orderKey().getKeys().toIntegerList().forEach(bitSetBuilder::set);
-                    if (rank.outputRankNumber()) {
-                        // exclude last column
-                        bitSetBuilder.set(inputFieldCnt);
-                    }
-                    return transmitDeterminismRequirement(
-                            rank, 
requireDeterminism.except(bitSetBuilder.build()));
-                } else if (rank.rankStrategy() instanceof 
RankProcessStrategy.RetractStrategy) {
-                    // in retract mode then require all input columns be 
deterministic
-                    return transmitDeterminismRequirement(
-                            rank, ImmutableBitSet.range(inputFieldCnt));
-                } else {
-                    // AppendFastStrategy only applicable for insert only 
input, so the undefined
-                    // strategy is not as expected here.
-                    throw new TableException(
-                            String.format(
-                                    "Can not infer the determinism for 
unsupported rank strategy: %s, this is a bug, please file an issue.",
-                                    rank.rankStrategy()));
-                }
-            }
+            return visitRank((StreamPhysicalRank) rel, requireDeterminism);
         } else if (rel instanceof StreamPhysicalDeduplicate) {
-            // output row type same as input and does not change output 
columns' order
-            StreamPhysicalDeduplicate dedup = (StreamPhysicalDeduplicate) rel;
-            if (inputInsertOnly(dedup)) {
-                // similar to rank, output is deterministic when input is 
insert only, so required
-                // determinism always be satisfied here.
-                return transmitDeterminismRequirement(dedup, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // Deduplicate always has unique key currently(exec node has 
null check and inner
-                // state only support data with keys), so only pass the left 
columns of required
-                // determinism to input.
-                return transmitDeterminismRequirement(
-                        dedup,
-                        
requireDeterminism.except(ImmutableBitSet.of(dedup.getUniqueKeys())));
-            }
+            return visitDeduplicate((StreamPhysicalDeduplicate) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalWindowDeduplicate) {
-            // output row type same as input and does not change output 
columns' order
-            StreamPhysicalWindowDeduplicate winDedup = 
(StreamPhysicalWindowDeduplicate) rel;
-            if (inputInsertOnly(winDedup)) {
-                // similar to rank, output is deterministic when input is 
insert only, so required
-                // determinism always be satisfied here.
-                return transmitDeterminismRequirement(winDedup, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // WindowDeduplicate does not support input with updates 
currently, so this branch
-                // will not be reached for now.
-
-                // only append partition keys, no need to process order key 
because it always comes
-                // from window
-                return transmitDeterminismRequirement(
-                        winDedup,
-                        requireDeterminism
-                                .clear(winDedup.orderKey())
-                                
.union(ImmutableBitSet.of(winDedup.partitionKeys())));
-            }
+            return visitWindowDeduplicate(
+                    (StreamPhysicalWindowDeduplicate) rel, requireDeterminism);
         } else if (rel instanceof StreamPhysicalWindowRank) {
-            StreamPhysicalWindowRank winRank = (StreamPhysicalWindowRank) rel;
-            if (inputInsertOnly(winRank)) {
-                // similar to rank, output is deterministic when input is 
insert only, so required
-                // determinism always be satisfied here.
-                return transmitDeterminismRequirement(winRank, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // WindowRank does not support input with updates currently, 
so this branch will not
-                // be reached for now.
-
-                // only append partition keys, no need to process order key 
because it always comes
-                // from window
-                int inputFieldCnt = 
winRank.getInput().getRowType().getFieldCount();
-                return transmitDeterminismRequirement(
-                        winRank,
-                        requireDeterminism
-                                
.intersect(ImmutableBitSet.range(inputFieldCnt))
-                                .union(winRank.partitionKey()));
-            }
+            return visitWindowRank((StreamPhysicalWindowRank) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalWindowTableFunction) {
-            // output row type = input row type + window attributes
-            StreamPhysicalWindowTableFunction winTVF = 
(StreamPhysicalWindowTableFunction) rel;
-            if (inputInsertOnly(winTVF)) {
-                return transmitDeterminismRequirement(winTVF, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // pass the left columns of required determinism to input 
exclude window attributes
-                return transmitDeterminismRequirement(
-                        winTVF,
-                        requireDeterminism.intersect(
-                                ImmutableBitSet.range(
-                                        
winTVF.getInput().getRowType().getFieldCount())));
-            }
+            return visitWindowTableFunction(
+                    (StreamPhysicalWindowTableFunction) rel, 
requireDeterminism);
         } else if (rel instanceof StreamPhysicalChangelogNormalize
                 || rel instanceof StreamPhysicalDropUpdateBefore
                 || rel instanceof StreamPhysicalMiniBatchAssigner
@@ -680,18 +207,7 @@ public class StreamNonDeterministicUpdatePlanVisitor {
             // transit requireDeterminism transparently
             return transmitDeterminismRequirement(rel, requireDeterminism);
         } else if (rel instanceof StreamPhysicalMatch) {
-            StreamPhysicalMatch match = (StreamPhysicalMatch) rel;
-            if (inputInsertOnly(match)) {
-                // similar to over aggregate, output is insert only when input 
is insert only, so
-                // required determinism always be satisfied here.
-                return transmitDeterminismRequirement(match, 
NO_REQUIRED_DETERMINISM);
-            } else {
-                // The DEFINE and MEASURES clauses in match-recognize have 
similar meanings to the
-                // WHERE and SELECT clauses in SQL query, we should analyze 
and transmit the
-                // determinism requirement via the RexNodes in these two 
clauses.
-                throw new UnsupportedOperationException(
-                        "Unsupported to resolve non-deterministic issue in 
match-recognize when input has updates.");
-            }
+            return visitMatch((StreamPhysicalMatch) rel, requireDeterminism);
         } else {
             throw new UnsupportedOperationException(
                     String.format(
@@ -700,7 +216,556 @@ public class StreamNonDeterministicUpdatePlanVisitor {
         }
     }
 
-    // helper methods
+    // ======== helper methods ==========
+
+    /**
+     * The pass-in requireDeterminism will always be NO_REQUIRED_DETERMINISM.
+     *
+     * @param requireDeterminism no requirement passed to sink node
+     */
+    private StreamPhysicalRel visitSink(
+            final StreamPhysicalSink sink, final ImmutableBitSet 
requireDeterminism) {
+        if (inputInsertOnly(sink)) {
+            // for append stream, not care about NDU
+            return transmitDeterminismRequirement(sink, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // for update streaming, when
+            // 1. sink with pk:
+            // upsert sink, update by pk, ideally pk == input.upsertKey,
+            // (otherwise upsertMaterialize will handle it)
+
+            // 1.1 input.upsertKey nonEmpty -> not care about NDU
+            // 1.2 input.upsertKey isEmpty -> retract by complete row, must 
not contain NDU
+
+            // once sink's requirement on pk was satisfied, no further request 
will be transited
+            // only when new requirement generated at stateful node which 
input has update
+            // (e.g., grouping keys)
+
+            // 2. sink without pk:
+            // retract sink, retract by complete row (all input columns should 
be deterministic)
+            // whether input.upsertKey is empty or not, must not contain NDU
+            int[] primaryKey =
+                    
sink.contextResolvedTable().getResolvedSchema().getPrimaryKeyIndexes();
+            ImmutableBitSet requireInputDeterminism;
+            if (sink.upsertMaterialize() || null == primaryKey || 
primaryKey.length == 0) {
+                // SinkUpsertMaterializer only support no upsertKey mode, it 
says all input
+                // columns should be deterministic (same as no primary key 
defined on sink)
+                // TODO should optimize it after SinkUpsertMaterializer 
support upsertKey
+                // FLINK-28569.
+                requireInputDeterminism =
+                        
ImmutableBitSet.range(sink.getInput().getRowType().getFieldCount());
+            } else {
+                requireInputDeterminism = ImmutableBitSet.of(primaryKey);
+            }
+            return transmitDeterminismRequirement(sink, 
requireInputDeterminism);
+        }
+    }
+
+    private StreamPhysicalRel visitLegacySink(
+            final StreamPhysicalLegacySink<?> sink, final ImmutableBitSet 
requireDeterminism) {
+        if (inputInsertOnly(sink)) {
+            // for append stream, not care about NDU
+            return transmitDeterminismRequirement(sink, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            TableSchema tableSchema = sink.sink().getTableSchema();
+            Optional<UniqueConstraint> primaryKey = 
tableSchema.getPrimaryKey();
+            List<String> columns = Arrays.asList(tableSchema.getFieldNames());
+            // SinkUpsertMaterializer does not support legacy sink
+            ImmutableBitSet requireInputDeterminism;
+            if (primaryKey.isPresent()) {
+                requireInputDeterminism =
+                        ImmutableBitSet.of(
+                                primaryKey.get().getColumns().stream()
+                                        .map(columns::indexOf)
+                                        .collect(Collectors.toList()));
+            } else {
+                requireInputDeterminism = 
ImmutableBitSet.range(columns.size());
+            }
+            return transmitDeterminismRequirement(sink, 
requireInputDeterminism);
+        }
+    }
+
+    private StreamPhysicalRel visitCalc(
+            final StreamPhysicalCalcBase calc, final ImmutableBitSet 
requireDeterminism) {
+        if (inputInsertOnly(calc) || requireDeterminism.isEmpty()) {
+            // for append stream, not care about NDU
+            return transmitDeterminismRequirement(calc, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // if input has updates, any non-deterministic conditions are not 
acceptable, also
+            // requireDeterminism should be satisfied.
+            checkNonDeterministicRexProgram(requireDeterminism, 
calc.getProgram(), calc);
+
+            // evaluate required determinism from input
+            List<RexNode> projects =
+                    calc.getProgram().getProjectList().stream()
+                            .map(expr -> 
calc.getProgram().expandLocalRef(expr))
+                            .collect(Collectors.toList());
+            Map<Integer, List<Integer>> outFromSourcePos = 
extractSourceMapping(projects);
+            List<Integer> conv2Inputs =
+                    requireDeterminism.toList().stream()
+                            .map(
+                                    out ->
+                                            
Optional.ofNullable(outFromSourcePos.get(out))
+                                                    .orElseThrow(
+                                                            () ->
+                                                                    new 
TableException(
+                                                                            
String.format(
+                                                                               
     "Invalid pos:%d over projection:%s",
+                                                                               
     out,
+                                                                               
     calc
+                                                                               
             .getProgram()))))
+                            .flatMap(Collection::stream)
+                            .filter(index -> index != -1)
+                            .distinct()
+                            .collect(Collectors.toList());
+
+            return transmitDeterminismRequirement(calc, 
ImmutableBitSet.of(conv2Inputs));
+        }
+    }
+
+    private StreamPhysicalRel visitCorrelate(
+            final StreamPhysicalCorrelateBase correlate, final ImmutableBitSet 
requireDeterminism) {
+        if (inputInsertOnly(correlate) || requireDeterminism.isEmpty()) {
+            return transmitDeterminismRequirement(correlate, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // check if non-deterministic condition (may exist after 
FLINK-7865 was fixed).
+            if (correlate.condition().isDefined()) {
+                RexNode rexNode = correlate.condition().get();
+                checkNonDeterministicCondition(rexNode, correlate);
+            }
+            // check if it is a non-deterministic function
+            int leftFieldCnt = 
correlate.inputRel().getRowType().getFieldCount();
+            Optional<String> ndCall =
+                    
FlinkRexUtil.getNonDeterministicCallName(correlate.scan().getCall());
+            if (ndCall.isPresent()) {
+                // all columns from table function scan cannot satisfy the 
required determinism
+                List<Integer> unsatisfiedColumns =
+                        requireDeterminism.toList().stream()
+                                .filter(index -> index >= leftFieldCnt)
+                                .collect(Collectors.toList());
+                if (!unsatisfiedColumns.isEmpty()) {
+                    throwNonDeterministicColumnsError(
+                            unsatisfiedColumns, correlate.getRowType(), 
correlate, null, ndCall);
+                }
+            }
+            // evaluate required determinism from input
+            List<Integer> fromLeft =
+                    requireDeterminism.toList().stream()
+                            .filter(index -> index < leftFieldCnt)
+                            .collect(Collectors.toList());
+            if (fromLeft.isEmpty()) {
+                return transmitDeterminismRequirement(correlate, 
NO_REQUIRED_DETERMINISM);
+            }
+            return transmitDeterminismRequirement(correlate, 
ImmutableBitSet.of(fromLeft));
+        }
+    }
+
+    /**
+     * If input has updates, the lookup join may produce non-deterministic 
result itself due to
+     * backed lookup source which data may change over time, we can try to 
eliminate this
+     * non-determinism by adding materialization to the join operator, but 
still exists
+     * non-determinism we cannot solve: 1. join condition 2. the inner calc in 
lookJoin.
+     */
+    private StreamPhysicalRel visitLookupJoin(
+            final StreamPhysicalLookupJoin lookupJoin, final ImmutableBitSet 
requireDeterminism) {
+        if (inputInsertOnly(lookupJoin) || requireDeterminism.isEmpty()) {
+            return transmitDeterminismRequirement(lookupJoin, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // required determinism cannot be satisfied even upsert 
materialize was enabled if:
+            // 1. remaining join condition contains non-deterministic call
+            
JavaScalaConversionUtil.toJava(lookupJoin.finalPreFilterCondition())
+                    .ifPresent(cond -> checkNonDeterministicCondition(cond, 
lookupJoin));
+            
JavaScalaConversionUtil.toJava(lookupJoin.finalRemainingCondition())
+                    .ifPresent(cond -> checkNonDeterministicCondition(cond, 
lookupJoin));
+
+            // 2. inner calc in lookJoin contains either non-deterministic 
condition or calls
+            JavaScalaConversionUtil.toJava(lookupJoin.calcOnTemporalTable())
+                    .ifPresent(
+                            calc ->
+                                    checkNonDeterministicRexProgram(
+                                            requireDeterminism, calc, 
lookupJoin));
+
+            // Try to resolve non-determinism by adding materialization which 
can eliminate
+            // non-determinism produced by lookup join via an evolving source.
+            int leftFieldCnt = 
lookupJoin.getInput().getRowType().getFieldCount();
+            List<Integer> requireRight =
+                    requireDeterminism.toList().stream()
+                            .filter(index -> index >= leftFieldCnt)
+                            .collect(Collectors.toList());
+            boolean omitUpsertMaterialize = false;
+            // two optimizations: 1. no fields from lookup source was required 
2. lookup key
+            // contains pk and no requirement on other fields we can omit 
materialization,
+            // otherwise upsert materialize can not be omitted.
+            if (requireRight.isEmpty()) {
+                omitUpsertMaterialize = true;
+            } else {
+                int[] outputPkIdx = 
lookupJoin.getOutputIndexesOfTemporalTablePrimaryKey();
+                ImmutableBitSet outputPkBitSet = 
ImmutableBitSet.of(outputPkIdx);
+                // outputPkIdx need to used so not using 
#lookupKeyContainsPrimaryKey directly.
+                omitUpsertMaterialize =
+                        Arrays.stream(outputPkIdx)
+                                        .allMatch(
+                                                index -> 
lookupJoin.allLookupKeys().contains(index))
+                                && 
requireRight.stream().allMatch(outputPkBitSet::get);
+            }
+            List<Integer> requireLeft =
+                    requireDeterminism.toList().stream()
+                            .filter(index -> index < leftFieldCnt)
+                            .collect(Collectors.toList());
+
+            if (omitUpsertMaterialize) {
+                return transmitDeterminismRequirement(lookupJoin, 
ImmutableBitSet.of(requireLeft));
+            } else {
+                // enable materialize for lookup join
+                return transmitDeterminismRequirement(
+                        lookupJoin.copy(true), 
ImmutableBitSet.of(requireLeft));
+            }
+        }
+    }
+
+    private StreamPhysicalRel visitTableSourceScan(
+            final StreamPhysicalTableSourceScan tableScan,
+            final ImmutableBitSet requireDeterminism) {
+        // tableScan has no input, so only check metadata from cdc source
+        if (!requireDeterminism.isEmpty()) {
+            boolean insertOnly =
+                    
tableScan.tableSource().getChangelogMode().containsOnly(RowKind.INSERT);
+            boolean supportsReadingMetadata =
+                    tableScan.tableSource() instanceof SupportsReadingMetadata;
+            if (!insertOnly && supportsReadingMetadata) {
+                TableSourceTable sourceTable = 
tableScan.getTable().unwrap(TableSourceTable.class);
+                TableConfig tableConfig =
+                        
ShortcutUtils.unwrapContext(tableScan.getCluster()).getTableConfig();
+                assert sourceTable != null;
+                ResolvedSchema resolvedSchema =
+                        sourceTable.contextResolvedTable().getResolvedSchema();
+                // check if changelogNormalize is enabled for the source, if 
yes, the metadata
+                // columns are deterministic
+                if (!DynamicSourceUtils.changelogNormalizeEnabled(
+                        tableScan.eventTimeSnapshotRequired(),
+                        resolvedSchema,
+                        sourceTable.tableSource(),
+                        tableConfig)) {
+                    // check if requireDeterminism contains metadata column
+                    List<Column.MetadataColumn> metadataColumns =
+                            DynamicSourceUtils.extractMetadataColumns(
+                                    
sourceTable.contextResolvedTable().getResolvedSchema());
+                    Set<String> metaColumnSet =
+                            metadataColumns.stream()
+                                    .map(Column::getName)
+                                    .collect(Collectors.toSet());
+                    List<String> columns = 
tableScan.getRowType().getFieldNames();
+                    List<String> metadataCauseErr = new ArrayList<>();
+                    for (int index = 0; index < columns.size(); index++) {
+                        String column = columns.get(index);
+                        if (metaColumnSet.contains(column) && 
requireDeterminism.get(index)) {
+                            metadataCauseErr.add(column);
+                        }
+                    }
+                    if (!metadataCauseErr.isEmpty()) {
+                        String errorMsg =
+                                "The metadata column(s): '"
+                                        + String.join(", ", 
metadataCauseErr.toArray(new String[0]))
+                                        + "' in cdc source may cause wrong 
result or error on"
+                                        + " downstream operators, please 
consider removing these"
+                                        + " columns or use a non-cdc source 
that only has insert"
+                                        + " messages.\nsource node:\n"
+                                        + FlinkRelOptUtil.toString(
+                                                tableScan,
+                                                
SqlExplainLevel.DIGEST_ATTRIBUTES,
+                                                false,
+                                                true,
+                                                false,
+                                                true,
+                                                false);
+                        throw new TableException(errorMsg);
+                    }
+                }
+            }
+        }
+        return tableScan;
+    }
+
+    private StreamPhysicalRel visitGroupAggregate(
+            final StreamPhysicalGroupAggregateBase groupAgg,
+            final ImmutableBitSet requireDeterminism) {
+        // output row type = grouping keys + aggCalls
+        if (inputInsertOnly(groupAgg)) {
+            // no further requirement to input, only check if it can satisfy 
the
+            // requiredDeterminism
+            if (!requireDeterminism.isEmpty()) {
+                checkUnsatisfiedDeterminism(
+                        requireDeterminism,
+                        groupAgg.grouping().length,
+                        // TODO remove this conversion when scala-free was 
total done.
+                        
scala.collection.JavaConverters.seqAsJavaList(groupAgg.aggCalls()),
+                        groupAgg.getRowType(),
+                        groupAgg);
+            }
+            return transmitDeterminismRequirement(groupAgg, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // agg works under retract mode if input is not insert only, and 
requires all input
+            // columns be deterministic
+            return transmitDeterminismRequirement(
+                    groupAgg,
+                    
ImmutableBitSet.range(groupAgg.getInput().getRowType().getFieldCount()));
+        }
+    }
+
+    private StreamPhysicalRel visitWindowAggregate(
+            final StreamPhysicalWindowAggregateBase windowAgg,
+            final ImmutableBitSet requireDeterminism) {
+        // output row type = grouping keys + aggCalls + windowProperties
+        // same logic with 'groupAgg' but they have no common parent
+        if (inputInsertOnly(windowAgg)) {
+            // no further requirement to input, only check if it can satisfy 
the
+            // requiredDeterminism
+            if (!requireDeterminism.isEmpty()) {
+                checkUnsatisfiedDeterminism(
+                        requireDeterminism,
+                        windowAgg.grouping().length,
+                        // TODO remove this conversion when scala-free was 
total done.
+                        
scala.collection.JavaConverters.seqAsJavaList(windowAgg.aggCalls()),
+                        windowAgg.getRowType(),
+                        windowAgg);
+            }
+            return transmitDeterminismRequirement(windowAgg, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // agg works under retract mode if input is not insert only, and 
requires all input
+            // columns be deterministic
+            return transmitDeterminismRequirement(
+                    windowAgg,
+                    
ImmutableBitSet.range(windowAgg.getInput().getRowType().getFieldCount()));
+        }
+    }
+
+    private StreamPhysicalRel visitExpand(
+            final StreamPhysicalExpand expand, final ImmutableBitSet 
requireDeterminism) {
+        // Expand is an internal operator only for plan rewriting currently, 
so only remove the
+        // expandIdIndex from requireDeterminism. We also skip checking if 
input has updates due
+        // to this is a non-stateful node which never changes the changelog 
mode.
+        return transmitDeterminismRequirement(
+                expand, 
requireDeterminism.except(ImmutableBitSet.of(expand.expandIdIndex())));
+    }
+
+    /**
+     * We do not distinguish the time attribute condition in interval/temporal 
join from
+     * regular/window join here because: rowtime field always from source, 
proctime is not limited
+     * (from source), when proctime appended to an update row without 
upsertKey then result may goes
+     * wrong, in such a case proctime( was materialized as 
PROCTIME_MATERIALIZE(PROCTIME())) is
+     * equal to a normal dynamic temporal function and will be validated in 
calc node.
+     */
+    private StreamPhysicalRel visitJoin(
+            final CommonPhysicalJoin join, final ImmutableBitSet 
requireDeterminism) {
+        // output row type = left row type + right row type
+        StreamPhysicalRel leftRel = (StreamPhysicalRel) join.getLeft();
+        StreamPhysicalRel rightRel = (StreamPhysicalRel) join.getRight();
+        boolean leftInputHasUpdate = !inputInsertOnly(leftRel);
+        boolean rightInputHasUpdate = !inputInsertOnly(rightRel);
+        boolean innerOrSemi =
+                join.joinSpec().getJoinType() == FlinkJoinType.INNER
+                        || join.joinSpec().getJoinType() == FlinkJoinType.SEMI;
+
+        Optional<String> ndCall = 
FlinkRexUtil.getNonDeterministicCallName(join.getCondition());
+        if ((leftInputHasUpdate || rightInputHasUpdate || !innerOrSemi) && 
ndCall.isPresent()) {
+            // when output has update, the join condition cannot be 
non-deterministic:
+            // 1. input has update -> output has update
+            // 2. input insert only and is not innerOrSemi join -> output has 
update
+            throwNonDeterministicConditionError(
+                    ndCall.get(), join.getCondition(), (StreamPhysicalRel) 
join);
+        }
+        int leftFieldCnt = leftRel.getRowType().getFieldCount();
+        StreamPhysicalRel newLeft =
+                visitJoinChild(
+                        requireDeterminism,
+                        leftRel,
+                        leftInputHasUpdate,
+                        leftFieldCnt,
+                        true,
+                        join.joinSpec().getLeftKeys(),
+                        // TODO remove this conversion when scala-free was 
total done.
+                        scala.collection.JavaConverters.seqAsJavaList(
+                                join.getUpsertKeys(leftRel, 
join.joinSpec().getLeftKeys())));
+        StreamPhysicalRel newRight =
+                visitJoinChild(
+                        requireDeterminism,
+                        rightRel,
+                        rightInputHasUpdate,
+                        leftFieldCnt,
+                        false,
+                        join.joinSpec().getRightKeys(),
+                        // TODO remove this conversion when scala-free was 
total done.
+                        scala.collection.JavaConverters.seqAsJavaList(
+                                join.getUpsertKeys(rightRel, 
join.joinSpec().getRightKeys())));
+
+        return (StreamPhysicalRel)
+                join.copy(
+                        join.getTraitSet(),
+                        join.getCondition(),
+                        newLeft,
+                        newRight,
+                        join.getJoinType(),
+                        join.isSemiJoin());
+    }
+
+    private StreamPhysicalRel visitOverAggregate(
+            final StreamPhysicalOverAggregateBase overAgg,
+            final ImmutableBitSet requireDeterminism) {
+        // output row type = input row type + overAgg outputs
+        if (inputInsertOnly(overAgg)) {
+            // no further requirement to input, only check if the agg outputs 
can satisfy the
+            // requiredDeterminism
+            if (!requireDeterminism.isEmpty()) {
+                int inputFieldCnt = 
overAgg.getInput().getRowType().getFieldCount();
+                OverSpec overSpec = 
OverAggregateUtil.createOverSpec(overAgg.logicWindow());
+                // add aggCall's input
+                int aggOutputIndex = inputFieldCnt;
+                for (OverSpec.GroupSpec groupSpec : overSpec.getGroups()) {
+                    checkUnsatisfiedDeterminism(
+                            requireDeterminism,
+                            aggOutputIndex,
+                            groupSpec.getAggCalls(),
+                            overAgg.getRowType(),
+                            overAgg);
+                    aggOutputIndex += groupSpec.getAggCalls().size();
+                }
+            }
+            return transmitDeterminismRequirement(overAgg, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // OverAgg does not support input with updates currently, so this 
branch will not be
+            // reached for now.
+
+            // We should append partition keys and order key to 
requireDeterminism
+            return transmitDeterminismRequirement(
+                    overAgg, 
mappingRequireDeterminismToInput(requireDeterminism, overAgg));
+        }
+    }
+
+    private StreamPhysicalRel visitRank(
+            final StreamPhysicalRank rank, final ImmutableBitSet 
requireDeterminism) {
+        // if outputRankNumber:  output row type = input row type + rank 
number type
+        // else keeps the same as input
+        if (inputInsertOnly(rank)) {
+            // rank output is deterministic when input is insert only, so 
required determinism
+            // always be satisfied here.
+            return transmitDeterminismRequirement(rank, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            int inputFieldCnt = rank.getInput().getRowType().getFieldCount();
+            if (rank.rankStrategy() instanceof 
RankProcessStrategy.UpdateFastStrategy) {
+                // in update fast mode, pass required determinism excludes 
partition keys and
+                // order key
+                ImmutableBitSet.Builder bitSetBuilder = 
ImmutableBitSet.builder();
+                rank.partitionKey().toList().forEach(bitSetBuilder::set);
+                
rank.orderKey().getKeys().toIntegerList().forEach(bitSetBuilder::set);
+                if (rank.outputRankNumber()) {
+                    // exclude last column
+                    bitSetBuilder.set(inputFieldCnt);
+                }
+                return transmitDeterminismRequirement(
+                        rank, 
requireDeterminism.except(bitSetBuilder.build()));
+            } else if (rank.rankStrategy() instanceof 
RankProcessStrategy.RetractStrategy) {
+                // in retract mode then require all input columns be 
deterministic
+                return transmitDeterminismRequirement(rank, 
ImmutableBitSet.range(inputFieldCnt));
+            } else {
+                // AppendFastStrategy only applicable for insert only input, 
so the undefined
+                // strategy is not as expected here.
+                throw new TableException(
+                        String.format(
+                                "Can not infer the determinism for unsupported 
rank strategy: %s, this is a bug, please file an issue.",
+                                rank.rankStrategy()));
+            }
+        }
+    }
+
+    private StreamPhysicalRel visitDeduplicate(
+            final StreamPhysicalDeduplicate dedup, final ImmutableBitSet 
requireDeterminism) {
+        // output row type same as input and does not change output columns' 
order
+        if (inputInsertOnly(dedup)) {
+            // similar to rank, output is deterministic when input is insert 
only, so required
+            // determinism always be satisfied here.
+            return transmitDeterminismRequirement(dedup, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // Deduplicate always has unique key currently(exec node has null 
check and inner
+            // state only support data with keys), so only pass the left 
columns of required
+            // determinism to input.
+            return transmitDeterminismRequirement(
+                    dedup, 
requireDeterminism.except(ImmutableBitSet.of(dedup.getUniqueKeys())));
+        }
+    }
+
+    private StreamPhysicalRel visitWindowDeduplicate(
+            final StreamPhysicalWindowDeduplicate winDedup,
+            final ImmutableBitSet requireDeterminism) {
+        // output row type same as input and does not change output columns' 
order
+        if (inputInsertOnly(winDedup)) {
+            // similar to rank, output is deterministic when input is insert 
only, so required
+            // determinism always be satisfied here.
+            return transmitDeterminismRequirement(winDedup, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // WindowDeduplicate does not support input with updates 
currently, so this branch
+            // will not be reached for now.
+
+            // only append partition keys, no need to process order key 
because it always comes
+            // from window
+            return transmitDeterminismRequirement(
+                    winDedup,
+                    requireDeterminism
+                            .clear(winDedup.orderKey())
+                            
.union(ImmutableBitSet.of(winDedup.partitionKeys())));
+        }
+    }
+
+    private StreamPhysicalRel visitWindowRank(
+            final StreamPhysicalWindowRank winRank, final ImmutableBitSet 
requireDeterminism) {
+        if (inputInsertOnly(winRank)) {
+            // similar to rank, output is deterministic when input is insert 
only, so required
+            // determinism always be satisfied here.
+            return transmitDeterminismRequirement(winRank, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // WindowRank does not support input with updates currently, so 
this branch will not
+            // be reached for now.
+
+            // only append partition keys, no need to process order key 
because it always comes
+            // from window
+            int inputFieldCnt = 
winRank.getInput().getRowType().getFieldCount();
+            return transmitDeterminismRequirement(
+                    winRank,
+                    requireDeterminism
+                            .intersect(ImmutableBitSet.range(inputFieldCnt))
+                            .union(winRank.partitionKey()));
+        }
+    }
+
+    private StreamPhysicalRel visitWindowTableFunction(
+            final StreamPhysicalWindowTableFunction winTVF,
+            final ImmutableBitSet requireDeterminism) {
+        // output row type = input row type + window attributes
+        if (inputInsertOnly(winTVF)) {
+            return transmitDeterminismRequirement(winTVF, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // pass the left columns of required determinism to input exclude 
window attributes
+            return transmitDeterminismRequirement(
+                    winTVF,
+                    requireDeterminism.intersect(
+                            
ImmutableBitSet.range(winTVF.getInput().getRowType().getFieldCount())));
+        }
+    }
+
+    private StreamPhysicalRel visitMatch(
+            final StreamPhysicalMatch match, final ImmutableBitSet 
requireDeterminism) {
+        if (inputInsertOnly(match)) {
+            // similar to over aggregate, output is insert only when input is 
insert only, so
+            // required determinism always be satisfied here.
+            return transmitDeterminismRequirement(match, 
NO_REQUIRED_DETERMINISM);
+        } else {
+            // The DEFINE and MEASURES clauses in match-recognize have similar 
meanings to the
+            // WHERE and SELECT clauses in SQL query, we should analyze and 
transmit the
+            // determinism requirement via the RexNodes in these two clauses.
+            throw new UnsupportedOperationException(
+                    "Unsupported to resolve non-deterministic issue in 
match-recognize when input has updates.");
+        }
+    }
+
     private boolean inputInsertOnly(final StreamPhysicalRel rel) {
         return ChangelogPlanUtils.inputInsertOnly(rel);
     }
@@ -859,21 +924,19 @@ public class StreamNonDeterministicUpdatePlanVisitor {
     private void throwNonDeterministicConditionError(
             final String ndCall, final RexNode condition, final 
StreamPhysicalRel relatedRel)
             throws TableException {
-        StringBuilder errorMsg = new StringBuilder();
-        errorMsg.append(
-                String.format(NON_DETERMINISTIC_CONDITION_ERROR_MSG_TEMPLATE, 
ndCall, condition));
-        errorMsg.append("\nrelated rel plan:\n")
-                .append(
-                        FlinkRelOptUtil.toString(
+        String errorMsg =
+                String.format(NON_DETERMINISTIC_CONDITION_ERROR_MSG_TEMPLATE, 
ndCall, condition)
+                        + "\nrelated rel plan:\n"
+                        + FlinkRelOptUtil.toString(
                                 relatedRel,
                                 SqlExplainLevel.DIGEST_ATTRIBUTES,
                                 false,
                                 true,
                                 false,
                                 true,
-                                false));
+                                false);
 
-        throw new TableException(errorMsg.toString());
+        throw new TableException(errorMsg);
     }
 
     private void throwNonDeterministicColumnsError(

Reply via email to