Hi,
I seem to have some difficulty in understanding the code for:

https://github.com/apache/flink-cdc/blob/master/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java#L95

Outside the while loop we initialize
        MongoChangeStreamCursor<BsonDocument> changeStreamCursor =
                openChangeStreamCursor(descriptor);
        HeartbeatManager heartbeatManager =
openHeartbeatManagerIfNeeded(changeStreamCursor);

However inside the while (taskRunning) loop we reassign
changeStreamCursor = openChangeStreamCursor(descriptor, resumeTokenExpires);

So it looks like heartbeatManager still runs using the older
changeStreamCursor.

As a result we frequently get the following error, after running the
datastream job for few days:

com.mongodb.MongoCommandException: Command failed with error 286
(ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused
by :: Resume of change stream was not possible, as the resume point may no
longer be in the oplog.' on server xxxx.yyy.mongodb.net:27017. The full
response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0,
"errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of
change stream was not possible, as the resume point may no longer be in the
oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost",
"$clusterTime": {"clusterTime": {"$timestamp": {"t": 1747876810, "i": 30}},
"signature": {"hash": {"$binary": {"base64":
"8rIRtcVyp/u8ddzGnA4Z5r1L79A=", "subType": "00"}}, "keyId":
7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1747876810,
"i": 30}}}
at
com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205)
at
com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:443)
at
com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:365)
at
com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
at
com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643)
at
com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:73)
at
com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:204)
at
com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:122)
at
com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:87)
at
com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:76)
at
com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:288)
at
com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:239)
at
com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:220)
at
com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$0(OperationHelper.java:358)
at
com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383)
at
com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$1(OperationHelper.java:357)
at
com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383)
at
com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:356)
at
com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:218)
at
com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
at
com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:223)
at
com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:204)
at
com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:191)
at
com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187)
at
com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:321)
at
com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185)
at
com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55)
at
com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:185)
at
com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212)
at
com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55)
at
com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139)
at
com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131)
at
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:286)
at
org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:123)

Based on raising this issue with MongoDB support they have reported:

Following the log review, I have two concerns related to the resumeToken:

   1. The workload using resumeAfter appears to be prone to having it's
   resumeToken be "rewound" to an older timestamp that the workload appears to
   already have processed
   2. The workload using startAfter doesn't appear to reliably save/update
   its progress with updated checkpoints at all

In both of these cases, it seems you may have some issues in the way the
application is storing the resumeToken:

   1. Either not consistently updated or
   2. Prone to being overwritten with an older change

In both cases, this leads to the token eventually falling off the oplog due
to trying to resume from a timestamp that no longer exists. In the case of
#1, your application logic may not be consistently updating resumeTokens
with each iteration of the change stream cursor. In the case of #2, it may
be that a competing application thread wrote an older timestamp.
Please let me know if my findings and what is reported by MongoDB
support are somewhat related. What it looks like is that heartbeatManager
runs on an older changeStreamCursor which is kind of not updating resume
tokes or getting rewounded.

Thanks
Sachin

Reply via email to