Hi, I seem to have some difficulty in understanding the code for: https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java#L95
Outside the while loop we initialize MongoChangeStreamCursor<BsonDocument> changeStreamCursor = openChangeStreamCursor(descriptor); HeartbeatManager heartbeatManager = openHeartbeatManagerIfNeeded(changeStreamCursor); However inside the while (taskRunning) loop we reassign changeStreamCursor = openChangeStreamCursor(descriptor, resumeTokenExpires); So it looks like heartbeatManager still runs using the older changeStreamCursor. As a result we frequently get the following error, after running the datastream job for few days: com.mongodb.MongoCommandException: Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.' on server xxxx.yyy.mongodb.net:27017. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1747876810, "i": 30}}, "signature": {"hash": {"$binary": {"base64": "8rIRtcVyp/u8ddzGnA4Z5r1L79A=", "subType": "00"}}, "keyId": 7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1747876810, "i": 30}}} at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205) at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:443) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:365) at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114) at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643) at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:73) at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:204) at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:122) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:87) at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:76) at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:288) at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:239) at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:220) at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$0(OperationHelper.java:358) at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383) at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$1(OperationHelper.java:357) at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383) at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:356) at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:218) at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67) at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:223) at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:204) at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:191) at com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187) at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:321) at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185) at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:185) at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212) at com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55) at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139) at com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131) at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:286) at org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:123) Based on raising this issue with MongoDB support they have reported: Following the log review, I have two concerns related to the resumeToken: 1. The workload using resumeAfter appears to be prone to having it's resumeToken be "rewound" to an older timestamp that the workload appears to already have processed 2. The workload using startAfter doesn't appear to reliably save/update its progress with updated checkpoints at all In both of these cases, it seems you may have some issues in the way the application is storing the resumeToken: 1. Either not consistently updated or 2. Prone to being overwritten with an older change In both cases, this leads to the token eventually falling off the oplog due to trying to resume from a timestamp that no longer exists. In the case of #1, your application logic may not be consistently updating resumeTokens with each iteration of the change stream cursor. In the case of #2, it may be that a competing application thread wrote an older timestamp. Please let me know if my findings and what is reported by MongoDB support are somewhat related. What it looks like is that heartbeatManager runs on an older changeStreamCursor which is kind of not updating resume tokes or getting rewounded. Thanks Sachin