This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1925e802d8 [Fix][Connector-V2][MongoDB-CDC] Fix NPE when 
heartbeat.interval.ms is enabled (#10477)
1925e802d8 is described below

commit 1925e802d82cdd79a1ae1db9f701bcf013659cf4
Author: miracle <[email protected]>
AuthorDate: Wed Feb 25 14:47:41 2026 +0800

    [Fix][Connector-V2][MongoDB-CDC] Fix NPE when heartbeat.interval.ms is 
enabled (#10477)
---
 .../source/fetch/MongodbFetchTaskContext.java      |  17 ++
 .../source/fetch/MongodbStreamFetchTask.java       |  29 +-
 .../utils/MongodbRecordUtilsHeartbeatTest.java     | 334 +++++++++++++++++++++
 3 files changed, 379 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
index 13a4f6c8c3..a8d5538181 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
@@ -72,6 +72,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getDocumentKey;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.isHeartbeatEvent;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient;
 
 @Slf4j
@@ -166,6 +167,22 @@ public class MongodbFetchTaskContext implements 
FetchTask.Context {
     public boolean isRecordBetween(
             SourceRecord record, @Nonnull Object[] splitStart, @Nonnull 
Object[] splitEnd) {
         BsonDocument documentKey = getDocumentKey(record);
+        if (documentKey == null) {
+            if (isHeartbeatEvent(record)) {
+                log.debug(
+                        "Heartbeat record has no documentKey field, skipping 
range check. Record: {}",
+                        record);
+                return false;
+            }
+            log.warn(
+                    "Non-heartbeat record has no documentKey field, this is 
unexpected. Record: {}",
+                    record);
+            throw new MongodbConnectorException(
+                    ILLEGAL_ARGUMENT,
+                    "Record has no documentKey field but is not a heartbeat 
event. "
+                            + "This indicates an unexpected record type: "
+                            + record);
+        }
         BsonDocument splitKeys = (BsonDocument) splitStart[0];
         String firstKey = splitKeys.getFirstKey();
         BsonValue keyValue = documentKey.get(firstKey);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
index 474f3a203e..2d90c0eeea 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java
@@ -57,6 +57,8 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;
@@ -67,6 +69,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.DOCUMENT_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.FAILED_TO_PARSE_ERROR;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.FALSE_FALSE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.HEARTBEAT_KEY_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.ID_FIELD;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.ILLEGAL_OPERATION_ERROR;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.NS_FIELD;
@@ -382,15 +385,39 @@ public class MongodbStreamFetchTask implements 
FetchTask<SourceSplitBase> {
         return new BsonDocument(ID_FIELD, primaryKey);
     }
 
+    /**
+     * Normalizes a heartbeat record by adding the HEARTBEAT=true flag to its 
offset.
+     *
+     * <p>The original heartbeat record from {@link HeartbeatManager} does not 
contain the HEARTBEAT
+     * flag in its offset, which causes {@link 
MongodbRecordUtils#isHeartbeatEvent} to return {@code
+     * false}. This would lead to the heartbeat record being incorrectly 
identified as a data change
+     * record and processed through {@link 
MongodbFetchTaskContext#isRecordBetween}, where a {@link
+     * NullPointerException} would occur because heartbeat records have no 
documentKey field.
+     *
+     * <p>By adding the HEARTBEAT=true flag, we ensure that:
+     *
+     * <ul>
+     *   <li>{@link MongodbRecordUtils#isHeartbeatEvent} returns {@code true}
+     *   <li>{@link MongodbRecordUtils#isDataChangeRecord} returns {@code 
false}
+     *   <li>The heartbeat record is excluded from range checking in {@link
+     *       MongodbFetchTaskContext#isRecordBetween}
+     * </ul>
+     *
+     * @param heartbeatRecord the original heartbeat record from 
HeartbeatManager
+     * @return a normalized heartbeat record with HEARTBEAT=true in its offset
+     */
     @Nonnull
     private SourceRecord normalizeHeartbeatRecord(@Nonnull SourceRecord 
heartbeatRecord) {
         final Struct heartbeatValue =
                 new Struct(SchemaBuilder.struct().field(TS_MS_FIELD, 
Schema.INT64_SCHEMA).build());
         heartbeatValue.put(TS_MS_FIELD, Instant.now().toEpochMilli());
 
+        Map<String, Object> heartbeatOffset = new 
HashMap<>(heartbeatRecord.sourceOffset());
+        heartbeatOffset.put(HEARTBEAT_KEY_FIELD, "true");
+
         return new SourceRecord(
                 heartbeatRecord.sourcePartition(),
-                heartbeatRecord.sourceOffset(),
+                heartbeatOffset,
                 heartbeatRecord.topic(),
                 heartbeatRecord.keySchema(),
                 heartbeatRecord.key(),
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/utils/MongodbRecordUtilsHeartbeatTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/utils/MongodbRecordUtilsHeartbeatTest.java
new file mode 100644
index 0000000000..7bc0350996
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/utils/MongodbRecordUtilsHeartbeatTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mongodb.utils;
+
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect.MongodbDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbFetchTaskContext;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamDescriptor;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonMaxKey;
+import org.bson.BsonMinKey;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.mongodb.client.MongoClient;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.DOCUMENT_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.HEARTBEAT_KEY_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.ID_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.NS_FIELD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConstants.TS_MS_FIELD;
+
+/**
+ * Tests for heartbeat record handling in MongoDB CDC.
+ *
+ * <p>Verifies that heartbeat records (produced when {@code 
heartbeat.interval.ms > 0}) are
+ * correctly identified by {@link MongodbRecordUtils#isHeartbeatEvent} and 
excluded from data change
+ * processing by {@link MongodbRecordUtils#isDataChangeRecord}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class MongodbRecordUtilsHeartbeatTest {
+
+    @Mock private MongodbSourceConfig mockConfig;
+    @Mock private MongodbDialect mockDialect;
+    @Mock private ChangeStreamDescriptor mockDescriptor;
+    @Mock private MongoClient mockMongoClient;
+
+    private MockedStatic<MongodbUtils> mockedMongodbUtils;
+    private MongodbFetchTaskContext fetchTaskContext;
+
+    @BeforeEach
+    void setUp() {
+        mockedMongodbUtils = Mockito.mockStatic(MongodbUtils.class);
+        mockedMongodbUtils
+                .when(() -> MongodbUtils.createMongoClient(mockConfig))
+                .thenReturn(mockMongoClient);
+        fetchTaskContext = new MongodbFetchTaskContext(mockDialect, 
mockConfig, mockDescriptor);
+    }
+
+    @AfterEach
+    void tearDown() {
+        if (mockedMongodbUtils != null) {
+            mockedMongodbUtils.close();
+        }
+    }
+
+    private SourceRecord createHeartbeatRecord(boolean withHeartbeatFlag) {
+        Map<String, Object> sourcePartition =
+                Collections.singletonMap(
+                        NS_FIELD, 
"mongodb://localhost:27017/__mongodb_heartbeats");
+
+        Map<String, String> sourceOffset = new HashMap<>();
+        sourceOffset.put(ID_FIELD, "{\"_data\": \"test-resume-token\"}");
+        if (withHeartbeatFlag) {
+            sourceOffset.put(HEARTBEAT_KEY_FIELD, "true");
+        }
+
+        Schema valueSchema = SchemaBuilder.struct().field(TS_MS_FIELD, 
Schema.INT64_SCHEMA).build();
+        Struct heartbeatValue = new Struct(valueSchema);
+        heartbeatValue.put(TS_MS_FIELD, Instant.now().toEpochMilli());
+
+        return new SourceRecord(
+                sourcePartition,
+                sourceOffset,
+                "__mongodb_heartbeats",
+                null,
+                null,
+                valueSchema,
+                heartbeatValue);
+    }
+
+    @Test
+    @DisplayName("isHeartbeatEvent should return true when offset contains 
HEARTBEAT=true")
+    void testIsHeartbeatEventReturnsTrueWithFlag() {
+        SourceRecord heartbeatRecord = createHeartbeatRecord(true);
+
+        boolean result = MongodbRecordUtils.isHeartbeatEvent(heartbeatRecord);
+
+        Assertions.assertTrue(result);
+    }
+
+    @Test
+    @DisplayName("isDataChangeRecord should return false for heartbeat record 
with flag")
+    void testIsDataChangeRecordReturnsFalseForHeartbeat() {
+        SourceRecord heartbeatRecord = createHeartbeatRecord(true);
+
+        boolean result = 
MongodbRecordUtils.isDataChangeRecord(heartbeatRecord);
+
+        Assertions.assertFalse(result);
+    }
+
+    @Test
+    @DisplayName("getDocumentKey should return null for heartbeat record (no 
documentKey field)")
+    void testGetDocumentKeyReturnsNullForHeartbeatRecord() {
+        SourceRecord heartbeatRecord = createHeartbeatRecord(true);
+
+        BsonDocument documentKey = 
MongodbRecordUtils.getDocumentKey(heartbeatRecord);
+
+        Assertions.assertNull(documentKey);
+    }
+
+    @Test
+    @DisplayName(
+            "isHeartbeatEvent should return false when offset lacks HEARTBEAT 
flag"
+                    + " (old buggy heartbeat record)")
+    void testIsHeartbeatEventReturnsFalseWithoutFlag() {
+        SourceRecord heartbeatRecord = createHeartbeatRecord(false);
+
+        boolean result = MongodbRecordUtils.isHeartbeatEvent(heartbeatRecord);
+
+        Assertions.assertFalse(result);
+    }
+
+    @Test
+    @DisplayName(
+            "isDataChangeRecord incorrectly returns true for heartbeat record 
without flag"
+                    + " (old buggy behavior)")
+    void testIsDataChangeRecordReturnsTrueForHeartbeatWithoutFlag() {
+        SourceRecord heartbeatRecord = createHeartbeatRecord(false);
+
+        boolean result = 
MongodbRecordUtils.isDataChangeRecord(heartbeatRecord);
+
+        // Without the HEARTBEAT flag, the record is misidentified as a data 
change record.
+        // This demonstrates why the fix in normalizeHeartbeatRecord is 
necessary.
+        Assertions.assertTrue(result);
+    }
+
+    @Test
+    @DisplayName("isRecordBetween should return false for heartbeat record 
with null documentKey")
+    void testIsRecordBetweenReturnsFalseForHeartbeat() {
+        // Given
+        SourceRecord heartbeatRecord = createHeartbeatRecord(true);
+
+        BsonDocument splitKeyDoc = new BsonDocument("_id", new BsonInt32(1));
+        BsonDocument lowerBound = new BsonDocument("_id", new BsonInt32(0));
+        BsonDocument upperBound = new BsonDocument("_id", new BsonInt32(100));
+        Object[] splitStart = new Object[] {splitKeyDoc, lowerBound};
+        Object[] splitEnd = new Object[] {splitKeyDoc, upperBound};
+
+        // When
+        boolean result = fetchTaskContext.isRecordBetween(heartbeatRecord, 
splitStart, splitEnd);
+
+        // Then
+        Assertions.assertFalse(
+                result,
+                "isRecordBetween should return false for heartbeat record"
+                        + " with null documentKey");
+    }
+
+    @Test
+    @DisplayName(
+            "isRecordBetween should throw MongodbConnectorException"
+                    + " for non-heartbeat record with null documentKey")
+    void testIsRecordBetweenThrowsForNonHeartbeatWithNullDocumentKey() {
+        // A record without HEARTBEAT flag and without documentKey field
+        // simulates an unexpected record type that should not be silently 
swallowed.
+        SourceRecord nonHeartbeatRecord = createHeartbeatRecord(false);
+
+        BsonDocument splitKeyDoc = new BsonDocument("_id", new BsonInt32(1));
+        BsonDocument lowerBound = new BsonDocument("_id", new BsonInt32(0));
+        BsonDocument upperBound = new BsonDocument("_id", new BsonInt32(100));
+        Object[] splitStart = new Object[] {splitKeyDoc, lowerBound};
+        Object[] splitEnd = new Object[] {splitKeyDoc, upperBound};
+
+        MongodbConnectorException exception =
+                Assertions.assertThrows(
+                        MongodbConnectorException.class,
+                        () ->
+                                fetchTaskContext.isRecordBetween(
+                                        nonHeartbeatRecord, splitStart, 
splitEnd));
+        Assertions.assertTrue(
+                exception.getMessage().contains("not a heartbeat event"),
+                "Exception message should indicate the record is not a 
heartbeat event");
+    }
+
+    // ======================== isRecordBetween range check tests 
========================
+
+    /**
+     * Creates a normal data change SourceRecord with a documentKey containing 
the given _id value.
+     */
+    private SourceRecord createDataChangeRecord(int idValue) {
+        Map<String, Object> sourcePartition =
+                Collections.singletonMap(NS_FIELD, 
"mongodb://localhost:27017/testdb.testcoll");
+
+        Map<String, String> sourceOffset = new HashMap<>();
+        sourceOffset.put(ID_FIELD, "{\"_data\": \"test-resume-token\"}");
+
+        Schema valueSchema =
+                SchemaBuilder.struct()
+                        .field(DOCUMENT_KEY, Schema.OPTIONAL_STRING_SCHEMA)
+                        .field(TS_MS_FIELD, Schema.INT64_SCHEMA)
+                        .build();
+        Struct value = new Struct(valueSchema);
+        value.put(DOCUMENT_KEY, new BsonDocument("_id", new 
BsonInt32(idValue)).toJson());
+        value.put(TS_MS_FIELD, Instant.now().toEpochMilli());
+
+        return new SourceRecord(
+                sourcePartition, sourceOffset, "testdb.testcoll", null, null, 
valueSchema, value);
+    }
+
+    @Test
+    @DisplayName("isRecordBetween should return true when documentKey is 
within split range")
+    void testIsRecordBetweenReturnsTrueForRecordInRange() {
+        SourceRecord record = createDataChangeRecord(50);
+
+        BsonDocument splitKeyDoc = new BsonDocument("_id", new BsonInt32(1));
+        BsonDocument lowerBound = new BsonDocument("_id", new BsonInt32(0));
+        BsonDocument upperBound = new BsonDocument("_id", new BsonInt32(100));
+        Object[] splitStart = new Object[] {splitKeyDoc, lowerBound};
+        Object[] splitEnd = new Object[] {splitKeyDoc, upperBound};
+
+        boolean result = fetchTaskContext.isRecordBetween(record, splitStart, 
splitEnd);
+
+        Assertions.assertTrue(result, "Record with _id=50 should be within 
range [0, 100)");
+    }
+
+    @Test
+    @DisplayName("isRecordBetween should return false when documentKey is 
outside split range")
+    void testIsRecordBetweenReturnsFalseForRecordOutOfRange() {
+        SourceRecord record = createDataChangeRecord(200);
+
+        BsonDocument splitKeyDoc = new BsonDocument("_id", new BsonInt32(1));
+        BsonDocument lowerBound = new BsonDocument("_id", new BsonInt32(0));
+        BsonDocument upperBound = new BsonDocument("_id", new BsonInt32(100));
+        Object[] splitStart = new Object[] {splitKeyDoc, lowerBound};
+        Object[] splitEnd = new Object[] {splitKeyDoc, upperBound};
+
+        boolean result = fetchTaskContext.isRecordBetween(record, splitStart, 
splitEnd);
+
+        Assertions.assertFalse(result, "Record with _id=200 should be outside 
range [0, 100)");
+    }
+
+    @Test
+    @DisplayName("isRecordBetween should return true for full range (MIN_KEY 
to MAX_KEY)")
+    void testIsRecordBetweenReturnsTrueForFullRange() {
+        SourceRecord record = createDataChangeRecord(999);
+
+        BsonDocument splitKeyDoc = new BsonDocument("_id", new BsonInt32(1));
+        BsonDocument lowerBound = new BsonDocument("_id", new BsonMinKey());
+        BsonDocument upperBound = new BsonDocument("_id", new BsonMaxKey());
+        Object[] splitStart = new Object[] {splitKeyDoc, lowerBound};
+        Object[] splitEnd = new Object[] {splitKeyDoc, upperBound};
+
+        boolean result = fetchTaskContext.isRecordBetween(record, splitStart, 
splitEnd);
+
+        Assertions.assertTrue(result, "Any record should be within full range 
[MIN_KEY, MAX_KEY)");
+    }
+
+    @Test
+    @DisplayName(
+            "isRecordBetween should return false when documentKey equals upper 
bound"
+                    + " (upper bound exclusive)")
+    void testIsRecordBetweenUpperBoundExclusive() {
+        SourceRecord record = createDataChangeRecord(100);
+
+        BsonDocument splitKeyDoc = new BsonDocument("_id", new BsonInt32(1));
+        BsonDocument lowerBound = new BsonDocument("_id", new BsonInt32(0));
+        BsonDocument upperBound = new BsonDocument("_id", new BsonInt32(100));
+        Object[] splitStart = new Object[] {splitKeyDoc, lowerBound};
+        Object[] splitEnd = new Object[] {splitKeyDoc, upperBound};
+
+        boolean result = fetchTaskContext.isRecordBetween(record, splitStart, 
splitEnd);
+
+        Assertions.assertFalse(
+                result, "Record with _id=100 should be excluded (upper bound 
is exclusive)");
+    }
+
+    @Test
+    @DisplayName(
+            "isRecordBetween should return true when documentKey equals lower 
bound"
+                    + " (lower bound inclusive)")
+    void testIsRecordBetweenLowerBoundInclusive() {
+        SourceRecord record = createDataChangeRecord(0);
+
+        BsonDocument splitKeyDoc = new BsonDocument("_id", new BsonInt32(1));
+        BsonDocument lowerBound = new BsonDocument("_id", new BsonInt32(0));
+        BsonDocument upperBound = new BsonDocument("_id", new BsonInt32(100));
+        Object[] splitStart = new Object[] {splitKeyDoc, lowerBound};
+        Object[] splitEnd = new Object[] {splitKeyDoc, upperBound};
+
+        boolean result = fetchTaskContext.isRecordBetween(record, splitStart, 
splitEnd);
+
+        Assertions.assertTrue(
+                result, "Record with _id=0 should be included (lower bound is 
inclusive)");
+    }
+}

Reply via email to