jubins opened a new pull request, #28318:
URL: https://github.com/apache/flink/pull/28318
## What is the purpose of the change
Fixes **FLINK-39837** — Dynamic Kafka sources (and other dynamic sources)
are vulnerable to metadata service instability during distributed system
operations like rollouts or restarts. When metadata configs oscillate between
versions (e.g., `n → n-1 → n`), the source may detect a partition/cluster
removal at version `n-1` and immediately purge it from the checkpoint. When
version `n` reappears, the split is treated as brand new and consumption resets
to `EARLIEST`, causing duplicate processing or data loss.
This PR introduces **configurable tombstone-based retention** at the
framework level (`SplitAssignmentTracker`) to make dynamic sources resilient to
temporary metadata inconsistencies. When a split is removed, it can be retained
in checkpoint state for a configurable duration (e.g., 24 hours). If the split
reappears within the retention window, its progress is restored instead of
resetting to the beginning.
## Brief change log
- **Added `REMOVED_SPLITS_RETENTION_MS` configuration option** in
`SourceReaderOptions` (default: `0` for backward compatibility, recommended:
`86400000` for 24-hour retention in dynamic sources)
- **Extended `SplitAssignmentTracker`** with tombstone tracking:
- New inner class `RemovedSplitInfo<SplitT>` stores removed splits with
removal timestamp and last assigned subtask
- `markSplitRemoved(splitId, split, subtaskId)` — records split removal as
tombstone (if retention enabled)
- `tryResurrectSplit(splitId)` — attempts to restore split from tombstone
if within retention window
- `cleanupExpiredTombstones()` — automatically removes expired entries on
checkpoint completion
- **Updated checkpoint serialization** in `SourceCoordinatorSerdeUtils`:
- Bumped version to `VERSION_2` to include tombstone state
- Added `serializeTombstones()` and `deserializeTombstones()` methods
- Maintains full backward compatibility with `VERSION_0` and `VERSION_1`
checkpoints
- **Extended `SplitAssignmentTracker` snapshot/restore logic**:
- `snapshotState()` now persists both assignments and tombstones
- `restoreState()` gracefully handles old checkpoints without tombstones
(backward compatibility)
- **Added constructor overload to `SourceCoordinatorContext`** accepting
`removedSplitsRetentionMs` parameter (default constructor preserves existing
behavior)
- **Comprehensive JavaDoc** with Kafka connector usage example showing how
to integrate `markSplitRemoved()` and `tryResurrectSplit()` in partition
discovery logic
## Verifying this change
This change is covered by **8 new unit tests** in
`SplitAssignmentTrackerTest`:
- `testMarkSplitRemovedWithRetention()` — verifies tombstone creation when
retention is enabled
- `testMarkSplitRemovedWithoutRetention()` — confirms default behavior (no
tombstones) is unchanged
- `testTryResurrectSplitWithinRetention()` — validates split resurrection
within retention window
- `testTryResurrectSplitExpired()` — verifies expired tombstones return
`null` and are cleaned up
- `testTryResurrectNonExistentSplit()` — confirms graceful handling of
non-existent splits
- `testCleanupExpiredTombstones()` — validates automatic cleanup on
checkpoint completion
- `testSnapshotAndRestoreWithTombstones()` — verifies tombstones survive
checkpoint/restore cycles
- `testBackwardCompatibilityWithOldCheckpoints()` — confirms new tracker can
restore old-format checkpoints without tombstones
Existing tests in `SplitAssignmentTrackerTest` continue to pass, confirming
backward compatibility.
## Does this pull request potentially affect one of the following parts
- **Dependencies** (does it add or upgrade a dependency): **no**
- **The public API**, i.e., is any changed class annotated with
`@Public(Evolving)`: **no** — `SplitAssignmentTracker`,
`SourceCoordinatorSerdeUtils`, and `SourceCoordinatorContext` are all
`@Internal`. The new `SourceReaderOptions.REMOVED_SPLITS_RETENTION_MS` config
is `@PublicEvolving` (standard for config options).
- **The serializers**: **yes** — `SourceCoordinatorSerdeUtils` version
bumped to `VERSION_2` with full backward compatibility
- **The runtime per-record code paths** (performance sensitive): **no** —
tombstone operations occur only during split discovery/checkpointing, not
per-record processing
- **Anything that affects deployment or recovery** (JobManager,
Checkpointing, Kubernetes/Yarn, ZooKeeper): **minimal** — checkpoint state size
may increase slightly when retention is enabled and splits are removed, but
tombstones are bounded by retention duration and cleanup automatically
- **The S3 file system connector**: **no**
## Documentation
- **Does this pull request introduce a new feature?** **yes** —
framework-level tombstone retention for dynamic sources
- **If yes, how is the feature documented?**
- Inline JavaDoc on `SplitAssignmentTracker` with comprehensive usage
example for Kafka connector developers
- Config option documentation in
`SourceReaderOptions.REMOVED_SPLITS_RETENTION_MS`
- *(Follow-up PR will add user-facing docs once Kafka connector
integration is complete)*
## Was generative AI tooling used to co-author this PR?
- [x] Yes — Claude Code was used as a pair-programming assistant for
discussing the architecture, reviewing implementation patterns, and structuring
the tombstone retention logic. All code was written, understood, and verified
by the author.
**Generated-by:** Claude Opus 4.8
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]