This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch 4.0.x-CAMEL-19875 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 226ce37d656a8bf95e6b76d8c9942a962233669c Author: Freeman(Yue) Fang <[email protected]> AuthorDate: Mon Sep 18 05:20:29 2023 -0400 [CAMEL-19875]HealthCheck is broken for KafkaConsumer (#11422) --- .../org/apache/camel/component/kafka/KafkaConsumer.java | 11 +++++------ .../integration/health/KafkaConsumerHealthCheckIT.java | 16 +++++++++++++--- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index 92cd99e3d4f..4a84066737c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -30,7 +30,7 @@ import org.apache.camel.Suspendable; import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener; import org.apache.camel.health.HealthCheckAware; import org.apache.camel.health.HealthCheckHelper; -import org.apache.camel.health.WritableHealthCheckRepository; +import org.apache.camel.health.HealthCheckRepository; import org.apache.camel.resume.ConsumerListenerAware; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; @@ -53,7 +53,7 @@ public class KafkaConsumer extends DefaultConsumer protected ExecutorService executor; private final KafkaEndpoint endpoint; private KafkaConsumerHealthCheck consumerHealthCheck; - private WritableHealthCheckRepository healthCheckRepository; + private HealthCheckRepository healthCheckRepository; // This list helps to work around the infinite loop of KAFKA-1894 private final List<KafkaFetchRecords> tasks = new ArrayList<>(); private volatile boolean stopOffsetRepo; @@ -126,13 +126,13 @@ public class KafkaConsumer extends DefaultConsumer // health-check is optional so discover and resolve healthCheckRepository = HealthCheckHelper.getHealthCheckRepository( endpoint.getCamelContext(), - "components", - WritableHealthCheckRepository.class); + "consumers", + HealthCheckRepository.class); if (healthCheckRepository != null) { consumerHealthCheck = new KafkaConsumerHealthCheck(this, getRouteId()); consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckConsumerEnabled()); - healthCheckRepository.addHealthCheck(consumerHealthCheck); + setHealthCheck(consumerHealthCheck); } // is the offset repository already started? @@ -175,7 +175,6 @@ public class KafkaConsumer extends DefaultConsumer } if (healthCheckRepository != null && consumerHealthCheck != null) { - healthCheckRepository.removeHealthCheck(consumerHealthCheck); consumerHealthCheck = null; } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java index 030c578ff73..47292f4405b 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java @@ -18,6 +18,7 @@ package org.apache.camel.component.kafka.integration.health; import java.util.Collection; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.StreamSupport; @@ -166,9 +167,18 @@ public class KafkaConsumerHealthCheckIT extends KafkaHealthCheckTestSupport { serviceShutdown = true; // health-check readiness should be DOWN - final Collection<HealthCheck.Result> res = HealthCheckHelper.invokeReadiness(context); - final boolean down = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.DOWN)); - Assertions.assertTrue(down, "readiness check"); + await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> { + Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context); + Assertions.assertTrue(res2.size() > 0); + Optional<HealthCheck.Result> down + = res2.stream().filter(r -> r.getState().equals(HealthCheck.State.DOWN)).findFirst(); + Assertions.assertTrue(down.isPresent()); + String msg = down.get().getMessage().get(); + Assertions.assertTrue(msg.contains("KafkaConsumer is not ready")); + Map<String, Object> map = down.get().getDetails(); + Assertions.assertEquals(TOPIC, map.get("topic")); + Assertions.assertEquals("test-health-it", map.get("route.id")); + }); } }
