This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8a7102e25f8f2f14eacde66328e1acb4f22a510b
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Jan 18 12:13:45 2022 +0100

    CAMEL-17509: fix invalid topic info displayed when using topic patterns
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 57 ++++++++++++++--------
 .../support/PartitionAssignmentListener.java       | 11 +++--
 2 files changed, 43 insertions(+), 25 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index f902f28..db42570 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -101,7 +101,10 @@ class KafkaFetchRecords implements Runnable {
             startPolling();
         } while ((isRetrying() || isReconnect()) && isKafkaConsumerRunnable());
 
-        LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: 
{}", threadId, topicName);
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Terminating KafkaConsumer thread: {} receiving from {}", 
threadId, getPrintableTopic());
+        }
+
         safeUnsubscribe();
         IOHelper.close(consumer);
     }
@@ -138,14 +141,16 @@ class KafkaFetchRecords implements Runnable {
 
     private void subscribe() {
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
-                threadId, topicName,
-                kafkaConsumer.getEndpoint().getConfiguration(), consumer, 
lastProcessedOffset, this::isRunnable);
+                threadId, kafkaConsumer.getEndpoint().getConfiguration(), 
consumer, lastProcessedOffset,
+                this::isRunnable);
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Subscribing {} to {}", threadId, getPrintableTopic());
+        }
 
         if (topicPattern != null) {
-            LOG.info("Subscribing {} to topic pattern {}", threadId, 
topicName);
             consumer.subscribe(topicPattern, listener);
         } else {
-            LOG.info("Subscribing {} to topic {}", threadId, topicName);
             consumer.subscribe(Arrays.asList(topicName.split(",")), listener);
         }
     }
@@ -161,7 +166,10 @@ class KafkaFetchRecords implements Runnable {
             lock.lock();
 
             long pollTimeoutMs = 
kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
-            LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, 
topicName, pollTimeoutMs);
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Polling {} from {} with timeout: {}", threadId, 
getPrintableTopic(), pollTimeoutMs);
+            }
 
             KafkaRecordProcessor kafkaRecordProcessor = 
buildKafkaRecordProcessor();
 
@@ -185,20 +193,24 @@ class KafkaFetchRecords implements Runnable {
                     e);
             commit();
 
-            LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
+            LOG.info("Unsubscribing {} from {}", threadId, 
getPrintableTopic());
             safeUnsubscribe();
             Thread.currentThread().interrupt();
         } catch (WakeupException e) {
             // This is normal: it raises this exception when calling the 
wakeUp (which happens when we stop)
-            LOG.trace("The kafka consumer was woken up while polling on thread 
{} for topic {}", threadId, topicName);
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("The kafka consumer was woken up while polling on 
thread {} for {}", threadId, getPrintableTopic());
+            }
+
             safeUnsubscribe();
         } catch (Exception e) {
             if (LOG.isDebugEnabled()) {
-                LOG.warn("Exception {} caught while polling {} from kafka 
topic {} at offset {}: {}",
-                        e.getClass().getName(), threadId, topicName, 
lastProcessedOffset, e.getMessage(), e);
+                LOG.warn("Exception {} caught while polling {} from kafka {} 
at offset {}: {}",
+                        e.getClass().getName(), threadId, getPrintableTopic(), 
lastProcessedOffset, e.getMessage(), e);
             } else {
-                LOG.warn("Exception {} caught while polling {} from kafka 
topic {} at offset {}: {}",
-                        e.getClass().getName(), threadId, topicName, 
lastProcessedOffset, e.getMessage());
+                LOG.warn("Exception {} caught while polling {} from kafka {} 
at offset {}: {}",
+                        e.getClass().getName(), threadId, getPrintableTopic(), 
lastProcessedOffset, e.getMessage());
             }
 
             handleAccordingToStrategy(partitionLastOffset, e);
@@ -254,9 +266,9 @@ class KafkaFetchRecords implements Runnable {
      */
     private String getPrintableTopic() {
         if (topicPattern != null) {
-            return "topic pattern" + topicPattern;
+            return "topic pattern " + topicPattern;
         } else {
-            return "topic" + topicName;
+            return "topic " + topicName;
         }
     }
 
@@ -264,13 +276,13 @@ class KafkaFetchRecords implements Runnable {
         processAsyncCommits();
         if (isAutoCommitEnabled()) {
             if 
("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                LOG.info("Auto commitAsync on stop {} from topic {}", 
threadId, topicName);
+                LOG.info("Auto commitAsync on stop {} from {}", threadId, 
getPrintableTopic());
                 consumer.commitAsync();
             } else if 
("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                LOG.info("Auto commitSync on stop {} from topic {}", threadId, 
topicName);
+                LOG.info("Auto commitSync on stop {} from {}", threadId, 
getPrintableTopic());
                 consumer.commitSync();
             } else if 
("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop()))
 {
-                LOG.info("Auto commit on stop {} from topic {} is disabled 
(none)", threadId, topicName);
+                LOG.info("Auto commit on stop {} from {} is disabled (none)", 
threadId, getPrintableTopic());
             }
         }
     }
@@ -428,7 +440,12 @@ class KafkaFetchRecords implements Runnable {
         Set<TopicPartition> tps = consumer.assignment();
         if (tps != null && partitionLastOffset != -1) {
             long next = partitionLastOffset + 1;
-            LOG.info("Consumer seeking to next offset {} to continue polling 
next message from topic: {}", next, topicName);
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Consumer seeking to next offset {} to continue 
polling next message from {}", next,
+                        getPrintableTopic());
+            }
+
             for (TopicPartition tp : tps) {
                 consumer.seek(tp, next);
             }
@@ -436,8 +453,8 @@ class KafkaFetchRecords implements Runnable {
             for (TopicPartition tp : tps) {
                 long next = consumer.position(tp) + 1;
                 if (!logged) {
-                    LOG.info("Consumer seeking to next offset {} to continue 
polling next message from topic: {}", next,
-                            topicName);
+                    LOG.info("Consumer seeking to next offset {} to continue 
polling next message from {}", next,
+                            getPrintableTopic());
                     logged = true;
                 }
                 consumer.seek(tp, next);
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index 07d914a..51854b1 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -34,18 +34,16 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
     private static final Logger LOG = 
LoggerFactory.getLogger(PartitionAssignmentListener.class);
 
     private final String threadId;
-    private final String topicName;
     private final KafkaConfiguration configuration;
     private final Consumer consumer;
     private final Map<String, Long> lastProcessedOffset;
     private final KafkaConsumerResumeStrategy resumeStrategy;
     private Supplier<Boolean> stopStateSupplier;
 
-    public PartitionAssignmentListener(String threadId, String topicName, 
KafkaConfiguration configuration,
+    public PartitionAssignmentListener(String threadId, KafkaConfiguration 
configuration,
                                        Consumer consumer, Map<String, Long> 
lastProcessedOffset,
                                        Supplier<Boolean> stopStateSupplier) {
         this.threadId = threadId;
-        this.topicName = topicName;
         this.configuration = configuration;
         this.consumer = consumer;
         this.lastProcessedOffset = lastProcessedOffset;
@@ -56,12 +54,13 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-        LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, 
topicName);
 
         // if camel is stopping, or we are not running
         boolean stopping = stopStateSupplier.get();
 
         for (TopicPartition partition : partitions) {
+            LOG.debug("onPartitionsRevoked: {} from {}", threadId, 
partition.topic());
+
             String offsetKey = serializeOffsetKey(partition);
             Long offset = lastProcessedOffset.get(offsetKey);
             if (offset == null) {
@@ -84,7 +83,9 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-        LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, 
topicName);
+        if (LOG.isDebugEnabled()) {
+            partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from 
{}", threadId, p.topic()));
+        }
 
         resumeStrategy.resume(consumer);
     }

Reply via email to