This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.18.x by this push:
     new 6f5240f6d70 CAMEL-18796: prevent close exceptions from leaking
6f5240f6d70 is described below

commit 6f5240f6d70785ee183ce4124fa17f32fa798fab
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Thu Dec 8 15:54:27 2022 +0100

    CAMEL-18796: prevent close exceptions from leaking
    
    Among other things, this prevent them from breaking reconnection/recovery 
attempts. This could happen, for instance, when authentication credentials have 
expired and the client tries to close it.
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 22 ++++++++++++++++++----
 1 file changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 5b782f8e747..4882f56b202 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -164,7 +164,7 @@ public class KafkaFetchRecords implements Runnable {
             LOG.info("Terminating KafkaConsumer thread {} receiving from {}", 
threadId, getPrintableTopic());
         }
 
-        IOHelper.close(consumer);
+        safeConsumerClose();
     }
 
     private void setupInitializeErrorException(ForegroundTask task, int max) {
@@ -366,11 +366,9 @@ public class KafkaFetchRecords implements Runnable {
         } finally {
             // only close if not retry
             if (!pollExceptionStrategy.canContinue()) {
-                LOG.debug("Closing consumer {}", threadId);
                 safeUnsubscribe();
-                IOHelper.close(consumer);
+                safeConsumerClose();
             }
-
             lock.unlock();
         }
     }
@@ -402,6 +400,19 @@ public class KafkaFetchRecords implements Runnable {
         }
     }
 
+    private void safeConsumerClose() {
+        /*
+         IOHelper.close only catches IOExceptions, thus other Kafka exceptions 
may leak from the fetcher causing
+         unspecified behavior.
+         */
+        try {
+            LOG.debug("Closing consumer {}", threadId);
+            IOHelper.close(consumer, "Kafka consumer (thread ID " + threadId + 
")", LOG);
+        } catch (Exception e) {
+            LOG.error("Error closing the Kafka consumer: {} (this error will 
be ignored)", e.getMessage(), e);
+        }
+    }
+
     private void safeUnsubscribe() {
         if (consumer == null) {
             return;
@@ -409,11 +420,14 @@ public class KafkaFetchRecords implements Runnable {
 
         final String printableTopic = getPrintableTopic();
         try {
+            LOG.debug("Unsubscribing from Kafka");
             consumer.unsubscribe();
+            LOG.debug("Done unsubscribing from Kafka");
         } catch (IllegalStateException e) {
             LOG.warn("The consumer is likely already closed. Skipping 
unsubscribing thread {} from kafka {}", threadId,
                     printableTopic);
         } catch (Exception e) {
+            LOG.debug("Something went wrong while unsubscribing from Kafka: 
{}", e.getMessage());
             kafkaConsumer.getExceptionHandler().handleException(
                     "Error unsubscribing thread " + threadId + " from kafka " 
+ printableTopic, e);
         }

Reply via email to