This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a6df30  CAMEL-16025: [camel-mongodb] ChangeStreams: Exception not 
gracefully handled
2a6df30 is described below

commit 2a6df30e1f34d8fc4f5b96c535df022f494f3e69
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Mar 22 18:25:00 2021 +0100

    CAMEL-16025: [camel-mongodb] ChangeStreams: Exception not gracefully handled
---
 .../mongodb/MongoAbstractConsumerThread.java       | 43 ++++++++++++++--------
 .../mongodb/MongoDbChangeStreamsThread.java        |  4 +-
 2 files changed, 30 insertions(+), 17 deletions(-)

diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java
index b129351..1209d9a 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoAbstractConsumerThread.java
@@ -64,26 +64,39 @@ abstract class MongoAbstractConsumerThread implements 
Runnable {
     @Override
     public void run() {
         stoppedLatch = new CountDownLatch(1);
-        while (keepRunning) {
-            doRun();
-            // regenerate the cursor, if reading failed for some reason
-            if (keepRunning) {
-                cursor.close();
-                regeneratingCursor();
-
-                if (cursorRegenerationDelayEnabled) {
-                    try {
-                        Thread.sleep(cursorRegenerationDelay);
-                    } catch (InterruptedException ignored) {
+        try {
+            while (keepRunning) {
+                try {
+                    doRun();
+                } catch (Exception e) {
+                    if (keepRunning) {
+                        log.warn("Exception from consuming from MongoDB caused 
by " + e.getMessage()
+                                 + ". Will try again on next poll.");
+                    } else {
+                        log.warn("Exception from consuming from MongoDB caused 
by " + e.getMessage()
+                                 + ". ConsumerThread will be stopped.",
+                                e);
                     }
                 }
+                // regenerate the cursor, if reading failed for some reason
+                if (keepRunning) {
+                    cursor.close();
+                    regeneratingCursor();
+
+                    if (cursorRegenerationDelayEnabled) {
+                        try {
+                            Thread.sleep(cursorRegenerationDelay);
+                        } catch (InterruptedException ignored) {
+                        }
+                    }
 
-                cursor = initializeCursor();
+                    cursor = initializeCursor();
+                }
             }
+        } finally {
+            stopped = true;
+            stoppedLatch.countDown();
         }
-
-        stopped = true;
-        stoppedLatch.countDown();
     }
 
     protected void stop() throws Exception {
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
index 8757793..7184797 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
@@ -96,9 +96,9 @@ class MongoDbChangeStreamsThread extends 
MongoAbstractConsumerThread {
             // it throws exception when cursor is closed in another thread
             // there is no way to stop hasNext() before closing cursor
             if (keepRunning) {
-                throw e;
-            } else {
                 log.debug("Exception from MongoDB, will regenerate cursor.", 
e);
+            } else {
+                throw e;
             }
         }
     }

Reply via email to