gustavodemorais commented on code in PR #28061:
URL: https://github.com/apache/flink/pull/28061#discussion_r3282496400
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala:
##########
@@ -1962,6 +1996,95 @@ class
NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
.isInstanceOf[TableException]
}
+ //
---------------------------------------------------------------------------
+ // PTF changelog NDU tests
+ //
---------------------------------------------------------------------------
+
+ /** Partition key equals the source upsert key — requirement excluded, no
NDU error. */
+ @TestTemplate
Review Comment:
Can you add tests with multiple input tables to make sure we're respecting
indexes correctly when we have multiple inputs
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java:
##########
@@ -958,6 +966,76 @@ private StreamPhysicalRel visitJoinChild(
return transmitDeterminismRequirement(rel, inputRequireDeterminism);
}
+ private StreamPhysicalRel visitPtf(
+ final StreamPhysicalProcessTableFunction ptf,
+ final ImmutableBitSet requireDeterminism) {
+ final RexCall call = ptf.getCall();
+
+ // Concern 1: PTF function itself is non-deterministic and downstream
nodes
+ // require determinism. PTFs can have pass-through input columns, but
they
+ // are not considered specially for now.
+ if (!requireDeterminism.isEmpty()) {
+ final Optional<String> ndCall =
FlinkRexUtil.getNonDeterministicCallName(call);
+ if (ndCall.isPresent()) {
+ throwNonDeterministicColumnsError(
+ requireDeterminism.toList(), ptf.getRowType(), ptf,
null, ndCall);
+ }
+ }
+
+ if (inputInsertOnly(ptf)) {
+ // No retracts arrive at the PTF input, so input-column
determinism does
+ // not affect retract correctness
+ return transmitDeterminismRequirement(ptf,
NO_REQUIRED_DETERMINISM);
+ }
+
+ // Concern 2: non-deterministic input columns.
+ // The PTF is a black box: there is no way to project downstream
column requirements
+ // (requireDeterminism) back through the PTF's internal computation to
determine which
+ // input columns need to be deterministic. requireDeterminism is
therefore ignored here.
+ //
+ // Note: the physical changelog the PTF receives is not solely
determined by the
+ // REQUIRE_UPDATE_BEFORE trait. Per the SUPPORT_UPDATES contract, the
function receives
+ // {+I,+U,-D} only when the input is upserting on the same key as the
partition key;
+ // otherwise it receives full retractions {+I,-U,+U,-D}, including UB
messages, even
+ // without REQUIRE_UPDATE_BEFORE. UBs may therefore arrive regardless
of the trait.
+ //
+ // What the trait controls is which messages the PTF is contractually
allowed to
+ // consume for state management, and that is what drives the
determinism requirement:
+ // - No REQUIRE_UPDATE_BEFORE: the PTF does not consume UBs; retract
handling is
+ // keyed by partition key, so only partition key columns must be
deterministic.
+ // Any non-key columns on an incidentally-delivered UB are not
used by the PTF.
+ // - Has REQUIRE_UPDATE_BEFORE: the PTF explicitly opts in to
consuming UB to
+ // reconstruct the previously processed row; that row must match
exactly, so all
+ // input columns must be deterministic.
+ final List<Ord<StaticArgument>> providedInputArgs =
+ StreamPhysicalProcessTableFunction.getProvidedInputArgs(call);
+ final List<RexNode> operands = call.getOperands();
+
+ final List<RelNode> newInputs = new ArrayList<>();
+ for (int i = 0; i < ptf.getInputs().size(); i++) {
+ final StreamPhysicalRel input = (StreamPhysicalRel)
ptf.getInput(i);
+ final StaticArgument staticArg = providedInputArgs.get(i).e;
+ final RexTableArgCall tableArgCall =
+ (RexTableArgCall) operands.get(providedInputArgs.get(i).i);
+ final ImmutableBitSet inputReq;
+ if (staticArg.is(StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)) {
+ // The PTF consumes UB to reconstruct the previously processed
row, which must
+ // match exactly — all input columns must be deterministic.
+ inputReq =
ImmutableBitSet.range(input.getRowType().getFieldCount());
+ } else {
+ // The PTF does not consume UB; retract handling is keyed by
partition key, so
+ // only partition key columns must be deterministic.
+ inputReq =
+ ImmutableBitSet.of(
+ Arrays.stream(tableArgCall.getPartitionKeys())
+ .boxed()
+ .collect(Collectors.toList()));
Review Comment:
Does this work?
```suggestion
ImmutableBitSet.of(tableArgCall.getPartitionKeys());
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java:
##########
@@ -958,6 +966,76 @@ private StreamPhysicalRel visitJoinChild(
return transmitDeterminismRequirement(rel, inputRequireDeterminism);
}
+ private StreamPhysicalRel visitPtf(
+ final StreamPhysicalProcessTableFunction ptf,
+ final ImmutableBitSet requireDeterminism) {
+ final RexCall call = ptf.getCall();
+
+ // Concern 1: PTF function itself is non-deterministic and downstream
nodes
+ // require determinism. PTFs can have pass-through input columns, but
they
+ // are not considered specially for now.
+ if (!requireDeterminism.isEmpty()) {
+ final Optional<String> ndCall =
FlinkRexUtil.getNonDeterministicCallName(call);
+ if (ndCall.isPresent()) {
+ throwNonDeterministicColumnsError(
+ requireDeterminism.toList(), ptf.getRowType(), ptf,
null, ndCall);
+ }
+ }
+
+ if (inputInsertOnly(ptf)) {
+ // No retracts arrive at the PTF input, so input-column
determinism does
+ // not affect retract correctness
+ return transmitDeterminismRequirement(ptf,
NO_REQUIRED_DETERMINISM);
+ }
+
+ // Concern 2: non-deterministic input columns.
+ // The PTF is a black box: there is no way to project downstream
column requirements
+ // (requireDeterminism) back through the PTF's internal computation to
determine which
+ // input columns need to be deterministic. requireDeterminism is
therefore ignored here.
Review Comment:
As per the current implementation we don't pass down "requireDeterminism".
Another possibility would be to be to require determinism for all columns
upstream if any column downstream needs it, which would be quite strict.
I think it's fine to keep as is so we're not too strict but I think it makes
sense to document that cases like that basic cases like the one below are still
not covered and will generate wrong results. Wdyt? Suggestion:
```suggestion
// input columns need to be deterministic. requireDeterminism is
therefore ignored here.
//
// A stricter alternative would be to require all input columns to
be deterministic
// whenever any output column downstream requires it. That avoids
the gap described
// below but rejects legitimate queries where the PTF does not
actually consume the
// non-deterministic column, so we keep the lenient behavior.
//
// Known gap: cases like the one below are not covered today and may
produce wrong
// results on failover, since the requirement on {@code nb} at the
retract sink never
// reaches {@code ndFunc(b)} upstream of the PTF.
//
// <pre>{@code
// CREATE VIEW v AS SELECT a, ndFunc(b) AS nb FROM upsert_src;
// INSERT INTO retract_sink SELECT * FROM rowPtf(TABLE v);
-- row-semantic
// INSERT INTO retract_sink SELECT * FROM setPtf(TABLE v PARTITION
BY a); -- set-semantic
// }</pre>
```
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala:
##########
@@ -1962,6 +1996,95 @@ class
NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
.isInstanceOf[TableException]
}
+ //
---------------------------------------------------------------------------
+ // PTF changelog NDU tests
+ //
---------------------------------------------------------------------------
+
+ /** Partition key equals the source upsert key — requirement excluded, no
NDU error. */
+ @TestTemplate
Review Comment:
We also only have tests for set semantics. Could you add base tests for row
semantics?
1. Row-semantic + non-deterministic PTF + retract sink
2. Row-semantic + insert-only input + retract sink
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala:
##########
@@ -1962,6 +1996,95 @@ class
NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
.isInstanceOf[TableException]
}
+ //
---------------------------------------------------------------------------
+ // PTF changelog NDU tests
Review Comment:
The tests are really easy to read! 🙂
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]