[
https://issues.apache.org/jira/browse/FLINK-38782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18067974#comment-18067974
]
Cong Cheng edited comment on FLINK-38782 at 3/24/26 10:07 AM:
--------------------------------------------------------------
Hi [~Joekwal], we ran into similar error when reading from MongoDB, the
exception is:
{code:java}
Caused by: com.mongodb.MongoQueryException: Command failed with error 10334
(BSONObjectTooLarge): 'BSONObj size: 20262446 (0x1352E2E) is invalid. Size must
be between 0 and 16793600(16MB) First element...{code}
It should be noted that there is an option named 'errors.tolerance' to skip all
errors, this configuration is set to None on purpose in
org.apache.flink.cdc.connectors.mongodb.MongoDBSource (deprecated source, at
line 459~460)
{code:java}
props.setProperty(MongoSourceConfig.ERRORS_TOLERANCE_CONFIG,
ErrorTolerance.NONE.value()); {code}
and the logic of skip errors within
`com.mongodb.kafka.connect.source.StartedMongoSourceTask` is:
{code:java}
try {
BsonDocument next;
do {
next = cursor.tryNext();
if (next != null) {
batch.add(next);
}
} while (next != null && batch.size() < maxBatchSize && cursor.available() >
0);
} catch (MongoException e) {
closeCursor();
if (isRunning) {
if (sourceConfig.tolerateErrors()) {
if (changeStreamNotValid(e)) {
cursor = tryRecreateCursor(e);
} else {
LOGGER.error(
"An exception occurred when trying to get the next item from the
Change Stream", e);
}
} else {
throw new ConnectException(
"An exception occurred when trying to get the next item from the
Change Stream: "
+ e.getMessage(),
e);
}
}
} catch (Exception e) { {code}
Thus, if it a `changeStreamNotValid` exception, it could be really dangerous,
since the a period of data will be lost.
The resume logic (within
`com.mongodb.kafka.connect.source.StartedMongoSourceTask`) is
{code:java}
@Nullable
private MongoChangeStreamCursor<? extends BsonDocument> tryRecreateCursor(
final MongoException e) {
int errorCode =
e instanceof MongoCommandException
? ((MongoCommandException) e).getErrorCode()
: e.getCode();
String errorMessage =
e instanceof MongoCommandException
? ((MongoCommandException) e).getErrorMessage()
: e.getMessage();
LOGGER.warn(
"Failed to resume change stream: {} {}\n"
+
"===================================================================================\n"
+ "When the resume token is no longer available there is the
potential for data loss.\n\n"
+ "Restarting the change stream with no resume token because
`errors.tolerance=all`.\n"
+
"===================================================================================\n",
errorMessage,
errorCode);
invalidatedCursor = true;
return tryCreateCursor(sourceConfig, mongoClient, null);
} {code}
was (Author: JIRAUSER300432):
Hi [~Joekwal], we ran into similar error when reading from MongoDB, the
exception is:
{code:java}
Caused by: com.mongodb.MongoQueryException: Command failed with error 10334
(BSONObjectTooLarge): 'BSONObj size: 20262446 (0x1352E2E) is invalid. Size must
be between 0 and 16793600(16MB) First element...{code}
It should be noted that there is an option named 'errors.tolerance' to skip all
errors, this configuration is set to None on purpose in
org.apache.flink.cdc.connectors.mongodb.MongoDBSource (deprecated source, at
line 459~460)
{code:java}
props.setProperty(MongoSourceConfig.ERRORS_TOLERANCE_CONFIG,
ErrorTolerance.NONE.value()); {code}
and the logic of skip errors within
`com.mongodb.kafka.connect.source.StartedMongoSourceTask` is:
{code:java}
try {
BsonDocument next;
do {
next = cursor.tryNext();
if (next != null) {
batch.add(next);
}
} while (next != null && batch.size() < maxBatchSize && cursor.available() >
0);
} catch (MongoException e) {
closeCursor();
if (isRunning) {
if (sourceConfig.tolerateErrors()) {
if (changeStreamNotValid(e)) {
cursor = tryRecreateCursor(e);
} else {
LOGGER.error(
"An exception occurred when trying to get the next item from the
Change Stream", e);
}
} else {
throw new ConnectException(
"An exception occurred when trying to get the next item from the
Change Stream: "
+ e.getMessage(),
e);
}
}
} catch (Exception e) { {code}
Thus, if it a `changeStreamNotValid` exception, it could be really dangerous,
since the a period of data will be lost.
> Mongodb CDC need a config to filter large size column
> -----------------------------------------------------
>
> Key: FLINK-38782
> URL: https://issues.apache.org/jira/browse/FLINK-38782
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Reporter: Joekwal
> Priority: Blocker
>
> An error occured:
> {color:#ff3333}com.mongodb.MongoQueryException: Query failed with error code
> 10334 and error message 'Executor error during getMore :: caused by ::
> BSONObj size: 27661090 (0x1A61322) is invalid. Size must be between 0 and
> 16793600(16MB) First element: _id: { _data: "xxx{color}" }' on server
> I'v solved the source problem in mongo, but maybe the developers need a
> config to skip the large size columns.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)