jubins opened a new pull request, #4410: URL: https://github.com/apache/flink-cdc/pull/4410
## What is the purpose of the change Fixes FLINK-39478 — when `scan.newly-added-table.enabled=true` and a job restarts mid-snapshot of a newly-added table, the affected table can silently stop emitting events for the rest of the job's lifetime. The root cause is that `MySqlSnapshotSplitAssigner.checkpointIdToFinish` — the field that gates the `NEWLY_ADDED_ASSIGNING → NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED` transition — is transient (`@Nullable`, not in `SnapshotPendingSplitsState`), and on restore must be re-derived over two more checkpoint cycles (one `snapshotState()` to set it, one `notifyCheckpointComplete()` to act on it). If a second restart lands inside that recovery window, the field resets and the state machine stalls. The table remains in the pipeline config and the job appears healthy, but produces zero data thereafter. Reported in production on two ~100M-row tables; the non-deterministic "survived first restart, died on second" pattern is consistent with the timing race described above. This change persists `checkpointIdToFinish` into the checkpointed state so the state machine can resume in one step after restore, and adds enumerator-side metrics (`assignerStatus`, aggregate snapshot progress counters) so operators can observe whether the assigner is making progress during long newly-added-table snapshots — the lack of which made the original incident extremely hard to diagnose. The companion fix for the multi-step `BinlogSplitUpdateRequest/Ack` handshake between states 3→4 (fix direction #2 in the JIRA) is deferred to a follow-up because it touches the enumerator↔reader RPC protocol and has a larger blast radius — this PR addresses what we believe is the most likely root cause of the observed incidents, plus the observability gap. ## Brief change log - Added `@Nullable Long checkpointIdToFinish` to `SnapshotPendingSplitsState` (flink-connector-mysql-cdc), with a new 11-arg constructor. The pre-existing 10-arg constructor is preserved as a delegating overload that defaults the new field to `null`, so all existing call sites compile unchanged and no test fixture data needs to change. - `snapshotState()` in `MySqlSnapshotSplitAssigner` now sets `checkpointIdToFinish` **before** building the state object, so the value is captured in the same checkpoint instead of the next one — eliminating the 2-checkpoint recovery window after a restart. - Bumped `PendingSplitsStateSerializer` from `VERSION = 5` to `VERSION = 6`. The serialize path writes a trailing `boolean present + long value`; the deserialize path reads the trailer only when `version >= 6`. v5 payloads continue to deserialize with `checkpointIdToFinish=null`, so no savepoint migration is required. - New `MySqlSourceEnumeratorMetrics` (flink-connector-mysql-cdc) exposes the following gauges on the enumerator metric group: `assignerStatus` (status code), `assignerStatusName` (status name), `numRemainingTables`, `numRemainingSnapshotSplits`, `numAssignedSnapshotSplits`, `numFinishedSnapshotSplits`, `numAlreadyProcessedTables`. Registered from `MySqlSourceEnumerator#start`, wrapped in a try/catch so metric-registration failures can never break enumerator startup. - Added five `default int get*Count()` accessors to `MySqlSplitAssigner` (`getRemainingSplitsCount`, `getRemainingTablesCount`, `getAssignedSplitsCount`, `getFinishedSplitsCount`, `getAlreadyProcessedTablesCount`). Default returns `0`, preserving source-compatibility for any external implementor. Overridden in `MySqlSnapshotSplitAssigner` and delegated in `MySqlHybridSplitAssigner`; `MySqlBinlogSplitAssigner` uses the no-op defaults (its phase has no snapshot work to report). - Extended `PendingSplitsStateSerializerTest` with a new round-trip case carrying a non-null `checkpointIdToFinish`, plus a v5-payload backward-compat test confirming the field reads back as `null` on the old code path. ## Verifying this change This change persists one additional field and registers new metric gauges; no existing data path or split-assignment logic is altered. Verification: - New parameterized test variant in `PendingSplitsStateSerializerTest` (`getTestSnapshotPendingSplitsStateWithCheckpointIdToFinish`) covers snapshot- and hybrid-state round-trips with a non-null `checkpointIdToFinish` (42L). All 31 tests in the class pass. - New `testDeserializeV5MissingCheckpointIdToFinish` asserts that a payload deserialized under the previous version number reads `checkpointIdToFinish=null` and does not attempt to read the v6 trailer — exercises the version-gated read branch. - Existing `equals`/`hashCode` round-trip in `PendingSplitsStateSerializerTest` now also exercises the new field via state equality (the field is included in both methods). - `mvn test-compile` passes for the whole `flink-connector-mysql-cdc` module, validating that the new interface defaults don't break any existing implementor (including the test-only assigner subclasses). - `mvn spotless:check` passes on the changed module. Note: `MySqlSnapshotSplitAssignerTest` and `MySqlHybridSplitAssignerTest` were not runnable in the local environment because they spin up MySQL via testcontainers (Docker not available). They should pass unchanged in CI — none of their fixture data construction needed updating thanks to the preserved 10-arg constructor overload. A deterministic IT case that injects a restart between `snapshotState()` setting `checkpointIdToFinish` and `notifyCheckpointComplete()` acting on it — i.e., a regression test that fails on `master` and passes on this branch — is the natural next step but is outside the scope of this PR. Happy to add it as a follow-up if reviewers prefer; it requires some thought about how to deterministically schedule the restart at the vulnerable window without flaking. ## Does this pull request potentially affect one of the following parts - **Dependencies (does it add or upgrade a dependency):** no - **The public API, i.e., is any changed class annotated with `@Public(Evolving)`:** the `MySqlSplitAssigner` interface is `@Internal`, not `@PublicEvolving` — the new `default` methods are nonetheless source- and binary-compatible for any external implementor (defaults return `0`). `SnapshotPendingSplitsState` is also internal to the connector. No `@PublicEvolving` surface is touched. - **The serializers:** yes — `PendingSplitsStateSerializer` is bumped from version 5 to version 6. The new payload appends a `boolean + (optional) long`. The deserializer accepts versions 1–6; v5 payloads deserialize unchanged (with `checkpointIdToFinish=null`), so existing savepoints/checkpoints restore without migration. - **The runtime per-record code paths (performance sensitive):** no — `checkpointIdToFinish` is read/written only at checkpoint and restore. Metric gauges are lazily evaluated by Flink's reporter; their suppliers are `O(1)` (size lookups + a field read). - **Anything that affects deployment or recovery (JobManager, Checkpointing, Kubernetes/Yarn, ZooKeeper):** yes — the checkpointed enumerator state schema changes (additive, backward-compatible). Existing checkpoints/savepoints restore cleanly via the v5 read path. - **The S3 file system connector:** no ## Documentation - **Does this pull request introduce a new feature?** It introduces new enumerator metrics (`assignerStatus`, plus aggregate snapshot progress gauges) on the MySQL CDC source. The state-persistence change is a bug fix rather than a feature. - **If yes, how is the feature documented?** Inline Javadoc on `MySqlSourceEnumeratorMetrics` and on each metric-name constant documents the contract. The metric names follow the same convention as the existing `SourceEnumeratorMetrics` in `flink-cdc-base`. No standalone docs page change is required for this PR; if maintainers prefer, the MySQL connector's metrics table in the docs can be extended in a follow-up. ## Was generative AI tooling used to co-author this PR? - [x] Yes — Claude was used as a pair-programming assistant for discussing the approach, locating the fragile state-machine windows in the code, and drafting the implementation. All code was reviewed, understood, and verified by the author. Generated-by: Claude Opus 4.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
