[ 
https://issues.apache.org/jira/browse/FLINK-38601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-38601:
-----------------------------------
    Labels: errorhandling pull-request-available  (was: errorhandling)

> MongoDB CDC silently stops consuming from unbounded streams when Throwable 
> errors occur and never recovers
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38601
>                 URL: https://issues.apache.org/jira/browse/FLINK-38601
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / MongoDB, Flink CDC
>            Reporter: Ciprian Anton
>            Priority: Major
>              Labels: errorhandling, pull-request-available
>
> The Flink MongoDB CDC connector does not properly handle 
> {{java.lang.Throwable}} errors (such as {{{}OutOfMemoryError{}}}, 
> {{{}StackOverflowError{}}}, etc.) in the stream consumption loop. When such 
> errors occur, the connector silently stops consuming from unbounded change 
> streams without propagating the error or triggering any recovery mechanism, 
> causing the CDC pipeline to become permanently stuck. Note that these 
> exceptions do not inherit java Exception, thus they will not be caught by 
> {{catch (Exception e)}}
>  
> I confirmed this hypothesis by adding a {{catch (Throwable t)}} block in 
> {{{}MongoDBStreamFetchTask.java{}}}, which successfully caught the error that 
> was previously being silently swallowed:
> {{2025-10-30 12:53:17,255 ERROR 
> org.ni.flink.systemscdc.mongosource.CustomMongoDBStreamFetchTask [] - 
> Captured throwable }}
> {{java.lang.OutOfMemoryError: Java heap space}}
> {{    at java.base/java.util.Arrays.copyOf(Unknown Source) ~[?:?]}}
> {{    at 
> java.base/java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown 
> Source) ~[?:?]}}
> {{    at java.base/java.lang.AbstractStringBuilder.append(Unknown Source) 
> ~[?:?]}}
> {{    at java.base/java.lang.StringBuffer.append(Unknown Source) ~[?:?]}}
> {{    at java.base/java.io.StringWriter.write(Unknown Source) ~[?:?]}}
> {{    at 
> org.bson.json.StrictCharacterStreamJsonWriter.write(StrictCharacterStreamJsonWriter.java:383)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> org.bson.json.StrictCharacterStreamJsonWriter.writeStringHelper(StrictCharacterStreamJsonWriter.java:349)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> org.bson.json.StrictCharacterStreamJsonWriter.writeName(StrictCharacterStreamJsonWriter.java:149)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at org.bson.json.JsonWriter.doWriteName(JsonWriter.java:82) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:537) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:117) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at org.bson.internal.LazyCodec.encode(LazyCodec.java:43) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at org.bson.BsonDocument.toJson(BsonDocument.java:848) 
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.stringToSchemaAndValue(BsonValueToSchemaAndValue.java:195)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:91)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.lambda$recordToSchemaAndValue$2(BsonValueToSchemaAndValue.java:275)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue$$Lambda$1273/0x00007f65cf873c58.accept(Unknown
>  Source) ~[?:?]}}
> {{    at java.base/java.util.ArrayList.forEach(Unknown Source) ~[?:?]}}
> {{    at 
> java.base/java.util.Collections$UnmodifiableCollection.forEach(Unknown 
> Source) ~[?:?]}}
> {{    at 
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.recordToSchemaAndValue(BsonValueToSchemaAndValue.java:271)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:103)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.createSourceRecord(MongoRecordUtils.java:159)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.createSourceRecord(MongoRecordUtils.java:138)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{    at 
> org.ni.flink.systemscdc.mongosource.CustomMongoDBStreamFetchTask.execute(CustomMongoDBStreamFetchTask.java:149)
>  
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
>  
> I think the root cause may come from 
> {{IncrementalSourceStreamFetcher.submitTask()}} which also does not handle 
> {{Throwable}} errors, preventing the exception from being propagated further 
> up the stack to trigger task failure and recovery.
> The logs also don't show any error, just indicates that the stream fetcher 
> gracefully stopped:
> {{2025-10-30 12:53:30,226 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Finished reading from splits [stream-split]}}
> {{2025-10-30 12:53:42,275 INFO  
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished 
> reading split(s) [stream-split]}}
> {{2025-10-30 12:53:42,277 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] 
> - Closing splitFetcher 21 because it is idle.}}
> {{2025-10-30 12:53:42,277 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - 
> Shutting down split fetcher 21}}
> {{2025-10-30 12:53:42,277 INFO  
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split 
> fetcher 21 exited.}}
>  
> Since Flink is supposed to be fault tolerant, my expectation is that the task 
> would be restarted and some errors would show up in the logs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to