This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-4.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.0.x by this push:
new c82550c6a23 [CAMEL-19875]HealthCheck is broken for KafkaConsumer
(#11422) (#11440)
c82550c6a23 is described below
commit c82550c6a23b4d26a8900f80052b1e2ed3db4875
Author: Andrea Cosentino <[email protected]>
AuthorDate: Mon Sep 18 13:16:21 2023 +0200
[CAMEL-19875]HealthCheck is broken for KafkaConsumer (#11422) (#11440)
Co-authored-by: Freeman(Yue) Fang <[email protected]>
---
.../org/apache/camel/component/kafka/KafkaConsumer.java | 11 +++++------
.../integration/health/KafkaConsumerHealthCheckIT.java | 16 +++++++++++++---
2 files changed, 18 insertions(+), 9 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 92cd99e3d4f..4a84066737c 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -30,7 +30,7 @@ import org.apache.camel.Suspendable;
import
org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.health.HealthCheckAware;
import org.apache.camel.health.HealthCheckHelper;
-import org.apache.camel.health.WritableHealthCheckRepository;
+import org.apache.camel.health.HealthCheckRepository;
import org.apache.camel.resume.ConsumerListenerAware;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
@@ -53,7 +53,7 @@ public class KafkaConsumer extends DefaultConsumer
protected ExecutorService executor;
private final KafkaEndpoint endpoint;
private KafkaConsumerHealthCheck consumerHealthCheck;
- private WritableHealthCheckRepository healthCheckRepository;
+ private HealthCheckRepository healthCheckRepository;
// This list helps to work around the infinite loop of KAFKA-1894
private final List<KafkaFetchRecords> tasks = new ArrayList<>();
private volatile boolean stopOffsetRepo;
@@ -126,13 +126,13 @@ public class KafkaConsumer extends DefaultConsumer
// health-check is optional so discover and resolve
healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(
endpoint.getCamelContext(),
- "components",
- WritableHealthCheckRepository.class);
+ "consumers",
+ HealthCheckRepository.class);
if (healthCheckRepository != null) {
consumerHealthCheck = new KafkaConsumerHealthCheck(this,
getRouteId());
consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckConsumerEnabled());
- healthCheckRepository.addHealthCheck(consumerHealthCheck);
+ setHealthCheck(consumerHealthCheck);
}
// is the offset repository already started?
@@ -175,7 +175,6 @@ public class KafkaConsumer extends DefaultConsumer
}
if (healthCheckRepository != null && consumerHealthCheck != null) {
- healthCheckRepository.removeHealthCheck(consumerHealthCheck);
consumerHealthCheck = null;
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
index 030c578ff73..47292f4405b 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.kafka.integration.health;
import java.util.Collection;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;
@@ -166,9 +167,18 @@ public class KafkaConsumerHealthCheckIT extends
KafkaHealthCheckTestSupport {
serviceShutdown = true;
// health-check readiness should be DOWN
- final Collection<HealthCheck.Result> res =
HealthCheckHelper.invokeReadiness(context);
- final boolean down = res.stream().allMatch(r ->
r.getState().equals(HealthCheck.State.DOWN));
- Assertions.assertTrue(down, "readiness check");
+ await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
+ Collection<HealthCheck.Result> res2 =
HealthCheckHelper.invokeReadiness(context);
+ Assertions.assertTrue(res2.size() > 0);
+ Optional<HealthCheck.Result> down
+ = res2.stream().filter(r ->
r.getState().equals(HealthCheck.State.DOWN)).findFirst();
+ Assertions.assertTrue(down.isPresent());
+ String msg = down.get().getMessage().get();
+ Assertions.assertTrue(msg.contains("KafkaConsumer is not ready"));
+ Map<String, Object> map = down.get().getDetails();
+ Assertions.assertEquals(TOPIC, map.get("topic"));
+ Assertions.assertEquals("test-health-it", map.get("route.id"));
+ });
}
}