This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 5ad73bdd87b9fb6cf521ea1cc1c02ffd602d56d0 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Fri Apr 22 11:45:20 2022 +0200 (chores) camel-kafka: rework the KafkaConsumerHealthCheckIT It manages the service instance, therefore it should not use the singleton service shared by other services via BaseEmbeddedKafkaTestSupport --- .../integration/KafkaConsumerHealthCheckIT.java | 48 ++++++++++++++++++++-- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java index 576e0118d3f..dcb52c44f6e 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java @@ -29,6 +29,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.KafkaComponent; import org.apache.camel.component.kafka.KafkaConstants; import org.apache.camel.component.kafka.MockConsumerInterceptor; import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer; @@ -37,14 +38,21 @@ import org.apache.camel.health.HealthCheck; import org.apache.camel.health.HealthCheckHelper; import org.apache.camel.health.HealthCheckRegistry; import org.apache.camel.impl.health.DefaultHealthCheckRegistry; +import org.apache.camel.test.infra.kafka.services.KafkaService; +import org.apache.camel.test.infra.kafka.services.KafkaServiceFactory; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.internals.RecordHeader; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestMethodOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +63,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) -public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport { +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class KafkaConsumerHealthCheckIT extends CamelTestSupport { public static final String TOPIC = "test-health"; + public static KafkaService service = KafkaServiceFactory.createService(); + + protected static AdminClient kafkaAdminClient; + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerHealthCheckIT.class); @BindToRegistry("myHeaderDeserializer") @@ -68,7 +81,6 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport { + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer" + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor") private Endpoint from; - @EndpointInject("mock:result") private MockEndpoint to; @@ -76,11 +88,32 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport { @BeforeEach public void before() { - Properties props = getDefaultProperties(); + Properties props = BaseEmbeddedKafkaTestSupport.getDefaultProperties(service); producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); MockConsumerInterceptor.recordsCaptured.clear(); } + @BeforeAll + public static void beforeClass() { + service.initialize(); + + LOG.info("### Embedded Kafka cluster broker list: " + service.getBootstrapServers()); + System.setProperty("bootstrapServers", service.getBootstrapServers()); + System.setProperty("brokers", service.getBootstrapServers()); + } + + @AfterAll + public static void afterClass() { + service.shutdown(); + } + + @BeforeEach + public void setKafkaAdminClient() { + if (kafkaAdminClient == null) { + kafkaAdminClient = BaseEmbeddedKafkaTestSupport.createAdminClient(service); + } + } + @AfterEach public void after() { if (producer != null) { @@ -93,6 +126,12 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport { @Override protected CamelContext createCamelContext() throws Exception { CamelContext context = super.createCamelContext(); + context.getPropertiesComponent().setLocation("ref:prop"); + + KafkaComponent kafka = new KafkaComponent(context); + kafka.init(); + kafka.getConfiguration().setBrokers(service.getBootstrapServers()); + context.addComponent("kafka", kafka); // install health check manually (yes a bit cumbersome) HealthCheckRegistry registry = new DefaultHealthCheckRegistry(); @@ -114,7 +153,8 @@ public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport { @Override public void configure() { - from(from).process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody())) + from(from) + .process(exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody())) .routeId("test-health-it").to(to); } };
