onceMisery opened a new pull request, #10477: URL: https://github.com/apache/seatunnel/pull/10477
closes [https://github.com/apache/seatunnel/issues/10470](url) ## Purpose of this pull request When `heartbeat.interval.ms` is set to a non-zero value in MongoDB CDC connector, a `NullPointerException` is thrown during snapshot split processing. **Root Cause:** `MongodbStreamFetchTask.normalizeHeartbeatRecord()` passes through the original heartbeat record's `sourceOffset()` directly, which does not contain the `HEARTBEAT=true` flag. This causes the downstream `MongodbRecordUtils.isHeartbeatEvent()` to return `false`, and `isDataChangeRecord()` to incorrectly return `true`. The heartbeat record is then treated as a data change record and passed to `isRecordBetween()`, where `getDocumentKey()` returns `null` (heartbeat records have no `documentKey` field), leading to NPE at `documentKey.get(firstKey)`. **Call chain:** ``` IncrementalSourceScanFetcher.pollSplitRecordsIfExactlyOnce() -> isChangeRecordInChunkRange(record) -> taskContext.isDataChangeRecord(record) -> true (should be false) -> taskContext.isRecordBetween(record, ...) -> getDocumentKey(record) -> null -> documentKey.get(firstKey) -> NPE! ``` **Fix:** 1. **Root cause fix** (`MongodbStreamFetchTask.java`): In `normalizeHeartbeatRecord()`, copy the original offset and inject `HEARTBEAT=true` so that `isHeartbeatEvent()` can correctly identify heartbeat records. 2. **Defensive fix** (`MongodbFetchTaskContext.java`): In `isRecordBetween()`, add a null check for `documentKey` and return `false` if null, preventing NPE even if heartbeat identification is bypassed through other code paths. ## Does this PR introduce _any_ user-facing change? No. This is a bug fix for an existing feature. Users who set `heartbeat.interval.ms` to a non-zero value will no longer encounter NPE. ## How was this patch tested? Added unit test class `MongodbRecordUtilsHeartbeatTest` with 6 test cases: | Test Case | Description | |------------------------------------------------------------|-----------------------------------------------------------------------------------| | `testIsHeartbeatEventReturnsTrueWithFlag` | Verifies heartbeat record with `HEARTBEAT=true` in offset is correctly identified | | `testIsDataChangeRecordReturnsFalseForHeartbeat` | Verifies heartbeat record is excluded from data change processing | | `testGetDocumentKeyReturnsNullForHeartbeatRecord` | Verifies heartbeat record has no `documentKey` field | | `testIsHeartbeatEventReturnsFalseWithoutFlag` | Demonstrates old buggy behavior (heartbeat not identified without flag) | | `testIsDataChangeRecordReturnsTrueForHeartbeatWithoutFlag` | Demonstrates old buggy behavior (heartbeat misidentified as data change) | | `testNpeReproductionWithoutFlag` | Reproduces the original NPE scenario | All 6 tests pass: ``` [INFO] Running mongodb.utils.MongodbRecordUtilsHeartbeatTest [INFO] Tests run: 6, Failures: 0, Errors: 0, Skipped: 0 [INFO] BUILD SUCCESS ``` ## Check list * [x] If any new Jar binary package adding in your PR, please add License Notice according [New License Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md) * [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs * [x] If necessary, please update `incompatible-changes.md` to describe the incompatibility caused by this PR. * [x] If you are contributing the connector code, please check that the following files are updated: 1. Update [plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it 2. Update the pom file of [seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml) 3. Add ci label in [label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml) 4. Add e2e testcase in [seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/) 5. Update connector [plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config) > Note: This PR is a bug fix only. No new connectors, no new Jar dependencies, no documentation changes, and no > incompatible changes are introduced. The checklist items above are not applicable to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
