[ 
https://issues.apache.org/jira/browse/FLINK-37909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37909:
-----------------------------------
    Labels: pull-request-available  (was: )

> Flink CDC source connector for mongodb repeatedly failing in streaming mode 
> with ChangeStreamHistoryLost error
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-37909
>                 URL: https://issues.apache.org/jira/browse/FLINK-37909
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0
>         Environment: Flink: 1.20.0
> Flink CDC: 3.4.0
> Java: 11
>  
>            Reporter: Sachin Mittal
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: cdc-3.5.0
>
>
> I have a data stream applications which pulls data from MongoDB using CDC, 
> and after the process runs for few days it fails with following stacktrace:
>  
> {code:java}
> com.mongodb.MongoCommandException: Command failed with error 286 
> (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused 
> by :: Resume of change stream was not possible, as the resume point may no 
> longer be in the oplog.' on server xxxx.yyy.mongodb.net:27017. The full 
> response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, 
> "errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of 
> change stream was not possible, as the resume point may no longer be in the 
> oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": 
> {"clusterTime": {"$timestamp": {"t": 1747876810, "i": 30}}, "signature": 
> {"hash": {"$binary": {"base64": "8rIRtcVyp/u8ddzGnA4Z5r1L79A=", "subType": 
> "00"}}, "keyId": 7448674344508588123}}, "operationTime": {"$timestamp": {"t": 
> 1747876810, "i": 30}}}
> at 
> com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205)
> at 
> com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:443)
> at 
> com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:365)
> at 
> com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114)
> at 
> com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643)
> at 
> com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:73)
> at 
> com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:204)
> at 
> com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:122)
> at 
> com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:87)
> at 
> com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:76)
> at 
> com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:288)
> at 
> com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:239)
> at 
> com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:220)
> at 
> com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$0(OperationHelper.java:358)
> at 
> com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383)
> at 
> com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$1(OperationHelper.java:357)
> at 
> com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383)
> at 
> com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:356)
> at 
> com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:218)
> at 
> com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
> at 
> com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:223)
> at 
> com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:204)
> at 
> com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:191)
> at 
> com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187)
> at 
> com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:321)
> at 
> com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185)
> at 
> com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55)
> at 
> com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:185)
> at 
> com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212)
> at 
> com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55)
> at 
> com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139)
> at 
> com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131)
> at 
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:286)
> at 
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:123)
> at 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
>  {code}
>  
> I have instantiated my Source as:
>  
> {code:java}
> // code placeholder
> MongoDBSource.<T>builder()
> ...
> .startupOptions(StartupOptions.initial())
> .batchSize(2048)
> .connectionOptions("heartbeat.interval.ms=5000")
> .heartbeatIntervalMillis(5000)
> .closeIdleReaders(true)
> .deserializer(...)
> .build(); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to