This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.18.x by this push:
new 6f5240f6d70 CAMEL-18796: prevent close exceptions from leaking
6f5240f6d70 is described below
commit 6f5240f6d70785ee183ce4124fa17f32fa798fab
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Thu Dec 8 15:54:27 2022 +0100
CAMEL-18796: prevent close exceptions from leaking
Among other things, this prevent them from breaking reconnection/recovery
attempts. This could happen, for instance, when authentication credentials have
expired and the client tries to close it.
---
.../camel/component/kafka/KafkaFetchRecords.java | 22 ++++++++++++++++++----
1 file changed, 18 insertions(+), 4 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 5b782f8e747..4882f56b202 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -164,7 +164,7 @@ public class KafkaFetchRecords implements Runnable {
LOG.info("Terminating KafkaConsumer thread {} receiving from {}",
threadId, getPrintableTopic());
}
- IOHelper.close(consumer);
+ safeConsumerClose();
}
private void setupInitializeErrorException(ForegroundTask task, int max) {
@@ -366,11 +366,9 @@ public class KafkaFetchRecords implements Runnable {
} finally {
// only close if not retry
if (!pollExceptionStrategy.canContinue()) {
- LOG.debug("Closing consumer {}", threadId);
safeUnsubscribe();
- IOHelper.close(consumer);
+ safeConsumerClose();
}
-
lock.unlock();
}
}
@@ -402,6 +400,19 @@ public class KafkaFetchRecords implements Runnable {
}
}
+ private void safeConsumerClose() {
+ /*
+ IOHelper.close only catches IOExceptions, thus other Kafka exceptions
may leak from the fetcher causing
+ unspecified behavior.
+ */
+ try {
+ LOG.debug("Closing consumer {}", threadId);
+ IOHelper.close(consumer, "Kafka consumer (thread ID " + threadId +
")", LOG);
+ } catch (Exception e) {
+ LOG.error("Error closing the Kafka consumer: {} (this error will
be ignored)", e.getMessage(), e);
+ }
+ }
+
private void safeUnsubscribe() {
if (consumer == null) {
return;
@@ -409,11 +420,14 @@ public class KafkaFetchRecords implements Runnable {
final String printableTopic = getPrintableTopic();
try {
+ LOG.debug("Unsubscribing from Kafka");
consumer.unsubscribe();
+ LOG.debug("Done unsubscribing from Kafka");
} catch (IllegalStateException e) {
LOG.warn("The consumer is likely already closed. Skipping
unsubscribing thread {} from kafka {}", threadId,
printableTopic);
} catch (Exception e) {
+ LOG.debug("Something went wrong while unsubscribing from Kafka:
{}", e.getMessage());
kafkaConsumer.getExceptionHandler().handleException(
"Error unsubscribing thread " + threadId + " from kafka "
+ printableTopic, e);
}