This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new c161ffcb86 Fixes #8621 Add KafkaOffsetBackingStore native support and 
integration tests
c161ffcb86 is described below

commit c161ffcb86563ed3995d73995d5caf45a8f168ae
Author: Jomin Mathew <[email protected]>
AuthorDate: Mon Jun 8 11:55:32 2026 +0100

    Fixes #8621 Add KafkaOffsetBackingStore native support and integration tests
    
    * Add KafkaOffsetBackingStore native support and integration tests for 
Debezium
    
    * Use ServiceProviderBuildItem for Kafka Connect Converter registration
    
    Replace the hardcoded list of Converter implementations with
    ServiceProviderBuildItem.allProvidersFromClassPath which auto-discovers
    providers from META-INF/services on the classpath, ensuring forward
    compatibility with future Kafka versions.
    
    ---------
    
    Co-authored-by: jomin mathew <>
---
 extensions-support/debezium/deployment/pom.xml     |  4 ++
 .../deployment/DebeziumSupportProcessor.java       | 41 +++++++--------
 extensions-support/debezium/runtime/pom.xml        |  4 ++
 integration-test-groups/debezium/mongodb/pom.xml   |  5 ++
 .../common/it/mongod/DebeziumMongodbResource.java  | 26 ++++++++--
 .../common/it/mongodb/DebeziumMongodbTest.java     | 31 ++++++++++++
 .../debezium/postgresql/pom.xml                    |  6 +++
 .../it/postgres/DebeziumPostgresResource.java      | 30 ++++++++++-
 .../common/it/postgres/DebeziumPostgresTest.java   | 28 ++++++++++
 integration-tests-support/debezium/pom.xml         |  7 ++-
 .../support/debezium/AbstractDebeziumResource.java | 54 +++++++++++++++++++-
 .../support/debezium/SharedKafkaTestResource.java  | 59 ++++++++++++++++++++++
 integration-tests/debezium-grouped/pom.xml         |  5 ++
 13 files changed, 269 insertions(+), 31 deletions(-)

diff --git a/extensions-support/debezium/deployment/pom.xml 
b/extensions-support/debezium/deployment/pom.xml
index 235613a760..8d1c5e461b 100644
--- a/extensions-support/debezium/deployment/pom.xml
+++ b/extensions-support/debezium/deployment/pom.xml
@@ -38,6 +38,10 @@
             <groupId>io.quarkus</groupId>
             <artifactId>quarkus-datasource-deployment</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-kafka-client-deployment</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.camel.quarkus</groupId>
             <artifactId>camel-quarkus-core-deployment</artifactId>
diff --git 
a/extensions-support/debezium/deployment/src/main/java/org/apache/camel/quarkus/support/debezium/deployment/DebeziumSupportProcessor.java
 
b/extensions-support/debezium/deployment/src/main/java/org/apache/camel/quarkus/support/debezium/deployment/DebeziumSupportProcessor.java
index 793ed0ff7c..30facc054c 100644
--- 
a/extensions-support/debezium/deployment/src/main/java/org/apache/camel/quarkus/support/debezium/deployment/DebeziumSupportProcessor.java
+++ 
b/extensions-support/debezium/deployment/src/main/java/org/apache/camel/quarkus/support/debezium/deployment/DebeziumSupportProcessor.java
@@ -51,11 +51,9 @@ import 
io.quarkus.deployment.builditem.CombinedIndexBuildItem;
 import io.quarkus.deployment.builditem.IndexDependencyBuildItem;
 import 
io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
 import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
-import 
io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem;
 import io.quarkus.gizmo.Gizmo;
 import org.apache.camel.quarkus.support.debezium.DebeziumComponentObserver;
-import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
-import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.source.SourceTask;
 import org.jboss.jandex.DotName;
 import org.jboss.jandex.IndexView;
@@ -74,11 +72,6 @@ public class DebeziumSupportProcessor {
         indexDependency.produce(new IndexDependencyBuildItem("io.debezium", 
"debezium-api"));
     }
 
-    @BuildStep
-    RuntimeInitializedClassBuildItem runtimeInitializedClasses() {
-        return new 
RuntimeInitializedClassBuildItem(SaslClientAuthenticator.class.getName());
-    }
-
     @BuildStep
     public void 
configureKafkaComponentForDevServices(BuildProducer<AdditionalBeanBuildItem> 
additionalBean) {
         
additionalBean.produce(AdditionalBeanBuildItem.unremovableOf(DebeziumComponentObserver.class));
@@ -88,13 +81,18 @@ public class DebeziumSupportProcessor {
     void reflectiveClasses(CombinedIndexBuildItem combinedIndex, 
BuildProducer<ReflectiveClassBuildItem> reflectiveClasses) {
         IndexView index = combinedIndex.getIndex();
 
-        String[] dtos = index.getKnownClasses().stream().map(ci -> 
ci.name().toString())
-                .filter(n -> n.startsWith("org.apache.kafka.connect.json")
-                        || n.startsWith("io.debezium.engine.spi"))
+        String[] debeziumEngineSpiClasses = 
index.getKnownClasses().stream().map(ci -> ci.name().toString())
+                .filter(n -> n.startsWith("io.debezium.engine.spi"))
                 .toArray(String[]::new);
-        
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(dtos).fields().build());
+        
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(debeziumEngineSpiClasses).fields().build());
 
-        dtos = 
index.getAllKnownImplementations(DotName.createSimple(SnapshotLock.class.getName())).stream()
+        String[] jsonConverters = index.getKnownClasses().stream().map(ci -> 
ci.name().toString())
+                .filter(n -> n.startsWith("org.apache.kafka.connect.json"))
+                .toArray(String[]::new);
+        reflectiveClasses.produce(
+                
ReflectiveClassBuildItem.builder(jsonConverters).fields().constructors().methods().build());
+
+        String[] dtos = 
index.getAllKnownImplementations(DotName.createSimple(SnapshotLock.class.getName())).stream()
                 .map(ci -> ci.name().toString())
                 .toArray(String[]::new);
         
reflectiveClasses.produce(ReflectiveClassBuildItem.builder(dtos).fields().build());
@@ -102,20 +100,16 @@ public class DebeziumSupportProcessor {
         reflectiveClasses.produce(ReflectiveClassBuildItem.builder(
                 "org.apache.kafka.connect.storage.FileOffsetBackingStore",
                 "org.apache.kafka.connect.storage.MemoryOffsetBackingStore",
+                "org.apache.kafka.connect.storage.KafkaOffsetBackingStore",
                 "io.debezium.storage.kafka.history.KafkaSchemaHistory",
                 "io.debezium.relational.history.FileDatabaseHistory",
                 "io.debezium.embedded.ConvertingEngineBuilderFactory",
-                "io.debezium.processors.PostProcessorRegistry",
-                
"io.debezium.pipeline.txmetadata.DefaultTransactionMetadataFactory",
-                "io.debezium.schema.SchemaTopicNamingStrategy",
-                "io.debezium.storage.file.history.FileSchemaHistory")
+                "io.debezium.processors.PostProcessorRegistry")
                 .build());
 
         reflectiveClasses.produce(ReflectiveClassBuildItem.builder(
                 DebeziumEngine.BuilderFactory.class,
                 ConvertingAsyncEngineBuilderFactory.class,
-                SaslClientAuthenticator.class,
-                JsonConverter.class,
                 DefaultTransactionMetadataFactory.class,
                 SchemaTopicNamingStrategy.class,
                 BaseSourceTask.class,
@@ -139,11 +133,14 @@ public class DebeziumSupportProcessor {
                 InProcessSignalChannel.class,
                 StandardActionProvider.class,
                 SourceTask.class,
-                ConvertingAsyncEngineBuilderFactory.class,
-                DefaultTransactionMetadataFactory.class,
-                SchemaTopicNamingStrategy.class,
                 FileSchemaHistory.class)
                 .build());
+
+    }
+
+    @BuildStep
+    ServiceProviderBuildItem registerConverterServiceProviders() {
+        return 
ServiceProviderBuildItem.allProvidersFromClassPath("org.apache.kafka.connect.storage.Converter");
     }
 
     @BuildStep
diff --git a/extensions-support/debezium/runtime/pom.xml 
b/extensions-support/debezium/runtime/pom.xml
index 502a6e241f..130fdfbe17 100644
--- a/extensions-support/debezium/runtime/pom.xml
+++ b/extensions-support/debezium/runtime/pom.xml
@@ -43,6 +43,10 @@
             <groupId>io.quarkus</groupId>
             <artifactId>quarkus-datasource</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-kafka-client</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.javassist</groupId>
             <artifactId>javassist</artifactId>
diff --git a/integration-test-groups/debezium/mongodb/pom.xml 
b/integration-test-groups/debezium/mongodb/pom.xml
index e7aa9c5c5a..ff2d2e786e 100644
--- a/integration-test-groups/debezium/mongodb/pom.xml
+++ b/integration-test-groups/debezium/mongodb/pom.xml
@@ -82,6 +82,11 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.strimzi</groupId>
+            <artifactId>strimzi-test-container</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git 
a/integration-test-groups/debezium/mongodb/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/mongod/DebeziumMongodbResource.java
 
b/integration-test-groups/debezium/mongodb/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/mongod/DebeziumMongodbResource.java
index e8fb4ffef8..f8105f2655 100644
--- 
a/integration-test-groups/debezium/mongodb/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/mongod/DebeziumMongodbResource.java
+++ 
b/integration-test-groups/debezium/mongodb/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/mongod/DebeziumMongodbResource.java
@@ -17,7 +17,6 @@
 package org.apache.camel.quarkus.component.debezium.common.it.mongod;
 
 import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.inject.Inject;
 import jakarta.ws.rs.GET;
 import jakarta.ws.rs.Path;
 import jakarta.ws.rs.Produces;
@@ -25,15 +24,11 @@ import jakarta.ws.rs.core.MediaType;
 import org.apache.camel.quarkus.test.support.debezium.AbstractDebeziumResource;
 import org.apache.camel.quarkus.test.support.debezium.Record;
 import org.apache.camel.quarkus.test.support.debezium.Type;
-import org.eclipse.microprofile.config.Config;
 
 @Path("/debezium-mongodb")
 @ApplicationScoped
 public class DebeziumMongodbResource extends AbstractDebeziumResource {
 
-    @Inject
-    Config config;
-
     public DebeziumMongodbResource() {
         super(Type.mongodb);
     }
@@ -64,6 +59,27 @@ public class DebeziumMongodbResource extends 
AbstractDebeziumResource {
         return record.getOperation();
     }
 
+    @Override
+    protected String getKafkaOffsetEndpointUrl() {
+        String kafkaBootstrapServers = 
config.getOptionalValue("kafka.bootstrap.servers", String.class).orElse(null);
+        if (kafkaBootstrapServers == null) {
+            return null;
+        }
+        String hostname = config.getValue(Type.mongodb.getPropertyHostname(), 
String.class);
+        String port = config.getValue(Type.mongodb.getPropertyPort(), 
String.class);
+        return Type.mongodb.getComponent() + ":localhost?"
+                + "mongodbUser=" + 
config.getValue(Type.mongodb.getPropertyUsername(), String.class)
+                + "&mongodbPassword=" + 
config.getValue(Type.mongodb.getPropertyPassword(), String.class)
+                + "&mongodbConnectionString=mongodb://" + hostname + ":" + 
port + "/?replicaSet=my-mongo-set"
+                + "&topicPrefix=cq-testing-kafka"
+                + 
"&offsetStorage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore"
+                + "&offsetStorageTopic=debezium-offset-storage-mongodb"
+                + "&offsetStoragePartitions=1"
+                + "&offsetStorageReplicationFactor=1"
+                + "&offsetFlushIntervalMs=1000"
+                + "&additionalProperties.bootstrap.servers=" + 
kafkaBootstrapServers;
+    }
+
     @Override
     protected String getEndpointUrl(String hostname, String port, String 
username, String password, String databaseServerName,
             String offsetStorageFileName) {
diff --git 
a/integration-test-groups/debezium/mongodb/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java
 
b/integration-test-groups/debezium/mongodb/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java
index 3c12c42681..d002202626 100644
--- 
a/integration-test-groups/debezium/mongodb/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java
+++ 
b/integration-test-groups/debezium/mongodb/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java
@@ -18,7 +18,9 @@ package 
org.apache.camel.quarkus.component.debezium.common.it.mongodb;
 
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.time.Duration;
 import java.util.Optional;
+import java.util.UUID;
 
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
@@ -28,7 +30,9 @@ import com.mongodb.client.result.DeleteResult;
 import io.quarkus.test.common.QuarkusTestResource;
 import io.quarkus.test.junit.QuarkusTest;
 import org.apache.camel.quarkus.test.support.debezium.AbstractDebeziumTest;
+import org.apache.camel.quarkus.test.support.debezium.SharedKafkaTestResource;
 import org.apache.camel.quarkus.test.support.debezium.Type;
+import org.awaitility.Awaitility;
 import org.bson.Document;
 import org.eclipse.microprofile.config.Config;
 import org.junit.jupiter.api.AfterAll;
@@ -39,8 +43,12 @@ import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
 
+import static io.restassured.RestAssured.given;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.emptyOrNullString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -48,6 +56,7 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 @QuarkusTest
 @QuarkusTestResource(value = DebeziumMongodbTestResource.class, 
restrictToAnnotatedClass = true)
+@QuarkusTestResource(SharedKafkaTestResource.class)
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 class DebeziumMongodbTest extends AbstractDebeziumTest {
     private static MongoClient mongoClient;
@@ -151,4 +160,26 @@ class DebeziumMongodbTest extends AbstractDebeziumTest {
         //validate that event for delete is in queue
         receiveResponse(200, equalTo("d"), "/receiveOperation");
     }
+
+    @Test
+    @Order(4)
+    public void testKafkaOffsetBackingStore() {
+        given()
+                .when().get("/debezium-mongodb/kafkaBootstrapServers")
+                .then()
+                .statusCode(200)
+                .body(not(equalTo("not-available")))
+                .body(containsString("PLAINTEXT://"));
+
+        String suffix = UUID.randomUUID().toString().substring(0, 8);
+        insertCompany("KafkaOffsetTest_" + suffix, "KafkaCity");
+
+        
Awaitility.await().pollDelay(Duration.ofMillis(250)).atMost(Duration.ofMinutes(1)).untilAsserted(()
 -> {
+            given()
+                    .when().get("/debezium-mongodb/receiveViaKafkaOffset")
+                    .then()
+                    .statusCode(200)
+                    .body(is(not(emptyOrNullString())));
+        });
+    }
 }
diff --git a/integration-test-groups/debezium/postgresql/pom.xml 
b/integration-test-groups/debezium/postgresql/pom.xml
index 98abeec52c..95d5cf2279 100644
--- a/integration-test-groups/debezium/postgresql/pom.xml
+++ b/integration-test-groups/debezium/postgresql/pom.xml
@@ -79,6 +79,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>io.strimzi</groupId>
+            <artifactId>strimzi-test-container</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
diff --git 
a/integration-test-groups/debezium/postgresql/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresResource.java
 
b/integration-test-groups/debezium/postgresql/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresResource.java
index 1ba47cb26e..f866098d36 100644
--- 
a/integration-test-groups/debezium/postgresql/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresResource.java
+++ 
b/integration-test-groups/debezium/postgresql/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresResource.java
@@ -49,8 +49,34 @@ public class DebeziumPostgresResource extends 
AbstractDebeziumResource {
     }
 
     @Override
-    protected String getEndpointUrl(String hostname, String port, String 
username, String password, String databaseServerName,
-            String offsetStorageFileName) {
+    protected String getKafkaOffsetEndpointUrl() {
+        String kafkaBootstrapServers = 
config.getOptionalValue("kafka.bootstrap.servers", String.class).orElse(null);
+        if (kafkaBootstrapServers == null) {
+            return null;
+        }
+        String hostname = config.getValue(Type.postgres.getPropertyHostname(), 
String.class);
+        String port = config.getValue(Type.postgres.getPropertyPort(), 
String.class);
+        String username = config.getValue(Type.postgres.getPropertyUsername(), 
String.class);
+        String password = config.getValue(Type.postgres.getPropertyPassword(), 
String.class);
+        return Type.postgres.getComponent() + ":localhost?"
+                + "databaseHostname=" + hostname
+                + "&databasePort=" + port
+                + "&databaseUser=" + username
+                + "&databasePassword=" + password
+                + "&databaseDbname=" + DB_NAME
+                + "&topicPrefix=cq-testing-kafka"
+                + "&slotName=debezium_kafka"
+                + 
"&offsetStorage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore"
+                + "&offsetStorageTopic=debezium-offset-storage-postgres"
+                + "&offsetStoragePartitions=1"
+                + "&offsetStorageReplicationFactor=1"
+                + "&offsetFlushIntervalMs=1000"
+                + "&additionalProperties.bootstrap.servers=" + 
kafkaBootstrapServers;
+    }
+
+    @Override
+    protected String getEndpointUrl(String hostname, String port, String 
username, String password,
+            String databaseServerName, String offsetStorageFileName) {
         return super.getEndpointUrl(hostname, port, username, password, 
databaseServerName, offsetStorageFileName)
                 + "&databaseDbname=" + DB_NAME;
     }
diff --git 
a/integration-test-groups/debezium/postgresql/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresTest.java
 
b/integration-test-groups/debezium/postgresql/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresTest.java
index 5732de2103..87be976cf3 100644
--- 
a/integration-test-groups/debezium/postgresql/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresTest.java
+++ 
b/integration-test-groups/debezium/postgresql/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/postgres/DebeziumPostgresTest.java
@@ -19,12 +19,16 @@ package 
org.apache.camel.quarkus.component.debezium.common.it.postgres;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
+import java.time.Duration;
+import java.util.UUID;
 
 import io.quarkus.test.common.QuarkusTestResource;
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
 import org.apache.camel.quarkus.test.support.debezium.AbstractDebeziumTest;
+import org.apache.camel.quarkus.test.support.debezium.SharedKafkaTestResource;
 import org.apache.camel.quarkus.test.support.debezium.Type;
+import org.awaitility.Awaitility;
 import org.eclipse.microprofile.config.Config;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -33,10 +37,12 @@ import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
 
+import static io.restassured.RestAssured.given;
 import static org.hamcrest.Matchers.*;
 
 @QuarkusTest
 @QuarkusTestResource(value = DebeziumPostgresTestResource.class, 
restrictToAnnotatedClass = true)
+@QuarkusTestResource(SharedKafkaTestResource.class)
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 class DebeziumPostgresTest extends AbstractDebeziumTest {
     private static Connection connection;
@@ -62,6 +68,28 @@ class DebeziumPostgresTest extends AbstractDebeziumTest {
                 .body("'bootstrap.servers'", is(notNullValue()));
     }
 
+    @Test
+    @Order(5)
+    public void testKafkaOffsetBackingStore() throws SQLException {
+        given()
+                .when().get("/debezium-postgres/kafkaBootstrapServers")
+                .then()
+                .statusCode(200)
+                .body(not(equalTo("not-available")))
+                .body(containsString("PLAINTEXT://"));
+
+        String suffix = UUID.randomUUID().toString().substring(0, 8);
+        insertCompany("KafkaOffsetTest_" + suffix, "KafkaCity");
+
+        
Awaitility.await().pollDelay(Duration.ofMillis(250)).atMost(Duration.ofMinutes(1)).untilAsserted(()
 -> {
+            given()
+                    .when().get("/debezium-postgres/receiveViaKafkaOffset")
+                    .then()
+                    .statusCode(200)
+                    .body(is(not(emptyOrNullString())));
+        });
+    }
+
     @AfterAll
     public static void cleanUp() throws SQLException {
         if (connection != null) {
diff --git a/integration-tests-support/debezium/pom.xml 
b/integration-tests-support/debezium/pom.xml
index 0b8f30d7a8..5de7cb70dc 100644
--- a/integration-tests-support/debezium/pom.xml
+++ b/integration-tests-support/debezium/pom.xml
@@ -74,5 +74,10 @@
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.strimzi</groupId>
+            <artifactId>strimzi-test-container</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/integration-tests-support/debezium/src/main/java/org/apache/camel/quarkus/test/support/debezium/AbstractDebeziumResource.java
 
b/integration-tests-support/debezium/src/main/java/org/apache/camel/quarkus/test/support/debezium/AbstractDebeziumResource.java
index 8a9c1e6b60..b2726a3083 100644
--- 
a/integration-tests-support/debezium/src/main/java/org/apache/camel/quarkus/test/support/debezium/AbstractDebeziumResource.java
+++ 
b/integration-tests-support/debezium/src/main/java/org/apache/camel/quarkus/test/support/debezium/AbstractDebeziumResource.java
@@ -19,6 +19,7 @@ package org.apache.camel.quarkus.test.support.debezium;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import jakarta.annotation.PreDestroy;
 import jakarta.inject.Inject;
 import jakarta.ws.rs.GET;
 import jakarta.ws.rs.Path;
@@ -27,6 +28,7 @@ import jakarta.ws.rs.core.MediaType;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
 import org.apache.camel.component.debezium.DebeziumConstants;
 import org.apache.camel.component.debezium.DebeziumEndpoint;
 import org.eclipse.microprofile.config.Config;
@@ -39,6 +41,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
  */
 public abstract class AbstractDebeziumResource {
     private final Type type;
+    private volatile PollingConsumer kafkaOffsetConsumer;
 
     @ConfigProperty(name = "test.debezium.timeout", defaultValue = "10000")
     long TIMEOUT;
@@ -47,7 +50,7 @@ public abstract class AbstractDebeziumResource {
     ConsumerTemplate consumerTemplate;
 
     @Inject
-    Config config;
+    protected Config config;
 
     @Inject
     CamelContext camelContext;
@@ -66,6 +69,55 @@ public abstract class AbstractDebeziumResource {
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> (String) 
e.getValue()));
     }
 
+    @Path("/kafkaBootstrapServers")
+    @GET
+    @Produces(MediaType.TEXT_PLAIN)
+    public String kafkaBootstrapServers() {
+        String servers = config.getOptionalValue("kafka.bootstrap.servers", 
String.class).orElse(null);
+        return servers != null ? servers : "not-available";
+    }
+
+    @Path("/receiveViaKafkaOffset")
+    @GET
+    @Produces(MediaType.TEXT_PLAIN)
+    public String receiveViaKafkaOffset() {
+        String kafkaEndpointUrl = getKafkaOffsetEndpointUrl();
+        if (kafkaEndpointUrl == null) {
+            return null;
+        }
+        if (kafkaOffsetConsumer == null) {
+            initKafkaOffsetConsumer(kafkaEndpointUrl);
+        }
+        Exchange exchange = kafkaOffsetConsumer.receive(TIMEOUT);
+        if (exchange == null) {
+            return null;
+        }
+        return exchange.getIn().getBody(String.class);
+    }
+
+    private synchronized void initKafkaOffsetConsumer(String endpointUrl) {
+        if (kafkaOffsetConsumer != null) {
+            return;
+        }
+        try {
+            kafkaOffsetConsumer = 
camelContext.getEndpoint(endpointUrl).createPollingConsumer();
+            kafkaOffsetConsumer.start();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to start Kafka offset consumer 
for " + type.name(), e);
+        }
+    }
+
+    @PreDestroy
+    void cleanup() {
+        if (kafkaOffsetConsumer != null) {
+            kafkaOffsetConsumer.stop();
+        }
+    }
+
+    protected String getKafkaOffsetEndpointUrl() {
+        return null;
+    }
+
     protected String getEndpointUrl(String hostname, String port, String 
username, String password, String databaseServerName,
             String offsetStorageFileName) {
         return type.getComponent() + ":localhost?"
diff --git 
a/integration-tests-support/debezium/src/test/java/org/apache/camel/quarkus/test/support/debezium/SharedKafkaTestResource.java
 
b/integration-tests-support/debezium/src/test/java/org/apache/camel/quarkus/test/support/debezium/SharedKafkaTestResource.java
new file mode 100644
index 0000000000..557b648bce
--- /dev/null
+++ 
b/integration-tests-support/debezium/src/test/java/org/apache/camel/quarkus/test/support/debezium/SharedKafkaTestResource.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.test.support.debezium;
+
+import java.util.Map;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import io.strimzi.test.container.StrimziKafkaCluster;
+import io.strimzi.test.container.StrimziKafkaContainer;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.logging.Logger;
+
+public class SharedKafkaTestResource implements 
QuarkusTestResourceLifecycleManager {
+
+    private static final Logger LOG = 
Logger.getLogger(SharedKafkaTestResource.class);
+    private static final String KAFKA_IMAGE_NAME = 
ConfigProvider.getConfig().getValue("kafka.container.image",
+            String.class);
+
+    private StrimziKafkaCluster kafkaCluster;
+
+    @Override
+    public Map<String, String> start() {
+        LOG.info("Starting shared Kafka cluster");
+        kafkaCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
+                .withImage(KAFKA_IMAGE_NAME)
+                .build();
+        kafkaCluster.start();
+        StrimziKafkaContainer kafkaContainer = 
kafkaCluster.getBrokers().stream().findFirst()
+                .orElseThrow(() -> new RuntimeException("No Kafka broker 
available"));
+        LOG.infof("Shared Kafka cluster started with bootstrap servers: %s", 
kafkaContainer.getBootstrapServers());
+        return Map.of("kafka.bootstrap.servers", 
kafkaContainer.getBootstrapServers());
+    }
+
+    @Override
+    public void stop() {
+        try {
+            if (kafkaCluster != null) {
+                LOG.info("Stopping shared Kafka cluster");
+                kafkaCluster.stop();
+            }
+        } catch (Exception e) {
+            LOG.warn("Error stopping shared Kafka cluster", e);
+        }
+    }
+}
diff --git a/integration-tests/debezium-grouped/pom.xml 
b/integration-tests/debezium-grouped/pom.xml
index dca140a555..9c6f36cd9b 100644
--- a/integration-tests/debezium-grouped/pom.xml
+++ b/integration-tests/debezium-grouped/pom.xml
@@ -131,6 +131,11 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>io.strimzi</groupId>
+            <artifactId>strimzi-test-container</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

Reply via email to