[ https://issues.apache.org/jira/browse/FLINK-37909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-37909: ----------------------------------- Labels: pull-request-available (was: ) > Flink CDC source connector for mongodb repeatedly failing in streaming mode > with ChangeStreamHistoryLost error > -------------------------------------------------------------------------------------------------------------- > > Key: FLINK-37909 > URL: https://issues.apache.org/jira/browse/FLINK-37909 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.4.0 > Environment: Flink: 1.20.0 > Flink CDC: 3.4.0 > Java: 11 > > Reporter: Sachin Mittal > Priority: Major > Labels: pull-request-available > Fix For: cdc-3.5.0 > > > I have a data stream applications which pulls data from MongoDB using CDC, > and after the process runs for few days it fails with following stacktrace: > > {code:java} > com.mongodb.MongoCommandException: Command failed with error 286 > (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused > by :: Resume of change stream was not possible, as the resume point may no > longer be in the oplog.' on server xxxx.yyy.mongodb.net:27017. The full > response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, > "errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of > change stream was not possible, as the resume point may no longer be in the > oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": > {"clusterTime": {"$timestamp": {"t": 1747876810, "i": 30}}, "signature": > {"hash": {"$binary": {"base64": "8rIRtcVyp/u8ddzGnA4Z5r1L79A=", "subType": > "00"}}, "keyId": 7448674344508588123}}, "operationTime": {"$timestamp": {"t": > 1747876810, "i": 30}}} > at > com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:205) > at > com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:443) > at > com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:365) > at > com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:114) > at > com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643) > at > com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:73) > at > com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:204) > at > com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:122) > at > com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:87) > at > com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:76) > at > com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:288) > at > com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:239) > at > com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:220) > at > com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$0(OperationHelper.java:358) > at > com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383) > at > com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$1(OperationHelper.java:357) > at > com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:383) > at > com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:356) > at > com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:218) > at > com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67) > at > com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:223) > at > com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:204) > at > com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:191) > at > com.mongodb.internal.operation.ChangeStreamOperation.lambda$execute$0(ChangeStreamOperation.java:187) > at > com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:321) > at > com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:185) > at > com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:55) > at > com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:185) > at > com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:212) > at > com.mongodb.client.internal.ChangeStreamIterableImpl.access$000(ChangeStreamIterableImpl.java:55) > at > com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:139) > at > com.mongodb.client.internal.ChangeStreamIterableImpl$1.cursor(ChangeStreamIterableImpl.java:131) > at > org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.openChangeStreamCursor(MongoDBStreamFetchTask.java:286) > at > org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.execute(MongoDBStreamFetchTask.java:123) > at > org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) > {code} > > I have instantiated my Source as: > > {code:java} > // code placeholder > MongoDBSource.<T>builder() > ... > .startupOptions(StartupOptions.initial()) > .batchSize(2048) > .connectionOptions("heartbeat.interval.ms=5000") > .heartbeatIntervalMillis(5000) > .closeIdleReaders(true) > .deserializer(...) > .build(); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)