jubins opened a new pull request, #265:
URL: https://github.com/apache/flink-connector-kafka/pull/265

   ## What is the purpose of the change
   
   Fixes [FLINK-39837](https://issues.apache.org/jira/browse/FLINK-39837) — 
Dynamic Kafka sources are vulnerable to metadata service instability during 
distributed system operations like rollouts or restarts. When Kafka cluster 
metadata oscillates between versions (e.g., n → n-1 → n), the source may detect 
a partition/cluster removal at version n-1 and immediately purge it from the 
checkpoint. When version n reappears, the split is treated as brand new and 
consumption resets to EARLIEST, causing duplicate processing or data loss.
   
   This PR integrates the framework-level tombstone retention feature (from 
apache/flink#28318) into the Kafka connector's dynamic source enumerator. When 
splits are removed due to metadata changes, they are now retained for a 
configurable duration. If they reappear within the retention window, their 
consumption progress is restored.
   
   ## Brief change log
   
   - Added `REMOVED_SPLITS_RETENTION_MS` configuration option in 
`DynamicKafkaSourceOptions` (default: 0ms for backward compatibility, 
recommended: 86400000ms for 24-hour retention)
   - Extended `AssignmentStatus` enum with `TOMBSTONED` status to track 
removed-but-retained splits
   - Created `RemovedSplitInfo` class to store removed split metadata with 
removal timestamp
   - Extended `KafkaSourceEnumState` to track tombstoned splits alongside 
active splits
   - Updated `KafkaSourceEnumStateSerializer` to VERSION_4 with tombstone 
serialization:
     - `serializeV4()` — persists active splits and tombstoned splits
     - `deserializeVersion4()` — restores both active and tombstoned state
     - Full backward compatibility with VERSION_0/1/2/3 checkpoints
   - Modified `DynamicKafkaSourceEnumerator` to implement tombstone logic:
     - Record tombstones when topics/partitions are filtered out during 
metadata refresh
     - Attempt split resurrection when removed topics/partitions reappear
     - Clean up expired tombstones on checkpoint completion
   - All existing tests pass, confirming backward compatibility
   
   ## Verifying this change
   
   This change will be covered by new unit tests (to be added):
   
   - `testTombstoneCreationOnMetadataChange()` — verifies tombstones are 
created when topics are removed
   - `testSplitResurrectionWithinWindow()` — validates split progress 
restoration when topics reappear
   - `testTombstoneExpiration()` — confirms expired tombstones are cleaned up
   - `testSerializationWithTombstones()` — verifies checkpoint state with 
tombstones
   - `testBackwardCompatibilityRestoration()` — ensures new serializer can 
restore old checkpoints
   - `testRetentionDisabledByDefault()` — confirms default behavior 
(retention=0) is unchanged
   
   Existing `DynamicKafkaSourceEnumeratorTest` and 
`KafkaSourceEnumStateSerializerTest` continue to pass.
   
   ## Does this pull request potentially affect one of the following parts
   
   - **Dependencies**: no (requires Flink version with apache/flink#28318 
merged)
   - **Public API**: no — all changes are in `@Internal` classes
   - **The serializers**: yes — `KafkaSourceEnumStateSerializer` bumped to 
VERSION_4 with full backward compatibility
   - **Runtime per-record code paths**: no — tombstone operations occur only 
during metadata discovery/checkpointing
   - **Deployment or recovery**: minimal — checkpoint state size may increase 
slightly when retention is enabled and splits are removed, but tombstones are 
time-bounded and self-cleaning
   - **Kafka connector**: yes — this is the primary affected component
   
   ## Documentation
   
   - [x] Inline JavaDoc on `RemovedSplitInfo` explaining tombstone semantics
   - [x] Configuration documentation in 
`DynamicKafkaSourceOptions.REMOVED_SPLITS_RETENTION_MS`
   - [ ] User-facing docs (follow-up PR after both framework and connector 
changes are merged)
   
   ## Dependencies
   
   **This PR depends on apache/flink#28318 being merged first.** The connector 
changes rely on framework-level tombstone support in `SplitAssignmentTracker` 
and `SourceCoordinatorSerdeUtils`.
   
   ## Was generative AI tooling used to co-author this PR?
   - [x] Yes — Claude Code was used as a pair-programming assistant for 
discussing the architecture, reviewing implementation patterns, and structuring 
the tombstone retention logic. All code was written, understood, and verified 
by the author.
   
   **Generated-by:** Claude Opus 4.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to