raminqaf opened a new pull request, #28164:
URL: https://github.com/apache/flink/pull/28164

   ## What is the purpose of the change
   
   `FROM_CHANGELOG` now emits an upsert changelog (`INSERT`, `UPDATE_AFTER`, 
full `DELETE`) when the input table is partitioned (set semantics via 
`PARTITION BY`) and the active `op_mapping` maps to `UPDATE_AFTER` without 
`UPDATE_BEFORE`. The partition key acts as the upsert key. In all other cases 
the output remains a retract changelog.
   
   Submitting an `op_mapping` with `UPDATE_AFTER` but no` UPDATE_BEFORE` 
without `PARTITION BY` is rejected at validation time, because upsert mode 
requires a key.
   
   To enable the strategy to inspect the resolved op_mapping and the input 
table's partition keys, ChangelogFunction.ChangelogContext is extended with two 
default methods: getArgumentValue(int, Class) and getTableSemantics(int). 
Defaults return Optional.empty() to preserve source compatibility for existing 
implementations.
   
   The planner-side wrapper in FlinkChangelogModeInferenceProgram delegates the 
two new methods to the underlying CallContext.
   
   Upsert mode uses full deletes (ChangelogMode.upsert(false)) because the 
runtime forwards each input delete row with all fields populated; only the 
RowKind is rewritten. This matches the runtime's behavior and avoids forcing 
downstream operators to reconstruct rows from state.
   
   The upsert key derivation in FlinkRelMdUniqueKeys.getPtfUniqueKeys already 
returns the partition columns when a PTF emits upsert, so no metadata changes 
are needed.
   
   
   ## Verifying this change
   
   Changes are backed by semantic tests and plan verification tests
   
     - Added semantics tests in `FromChangelogTestProgram`
     - Added tests in `FromChangelogTest` to verify generated plan and 
upsertKeys
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs / JavaDocs )
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change the checkbox below to `[X]` followed by the name of the tool, and 
uncomment the
   "Generated-by" line. See the ASF Generative Tooling Guidance for details:
   https://www.apache.org/legal/generative-tooling.html
   
   You are responsible for the quality and correctness of every change in this 
PR
   regardless of the tooling used. Low-effort AI-generated PRs will be closed. 
See
   AGENTS.md for the full guidance.
   -->
   
   - [X] Yes (please specify the tool below)
   
   
   Generated-by: Opus 4.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to