onceMisery opened a new pull request, #10477:
URL: https://github.com/apache/seatunnel/pull/10477

   closes [https://github.com/apache/seatunnel/issues/10470](url)
   
   ## Purpose of this pull request
   
   When `heartbeat.interval.ms` is set to a non-zero value in MongoDB CDC 
connector, a `NullPointerException` is thrown
   during snapshot split processing.
   
   **Root Cause:**
   
   `MongodbStreamFetchTask.normalizeHeartbeatRecord()` passes through the 
original heartbeat record's `sourceOffset()`
   directly, which does not contain the `HEARTBEAT=true` flag. This causes the 
downstream
   `MongodbRecordUtils.isHeartbeatEvent()` to return `false`, and 
`isDataChangeRecord()` to incorrectly return `true`. The
   heartbeat record is then treated as a data change record and passed to 
`isRecordBetween()`, where `getDocumentKey()`
   returns `null` (heartbeat records have no `documentKey` field), leading to 
NPE at `documentKey.get(firstKey)`.
   
   **Call chain:**
   
   ```
   IncrementalSourceScanFetcher.pollSplitRecordsIfExactlyOnce()
     -> isChangeRecordInChunkRange(record)
       -> taskContext.isDataChangeRecord(record) -> true (should be false)
       -> taskContext.isRecordBetween(record, ...)
         -> getDocumentKey(record) -> null
         -> documentKey.get(firstKey) -> NPE!
   ```
   
   **Fix:**
   
   1. **Root cause fix** (`MongodbStreamFetchTask.java`): In 
`normalizeHeartbeatRecord()`, copy the original offset and
      inject `HEARTBEAT=true` so that `isHeartbeatEvent()` can correctly 
identify heartbeat records.
   2. **Defensive fix** (`MongodbFetchTaskContext.java`): In 
`isRecordBetween()`, add a null check for `documentKey` and
      return `false` if null, preventing NPE even if heartbeat identification 
is bypassed through other code paths.
   
   ## Does this PR introduce _any_ user-facing change?
   
   No. This is a bug fix for an existing feature. Users who set 
`heartbeat.interval.ms` to a non-zero value will no longer
   encounter NPE.
   
   ## How was this patch tested?
   
   Added unit test class `MongodbRecordUtilsHeartbeatTest` with 6 test cases:
   
   | Test Case                                                  | Description   
                                                                    |
   
|------------------------------------------------------------|-----------------------------------------------------------------------------------|
   | `testIsHeartbeatEventReturnsTrueWithFlag`                  | Verifies 
heartbeat record with `HEARTBEAT=true` in offset is correctly identified |
   | `testIsDataChangeRecordReturnsFalseForHeartbeat`           | Verifies 
heartbeat record is excluded from data change processing                 |
   | `testGetDocumentKeyReturnsNullForHeartbeatRecord`          | Verifies 
heartbeat record has no `documentKey` field                              |
   | `testIsHeartbeatEventReturnsFalseWithoutFlag`              | Demonstrates 
old buggy behavior (heartbeat not identified without flag)           |
   | `testIsDataChangeRecordReturnsTrueForHeartbeatWithoutFlag` | Demonstrates 
old buggy behavior (heartbeat misidentified as data change)          |
   | `testNpeReproductionWithoutFlag`                           | Reproduces 
the original NPE scenario                                              |
   
   All 6 tests pass:
   
   ```
   [INFO] Running mongodb.utils.MongodbRecordUtilsHeartbeatTest
   [INFO] Tests run: 6, Failures: 0, Errors: 0, Skipped: 0
   [INFO] BUILD SUCCESS
   ```
   
   ## Check list
   
   * [x] If any new Jar binary package adding in your PR, please add License 
Notice according
     [New License 
Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [x] If necessary, please update the documentation to describe the new
     feature. https://github.com/apache/seatunnel/tree/dev/docs
   * [x] If necessary, please update `incompatible-changes.md` to describe the 
incompatibility caused by this PR.
   * [x] If you are contributing the connector code, please check that the 
following files are updated:
       1. Update 
[plugin-mapping.properties](https://github.com/apache/seatunnel/blob/dev/plugin-mapping.properties)
 and
          add new connector information in it
       2. Update the pom file of 
[seatunnel-dist](https://github.com/apache/seatunnel/blob/dev/seatunnel-dist/pom.xml)
       3. Add ci label
          in 
[label-scope-conf](https://github.com/apache/seatunnel/blob/dev/.github/workflows/labeler/label-scope-conf.yml)
       4. Add e2e testcase
          in 
[seatunnel-e2e](https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/)
       5. Update connector 
[plugin_config](https://github.com/apache/seatunnel/blob/dev/config/plugin_config)
   
   > Note: This PR is a bug fix only. No new connectors, no new Jar 
dependencies, no documentation changes, and no
   > incompatible changes are introduced. The checklist items above are not 
applicable to this PR.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to