Myracle opened a new pull request, #27554: URL: https://github.com/apache/flink/pull/27554
## What is the purpose of the change This pull request introduces a new configuration option `table.exec.source.rowtime-null-handling` to handle null rowtime fields during watermark generation in Flink Table SQL. Previously, when the rowtime field was null, the `WatermarkAssignerOperator` would throw a `RuntimeException`, which could cause job failures in scenarios where null rowtime values are expected (e.g., CDC sources with missing timestamps). This change provides users with three configurable strategies: - `FAIL` (default): Throw an exception (maintains backward compatibility) - `DROP`: Silently drop the record and increment a metric counter - `SKIP_WATERMARK`: Forward the record without advancing the watermark and increment a metric counter ## Brief change log - Added new configuration option `table.exec.source.rowtime-null-handling` in `ExecutionConfigOptions` with enum `RowtimeNullHandling` - Modified `WatermarkAssignerOperator` to handle null rowtime fields based on the configured strategy - Added two new metrics: `numNullRowtimeRecordsDropped` and `numNullRowtimeRecordsSkipped` to track null rowtime handling - Extended `WatermarkAssignerOperatorFactory` to pass the null handling configuration - Updated `StreamExecWatermarkAssigner` to read and apply the configuration - Added comprehensive unit tests for all three strategies - Updated documentation (both English and Chinese) for the new configuration option ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). This change added tests and can be verified as follows: - Added unit test `testNullRowtimeWithFailStrategy` to verify the FAIL strategy throws exception with helpful error message - Added unit test `testNullRowtimeWithDropStrategy` to verify records with null rowtime are dropped correctly - Added unit test `testNullRowtimeDropMetricCounter` to verify DROP strategy handles multiple null records - Added unit test `testNullRowtimeSkipMetricCounter` to verify SKIP_WATERMARK strategy forwards records without advancing watermark - Added unit test `testNullRowtimeWithSkipWatermarkStrategy` to verify watermark is not affected by null rowtime records when using SKIP_WATERMARK ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **yes** (added new `@PublicEvolving` enum `RowtimeNullHandling` and configuration option) - The serializers: **no** - The runtime per-record code paths (performance sensitive): **yes** (added null check and switch statement in `WatermarkAssignerOperator.processElement()`, but impact is minimal as null check is a single field access) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? **docs / JavaDocs** (Updated `time_attributes.md` in both English and Chinese, added configuration documentation in `execution_config_configuration.html`, and added JavaDoc for the new configuration option and enum) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
