This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit a696417867d96b47c436e7405de67bdb647f7ded Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Fri Jan 14 19:25:43 2022 +0100 CAMEL-17493: ignore safe exceptions when unsubscribing --- .../camel/component/kafka/KafkaFetchRecords.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index e1c5b03..f902f28 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -236,12 +236,27 @@ class KafkaFetchRecords implements Runnable { } private void safeUnsubscribe() { + final String printableTopic = getPrintableTopic(); + try { consumer.unsubscribe(); + } catch (IllegalStateException e) { + LOG.warn("The consumer is likely already closed. Skipping the unsubscription from {}", printableTopic); } catch (Exception e) { kafkaConsumer.getExceptionHandler().handleException( - "Error unsubscribing " + threadId + " from kafka topic " + topicName, - e); + "Error unsubscribing thread " + threadId + " from kafka " + printableTopic, e); + } + } + + /* + * This is only used for presenting log messages that take into consideration that it might be subscribed to a topic + * or a topic pattern. + */ + private String getPrintableTopic() { + if (topicPattern != null) { + return "topic pattern" + topicPattern; + } else { + return "topic" + topicName; } }
