[
https://issues.apache.org/jira/browse/FLINK-38601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-38601:
-----------------------------------
Labels: errorhandling pull-request-available (was: errorhandling)
> MongoDB CDC silently stops consuming from unbounded streams when Throwable
> errors occur and never recovers
> ----------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38601
> URL: https://issues.apache.org/jira/browse/FLINK-38601
> Project: Flink
> Issue Type: Bug
> Components: Connectors / MongoDB, Flink CDC
> Reporter: Ciprian Anton
> Priority: Major
> Labels: errorhandling, pull-request-available
>
> The Flink MongoDB CDC connector does not properly handle
> {{java.lang.Throwable}} errors (such as {{{}OutOfMemoryError{}}},
> {{{}StackOverflowError{}}}, etc.) in the stream consumption loop. When such
> errors occur, the connector silently stops consuming from unbounded change
> streams without propagating the error or triggering any recovery mechanism,
> causing the CDC pipeline to become permanently stuck. Note that these
> exceptions do not inherit java Exception, thus they will not be caught by
> {{catch (Exception e)}}
>
> I confirmed this hypothesis by adding a {{catch (Throwable t)}} block in
> {{{}MongoDBStreamFetchTask.java{}}}, which successfully caught the error that
> was previously being silently swallowed:
> {{2025-10-30 12:53:17,255 ERROR
> org.ni.flink.systemscdc.mongosource.CustomMongoDBStreamFetchTask [] -
> Captured throwable }}
> {{java.lang.OutOfMemoryError: Java heap space}}
> {{ at java.base/java.util.Arrays.copyOf(Unknown Source) ~[?:?]}}
> {{ at
> java.base/java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown
> Source) ~[?:?]}}
> {{ at java.base/java.lang.AbstractStringBuilder.append(Unknown Source)
> ~[?:?]}}
> {{ at java.base/java.lang.StringBuffer.append(Unknown Source) ~[?:?]}}
> {{ at java.base/java.io.StringWriter.write(Unknown Source) ~[?:?]}}
> {{ at
> org.bson.json.StrictCharacterStreamJsonWriter.write(StrictCharacterStreamJsonWriter.java:383)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> org.bson.json.StrictCharacterStreamJsonWriter.writeStringHelper(StrictCharacterStreamJsonWriter.java:349)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> org.bson.json.StrictCharacterStreamJsonWriter.writeName(StrictCharacterStreamJsonWriter.java:149)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at org.bson.json.JsonWriter.doWriteName(JsonWriter.java:82)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:537)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:117)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at org.bson.internal.LazyCodec.encode(LazyCodec.java:43)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:42)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> org.bson.codecs.EncoderContext.encodeWithChildContext(EncoderContext.java:91)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> org.bson.codecs.BsonDocumentCodec.writeValue(BsonDocumentCodec.java:139)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:118)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at org.bson.BsonDocument.toJson(BsonDocument.java:848)
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.stringToSchemaAndValue(BsonValueToSchemaAndValue.java:195)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:91)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.lambda$recordToSchemaAndValue$2(BsonValueToSchemaAndValue.java:275)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue$$Lambda$1273/0x00007f65cf873c58.accept(Unknown
> Source) ~[?:?]}}
> {{ at java.base/java.util.ArrayList.forEach(Unknown Source) ~[?:?]}}
> {{ at
> java.base/java.util.Collections$UnmodifiableCollection.forEach(Unknown
> Source) ~[?:?]}}
> {{ at
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.recordToSchemaAndValue(BsonValueToSchemaAndValue.java:271)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> com.mongodb.kafka.connect.source.schema.BsonValueToSchemaAndValue.toSchemaAndValue(BsonValueToSchemaAndValue.java:103)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.createSourceRecord(MongoRecordUtils.java:159)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> org.apache.flink.cdc.connectors.mongodb.source.utils.MongoRecordUtils.createSourceRecord(MongoRecordUtils.java:138)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
> {{ at
> org.ni.flink.systemscdc.mongosource.CustomMongoDBStreamFetchTask.execute(CustomMongoDBStreamFetchTask.java:149)
>
> ~[blob_p-a90e9bce73f1dd181443f1a4a648ba9925badacc-b148f05ed9675652d3b06109acb703a8:?]}}
>
> I think the root cause may come from
> {{IncrementalSourceStreamFetcher.submitTask()}} which also does not handle
> {{Throwable}} errors, preventing the exception from being propagated further
> up the stack to trigger task failure and recovery.
> The logs also don't show any error, just indicates that the stream fetcher
> gracefully stopped:
> {{2025-10-30 12:53:30,226 INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Finished reading from splits [stream-split]}}
> {{2025-10-30 12:53:42,275 INFO
> org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished
> reading split(s) [stream-split]}}
> {{2025-10-30 12:53:42,277 INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager []
> - Closing splitFetcher 21 because it is idle.}}
> {{2025-10-30 12:53:42,277 INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Shutting down split fetcher 21}}
> {{2025-10-30 12:53:42,277 INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split
> fetcher 21 exited.}}
>
> Since Flink is supposed to be fault tolerant, my expectation is that the task
> would be restarted and some errors would show up in the logs.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)