jubins opened a new pull request, #265: URL: https://github.com/apache/flink-connector-kafka/pull/265
## What is the purpose of the change Fixes [FLINK-39837](https://issues.apache.org/jira/browse/FLINK-39837) — Dynamic Kafka sources are vulnerable to metadata service instability during distributed system operations like rollouts or restarts. When Kafka cluster metadata oscillates between versions (e.g., n → n-1 → n), the source may detect a partition/cluster removal at version n-1 and immediately purge it from the checkpoint. When version n reappears, the split is treated as brand new and consumption resets to EARLIEST, causing duplicate processing or data loss. This PR integrates the framework-level tombstone retention feature (from apache/flink#28318) into the Kafka connector's dynamic source enumerator. When splits are removed due to metadata changes, they are now retained for a configurable duration. If they reappear within the retention window, their consumption progress is restored. ## Brief change log - Added `REMOVED_SPLITS_RETENTION_MS` configuration option in `DynamicKafkaSourceOptions` (default: 0ms for backward compatibility, recommended: 86400000ms for 24-hour retention) - Extended `AssignmentStatus` enum with `TOMBSTONED` status to track removed-but-retained splits - Created `RemovedSplitInfo` class to store removed split metadata with removal timestamp - Extended `KafkaSourceEnumState` to track tombstoned splits alongside active splits - Updated `KafkaSourceEnumStateSerializer` to VERSION_4 with tombstone serialization: - `serializeV4()` — persists active splits and tombstoned splits - `deserializeVersion4()` — restores both active and tombstoned state - Full backward compatibility with VERSION_0/1/2/3 checkpoints - Modified `DynamicKafkaSourceEnumerator` to implement tombstone logic: - Record tombstones when topics/partitions are filtered out during metadata refresh - Attempt split resurrection when removed topics/partitions reappear - Clean up expired tombstones on checkpoint completion - All existing tests pass, confirming backward compatibility ## Verifying this change This change will be covered by new unit tests (to be added): - `testTombstoneCreationOnMetadataChange()` — verifies tombstones are created when topics are removed - `testSplitResurrectionWithinWindow()` — validates split progress restoration when topics reappear - `testTombstoneExpiration()` — confirms expired tombstones are cleaned up - `testSerializationWithTombstones()` — verifies checkpoint state with tombstones - `testBackwardCompatibilityRestoration()` — ensures new serializer can restore old checkpoints - `testRetentionDisabledByDefault()` — confirms default behavior (retention=0) is unchanged Existing `DynamicKafkaSourceEnumeratorTest` and `KafkaSourceEnumStateSerializerTest` continue to pass. ## Does this pull request potentially affect one of the following parts - **Dependencies**: no (requires Flink version with apache/flink#28318 merged) - **Public API**: no — all changes are in `@Internal` classes - **The serializers**: yes — `KafkaSourceEnumStateSerializer` bumped to VERSION_4 with full backward compatibility - **Runtime per-record code paths**: no — tombstone operations occur only during metadata discovery/checkpointing - **Deployment or recovery**: minimal — checkpoint state size may increase slightly when retention is enabled and splits are removed, but tombstones are time-bounded and self-cleaning - **Kafka connector**: yes — this is the primary affected component ## Documentation - [x] Inline JavaDoc on `RemovedSplitInfo` explaining tombstone semantics - [x] Configuration documentation in `DynamicKafkaSourceOptions.REMOVED_SPLITS_RETENTION_MS` - [ ] User-facing docs (follow-up PR after both framework and connector changes are merged) ## Dependencies **This PR depends on apache/flink#28318 being merged first.** The connector changes rely on framework-level tombstone support in `SplitAssignmentTracker` and `SourceCoordinatorSerdeUtils`. ## Was generative AI tooling used to co-author this PR? - [x] Yes — Claude Code was used as a pair-programming assistant for discussing the architecture, reviewing implementation patterns, and structuring the tombstone retention logic. All code was written, understood, and verified by the author. **Generated-by:** Claude Opus 4.8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
