Copilot commented on code in PR #9454:
URL: https://github.com/apache/seatunnel/pull/9454#discussion_r2153735868
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbUtils.java:
##########
@@ -404,4 +413,25 @@ public static String encodeValue(String value) {
throw new MongodbConnectorException(ILLEGAL_ARGUMENT,
e.getMessage());
}
}
+
+ // Checks if given exception is caused by change stream cursor issues,
including
+ // network connection failures, sharded cluster changes, or invalidate
events.
+ // See: https://www.mongodb.com/docs/manual/changeStreams/ for more
details.
+ public static boolean checkIfChangeStreamCursorExpires(final
MongoCommandException e) {
+ return INVALID_CHANGE_STREAM_ERRORS.contains(e.getCode());
+ }
+
+ // This check is stricter than checkIfChangeStreamCursorExpires, which
specifically
+ // checks if given exception is caused by an expired resume token.
+ public static boolean checkIfResumeTokenExpires(final
MongoCommandException e) {
+ if (e.getCode() != CHANGE_STREAM_FATAL_ERROR) {
+ return false;
+ }
+ String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT);
Review Comment:
Guard against a null `e.getErrorMessage()` before calling `toLowerCase(...)`
to avoid potential NPEs.
```suggestion
String errorMessage = e.getErrorMessage();
if (errorMessage == null) {
return false;
}
errorMessage = errorMessage.toLowerCase(Locale.ROOT);
```
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbStreamFetchTask.java:
##########
@@ -117,7 +118,23 @@ public void execute(Context context) {
this.taskRunning = true;
try {
while (taskRunning) {
- Optional<BsonDocument> next =
Optional.ofNullable(changeStreamCursor.tryNext());
+ Optional<BsonDocument> next;
+ try {
+ next = Optional.ofNullable(changeStreamCursor.tryNext());
+ } catch (MongoCommandException e) {
+ if (MongodbUtils.checkIfChangeStreamCursorExpires(e)) {
+ log.warn("Change stream cursor has expired, trying to
recreate cursor");
+ boolean resumeTokenExpires =
MongodbUtils.checkIfResumeTokenExpires(e);
+ if (resumeTokenExpires) {
+ log.warn(
+ "Resume token has expired, fallback to
timestamp restart mode");
+ }
+ changeStreamCursor =
openChangeStreamCursor(descriptor, resumeTokenExpires);
+ next =
Optional.ofNullable(changeStreamCursor.tryNext());
Review Comment:
The second `tryNext()` call after cursor recreation isn’t wrapped in a
try/catch block, so any exception it throws will go unhandled; consider
enclosing it similarly to the first call.
```suggestion
try {
next =
Optional.ofNullable(changeStreamCursor.tryNext());
} catch (MongoCommandException ex) {
log.error("Exception during second tryNext()
call after cursor recreation", ex);
throw ex;
}
```
##########
seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java:
##########
@@ -76,6 +80,25 @@ public class MongodbSourceOptions extends SourceOptions {
public static final int ILLEGAL_OPERATION_ERROR = 20;
+ public static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
+ public static final int CHANGE_STREAM_FATAL_ERROR = 280;
+ public static final int CHANGE_STREAM_HISTORY_LOST = 286;
+ public static final int BSON_OBJECT_TOO_LARGE = 10334;
+
+ public static final Set<Integer> INVALID_CHANGE_STREAM_ERRORS =
+ new HashSet<>(
+ asList(
+ INVALIDATED_RESUME_TOKEN_ERROR,
+ CHANGE_STREAM_FATAL_ERROR,
+ CHANGE_STREAM_HISTORY_LOST,
+ BSON_OBJECT_TOO_LARGE));
Review Comment:
[nitpick] Consider using `Set.of(...)` to create an immutable set literal,
which is more concise and signals immutability.
```suggestion
Set.of(
INVALIDATED_RESUME_TOKEN_ERROR,
CHANGE_STREAM_FATAL_ERROR,
CHANGE_STREAM_HISTORY_LOST,
BSON_OBJECT_TOO_LARGE);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]