This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-3.14.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8a7102e25f8f2f14eacde66328e1acb4f22a510b Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Tue Jan 18 12:13:45 2022 +0100 CAMEL-17509: fix invalid topic info displayed when using topic patterns --- .../camel/component/kafka/KafkaFetchRecords.java | 57 ++++++++++++++-------- .../support/PartitionAssignmentListener.java | 11 +++-- 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index f902f28..db42570 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -101,7 +101,10 @@ class KafkaFetchRecords implements Runnable { startPolling(); } while ((isRetrying() || isReconnect()) && isKafkaConsumerRunnable()); - LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName); + if (LOG.isInfoEnabled()) { + LOG.info("Terminating KafkaConsumer thread: {} receiving from {}", threadId, getPrintableTopic()); + } + safeUnsubscribe(); IOHelper.close(consumer); } @@ -138,14 +141,16 @@ class KafkaFetchRecords implements Runnable { private void subscribe() { PartitionAssignmentListener listener = new PartitionAssignmentListener( - threadId, topicName, - kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset, this::isRunnable); + threadId, kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset, + this::isRunnable); + + if (LOG.isInfoEnabled()) { + LOG.info("Subscribing {} to {}", threadId, getPrintableTopic()); + } if (topicPattern != null) { - LOG.info("Subscribing {} to topic pattern {}", threadId, topicName); consumer.subscribe(topicPattern, listener); } else { - LOG.info("Subscribing {} to topic {}", threadId, topicName); consumer.subscribe(Arrays.asList(topicName.split(",")), listener); } } @@ -161,7 +166,10 @@ class KafkaFetchRecords implements Runnable { lock.lock(); long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs(); - LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs); + + if (LOG.isTraceEnabled()) { + LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs); + } KafkaRecordProcessor kafkaRecordProcessor = buildKafkaRecordProcessor(); @@ -185,20 +193,24 @@ class KafkaFetchRecords implements Runnable { e); commit(); - LOG.info("Unsubscribing {} from topic {}", threadId, topicName); + LOG.info("Unsubscribing {} from {}", threadId, getPrintableTopic()); safeUnsubscribe(); Thread.currentThread().interrupt(); } catch (WakeupException e) { // This is normal: it raises this exception when calling the wakeUp (which happens when we stop) - LOG.trace("The kafka consumer was woken up while polling on thread {} for topic {}", threadId, topicName); + + if (LOG.isTraceEnabled()) { + LOG.trace("The kafka consumer was woken up while polling on thread {} for {}", threadId, getPrintableTopic()); + } + safeUnsubscribe(); } catch (Exception e) { if (LOG.isDebugEnabled()) { - LOG.warn("Exception {} caught while polling {} from kafka topic {} at offset {}: {}", - e.getClass().getName(), threadId, topicName, lastProcessedOffset, e.getMessage(), e); + LOG.warn("Exception {} caught while polling {} from kafka {} at offset {}: {}", + e.getClass().getName(), threadId, getPrintableTopic(), lastProcessedOffset, e.getMessage(), e); } else { - LOG.warn("Exception {} caught while polling {} from kafka topic {} at offset {}: {}", - e.getClass().getName(), threadId, topicName, lastProcessedOffset, e.getMessage()); + LOG.warn("Exception {} caught while polling {} from kafka {} at offset {}: {}", + e.getClass().getName(), threadId, getPrintableTopic(), lastProcessedOffset, e.getMessage()); } handleAccordingToStrategy(partitionLastOffset, e); @@ -254,9 +266,9 @@ class KafkaFetchRecords implements Runnable { */ private String getPrintableTopic() { if (topicPattern != null) { - return "topic pattern" + topicPattern; + return "topic pattern " + topicPattern; } else { - return "topic" + topicName; + return "topic " + topicName; } } @@ -264,13 +276,13 @@ class KafkaFetchRecords implements Runnable { processAsyncCommits(); if (isAutoCommitEnabled()) { if ("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName); + LOG.info("Auto commitAsync on stop {} from {}", threadId, getPrintableTopic()); consumer.commitAsync(); } else if ("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName); + LOG.info("Auto commitSync on stop {} from {}", threadId, getPrintableTopic()); consumer.commitSync(); } else if ("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) { - LOG.info("Auto commit on stop {} from topic {} is disabled (none)", threadId, topicName); + LOG.info("Auto commit on stop {} from {} is disabled (none)", threadId, getPrintableTopic()); } } } @@ -428,7 +440,12 @@ class KafkaFetchRecords implements Runnable { Set<TopicPartition> tps = consumer.assignment(); if (tps != null && partitionLastOffset != -1) { long next = partitionLastOffset + 1; - LOG.info("Consumer seeking to next offset {} to continue polling next message from topic: {}", next, topicName); + + if (LOG.isInfoEnabled()) { + LOG.info("Consumer seeking to next offset {} to continue polling next message from {}", next, + getPrintableTopic()); + } + for (TopicPartition tp : tps) { consumer.seek(tp, next); } @@ -436,8 +453,8 @@ class KafkaFetchRecords implements Runnable { for (TopicPartition tp : tps) { long next = consumer.position(tp) + 1; if (!logged) { - LOG.info("Consumer seeking to next offset {} to continue polling next message from topic: {}", next, - topicName); + LOG.info("Consumer seeking to next offset {} to continue polling next message from {}", next, + getPrintableTopic()); logged = true; } consumer.seek(tp, next); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java index 07d914a..51854b1 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java @@ -34,18 +34,16 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class); private final String threadId; - private final String topicName; private final KafkaConfiguration configuration; private final Consumer consumer; private final Map<String, Long> lastProcessedOffset; private final KafkaConsumerResumeStrategy resumeStrategy; private Supplier<Boolean> stopStateSupplier; - public PartitionAssignmentListener(String threadId, String topicName, KafkaConfiguration configuration, + public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration, Consumer consumer, Map<String, Long> lastProcessedOffset, Supplier<Boolean> stopStateSupplier) { this.threadId = threadId; - this.topicName = topicName; this.configuration = configuration; this.consumer = consumer; this.lastProcessedOffset = lastProcessedOffset; @@ -56,12 +54,13 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, topicName); // if camel is stopping, or we are not running boolean stopping = stopStateSupplier.get(); for (TopicPartition partition : partitions) { + LOG.debug("onPartitionsRevoked: {} from {}", threadId, partition.topic()); + String offsetKey = serializeOffsetKey(partition); Long offset = lastProcessedOffset.get(offsetKey); if (offset == null) { @@ -84,7 +83,9 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, topicName); + if (LOG.isDebugEnabled()) { + partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from {}", threadId, p.topic())); + } resumeStrategy.resume(consumer); }
