This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc01ed7376441853f8c0ae2d32a44a8069dbdb1d Author: lincoln lee <[email protected]> AuthorDate: Fri Aug 9 17:36:33 2024 +0800 [refactor][table-planner] Separate specific node logic from visit block for StreamNonDeterministicUpdatePlanVisitor to improve readability --- .../StreamNonDeterministicUpdatePlanVisitor.java | 1089 +++++++++++--------- 1 file changed, 576 insertions(+), 513 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java index f8ccd8fd436..34e5d487a98 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java @@ -155,518 +155,45 @@ public class StreamNonDeterministicUpdatePlanVisitor { public StreamPhysicalRel visit( final StreamPhysicalRel rel, final ImmutableBitSet requireDeterminism) { if (rel instanceof StreamPhysicalSink) { - if (inputInsertOnly(rel)) { - // for append stream, not care about NDU - return transmitDeterminismRequirement(rel, NO_REQUIRED_DETERMINISM); - } else { - // for update streaming, when - // 1. sink with pk: - // upsert sink, update by pk, ideally pk == input.upsertKey, - // (otherwise upsertMaterialize will handle it) - - // 1.1 input.upsertKey nonEmpty -> not care about NDU - // 1.2 input.upsertKey isEmpty -> retract by complete row, must not contain NDU - - // once sink's requirement on pk was satisfied, no further request will be transited - // only when new requirement generated at stateful node which input has update - // (e.g., grouping keys) - - // 2. sink without pk: - // retract sink, retract by complete row (all input columns should be deterministic) - // whether input.upsertKey is empty or not, must not contain NDU - StreamPhysicalSink sink = (StreamPhysicalSink) rel; - int[] primaryKey = - sink.contextResolvedTable().getResolvedSchema().getPrimaryKeyIndexes(); - ImmutableBitSet requireInputDeterminism; - if (sink.upsertMaterialize() || null == primaryKey || primaryKey.length == 0) { - // SinkUpsertMaterializer only support no upsertKey mode, it says all input - // columns should be deterministic (same as no primary key defined on sink) - // TODO should optimize it after SinkUpsertMaterializer support upsertKey - // FLINK-28569. - requireInputDeterminism = - ImmutableBitSet.range(sink.getInput().getRowType().getFieldCount()); - } else { - requireInputDeterminism = ImmutableBitSet.of(primaryKey); - } - return transmitDeterminismRequirement(sink, requireInputDeterminism); - } + return visitSink((StreamPhysicalSink) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalLegacySink<?>) { - if (inputInsertOnly(rel)) { - // for append stream, not care about NDU - return transmitDeterminismRequirement(rel, NO_REQUIRED_DETERMINISM); - } else { - StreamPhysicalLegacySink<?> sink = (StreamPhysicalLegacySink<?>) rel; - TableSchema tableSchema = sink.sink().getTableSchema(); - Optional<UniqueConstraint> primaryKey = tableSchema.getPrimaryKey(); - List<String> columns = Arrays.asList(tableSchema.getFieldNames()); - // SinkUpsertMaterializer does not support legacy sink - ImmutableBitSet requireInputDeterminism; - if (primaryKey.isPresent()) { - requireInputDeterminism = - ImmutableBitSet.of( - primaryKey.get().getColumns().stream() - .map(columns::indexOf) - .collect(Collectors.toList())); - } else { - requireInputDeterminism = ImmutableBitSet.range(columns.size()); - } - return transmitDeterminismRequirement(rel, requireInputDeterminism); - } + return visitLegacySink((StreamPhysicalLegacySink<?>) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalCalcBase) { - if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) { - // for append stream, not care about NDU - return transmitDeterminismRequirement(rel, NO_REQUIRED_DETERMINISM); - } else { - // if input has updates, any non-deterministic conditions are not acceptable, also - // requireDeterminism should be satisfied. - StreamPhysicalCalcBase calc = (StreamPhysicalCalcBase) rel; - checkNonDeterministicRexProgram(requireDeterminism, calc.getProgram(), calc); - - // evaluate required determinism from input - List<RexNode> projects = - calc.getProgram().getProjectList().stream() - .map(expr -> calc.getProgram().expandLocalRef(expr)) - .collect(Collectors.toList()); - Map<Integer, List<Integer>> outFromSourcePos = extractSourceMapping(projects); - List<Integer> conv2Inputs = - requireDeterminism.toList().stream() - .map( - out -> - Optional.ofNullable(outFromSourcePos.get(out)) - .orElseThrow( - () -> - new TableException( - String.format( - "Invalid pos:%d over projection:%s", - out, - calc - .getProgram())))) - .flatMap(Collection::stream) - .filter(index -> index != -1) - .distinct() - .collect(Collectors.toList()); - - return transmitDeterminismRequirement(calc, ImmutableBitSet.of(conv2Inputs)); - } + return visitCalc((StreamPhysicalCalcBase) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalCorrelateBase) { - if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) { - return transmitDeterminismRequirement(rel, NO_REQUIRED_DETERMINISM); - } else { - // check if non-deterministic condition (may exist after FLINK-7865 was fixed). - StreamPhysicalCorrelateBase correlate = (StreamPhysicalCorrelateBase) rel; - if (correlate.condition().isDefined()) { - RexNode rexNode = correlate.condition().get(); - checkNonDeterministicCondition(rexNode, correlate); - } - // check if it is a non-deterministic function - int leftFieldCnt = correlate.inputRel().getRowType().getFieldCount(); - Optional<String> ndCall = - FlinkRexUtil.getNonDeterministicCallName(correlate.scan().getCall()); - if (ndCall.isPresent()) { - // all columns from table function scan cannot satisfy the required determinism - List<Integer> unsatisfiedColumns = - requireDeterminism.toList().stream() - .filter(index -> index >= leftFieldCnt) - .collect(Collectors.toList()); - if (!unsatisfiedColumns.isEmpty()) { - throwNonDeterministicColumnsError( - unsatisfiedColumns, - correlate.getRowType(), - correlate, - null, - ndCall); - } - } - // evaluate required determinism from input - List<Integer> fromLeft = - requireDeterminism.toList().stream() - .filter(index -> index < leftFieldCnt) - .collect(Collectors.toList()); - if (fromLeft.isEmpty()) { - return transmitDeterminismRequirement(correlate, NO_REQUIRED_DETERMINISM); - } - return transmitDeterminismRequirement(correlate, ImmutableBitSet.of(fromLeft)); - } - + return visitCorrelate((StreamPhysicalCorrelateBase) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalLookupJoin) { - if (inputInsertOnly(rel) || requireDeterminism.isEmpty()) { - return transmitDeterminismRequirement(rel, NO_REQUIRED_DETERMINISM); - } else { - /** - * if input has updates, the lookup join may produce non-deterministic result itself - * due to backed lookup source which data may change over time, we can try to - * eliminate this non-determinism by adding materialization to the join operator, - * but still exists non-determinism we cannot solve: 1. join condition 2. the inner - * calc in lookJoin. - */ - StreamPhysicalLookupJoin lookupJoin = (StreamPhysicalLookupJoin) rel; - - // required determinism cannot be satisfied even upsert materialize was enabled if: - // 1. remaining join condition contains non-deterministic call - JavaScalaConversionUtil.toJava(lookupJoin.finalPreFilterCondition()) - .ifPresent(cond -> checkNonDeterministicCondition(cond, lookupJoin)); - JavaScalaConversionUtil.toJava(lookupJoin.finalRemainingCondition()) - .ifPresent(cond -> checkNonDeterministicCondition(cond, lookupJoin)); - - // 2. inner calc in lookJoin contains either non-deterministic condition or calls - JavaScalaConversionUtil.toJava(lookupJoin.calcOnTemporalTable()) - .ifPresent( - calc -> - checkNonDeterministicRexProgram( - requireDeterminism, calc, lookupJoin)); - - // Try to resolve non-determinism by adding materialization which can eliminate - // non-determinism produced by lookup join via an evolving source. - int leftFieldCnt = lookupJoin.getInput().getRowType().getFieldCount(); - List<Integer> requireRight = - requireDeterminism.toList().stream() - .filter(index -> index >= leftFieldCnt) - .collect(Collectors.toList()); - boolean omitUpsertMaterialize = false; - // two optimizations: 1. no fields from lookup source was required 2. lookup key - // contains pk and no requirement on other fields we can omit materialization, - // otherwise upsert materialize can not be omitted. - if (requireRight.isEmpty()) { - omitUpsertMaterialize = true; - } else { - int[] outputPkIdx = lookupJoin.getOutputIndexesOfTemporalTablePrimaryKey(); - ImmutableBitSet outputPkBitSet = ImmutableBitSet.of(outputPkIdx); - // outputPkIdx need to used so not using #lookupKeyContainsPrimaryKey directly. - omitUpsertMaterialize = - Arrays.stream(outputPkIdx) - .allMatch( - index -> - lookupJoin - .allLookupKeys() - .contains(index)) - && requireRight.stream().allMatch(outputPkBitSet::get); - } - List<Integer> requireLeft = - requireDeterminism.toList().stream() - .filter(index -> index < leftFieldCnt) - .collect(Collectors.toList()); - - if (omitUpsertMaterialize) { - return transmitDeterminismRequirement( - lookupJoin, ImmutableBitSet.of(requireLeft)); - } else { - // enable materialize for lookup join - return transmitDeterminismRequirement( - lookupJoin.copy(true), ImmutableBitSet.of(requireLeft)); - } - } + return visitLookupJoin((StreamPhysicalLookupJoin) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalTableSourceScan) { - // tableScan has no input, so only check metadata from cdc source - if (!requireDeterminism.isEmpty()) { - StreamPhysicalTableSourceScan tableScan = (StreamPhysicalTableSourceScan) rel; - boolean insertOnly = - tableScan.tableSource().getChangelogMode().containsOnly(RowKind.INSERT); - boolean supportsReadingMetadata = - tableScan.tableSource() instanceof SupportsReadingMetadata; - if (!insertOnly && supportsReadingMetadata) { - TableSourceTable sourceTable = - tableScan.getTable().unwrap(TableSourceTable.class); - TableConfig tableConfig = - ShortcutUtils.unwrapContext(rel.getCluster()).getTableConfig(); - ResolvedSchema resolvedSchema = - sourceTable.contextResolvedTable().getResolvedSchema(); - // check if changelogNormalize is enabled for the source, if yes, the metadata - // columns are deterministic - if (!DynamicSourceUtils.changelogNormalizeEnabled( - tableScan.eventTimeSnapshotRequired(), - resolvedSchema, - sourceTable.tableSource(), - tableConfig)) { - // check if requireDeterminism contains metadata column - List<Column.MetadataColumn> metadataColumns = - DynamicSourceUtils.extractMetadataColumns( - sourceTable.contextResolvedTable().getResolvedSchema()); - Set<String> metaColumnSet = - metadataColumns.stream() - .map(Column::getName) - .collect(Collectors.toSet()); - List<String> columns = tableScan.getRowType().getFieldNames(); - List<String> metadataCauseErr = new ArrayList<>(); - for (int index = 0; index < columns.size(); index++) { - String column = columns.get(index); - if (metaColumnSet.contains(column) && requireDeterminism.get(index)) { - metadataCauseErr.add(column); - } - } - if (!metadataCauseErr.isEmpty()) { - StringBuilder errorMsg = new StringBuilder(); - errorMsg.append("The metadata column(s): '") - .append( - String.join( - ", ", metadataCauseErr.toArray(new String[0]))) - .append("' in cdc source may cause wrong result or error on") - .append(" downstream operators, please consider removing these") - .append(" columns or use a non-cdc source that only has insert") - .append(" messages.\nsource node:\n") - .append( - FlinkRelOptUtil.toString( - tableScan, - SqlExplainLevel.DIGEST_ATTRIBUTES, - false, - true, - false, - true, - false)); - throw new TableException(errorMsg.toString()); - } - } - } - } - return rel; + return visitTableSourceScan((StreamPhysicalTableSourceScan) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalLegacyTableSourceScan || rel instanceof StreamPhysicalDataStreamScan || rel instanceof StreamPhysicalValues) { // not cdc source, end visit return rel; } else if (rel instanceof StreamPhysicalGroupAggregateBase) { - // output row type = grouping keys + aggCalls - StreamPhysicalGroupAggregateBase groupAgg = (StreamPhysicalGroupAggregateBase) rel; - if (inputInsertOnly(groupAgg)) { - // no further requirement to input, only check if it can satisfy the - // requiredDeterminism - if (!requireDeterminism.isEmpty()) { - checkUnsatisfiedDeterminism( - requireDeterminism, - groupAgg.grouping().length, - // TODO remove this conversion when scala-free was total done. - scala.collection.JavaConverters.seqAsJavaList(groupAgg.aggCalls()), - groupAgg.getRowType(), - groupAgg); - } - return transmitDeterminismRequirement(groupAgg, NO_REQUIRED_DETERMINISM); - } else { - // agg works under retract mode if input is not insert only, and requires all input - // columns be deterministic - return transmitDeterminismRequirement( - groupAgg, - ImmutableBitSet.range(groupAgg.getInput().getRowType().getFieldCount())); - } + return visitGroupAggregate((StreamPhysicalGroupAggregateBase) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalWindowAggregateBase) { - // output row type = grouping keys + aggCalls + windowProperties - // same logic with 'groupAgg' but they have no common parent - StreamPhysicalWindowAggregateBase windowAgg = (StreamPhysicalWindowAggregateBase) rel; - if (inputInsertOnly(windowAgg)) { - // no further requirement to input, only check if it can satisfy the - // requiredDeterminism - if (!requireDeterminism.isEmpty()) { - checkUnsatisfiedDeterminism( - requireDeterminism, - windowAgg.grouping().length, - // TODO remove this conversion when scala-free was total done. - scala.collection.JavaConverters.seqAsJavaList(windowAgg.aggCalls()), - windowAgg.getRowType(), - windowAgg); - } - return transmitDeterminismRequirement(windowAgg, NO_REQUIRED_DETERMINISM); - } else { - // agg works under retract mode if input is not insert only, and requires all input - // columns be deterministic - return transmitDeterminismRequirement( - windowAgg, - ImmutableBitSet.range(windowAgg.getInput().getRowType().getFieldCount())); - } + return visitWindowAggregate( + (StreamPhysicalWindowAggregateBase) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalExpand) { - // Expand is an internal operator only for plan rewriting currently, so only remove the - // expandIdIndex from requireDeterminism. We also skip checking if input has updates due - // to this is a non-stateful node which never changes the changelog mode. - StreamPhysicalExpand expand = (StreamPhysicalExpand) rel; - return transmitDeterminismRequirement( - expand, requireDeterminism.except(ImmutableBitSet.of(expand.expandIdIndex()))); + return visitExpand((StreamPhysicalExpand) rel, requireDeterminism); } else if (rel instanceof CommonPhysicalJoin) { - // output row type = left row type + right row type - CommonPhysicalJoin join = (CommonPhysicalJoin) rel; - StreamPhysicalRel leftRel = (StreamPhysicalRel) join.getLeft(); - StreamPhysicalRel rightRel = (StreamPhysicalRel) join.getRight(); - boolean leftInputHasUpdate = !inputInsertOnly(leftRel); - boolean rightInputHasUpdate = !inputInsertOnly(rightRel); - boolean innerOrSemi = - join.joinSpec().getJoinType() == FlinkJoinType.INNER - || join.joinSpec().getJoinType() == FlinkJoinType.SEMI; - /** - * we do not distinguish the time attribute condition in interval/temporal join from - * regular/window join here because: rowtime field always from source, proctime is not - * limited (from source), when proctime appended to an update row without upsertKey then - * result may goes wrong, in such a case proctime( was materialized as - * PROCTIME_MATERIALIZE(PROCTIME())) is equal to a normal dynamic temporal function and - * will be validated in calc node. - */ - Optional<String> ndCall = FlinkRexUtil.getNonDeterministicCallName(join.getCondition()); - if ((leftInputHasUpdate || rightInputHasUpdate || !innerOrSemi) && ndCall.isPresent()) { - // when output has update, the join condition cannot be non-deterministic: - // 1. input has update -> output has update - // 2. input insert only and is not innerOrSemi join -> output has update - throwNonDeterministicConditionError( - ndCall.get(), join.getCondition(), (StreamPhysicalRel) join); - } - int leftFieldCnt = leftRel.getRowType().getFieldCount(); - StreamPhysicalRel newLeft = - visitJoinChild( - requireDeterminism, - leftRel, - leftInputHasUpdate, - leftFieldCnt, - true, - join.joinSpec().getLeftKeys(), - // TODO remove this conversion when scala-free was total done. - scala.collection.JavaConverters.seqAsJavaList( - join.getUpsertKeys(leftRel, join.joinSpec().getLeftKeys()))); - StreamPhysicalRel newRight = - visitJoinChild( - requireDeterminism, - rightRel, - rightInputHasUpdate, - leftFieldCnt, - false, - join.joinSpec().getRightKeys(), - // TODO remove this conversion when scala-free was total done. - scala.collection.JavaConverters.seqAsJavaList( - join.getUpsertKeys(rightRel, join.joinSpec().getRightKeys()))); - - return (StreamPhysicalRel) - join.copy( - join.getTraitSet(), - join.getCondition(), - newLeft, - newRight, - join.getJoinType(), - join.isSemiJoin()); - + return visitJoin((CommonPhysicalJoin) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalOverAggregateBase) { - // output row type = input row type + overAgg outputs - StreamPhysicalOverAggregateBase overAgg = ((StreamPhysicalOverAggregateBase) rel); - if (inputInsertOnly(overAgg)) { - // no further requirement to input, only check if the agg outputs can satisfy the - // requiredDeterminism - if (!requireDeterminism.isEmpty()) { - int inputFieldCnt = overAgg.getInput().getRowType().getFieldCount(); - OverSpec overSpec = OverAggregateUtil.createOverSpec(overAgg.logicWindow()); - // add aggCall's input - int aggOutputIndex = inputFieldCnt; - for (OverSpec.GroupSpec groupSpec : overSpec.getGroups()) { - checkUnsatisfiedDeterminism( - requireDeterminism, - aggOutputIndex, - groupSpec.getAggCalls(), - overAgg.getRowType(), - overAgg); - aggOutputIndex += groupSpec.getAggCalls().size(); - } - } - return transmitDeterminismRequirement(overAgg, NO_REQUIRED_DETERMINISM); - } else { - // OverAgg does not support input with updates currently, so this branch will not be - // reached for now. - - // We should append partition keys and order key to requireDeterminism - return transmitDeterminismRequirement( - overAgg, mappingRequireDeterminismToInput(requireDeterminism, overAgg)); - } + return visitOverAggregate((StreamPhysicalOverAggregateBase) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalRank) { - // if outputRankNumber: output row type = input row type + rank number type - // else keeps the same as input - StreamPhysicalRank rank = (StreamPhysicalRank) rel; - if (inputInsertOnly(rank)) { - // rank output is deterministic when input is insert only, so required determinism - // always be satisfied here. - return transmitDeterminismRequirement(rank, NO_REQUIRED_DETERMINISM); - } else { - int inputFieldCnt = rank.getInput().getRowType().getFieldCount(); - if (rank.rankStrategy() instanceof RankProcessStrategy.UpdateFastStrategy) { - // in update fast mode, pass required determinism excludes partition keys and - // order key - ImmutableBitSet.Builder bitSetBuilder = ImmutableBitSet.builder(); - rank.partitionKey().toList().forEach(bitSetBuilder::set); - rank.orderKey().getKeys().toIntegerList().forEach(bitSetBuilder::set); - if (rank.outputRankNumber()) { - // exclude last column - bitSetBuilder.set(inputFieldCnt); - } - return transmitDeterminismRequirement( - rank, requireDeterminism.except(bitSetBuilder.build())); - } else if (rank.rankStrategy() instanceof RankProcessStrategy.RetractStrategy) { - // in retract mode then require all input columns be deterministic - return transmitDeterminismRequirement( - rank, ImmutableBitSet.range(inputFieldCnt)); - } else { - // AppendFastStrategy only applicable for insert only input, so the undefined - // strategy is not as expected here. - throw new TableException( - String.format( - "Can not infer the determinism for unsupported rank strategy: %s, this is a bug, please file an issue.", - rank.rankStrategy())); - } - } + return visitRank((StreamPhysicalRank) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalDeduplicate) { - // output row type same as input and does not change output columns' order - StreamPhysicalDeduplicate dedup = (StreamPhysicalDeduplicate) rel; - if (inputInsertOnly(dedup)) { - // similar to rank, output is deterministic when input is insert only, so required - // determinism always be satisfied here. - return transmitDeterminismRequirement(dedup, NO_REQUIRED_DETERMINISM); - } else { - // Deduplicate always has unique key currently(exec node has null check and inner - // state only support data with keys), so only pass the left columns of required - // determinism to input. - return transmitDeterminismRequirement( - dedup, - requireDeterminism.except(ImmutableBitSet.of(dedup.getUniqueKeys()))); - } + return visitDeduplicate((StreamPhysicalDeduplicate) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalWindowDeduplicate) { - // output row type same as input and does not change output columns' order - StreamPhysicalWindowDeduplicate winDedup = (StreamPhysicalWindowDeduplicate) rel; - if (inputInsertOnly(winDedup)) { - // similar to rank, output is deterministic when input is insert only, so required - // determinism always be satisfied here. - return transmitDeterminismRequirement(winDedup, NO_REQUIRED_DETERMINISM); - } else { - // WindowDeduplicate does not support input with updates currently, so this branch - // will not be reached for now. - - // only append partition keys, no need to process order key because it always comes - // from window - return transmitDeterminismRequirement( - winDedup, - requireDeterminism - .clear(winDedup.orderKey()) - .union(ImmutableBitSet.of(winDedup.partitionKeys()))); - } + return visitWindowDeduplicate( + (StreamPhysicalWindowDeduplicate) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalWindowRank) { - StreamPhysicalWindowRank winRank = (StreamPhysicalWindowRank) rel; - if (inputInsertOnly(winRank)) { - // similar to rank, output is deterministic when input is insert only, so required - // determinism always be satisfied here. - return transmitDeterminismRequirement(winRank, NO_REQUIRED_DETERMINISM); - } else { - // WindowRank does not support input with updates currently, so this branch will not - // be reached for now. - - // only append partition keys, no need to process order key because it always comes - // from window - int inputFieldCnt = winRank.getInput().getRowType().getFieldCount(); - return transmitDeterminismRequirement( - winRank, - requireDeterminism - .intersect(ImmutableBitSet.range(inputFieldCnt)) - .union(winRank.partitionKey())); - } + return visitWindowRank((StreamPhysicalWindowRank) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalWindowTableFunction) { - // output row type = input row type + window attributes - StreamPhysicalWindowTableFunction winTVF = (StreamPhysicalWindowTableFunction) rel; - if (inputInsertOnly(winTVF)) { - return transmitDeterminismRequirement(winTVF, NO_REQUIRED_DETERMINISM); - } else { - // pass the left columns of required determinism to input exclude window attributes - return transmitDeterminismRequirement( - winTVF, - requireDeterminism.intersect( - ImmutableBitSet.range( - winTVF.getInput().getRowType().getFieldCount()))); - } + return visitWindowTableFunction( + (StreamPhysicalWindowTableFunction) rel, requireDeterminism); } else if (rel instanceof StreamPhysicalChangelogNormalize || rel instanceof StreamPhysicalDropUpdateBefore || rel instanceof StreamPhysicalMiniBatchAssigner @@ -680,18 +207,7 @@ public class StreamNonDeterministicUpdatePlanVisitor { // transit requireDeterminism transparently return transmitDeterminismRequirement(rel, requireDeterminism); } else if (rel instanceof StreamPhysicalMatch) { - StreamPhysicalMatch match = (StreamPhysicalMatch) rel; - if (inputInsertOnly(match)) { - // similar to over aggregate, output is insert only when input is insert only, so - // required determinism always be satisfied here. - return transmitDeterminismRequirement(match, NO_REQUIRED_DETERMINISM); - } else { - // The DEFINE and MEASURES clauses in match-recognize have similar meanings to the - // WHERE and SELECT clauses in SQL query, we should analyze and transmit the - // determinism requirement via the RexNodes in these two clauses. - throw new UnsupportedOperationException( - "Unsupported to resolve non-deterministic issue in match-recognize when input has updates."); - } + return visitMatch((StreamPhysicalMatch) rel, requireDeterminism); } else { throw new UnsupportedOperationException( String.format( @@ -700,7 +216,556 @@ public class StreamNonDeterministicUpdatePlanVisitor { } } - // helper methods + // ======== helper methods ========== + + /** + * The pass-in requireDeterminism will always be NO_REQUIRED_DETERMINISM. + * + * @param requireDeterminism no requirement passed to sink node + */ + private StreamPhysicalRel visitSink( + final StreamPhysicalSink sink, final ImmutableBitSet requireDeterminism) { + if (inputInsertOnly(sink)) { + // for append stream, not care about NDU + return transmitDeterminismRequirement(sink, NO_REQUIRED_DETERMINISM); + } else { + // for update streaming, when + // 1. sink with pk: + // upsert sink, update by pk, ideally pk == input.upsertKey, + // (otherwise upsertMaterialize will handle it) + + // 1.1 input.upsertKey nonEmpty -> not care about NDU + // 1.2 input.upsertKey isEmpty -> retract by complete row, must not contain NDU + + // once sink's requirement on pk was satisfied, no further request will be transited + // only when new requirement generated at stateful node which input has update + // (e.g., grouping keys) + + // 2. sink without pk: + // retract sink, retract by complete row (all input columns should be deterministic) + // whether input.upsertKey is empty or not, must not contain NDU + int[] primaryKey = + sink.contextResolvedTable().getResolvedSchema().getPrimaryKeyIndexes(); + ImmutableBitSet requireInputDeterminism; + if (sink.upsertMaterialize() || null == primaryKey || primaryKey.length == 0) { + // SinkUpsertMaterializer only support no upsertKey mode, it says all input + // columns should be deterministic (same as no primary key defined on sink) + // TODO should optimize it after SinkUpsertMaterializer support upsertKey + // FLINK-28569. + requireInputDeterminism = + ImmutableBitSet.range(sink.getInput().getRowType().getFieldCount()); + } else { + requireInputDeterminism = ImmutableBitSet.of(primaryKey); + } + return transmitDeterminismRequirement(sink, requireInputDeterminism); + } + } + + private StreamPhysicalRel visitLegacySink( + final StreamPhysicalLegacySink<?> sink, final ImmutableBitSet requireDeterminism) { + if (inputInsertOnly(sink)) { + // for append stream, not care about NDU + return transmitDeterminismRequirement(sink, NO_REQUIRED_DETERMINISM); + } else { + TableSchema tableSchema = sink.sink().getTableSchema(); + Optional<UniqueConstraint> primaryKey = tableSchema.getPrimaryKey(); + List<String> columns = Arrays.asList(tableSchema.getFieldNames()); + // SinkUpsertMaterializer does not support legacy sink + ImmutableBitSet requireInputDeterminism; + if (primaryKey.isPresent()) { + requireInputDeterminism = + ImmutableBitSet.of( + primaryKey.get().getColumns().stream() + .map(columns::indexOf) + .collect(Collectors.toList())); + } else { + requireInputDeterminism = ImmutableBitSet.range(columns.size()); + } + return transmitDeterminismRequirement(sink, requireInputDeterminism); + } + } + + private StreamPhysicalRel visitCalc( + final StreamPhysicalCalcBase calc, final ImmutableBitSet requireDeterminism) { + if (inputInsertOnly(calc) || requireDeterminism.isEmpty()) { + // for append stream, not care about NDU + return transmitDeterminismRequirement(calc, NO_REQUIRED_DETERMINISM); + } else { + // if input has updates, any non-deterministic conditions are not acceptable, also + // requireDeterminism should be satisfied. + checkNonDeterministicRexProgram(requireDeterminism, calc.getProgram(), calc); + + // evaluate required determinism from input + List<RexNode> projects = + calc.getProgram().getProjectList().stream() + .map(expr -> calc.getProgram().expandLocalRef(expr)) + .collect(Collectors.toList()); + Map<Integer, List<Integer>> outFromSourcePos = extractSourceMapping(projects); + List<Integer> conv2Inputs = + requireDeterminism.toList().stream() + .map( + out -> + Optional.ofNullable(outFromSourcePos.get(out)) + .orElseThrow( + () -> + new TableException( + String.format( + "Invalid pos:%d over projection:%s", + out, + calc + .getProgram())))) + .flatMap(Collection::stream) + .filter(index -> index != -1) + .distinct() + .collect(Collectors.toList()); + + return transmitDeterminismRequirement(calc, ImmutableBitSet.of(conv2Inputs)); + } + } + + private StreamPhysicalRel visitCorrelate( + final StreamPhysicalCorrelateBase correlate, final ImmutableBitSet requireDeterminism) { + if (inputInsertOnly(correlate) || requireDeterminism.isEmpty()) { + return transmitDeterminismRequirement(correlate, NO_REQUIRED_DETERMINISM); + } else { + // check if non-deterministic condition (may exist after FLINK-7865 was fixed). + if (correlate.condition().isDefined()) { + RexNode rexNode = correlate.condition().get(); + checkNonDeterministicCondition(rexNode, correlate); + } + // check if it is a non-deterministic function + int leftFieldCnt = correlate.inputRel().getRowType().getFieldCount(); + Optional<String> ndCall = + FlinkRexUtil.getNonDeterministicCallName(correlate.scan().getCall()); + if (ndCall.isPresent()) { + // all columns from table function scan cannot satisfy the required determinism + List<Integer> unsatisfiedColumns = + requireDeterminism.toList().stream() + .filter(index -> index >= leftFieldCnt) + .collect(Collectors.toList()); + if (!unsatisfiedColumns.isEmpty()) { + throwNonDeterministicColumnsError( + unsatisfiedColumns, correlate.getRowType(), correlate, null, ndCall); + } + } + // evaluate required determinism from input + List<Integer> fromLeft = + requireDeterminism.toList().stream() + .filter(index -> index < leftFieldCnt) + .collect(Collectors.toList()); + if (fromLeft.isEmpty()) { + return transmitDeterminismRequirement(correlate, NO_REQUIRED_DETERMINISM); + } + return transmitDeterminismRequirement(correlate, ImmutableBitSet.of(fromLeft)); + } + } + + /** + * If input has updates, the lookup join may produce non-deterministic result itself due to + * backed lookup source which data may change over time, we can try to eliminate this + * non-determinism by adding materialization to the join operator, but still exists + * non-determinism we cannot solve: 1. join condition 2. the inner calc in lookJoin. + */ + private StreamPhysicalRel visitLookupJoin( + final StreamPhysicalLookupJoin lookupJoin, final ImmutableBitSet requireDeterminism) { + if (inputInsertOnly(lookupJoin) || requireDeterminism.isEmpty()) { + return transmitDeterminismRequirement(lookupJoin, NO_REQUIRED_DETERMINISM); + } else { + // required determinism cannot be satisfied even upsert materialize was enabled if: + // 1. remaining join condition contains non-deterministic call + JavaScalaConversionUtil.toJava(lookupJoin.finalPreFilterCondition()) + .ifPresent(cond -> checkNonDeterministicCondition(cond, lookupJoin)); + JavaScalaConversionUtil.toJava(lookupJoin.finalRemainingCondition()) + .ifPresent(cond -> checkNonDeterministicCondition(cond, lookupJoin)); + + // 2. inner calc in lookJoin contains either non-deterministic condition or calls + JavaScalaConversionUtil.toJava(lookupJoin.calcOnTemporalTable()) + .ifPresent( + calc -> + checkNonDeterministicRexProgram( + requireDeterminism, calc, lookupJoin)); + + // Try to resolve non-determinism by adding materialization which can eliminate + // non-determinism produced by lookup join via an evolving source. + int leftFieldCnt = lookupJoin.getInput().getRowType().getFieldCount(); + List<Integer> requireRight = + requireDeterminism.toList().stream() + .filter(index -> index >= leftFieldCnt) + .collect(Collectors.toList()); + boolean omitUpsertMaterialize = false; + // two optimizations: 1. no fields from lookup source was required 2. lookup key + // contains pk and no requirement on other fields we can omit materialization, + // otherwise upsert materialize can not be omitted. + if (requireRight.isEmpty()) { + omitUpsertMaterialize = true; + } else { + int[] outputPkIdx = lookupJoin.getOutputIndexesOfTemporalTablePrimaryKey(); + ImmutableBitSet outputPkBitSet = ImmutableBitSet.of(outputPkIdx); + // outputPkIdx need to used so not using #lookupKeyContainsPrimaryKey directly. + omitUpsertMaterialize = + Arrays.stream(outputPkIdx) + .allMatch( + index -> lookupJoin.allLookupKeys().contains(index)) + && requireRight.stream().allMatch(outputPkBitSet::get); + } + List<Integer> requireLeft = + requireDeterminism.toList().stream() + .filter(index -> index < leftFieldCnt) + .collect(Collectors.toList()); + + if (omitUpsertMaterialize) { + return transmitDeterminismRequirement(lookupJoin, ImmutableBitSet.of(requireLeft)); + } else { + // enable materialize for lookup join + return transmitDeterminismRequirement( + lookupJoin.copy(true), ImmutableBitSet.of(requireLeft)); + } + } + } + + private StreamPhysicalRel visitTableSourceScan( + final StreamPhysicalTableSourceScan tableScan, + final ImmutableBitSet requireDeterminism) { + // tableScan has no input, so only check metadata from cdc source + if (!requireDeterminism.isEmpty()) { + boolean insertOnly = + tableScan.tableSource().getChangelogMode().containsOnly(RowKind.INSERT); + boolean supportsReadingMetadata = + tableScan.tableSource() instanceof SupportsReadingMetadata; + if (!insertOnly && supportsReadingMetadata) { + TableSourceTable sourceTable = tableScan.getTable().unwrap(TableSourceTable.class); + TableConfig tableConfig = + ShortcutUtils.unwrapContext(tableScan.getCluster()).getTableConfig(); + assert sourceTable != null; + ResolvedSchema resolvedSchema = + sourceTable.contextResolvedTable().getResolvedSchema(); + // check if changelogNormalize is enabled for the source, if yes, the metadata + // columns are deterministic + if (!DynamicSourceUtils.changelogNormalizeEnabled( + tableScan.eventTimeSnapshotRequired(), + resolvedSchema, + sourceTable.tableSource(), + tableConfig)) { + // check if requireDeterminism contains metadata column + List<Column.MetadataColumn> metadataColumns = + DynamicSourceUtils.extractMetadataColumns( + sourceTable.contextResolvedTable().getResolvedSchema()); + Set<String> metaColumnSet = + metadataColumns.stream() + .map(Column::getName) + .collect(Collectors.toSet()); + List<String> columns = tableScan.getRowType().getFieldNames(); + List<String> metadataCauseErr = new ArrayList<>(); + for (int index = 0; index < columns.size(); index++) { + String column = columns.get(index); + if (metaColumnSet.contains(column) && requireDeterminism.get(index)) { + metadataCauseErr.add(column); + } + } + if (!metadataCauseErr.isEmpty()) { + String errorMsg = + "The metadata column(s): '" + + String.join(", ", metadataCauseErr.toArray(new String[0])) + + "' in cdc source may cause wrong result or error on" + + " downstream operators, please consider removing these" + + " columns or use a non-cdc source that only has insert" + + " messages.\nsource node:\n" + + FlinkRelOptUtil.toString( + tableScan, + SqlExplainLevel.DIGEST_ATTRIBUTES, + false, + true, + false, + true, + false); + throw new TableException(errorMsg); + } + } + } + } + return tableScan; + } + + private StreamPhysicalRel visitGroupAggregate( + final StreamPhysicalGroupAggregateBase groupAgg, + final ImmutableBitSet requireDeterminism) { + // output row type = grouping keys + aggCalls + if (inputInsertOnly(groupAgg)) { + // no further requirement to input, only check if it can satisfy the + // requiredDeterminism + if (!requireDeterminism.isEmpty()) { + checkUnsatisfiedDeterminism( + requireDeterminism, + groupAgg.grouping().length, + // TODO remove this conversion when scala-free was total done. + scala.collection.JavaConverters.seqAsJavaList(groupAgg.aggCalls()), + groupAgg.getRowType(), + groupAgg); + } + return transmitDeterminismRequirement(groupAgg, NO_REQUIRED_DETERMINISM); + } else { + // agg works under retract mode if input is not insert only, and requires all input + // columns be deterministic + return transmitDeterminismRequirement( + groupAgg, + ImmutableBitSet.range(groupAgg.getInput().getRowType().getFieldCount())); + } + } + + private StreamPhysicalRel visitWindowAggregate( + final StreamPhysicalWindowAggregateBase windowAgg, + final ImmutableBitSet requireDeterminism) { + // output row type = grouping keys + aggCalls + windowProperties + // same logic with 'groupAgg' but they have no common parent + if (inputInsertOnly(windowAgg)) { + // no further requirement to input, only check if it can satisfy the + // requiredDeterminism + if (!requireDeterminism.isEmpty()) { + checkUnsatisfiedDeterminism( + requireDeterminism, + windowAgg.grouping().length, + // TODO remove this conversion when scala-free was total done. + scala.collection.JavaConverters.seqAsJavaList(windowAgg.aggCalls()), + windowAgg.getRowType(), + windowAgg); + } + return transmitDeterminismRequirement(windowAgg, NO_REQUIRED_DETERMINISM); + } else { + // agg works under retract mode if input is not insert only, and requires all input + // columns be deterministic + return transmitDeterminismRequirement( + windowAgg, + ImmutableBitSet.range(windowAgg.getInput().getRowType().getFieldCount())); + } + } + + private StreamPhysicalRel visitExpand( + final StreamPhysicalExpand expand, final ImmutableBitSet requireDeterminism) { + // Expand is an internal operator only for plan rewriting currently, so only remove the + // expandIdIndex from requireDeterminism. We also skip checking if input has updates due + // to this is a non-stateful node which never changes the changelog mode. + return transmitDeterminismRequirement( + expand, requireDeterminism.except(ImmutableBitSet.of(expand.expandIdIndex()))); + } + + /** + * We do not distinguish the time attribute condition in interval/temporal join from + * regular/window join here because: rowtime field always from source, proctime is not limited + * (from source), when proctime appended to an update row without upsertKey then result may goes + * wrong, in such a case proctime( was materialized as PROCTIME_MATERIALIZE(PROCTIME())) is + * equal to a normal dynamic temporal function and will be validated in calc node. + */ + private StreamPhysicalRel visitJoin( + final CommonPhysicalJoin join, final ImmutableBitSet requireDeterminism) { + // output row type = left row type + right row type + StreamPhysicalRel leftRel = (StreamPhysicalRel) join.getLeft(); + StreamPhysicalRel rightRel = (StreamPhysicalRel) join.getRight(); + boolean leftInputHasUpdate = !inputInsertOnly(leftRel); + boolean rightInputHasUpdate = !inputInsertOnly(rightRel); + boolean innerOrSemi = + join.joinSpec().getJoinType() == FlinkJoinType.INNER + || join.joinSpec().getJoinType() == FlinkJoinType.SEMI; + + Optional<String> ndCall = FlinkRexUtil.getNonDeterministicCallName(join.getCondition()); + if ((leftInputHasUpdate || rightInputHasUpdate || !innerOrSemi) && ndCall.isPresent()) { + // when output has update, the join condition cannot be non-deterministic: + // 1. input has update -> output has update + // 2. input insert only and is not innerOrSemi join -> output has update + throwNonDeterministicConditionError( + ndCall.get(), join.getCondition(), (StreamPhysicalRel) join); + } + int leftFieldCnt = leftRel.getRowType().getFieldCount(); + StreamPhysicalRel newLeft = + visitJoinChild( + requireDeterminism, + leftRel, + leftInputHasUpdate, + leftFieldCnt, + true, + join.joinSpec().getLeftKeys(), + // TODO remove this conversion when scala-free was total done. + scala.collection.JavaConverters.seqAsJavaList( + join.getUpsertKeys(leftRel, join.joinSpec().getLeftKeys()))); + StreamPhysicalRel newRight = + visitJoinChild( + requireDeterminism, + rightRel, + rightInputHasUpdate, + leftFieldCnt, + false, + join.joinSpec().getRightKeys(), + // TODO remove this conversion when scala-free was total done. + scala.collection.JavaConverters.seqAsJavaList( + join.getUpsertKeys(rightRel, join.joinSpec().getRightKeys()))); + + return (StreamPhysicalRel) + join.copy( + join.getTraitSet(), + join.getCondition(), + newLeft, + newRight, + join.getJoinType(), + join.isSemiJoin()); + } + + private StreamPhysicalRel visitOverAggregate( + final StreamPhysicalOverAggregateBase overAgg, + final ImmutableBitSet requireDeterminism) { + // output row type = input row type + overAgg outputs + if (inputInsertOnly(overAgg)) { + // no further requirement to input, only check if the agg outputs can satisfy the + // requiredDeterminism + if (!requireDeterminism.isEmpty()) { + int inputFieldCnt = overAgg.getInput().getRowType().getFieldCount(); + OverSpec overSpec = OverAggregateUtil.createOverSpec(overAgg.logicWindow()); + // add aggCall's input + int aggOutputIndex = inputFieldCnt; + for (OverSpec.GroupSpec groupSpec : overSpec.getGroups()) { + checkUnsatisfiedDeterminism( + requireDeterminism, + aggOutputIndex, + groupSpec.getAggCalls(), + overAgg.getRowType(), + overAgg); + aggOutputIndex += groupSpec.getAggCalls().size(); + } + } + return transmitDeterminismRequirement(overAgg, NO_REQUIRED_DETERMINISM); + } else { + // OverAgg does not support input with updates currently, so this branch will not be + // reached for now. + + // We should append partition keys and order key to requireDeterminism + return transmitDeterminismRequirement( + overAgg, mappingRequireDeterminismToInput(requireDeterminism, overAgg)); + } + } + + private StreamPhysicalRel visitRank( + final StreamPhysicalRank rank, final ImmutableBitSet requireDeterminism) { + // if outputRankNumber: output row type = input row type + rank number type + // else keeps the same as input + if (inputInsertOnly(rank)) { + // rank output is deterministic when input is insert only, so required determinism + // always be satisfied here. + return transmitDeterminismRequirement(rank, NO_REQUIRED_DETERMINISM); + } else { + int inputFieldCnt = rank.getInput().getRowType().getFieldCount(); + if (rank.rankStrategy() instanceof RankProcessStrategy.UpdateFastStrategy) { + // in update fast mode, pass required determinism excludes partition keys and + // order key + ImmutableBitSet.Builder bitSetBuilder = ImmutableBitSet.builder(); + rank.partitionKey().toList().forEach(bitSetBuilder::set); + rank.orderKey().getKeys().toIntegerList().forEach(bitSetBuilder::set); + if (rank.outputRankNumber()) { + // exclude last column + bitSetBuilder.set(inputFieldCnt); + } + return transmitDeterminismRequirement( + rank, requireDeterminism.except(bitSetBuilder.build())); + } else if (rank.rankStrategy() instanceof RankProcessStrategy.RetractStrategy) { + // in retract mode then require all input columns be deterministic + return transmitDeterminismRequirement(rank, ImmutableBitSet.range(inputFieldCnt)); + } else { + // AppendFastStrategy only applicable for insert only input, so the undefined + // strategy is not as expected here. + throw new TableException( + String.format( + "Can not infer the determinism for unsupported rank strategy: %s, this is a bug, please file an issue.", + rank.rankStrategy())); + } + } + } + + private StreamPhysicalRel visitDeduplicate( + final StreamPhysicalDeduplicate dedup, final ImmutableBitSet requireDeterminism) { + // output row type same as input and does not change output columns' order + if (inputInsertOnly(dedup)) { + // similar to rank, output is deterministic when input is insert only, so required + // determinism always be satisfied here. + return transmitDeterminismRequirement(dedup, NO_REQUIRED_DETERMINISM); + } else { + // Deduplicate always has unique key currently(exec node has null check and inner + // state only support data with keys), so only pass the left columns of required + // determinism to input. + return transmitDeterminismRequirement( + dedup, requireDeterminism.except(ImmutableBitSet.of(dedup.getUniqueKeys()))); + } + } + + private StreamPhysicalRel visitWindowDeduplicate( + final StreamPhysicalWindowDeduplicate winDedup, + final ImmutableBitSet requireDeterminism) { + // output row type same as input and does not change output columns' order + if (inputInsertOnly(winDedup)) { + // similar to rank, output is deterministic when input is insert only, so required + // determinism always be satisfied here. + return transmitDeterminismRequirement(winDedup, NO_REQUIRED_DETERMINISM); + } else { + // WindowDeduplicate does not support input with updates currently, so this branch + // will not be reached for now. + + // only append partition keys, no need to process order key because it always comes + // from window + return transmitDeterminismRequirement( + winDedup, + requireDeterminism + .clear(winDedup.orderKey()) + .union(ImmutableBitSet.of(winDedup.partitionKeys()))); + } + } + + private StreamPhysicalRel visitWindowRank( + final StreamPhysicalWindowRank winRank, final ImmutableBitSet requireDeterminism) { + if (inputInsertOnly(winRank)) { + // similar to rank, output is deterministic when input is insert only, so required + // determinism always be satisfied here. + return transmitDeterminismRequirement(winRank, NO_REQUIRED_DETERMINISM); + } else { + // WindowRank does not support input with updates currently, so this branch will not + // be reached for now. + + // only append partition keys, no need to process order key because it always comes + // from window + int inputFieldCnt = winRank.getInput().getRowType().getFieldCount(); + return transmitDeterminismRequirement( + winRank, + requireDeterminism + .intersect(ImmutableBitSet.range(inputFieldCnt)) + .union(winRank.partitionKey())); + } + } + + private StreamPhysicalRel visitWindowTableFunction( + final StreamPhysicalWindowTableFunction winTVF, + final ImmutableBitSet requireDeterminism) { + // output row type = input row type + window attributes + if (inputInsertOnly(winTVF)) { + return transmitDeterminismRequirement(winTVF, NO_REQUIRED_DETERMINISM); + } else { + // pass the left columns of required determinism to input exclude window attributes + return transmitDeterminismRequirement( + winTVF, + requireDeterminism.intersect( + ImmutableBitSet.range(winTVF.getInput().getRowType().getFieldCount()))); + } + } + + private StreamPhysicalRel visitMatch( + final StreamPhysicalMatch match, final ImmutableBitSet requireDeterminism) { + if (inputInsertOnly(match)) { + // similar to over aggregate, output is insert only when input is insert only, so + // required determinism always be satisfied here. + return transmitDeterminismRequirement(match, NO_REQUIRED_DETERMINISM); + } else { + // The DEFINE and MEASURES clauses in match-recognize have similar meanings to the + // WHERE and SELECT clauses in SQL query, we should analyze and transmit the + // determinism requirement via the RexNodes in these two clauses. + throw new UnsupportedOperationException( + "Unsupported to resolve non-deterministic issue in match-recognize when input has updates."); + } + } + private boolean inputInsertOnly(final StreamPhysicalRel rel) { return ChangelogPlanUtils.inputInsertOnly(rel); } @@ -859,21 +924,19 @@ public class StreamNonDeterministicUpdatePlanVisitor { private void throwNonDeterministicConditionError( final String ndCall, final RexNode condition, final StreamPhysicalRel relatedRel) throws TableException { - StringBuilder errorMsg = new StringBuilder(); - errorMsg.append( - String.format(NON_DETERMINISTIC_CONDITION_ERROR_MSG_TEMPLATE, ndCall, condition)); - errorMsg.append("\nrelated rel plan:\n") - .append( - FlinkRelOptUtil.toString( + String errorMsg = + String.format(NON_DETERMINISTIC_CONDITION_ERROR_MSG_TEMPLATE, ndCall, condition) + + "\nrelated rel plan:\n" + + FlinkRelOptUtil.toString( relatedRel, SqlExplainLevel.DIGEST_ATTRIBUTES, false, true, false, true, - false)); + false); - throw new TableException(errorMsg.toString()); + throw new TableException(errorMsg); } private void throwNonDeterministicColumnsError(
