Myracle opened a new pull request, #27554:
URL: https://github.com/apache/flink/pull/27554

   ## What is the purpose of the change
   
   This pull request introduces a new configuration option 
`table.exec.source.rowtime-null-handling` to handle null rowtime fields during 
watermark generation in Flink Table SQL. Previously, when the rowtime field was 
null, the `WatermarkAssignerOperator` would throw a `RuntimeException`, which 
could cause job failures in scenarios where null rowtime values are expected 
(e.g., CDC sources with missing timestamps). 
   
   This change provides users with three configurable strategies:
   - `FAIL` (default): Throw an exception (maintains backward compatibility)
   - `DROP`: Silently drop the record and increment a metric counter
   - `SKIP_WATERMARK`: Forward the record without advancing the watermark and 
increment a metric counter
   
   ## Brief change log
   
   - Added new configuration option `table.exec.source.rowtime-null-handling` 
in `ExecutionConfigOptions` with enum `RowtimeNullHandling`
   - Modified `WatermarkAssignerOperator` to handle null rowtime fields based 
on the configured strategy
   - Added two new metrics: `numNullRowtimeRecordsDropped` and 
`numNullRowtimeRecordsSkipped` to track null rowtime handling
   - Extended `WatermarkAssignerOperatorFactory` to pass the null handling 
configuration
   - Updated `StreamExecWatermarkAssigner` to read and apply the configuration
   - Added comprehensive unit tests for all three strategies
   - Updated documentation (both English and Chinese) for the new configuration 
option
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   This change added tests and can be verified as follows:
   
   - Added unit test `testNullRowtimeWithFailStrategy` to verify the FAIL 
strategy throws exception with helpful error message
   - Added unit test `testNullRowtimeWithDropStrategy` to verify records with 
null rowtime are dropped correctly
   - Added unit test `testNullRowtimeDropMetricCounter` to verify DROP strategy 
handles multiple null records
   - Added unit test `testNullRowtimeSkipMetricCounter` to verify 
SKIP_WATERMARK strategy forwards records without advancing watermark
   - Added unit test `testNullRowtimeWithSkipWatermarkStrategy` to verify 
watermark is not affected by null rowtime records when using SKIP_WATERMARK
   
   ## Does this pull request potentially affect one of the following parts:
   
   - Dependencies (does it add or upgrade a dependency): **no**
   - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **yes** (added new `@PublicEvolving` enum 
`RowtimeNullHandling` and configuration option)
   - The serializers: **no**
   - The runtime per-record code paths (performance sensitive): **yes** (added 
null check and switch statement in 
`WatermarkAssignerOperator.processElement()`, but impact is minimal as null 
check is a single field access)
   - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
   - The S3 file system connector: **no**
   
   ## Documentation
   
   - Does this pull request introduce a new feature? **yes**
   - If yes, how is the feature documented? **docs / JavaDocs** (Updated 
`time_attributes.md` in both English and Chinese, added configuration 
documentation in `execution_config_configuration.html`, and added JavaDoc for 
the new configuration option and enum)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to