This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new c161ffcb86 Fixes #8621 Add KafkaOffsetBackingStore native support and
integration tests
c161ffcb86 is described below
commit c161ffcb86563ed3995d73995d5caf45a8f168ae
Author: Jomin Mathew <[email protected]>
AuthorDate: Mon Jun 8 11:55:32 2026 +0100
Fixes #8621 Add KafkaOffsetBackingStore native support and integration tests
* Add KafkaOffsetBackingStore native support and integration tests for
Debezium
* Use ServiceProviderBuildItem for Kafka Connect Converter registration
Replace the hardcoded list of Converter implementations with
ServiceProviderBuildItem.allProvidersFromClassPath which auto-discovers
providers from META-INF/services on the classpath, ensuring forward
compatibility with future Kafka versions.
---------
Co-authored-by: jomin mathew <>
---
extensions-support/debezium/deployment/pom.xml | 4 ++
.../deployment/DebeziumSupportProcessor.java | 41 +++++++--------
extensions-support/debezium/runtime/pom.xml | 4 ++
integration-test-groups/debezium/mongodb/pom.xml | 5 ++
.../common/it/mongod/DebeziumMongodbResource.java | 26 ++++++++--
.../common/it/mongodb/DebeziumMongodbTest.java | 31 ++++++++++++
.../debezium/postgresql/pom.xml | 6 +++
.../it/postgres/DebeziumPostgresResource.java | 30 ++++++++++-
.../common/it/postgres/DebeziumPostgresTest.java | 28 ++++++++++
integration-tests-support/debezium/pom.xml | 7 ++-
.../support/debezium/AbstractDebeziumResource.java | 54 +++++++++++++++++++-
.../support/debezium/SharedKafkaTestResource.java | 59 ++++++++++++++++++++++
integration-tests/debezium-grouped/pom.xml | 5 ++
13 files changed, 269 insertions(+), 31 deletions(-)
diff --git a/extensions-support/debezium/deployment/pom.xml
b/extensions-support/debezium/deployment/pom.xml
index 235613a760..8d1c5e461b 100644
--- a/extensions-support/debezium/deployment/pom.xml
+++ b/extensions-support/debezium/deployment/pom.xml
@@ -38,6 +38,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-datasource-deployment</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-kafka-client-deployment</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-core-deployment</artifactId>
diff --git
a/extensions-support/debezium/deployment/src/main/java/org/apache/camel/quarkus/support/debezium/deployment/DebeziumSupportProcessor.java
b/extensions-support/debezium/deployment/src/main/java/org/apache/camel/quarkus/support/debezium/deployment/DebeziumSupportProcessor.java
index 793ed0ff7c..30facc054c 100644
---
a/extensions-support/debezium/deployment/src/main/java/org/apache/camel/quarkus/support/debezium/deployment/DebeziumSupportProcessor.java
+++
b/extensions-support/debezium/deployment/src/main/java/org/apache/camel/quarkus/support/debezium/deployment/DebeziumSupportProcessor.java
@@ -51,11 +51,9 @@ import
io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.IndexDependencyBuildItem;
import
io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
-import
io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem;
import io.quarkus.gizmo.Gizmo;
import org.apache.camel.quarkus.support.debezium.DebeziumComponentObserver;
-import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
-import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceTask;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
@@ -74,11 +72,6 @@ public class DebeziumSupportProcessor {
indexDependency.produce(new IndexDependencyBuildItem("io.debezium",
"debezium-api"));
}
- @BuildStep
- RuntimeInitializedClassBuildItem runtimeInitializedClasses() {
- return new
RuntimeInitializedClassBuildItem(SaslClientAuthenticator.class.getName());
- }
-
@BuildStep
public void
configureKafkaComponentForDevServices(BuildProducer<AdditionalBeanBuildItem>
additionalBean) {
additionalBean.produce(AdditionalBeanBuildItem.unremovableOf(DebeziumComponentObserver.class));
@@ -88,13 +81,18 @@ public class DebeziumSupportProcessor {
void reflectiveClasses(CombinedIndexBuildItem combinedIndex,
BuildProducer<ReflectiveClassBuildItem> reflectiveClasses) {
IndexView index = combinedIndex.getIndex();
- String[] dtos = index.getKnownClasses().stream().map(ci ->
ci.name().toString())
- .filter(n -> n.startsWith("org.apache.kafka.connect.json")
- || n.startsWith("io.debezium.engine.spi"))
+ String[] debeziumEngineSpiClasses =
index.getKnownClasses().stream().map(ci -> ci.name().toString())
+ .filter(n -> n.startsWith("io.debezium.engine.spi"))
.toArray(String[]::new);
-
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(dtos).fields().build());
+
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(debeziumEngineSpiClasses).fields().build());
- dtos =
index.getAllKnownImplementations(DotName.createSimple(SnapshotLock.class.getName())).stream()
+ String[] jsonConverters = index.getKnownClasses().stream().map(ci ->
ci.name().toString())
+ .filter(n -> n.startsWith("org.apache.kafka.connect.json"))
+ .toArray(String[]::new);
+ reflectiveClasses.produce(
+
ReflectiveClassBuildItem.builder(jsonConverters).fields().constructors().methods().build());
+
+ String[] dtos =
index.getAllKnownImplementations(DotName.createSimple(SnapshotLock.class.getName())).stream()
.map(ci -> ci.name().toString())
.toArray(String[]::new);
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(dtos).fields().build());
@@ -102,20 +100,16 @@ public class DebeziumSupportProcessor {
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(
"org.apache.kafka.connect.storage.FileOffsetBackingStore",
"org.apache.kafka.connect.storage.MemoryOffsetBackingStore",
+ "org.apache.kafka.connect.storage.KafkaOffsetBackingStore",
"io.debezium.storage.kafka.history.KafkaSchemaHistory",
"io.debezium.relational.history.FileDatabaseHistory",
"io.debezium.embedded.ConvertingEngineBuilderFactory",
- "io.debezium.processors.PostProcessorRegistry",
-
"io.debezium.pipeline.txmetadata.DefaultTransactionMetadataFactory",
- "io.debezium.schema.SchemaTopicNamingStrategy",
- "io.debezium.storage.file.history.FileSchemaHistory")
+ "io.debezium.processors.PostProcessorRegistry")
.build());
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(
DebeziumEngine.BuilderFactory.class,
ConvertingAsyncEngineBuilderFactory.class,
- SaslClientAuthenticator.class,
- JsonConverter.class,
DefaultTransactionMetadataFactory.class,
SchemaTopicNamingStrategy.class,
BaseSourceTask.class,
@@ -139,11 +133,14 @@ public class DebeziumSupportProcessor {
InProcessSignalChannel.class,
StandardActionProvider.class,
SourceTask.class,
- ConvertingAsyncEngineBuilderFactory.class,
- DefaultTransactionMetadataFactory.class,
- SchemaTopicNamingStrategy.class,
FileSchemaHistory.class)
.build());
+
+ }
+
+ @BuildStep
+ ServiceProviderBuildItem registerConverterServiceProviders() {
+ return
ServiceProviderBuildItem.allProvidersFromClassPath("org.apache.kafka.connect.storage.Converter");
}
@BuildStep
diff --git a/extensions-support/debezium/runtime/pom.xml
b/extensions-support/debezium/runtime/pom.xml
index 502a6e241f..130fdfbe17 100644
--- a/extensions-support/debezium/runtime/pom.xml
+++ b/extensions-support/debezium/runtime/pom.xml
@@ -43,6 +43,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-datasource</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-kafka-client</artifactId>
+ </dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
diff --git a/integration-test-groups/debezium/mongodb/pom.xml
b/integration-test-groups/debezium/mongodb/pom.xml
index e7aa9c5c5a..ff2d2e786e 100644
--- a/integration-test-groups/debezium/mongodb/pom.xml
+++ b/integration-test-groups/debezium/mongodb/pom.xml
@@ -82,6 +82,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.strimzi</groupId>
+ <artifactId>strimzi-test-container</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/integration-test-groups/debezium/mongodb/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/mongod/DebeziumMongodbResource.java
b/integration-test-groups/debezium/mongodb/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/mongod/DebeziumMongodbResource.java
index e8fb4ffef8..f8105f2655 100644
---
a/integration-test-groups/debezium/mongodb/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/mongod/DebeziumMongodbResource.java
+++
b/integration-test-groups/debezium/mongodb/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/mongod/DebeziumMongodbResource.java
@@ -17,7 +17,6 @@
package org.apache.camel.quarkus.component.debezium.common.it.mongod;
import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
@@ -25,15 +24,11 @@ import jakarta.ws.rs.core.MediaType;
import org.apache.camel.quarkus.test.support.debezium.AbstractDebeziumResource;
import org.apache.camel.quarkus.test.support.debezium.Record;
import org.apache.camel.quarkus.test.support.debezium.Type;
-import org.eclipse.microprofile.config.Config;
@Path("/debezium-mongodb")
@ApplicationScoped
public class DebeziumMongodbResource extends AbstractDebeziumResource {
- @Inject
- Config config;
-
public DebeziumMongodbResource() {
super(Type.mongodb);
}
@@ -64,6 +59,27 @@ public class DebeziumMongodbResource extends
AbstractDebeziumResource {
return record.getOperation();
}
+ @Override
+ protected String getKafkaOffsetEndpointUrl() {
+ String kafkaBootstrapServers =
config.getOptionalValue("kafka.bootstrap.servers", String.class).orElse(null);
+ if (kafkaBootstrapServers == null) {
+ return null;
+ }
+ String hostname = config.getValue(Type.mongodb.getPropertyHostname(),
String.class);
+ String port = config.getValue(Type.mongodb.getPropertyPort(),
String.class);
+ return Type.mongodb.getComponent() + ":localhost?"
+ + "mongodbUser=" +
config.getValue(Type.mongodb.getPropertyUsername(), String.class)
+ + "&mongodbPassword=" +
config.getValue(Type.mongodb.getPropertyPassword(), String.class)
+ + "&mongodbConnectionString=mongodb://" + hostname + ":" +
port + "/?replicaSet=my-mongo-set"
+ + "&topicPrefix=cq-testing-kafka"
+ +
"&offsetStorage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore"
+ + "&offsetStorageTopic=debezium-offset-storage-mongodb"
+ + "&offsetStoragePartitions=1"
+ + "&offsetStorageReplicationFactor=1"
+ + "&offsetFlushIntervalMs=1000"
+ + "&additionalProperties.bootstrap.servers=" +
kafkaBootstrapServers;
+ }
+
@Override
protected String getEndpointUrl(String hostname, String port, String
username, String password, String databaseServerName,
String offsetStorageFileName) {
diff --git
a/integration-test-groups/debezium/mongodb/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java
b/integration-test-groups/debezium/mongodb/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java
index 3c12c42681..d002202626 100644
---
a/integration-test-groups/debezium/mongodb/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java
+++
b/integration-test-groups/debezium/mongodb/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java
@@ -18,7 +18,9 @@ package
org.apache.camel.quarkus.component.debezium.common.it.mongodb;
import java.sql.Connection;
import java.sql.SQLException;
+import java.time.Duration;
import java.util.Optional;
+import java.util.UUID;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
@@ -28,7 +30,9 @@ import com.mongodb.client.result.DeleteResult;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import org.apache.camel.quarkus.test.support.debezium.AbstractDebeziumTest;
+import org.apache.camel.quarkus.test.support.debezium.SharedKafkaTestResource;
import org.apache.camel.quarkus.test.support.debezium.Type;
+import org.awaitility.Awaitility;
import org.bson.Document;
import org.eclipse.microprofile.config.Config;
import org.junit.jupiter.api.AfterAll;
@@ -39,8 +43,12 @@ import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
+import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
@@ -48,6 +56,7 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue;
@QuarkusTest
@QuarkusTestResource(value = DebeziumMongodbTestResource.class,
restrictToAnnotatedClass = true)
+@QuarkusTestResource(SharedKafkaTestResource.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class DebeziumMongodbTest extends AbstractDebeziumTest {
private static MongoClient mongoClient;
@@ -151,4 +160,26 @@ class DebeziumMongodbTest extends AbstractDebeziumTest {
//validate that event for delete is in queue
receiveResponse(200, equalTo("d"), "/receiveOperation");
}
+
+ @Test
+ @Order(4)
+ public void testKafkaOffsetBackingStore() {
+ given()
+ .when().get("/debezium-mongodb/kafkaBootstrapServers")
+ .then()
+ .statusCode(200)
+ .body(not(equalTo("not-available")))
+ .body(containsString("PLAINTEXT://"));
+
+ String suffix = UUID.randomUUID().toString().substring(0, 8);
+ insertCompany("KafkaOffsetTest_" + suffix, "KafkaCity");
+
+
Awaitility.await().pollDelay(Duration.ofMillis(250)).atMost(Duration.ofMinutes(1)).untilAsserted(()
-> {
+ given()
+ .when().get("/debezium-mongodb/receiveViaKafkaOffset")
+ .then()
+ .statusCode(200)
+ .body(is(not(emptyOrNullString())));
+ });
+ }
}
diff --git a/integration-test-groups/debezium/postgresql/pom.xml
b/integration-test-groups/debezium/postgresql/pom.xml
index 98abeec52c..95d5cf2279 100644
--- a/integration-test-groups/debezium/postgresql/pom.xml
+++ b/integration-test-groups/debezium/postgresql/pom.xml
@@ -79,6 +79,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.strimzi</groupId>
+ <artifactId>strimzi-test-container</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
diff --git
a/integration-test-groups/debezium/postgresql/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresResource.java
b/integration-test-groups/debezium/postgresql/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresResource.java
index 1ba47cb26e..f866098d36 100644
---
a/integration-test-groups/debezium/postgresql/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresResource.java
+++
b/integration-test-groups/debezium/postgresql/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresResource.java
@@ -49,8 +49,34 @@ public class DebeziumPostgresResource extends
AbstractDebeziumResource {
}
@Override
- protected String getEndpointUrl(String hostname, String port, String
username, String password, String databaseServerName,
- String offsetStorageFileName) {
+ protected String getKafkaOffsetEndpointUrl() {
+ String kafkaBootstrapServers =
config.getOptionalValue("kafka.bootstrap.servers", String.class).orElse(null);
+ if (kafkaBootstrapServers == null) {
+ return null;
+ }
+ String hostname = config.getValue(Type.postgres.getPropertyHostname(),
String.class);
+ String port = config.getValue(Type.postgres.getPropertyPort(),
String.class);
+ String username = config.getValue(Type.postgres.getPropertyUsername(),
String.class);
+ String password = config.getValue(Type.postgres.getPropertyPassword(),
String.class);
+ return Type.postgres.getComponent() + ":localhost?"
+ + "databaseHostname=" + hostname
+ + "&databasePort=" + port
+ + "&databaseUser=" + username
+ + "&databasePassword=" + password
+ + "&databaseDbname=" + DB_NAME
+ + "&topicPrefix=cq-testing-kafka"
+ + "&slotName=debezium_kafka"
+ +
"&offsetStorage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore"
+ + "&offsetStorageTopic=debezium-offset-storage-postgres"
+ + "&offsetStoragePartitions=1"
+ + "&offsetStorageReplicationFactor=1"
+ + "&offsetFlushIntervalMs=1000"
+ + "&additionalProperties.bootstrap.servers=" +
kafkaBootstrapServers;
+ }
+
+ @Override
+ protected String getEndpointUrl(String hostname, String port, String
username, String password,
+ String databaseServerName, String offsetStorageFileName) {
return super.getEndpointUrl(hostname, port, username, password,
databaseServerName, offsetStorageFileName)
+ "&databaseDbname=" + DB_NAME;
}
diff --git
a/integration-test-groups/debezium/postgresql/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresTest.java
b/integration-test-groups/debezium/postgresql/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresTest.java
index 5732de2103..87be976cf3 100644
---
a/integration-test-groups/debezium/postgresql/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresTest.java
+++
b/integration-test-groups/debezium/postgresql/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresTest.java
@@ -19,12 +19,16 @@ package
org.apache.camel.quarkus.component.debezium.common.it.postgres;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
+import java.time.Duration;
+import java.util.UUID;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import org.apache.camel.quarkus.test.support.debezium.AbstractDebeziumTest;
+import org.apache.camel.quarkus.test.support.debezium.SharedKafkaTestResource;
import org.apache.camel.quarkus.test.support.debezium.Type;
+import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.Config;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -33,10 +37,12 @@ import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
+import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.*;
@QuarkusTest
@QuarkusTestResource(value = DebeziumPostgresTestResource.class,
restrictToAnnotatedClass = true)
+@QuarkusTestResource(SharedKafkaTestResource.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class DebeziumPostgresTest extends AbstractDebeziumTest {
private static Connection connection;
@@ -62,6 +68,28 @@ class DebeziumPostgresTest extends AbstractDebeziumTest {
.body("'bootstrap.servers'", is(notNullValue()));
}
+ @Test
+ @Order(5)
+ public void testKafkaOffsetBackingStore() throws SQLException {
+ given()
+ .when().get("/debezium-postgres/kafkaBootstrapServers")
+ .then()
+ .statusCode(200)
+ .body(not(equalTo("not-available")))
+ .body(containsString("PLAINTEXT://"));
+
+ String suffix = UUID.randomUUID().toString().substring(0, 8);
+ insertCompany("KafkaOffsetTest_" + suffix, "KafkaCity");
+
+
Awaitility.await().pollDelay(Duration.ofMillis(250)).atMost(Duration.ofMinutes(1)).untilAsserted(()
-> {
+ given()
+ .when().get("/debezium-postgres/receiveViaKafkaOffset")
+ .then()
+ .statusCode(200)
+ .body(is(not(emptyOrNullString())));
+ });
+ }
+
@AfterAll
public static void cleanUp() throws SQLException {
if (connection != null) {
diff --git a/integration-tests-support/debezium/pom.xml
b/integration-tests-support/debezium/pom.xml
index 0b8f30d7a8..5de7cb70dc 100644
--- a/integration-tests-support/debezium/pom.xml
+++ b/integration-tests-support/debezium/pom.xml
@@ -74,5 +74,10 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.strimzi</groupId>
+ <artifactId>strimzi-test-container</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git
a/integration-tests-support/debezium/src/main/java/org/apache/camel/quarkus/test/support/debezium/AbstractDebeziumResource.java
b/integration-tests-support/debezium/src/main/java/org/apache/camel/quarkus/test/support/debezium/AbstractDebeziumResource.java
index 8a9c1e6b60..b2726a3083 100644
---
a/integration-tests-support/debezium/src/main/java/org/apache/camel/quarkus/test/support/debezium/AbstractDebeziumResource.java
+++
b/integration-tests-support/debezium/src/main/java/org/apache/camel/quarkus/test/support/debezium/AbstractDebeziumResource.java
@@ -19,6 +19,7 @@ package org.apache.camel.quarkus.test.support.debezium;
import java.util.Map;
import java.util.stream.Collectors;
+import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
@@ -27,6 +28,7 @@ import jakarta.ws.rs.core.MediaType;
import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
import org.apache.camel.component.debezium.DebeziumConstants;
import org.apache.camel.component.debezium.DebeziumEndpoint;
import org.eclipse.microprofile.config.Config;
@@ -39,6 +41,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
*/
public abstract class AbstractDebeziumResource {
private final Type type;
+ private volatile PollingConsumer kafkaOffsetConsumer;
@ConfigProperty(name = "test.debezium.timeout", defaultValue = "10000")
long TIMEOUT;
@@ -47,7 +50,7 @@ public abstract class AbstractDebeziumResource {
ConsumerTemplate consumerTemplate;
@Inject
- Config config;
+ protected Config config;
@Inject
CamelContext camelContext;
@@ -66,6 +69,55 @@ public abstract class AbstractDebeziumResource {
.collect(Collectors.toMap(Map.Entry::getKey, e -> (String)
e.getValue()));
}
+ @Path("/kafkaBootstrapServers")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String kafkaBootstrapServers() {
+ String servers = config.getOptionalValue("kafka.bootstrap.servers",
String.class).orElse(null);
+ return servers != null ? servers : "not-available";
+ }
+
+ @Path("/receiveViaKafkaOffset")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String receiveViaKafkaOffset() {
+ String kafkaEndpointUrl = getKafkaOffsetEndpointUrl();
+ if (kafkaEndpointUrl == null) {
+ return null;
+ }
+ if (kafkaOffsetConsumer == null) {
+ initKafkaOffsetConsumer(kafkaEndpointUrl);
+ }
+ Exchange exchange = kafkaOffsetConsumer.receive(TIMEOUT);
+ if (exchange == null) {
+ return null;
+ }
+ return exchange.getIn().getBody(String.class);
+ }
+
+ private synchronized void initKafkaOffsetConsumer(String endpointUrl) {
+ if (kafkaOffsetConsumer != null) {
+ return;
+ }
+ try {
+ kafkaOffsetConsumer =
camelContext.getEndpoint(endpointUrl).createPollingConsumer();
+ kafkaOffsetConsumer.start();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start Kafka offset consumer
for " + type.name(), e);
+ }
+ }
+
+ @PreDestroy
+ void cleanup() {
+ if (kafkaOffsetConsumer != null) {
+ kafkaOffsetConsumer.stop();
+ }
+ }
+
+ protected String getKafkaOffsetEndpointUrl() {
+ return null;
+ }
+
protected String getEndpointUrl(String hostname, String port, String
username, String password, String databaseServerName,
String offsetStorageFileName) {
return type.getComponent() + ":localhost?"
diff --git
a/integration-tests-support/debezium/src/test/java/org/apache/camel/quarkus/test/support/debezium/SharedKafkaTestResource.java
b/integration-tests-support/debezium/src/test/java/org/apache/camel/quarkus/test/support/debezium/SharedKafkaTestResource.java
new file mode 100644
index 0000000000..557b648bce
--- /dev/null
+++
b/integration-tests-support/debezium/src/test/java/org/apache/camel/quarkus/test/support/debezium/SharedKafkaTestResource.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.test.support.debezium;
+
+import java.util.Map;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import io.strimzi.test.container.StrimziKafkaCluster;
+import io.strimzi.test.container.StrimziKafkaContainer;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.logging.Logger;
+
+public class SharedKafkaTestResource implements
QuarkusTestResourceLifecycleManager {
+
+ private static final Logger LOG =
Logger.getLogger(SharedKafkaTestResource.class);
+ private static final String KAFKA_IMAGE_NAME =
ConfigProvider.getConfig().getValue("kafka.container.image",
+ String.class);
+
+ private StrimziKafkaCluster kafkaCluster;
+
+ @Override
+ public Map<String, String> start() {
+ LOG.info("Starting shared Kafka cluster");
+ kafkaCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
+ .withImage(KAFKA_IMAGE_NAME)
+ .build();
+ kafkaCluster.start();
+ StrimziKafkaContainer kafkaContainer =
kafkaCluster.getBrokers().stream().findFirst()
+ .orElseThrow(() -> new RuntimeException("No Kafka broker
available"));
+ LOG.infof("Shared Kafka cluster started with bootstrap servers: %s",
kafkaContainer.getBootstrapServers());
+ return Map.of("kafka.bootstrap.servers",
kafkaContainer.getBootstrapServers());
+ }
+
+ @Override
+ public void stop() {
+ try {
+ if (kafkaCluster != null) {
+ LOG.info("Stopping shared Kafka cluster");
+ kafkaCluster.stop();
+ }
+ } catch (Exception e) {
+ LOG.warn("Error stopping shared Kafka cluster", e);
+ }
+ }
+}
diff --git a/integration-tests/debezium-grouped/pom.xml
b/integration-tests/debezium-grouped/pom.xml
index dca140a555..9c6f36cd9b 100644
--- a/integration-tests/debezium-grouped/pom.xml
+++ b/integration-tests/debezium-grouped/pom.xml
@@ -131,6 +131,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.strimzi</groupId>
+ <artifactId>strimzi-test-container</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>