This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit bf6c9de6ee4fb03d3faab8ce8c7d44f705234b58 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Wed Jul 10 11:43:31 2024 +0200 CAMEL-20956: separate metrics collection concerns from record fetching concerns --- .../camel/component/kafka/KafkaDevConsole.java | 44 ++++--- .../camel/component/kafka/KafkaFetchRecords.java | 111 ++++-------------- .../devconsole/DefaultMetricsCollector.java | 130 +++++++++++++++++++++ .../devconsole/DevConsoleMetricsCollector.java | 52 +++++++++ .../consumer/devconsole/NoopMetricsCollector.java | 69 +++++++++++ 5 files changed, 298 insertions(+), 108 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java index 1f80df5a8b6..b7f94dd12e3 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java @@ -23,6 +23,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.camel.Route; +import org.apache.camel.component.kafka.consumer.devconsole.DefaultMetricsCollector; +import org.apache.camel.component.kafka.consumer.devconsole.DevConsoleMetricsCollector; import org.apache.camel.spi.annotations.DevConsole; import org.apache.camel.support.console.AbstractDevConsole; import org.apache.camel.util.StopWatch; @@ -58,28 +60,29 @@ public class KafkaDevConsole extends AbstractDevConsole { sb.append(String.format("\n Route Id: %s", route.getRouteId())); sb.append(String.format("\n From: %s", route.getEndpoint().getEndpointUri())); for (KafkaFetchRecords t : kc.tasks()) { - sb.append(String.format("\n Worked Thread: %s", t.getThreadId())); + final DevConsoleMetricsCollector metricsCollector = t.getMetricsCollector(); + sb.append(String.format("\n Worked Thread: %s", metricsCollector.getThreadId())); sb.append(String.format("\n Worker State: %s", t.getState())); TaskHealthState hs = t.healthState(); if (!hs.isReady()) { sb.append(String.format("\n Worker Last Error: %s", hs.buildStateMessage())); } - KafkaFetchRecords.GroupMetadata meta = t.getGroupMetadata(); + DefaultMetricsCollector.GroupMetadata meta = metricsCollector.getGroupMetadata(); if (meta != null) { sb.append(String.format("\n Group Id: %s", meta.groupId())); sb.append(String.format("\n Group Instance Id: %s", meta.groupInstanceId())); sb.append(String.format("\n Member Id: %s", meta.memberId())); sb.append(String.format("\n Generation Id: %d", meta.generationId())); } - if (t.getLastRecord() != null) { - sb.append(String.format("\n Last Topic: %s", t.getLastRecord().topic())); - sb.append(String.format("\n Last Partition: %d", t.getLastRecord().partition())); - sb.append(String.format("\n Last Offset: %d", t.getLastRecord().offset())); + if (metricsCollector.getLastRecord() != null) { + sb.append(String.format("\n Last Topic: %s", metricsCollector.getLastRecord().topic())); + sb.append(String.format("\n Last Partition: %d", metricsCollector.getLastRecord().partition())); + sb.append(String.format("\n Last Offset: %d", metricsCollector.getLastRecord().offset())); } if (committed) { - List<KafkaFetchRecords.KafkaTopicPosition> l = fetchCommitOffsets(kc, t); + List<DefaultMetricsCollector.KafkaTopicPosition> l = fetchCommitOffsets(kc, metricsCollector); if (l != null) { - for (KafkaFetchRecords.KafkaTopicPosition r : l) { + for (DefaultMetricsCollector.KafkaTopicPosition r : l) { sb.append(String.format("\n Commit Topic: %s", r.topic())); sb.append(String.format("\n Commit Partition: %s", r.partition())); sb.append(String.format("\n Commit Offset: %s", r.offset())); @@ -99,14 +102,15 @@ public class KafkaDevConsole extends AbstractDevConsole { return sb.toString(); } - private static List<KafkaFetchRecords.KafkaTopicPosition> fetchCommitOffsets(KafkaConsumer kc, KafkaFetchRecords task) { + private static List<DefaultMetricsCollector.KafkaTopicPosition> fetchCommitOffsets( + KafkaConsumer kc, DevConsoleMetricsCollector collector) { StopWatch watch = new StopWatch(); - CountDownLatch latch = task.fetchCommitRecords(); + CountDownLatch latch = collector.fetchCommitRecords(); long timeout = Math.min(kc.getEndpoint().getConfiguration().getPollTimeoutMs(), COMMITTED_TIMEOUT); try { latch.await(timeout, TimeUnit.MILLISECONDS); - var answer = task.getCommitRecords(); + var answer = collector.getCommitRecords(); LOG.debug("Fetching commit offsets took: {} ms", watch.taken()); return answer; } catch (Exception e) { @@ -134,31 +138,33 @@ public class KafkaDevConsole extends AbstractDevConsole { jo.put("workers", arr); for (KafkaFetchRecords t : kc.tasks()) { + final DevConsoleMetricsCollector metricsCollector = t.getMetricsCollector(); + JsonObject wo = new JsonObject(); arr.add(wo); - wo.put("threadId", t.getThreadId()); + wo.put("threadId", metricsCollector.getThreadId()); wo.put("state", t.getState()); TaskHealthState hs = t.healthState(); if (!hs.isReady()) { wo.put("lastError", hs.buildStateMessage()); } - KafkaFetchRecords.GroupMetadata meta = t.getGroupMetadata(); + DefaultMetricsCollector.GroupMetadata meta = metricsCollector.getGroupMetadata(); if (meta != null) { wo.put("groupId", meta.groupId()); wo.put("groupInstanceId", meta.groupInstanceId()); wo.put("memberId", meta.memberId()); wo.put("generationId", meta.generationId()); } - if (t.getLastRecord() != null) { - wo.put("lastTopic", t.getLastRecord().topic()); - wo.put("lastPartition", t.getLastRecord().partition()); - wo.put("lastOffset", t.getLastRecord().offset()); + if (metricsCollector.getLastRecord() != null) { + wo.put("lastTopic", metricsCollector.getLastRecord().topic()); + wo.put("lastPartition", metricsCollector.getLastRecord().partition()); + wo.put("lastOffset", metricsCollector.getLastRecord().offset()); } if (committed) { - List<KafkaFetchRecords.KafkaTopicPosition> l = fetchCommitOffsets(kc, t); + List<DefaultMetricsCollector.KafkaTopicPosition> l = fetchCommitOffsets(kc, metricsCollector); if (l != null) { JsonArray ca = new JsonArray(); - for (KafkaFetchRecords.KafkaTopicPosition r : l) { + for (DefaultMetricsCollector.KafkaTopicPosition r : l) { JsonObject cr = new JsonObject(); cr.put("topic", r.topic()); cr.put("partition", r.partition()); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index fc357f60c8a..3534fccf3f4 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -17,21 +17,17 @@ package org.apache.camel.component.kafka; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import org.apache.camel.CamelContext; import org.apache.camel.component.kafka.consumer.CommitManager; import org.apache.camel.component.kafka.consumer.CommitManagers; +import org.apache.camel.component.kafka.consumer.devconsole.DefaultMetricsCollector; +import org.apache.camel.component.kafka.consumer.devconsole.DevConsoleMetricsCollector; +import org.apache.camel.component.kafka.consumer.devconsole.NoopMetricsCollector; import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener; import org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies; import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade; @@ -52,7 +48,6 @@ import org.apache.camel.util.IOHelper; import org.apache.camel.util.ReflectionHelper; import org.apache.camel.util.TimeUtils; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -63,8 +58,6 @@ import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.rmi.registry.LocateRegistry.getRegistry; - public class KafkaFetchRecords implements Runnable { /* @@ -105,19 +98,7 @@ public class KafkaFetchRecords implements Runnable { private volatile boolean connected; // this is the state (connected or not) private volatile State state = State.RUNNING; - // dev-console records and state - record GroupMetadata(String groupId, String groupInstanceId, String memberId, int generationId) { - } - - record KafkaTopicPosition(String topic, int partition, long offset, int epoch) { - } - - private final boolean devConsoleEnabled; - private volatile GroupMetadata groupMetadata; - private volatile KafkaTopicPosition lastRecord; - private final List<KafkaTopicPosition> commitRecords = new ArrayList<>(); - private final AtomicBoolean commitRecordsRequested = new AtomicBoolean(); - private final AtomicReference<CountDownLatch> latch = new AtomicReference<>(); + private final DevConsoleMetricsCollector metricsCollector; KafkaFetchRecords(KafkaConsumer kafkaConsumer, BridgeExceptionHandlerToErrorHandler bridge, String topicName, Pattern topicPattern, String id, @@ -129,7 +110,13 @@ public class KafkaFetchRecords implements Runnable { this.consumerListener = consumerListener; this.threadId = topicName + "-" + "Thread " + id; this.kafkaProps = kafkaProps; - this.devConsoleEnabled = kafkaConsumer.getEndpoint().getCamelContext().isDevConsole(); + final boolean devConsoleEnabled = kafkaConsumer.getEndpoint().getCamelContext().isDevConsole(); + + if (devConsoleEnabled) { + metricsCollector = new DefaultMetricsCollector(threadId); + } else { + metricsCollector = new NoopMetricsCollector(); + } } @Override @@ -191,13 +178,8 @@ public class KafkaFetchRecords implements Runnable { setConnected(true); } - if (devConsoleEnabled && isConnected()) { - // store metadata - ConsumerGroupMetadata meta = consumer.groupMetadata(); - if (meta != null) { - groupMetadata = new GroupMetadata( - meta.groupId(), meta.groupInstanceId().orElse(""), meta.memberId(), meta.generationId()); - } + if (isConnected()) { + metricsCollector.storeMetadata(consumer); } setLastError(null); @@ -377,9 +359,7 @@ public class KafkaFetchRecords implements Runnable { // if dev-console is in use then a request to fetch the commit offsets can be requested on-demand // which must happen using this polling thread, so we use the commitRecordsRequested to trigger this - if (devConsoleEnabled) { - collectCommitMetrics(); - } + metricsCollector.collectCommitMetrics(consumer); ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); if (consumerListener != null) { @@ -389,9 +369,8 @@ public class KafkaFetchRecords implements Runnable { } ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords); - if (devConsoleEnabled && result != null && result.getTopic() != null) { - // dev-console uses information from last processed record - lastRecord = new KafkaTopicPosition(result.getTopic(), result.getPartition(), result.getOffset(), 0); + if (result != null && result.getTopic() != null) { + metricsCollector.storeLastRecord(result); } updateTaskState(); @@ -448,31 +427,6 @@ public class KafkaFetchRecords implements Runnable { } } - private void collectCommitMetrics() { - if (commitRecordsRequested.compareAndSet(true, false)) { - try { - Map<TopicPartition, OffsetAndMetadata> commits = consumer.committed(consumer.assignment()); - commitRecords.clear(); - for (var e : commits.entrySet()) { - KafkaTopicPosition p - = new KafkaTopicPosition( - e.getKey().topic(), e.getKey().partition(), e.getValue().offset(), - e.getValue().leaderEpoch().orElse(0)); - commitRecords.add(p); - } - CountDownLatch count = latch.get(); - if (count != null) { - count.countDown(); - } - } catch (Exception e) { - // ignore cannot get last commit details - LOG.debug( - "Cannot get last offset committed from Kafka brokers due to: {}. This exception is ignored.", - e.getMessage(), e); - } - } - } - private KafkaRecordProcessorFacade createRecordProcessor() { final KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration(); if (configuration.isBatching()) { @@ -701,36 +655,15 @@ public class KafkaFetchRecords implements Runnable { this.lastError = lastError; } - // dev console information - // ------------------------------------------------------------------------ - - GroupMetadata getGroupMetadata() { - return groupMetadata; - } - - KafkaTopicPosition getLastRecord() { - return lastRecord; - } - - String getThreadId() { - return threadId; + /** + * Gets the metrics collector for the dev console. Defaults to the {@link NoopMetricsCollector} unless the dev + * console is enabled + */ + public DevConsoleMetricsCollector getMetricsCollector() { + return metricsCollector; } String getState() { return state.name(); } - - List<KafkaTopicPosition> getCommitRecords() { - return Collections.unmodifiableList(commitRecords); - } - - CountDownLatch fetchCommitRecords() { - // use a latch to wait for commit records to be ready - // as the consumer thread must be calling Kafka brokers to get this information - // so this thread need to wait for that to be complete - CountDownLatch answer = new CountDownLatch(1); - latch.set(answer); - commitRecordsRequested.set(true); - return answer; - } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DefaultMetricsCollector.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DefaultMetricsCollector.java new file mode 100644 index 00000000000..f00367bc9a0 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DefaultMetricsCollector.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.devconsole; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.camel.component.kafka.consumer.support.ProcessingResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The default collector if the dev console is enabled for Kafka + */ +public class DefaultMetricsCollector implements DevConsoleMetricsCollector { + private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricsCollector.class); + private final String threadId; + + public DefaultMetricsCollector(String threadId) { + this.threadId = threadId; + } + + // dev-console records and state + + private volatile GroupMetadata groupMetadata; + private volatile KafkaTopicPosition lastRecord; + private final List<KafkaTopicPosition> commitRecords = new ArrayList<>(); + private final AtomicBoolean commitRecordsRequested = new AtomicBoolean(); + private final AtomicReference<CountDownLatch> latch = new AtomicReference<>(); + + @Override + public void storeMetadata(Consumer<?, ?> consumer) { + // store metadata + ConsumerGroupMetadata meta = consumer.groupMetadata(); + if (meta != null) { + groupMetadata = new GroupMetadata( + meta.groupId(), meta.groupInstanceId().orElse(""), meta.memberId(), meta.generationId()); + } + } + + @Override + public void storeLastRecord(ProcessingResult result) { + // dev-console uses information from last processed record + lastRecord = new KafkaTopicPosition(result.getTopic(), result.getPartition(), result.getOffset(), 0); + } + + @Override + public void collectCommitMetrics(Consumer<?, ?> consumer) { + if (commitRecordsRequested.compareAndSet(true, false)) { + try { + Map<TopicPartition, OffsetAndMetadata> commits = consumer.committed(consumer.assignment()); + commitRecords.clear(); + for (var e : commits.entrySet()) { + KafkaTopicPosition p + = new KafkaTopicPosition( + e.getKey().topic(), e.getKey().partition(), e.getValue().offset(), + e.getValue().leaderEpoch().orElse(0)); + commitRecords.add(p); + } + CountDownLatch count = latch.get(); + if (count != null) { + count.countDown(); + } + } catch (Exception e) { + // ignore cannot get last commit details + LOG.debug( + "Cannot get last offset committed from Kafka brokers due to: {}. This exception is ignored.", + e.getMessage(), e); + } + } + } + + // dev console information + // ------------------------------------------------------------------------ + + @Override + public GroupMetadata getGroupMetadata() { + return groupMetadata; + } + + @Override + public KafkaTopicPosition getLastRecord() { + return lastRecord; + } + + @Override + public String getThreadId() { + return threadId; + } + + @Override + public List<KafkaTopicPosition> getCommitRecords() { + return Collections.unmodifiableList(commitRecords); + } + + @Override + public CountDownLatch fetchCommitRecords() { + // use a latch to wait for commit records to be ready + // as the consumer thread must be calling Kafka brokers to get this information + // so this thread need to wait for that to be complete + CountDownLatch answer = new CountDownLatch(1); + latch.set(answer); + commitRecordsRequested.set(true); + return answer; + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DevConsoleMetricsCollector.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DevConsoleMetricsCollector.java new file mode 100644 index 00000000000..9a602532533 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/DevConsoleMetricsCollector.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.devconsole; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.camel.component.kafka.consumer.support.ProcessingResult; +import org.apache.kafka.clients.consumer.Consumer; + +/** + * Collects metrics for the dev console + */ +public interface DevConsoleMetricsCollector { + + void storeMetadata(Consumer<?, ?> consumer); + + void storeLastRecord(ProcessingResult result); + + void collectCommitMetrics(Consumer<?, ?> consumer); + + DefaultMetricsCollector.GroupMetadata getGroupMetadata(); + + DefaultMetricsCollector.KafkaTopicPosition getLastRecord(); + + String getThreadId(); + + List<DefaultMetricsCollector.KafkaTopicPosition> getCommitRecords(); + + CountDownLatch fetchCommitRecords(); + + record GroupMetadata(String groupId, String groupInstanceId, String memberId, int generationId) { + } + + record KafkaTopicPosition(String topic, int partition, long offset, int epoch) { + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/NoopMetricsCollector.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/NoopMetricsCollector.java new file mode 100644 index 00000000000..993fc0867f4 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/devconsole/NoopMetricsCollector.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.devconsole; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.camel.component.kafka.consumer.support.ProcessingResult; +import org.apache.kafka.clients.consumer.Consumer; + +/** + * A NO-OP collector that is used if the metrics collector is disabled + */ +public class NoopMetricsCollector implements DevConsoleMetricsCollector { + @Override + public void storeMetadata(Consumer<?, ?> consumer) { + // NO-OP + } + + @Override + public void storeLastRecord(ProcessingResult result) { + // NO-OP + } + + @Override + public void collectCommitMetrics(Consumer<?, ?> consumer) { + // NO-OP + } + + @Override + public GroupMetadata getGroupMetadata() { + return null; + } + + @Override + public KafkaTopicPosition getLastRecord() { + return null; + } + + @Override + public String getThreadId() { + return ""; + } + + @Override + public List<KafkaTopicPosition> getCommitRecords() { + return List.of(); + } + + @Override + public CountDownLatch fetchCommitRecords() { + return null; + } +}
