This is an automated email from the ASF dual-hosted git repository.

ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new e780587  Expand Sql test coverage #2623
e780587 is described below

commit e780587f835fc0a2af7fe71c73625f6208f84007
Author: JiriOndrusek <[email protected]>
AuthorDate: Thu May 27 16:22:48 2021 +0200

    Expand Sql test coverage #2623
---
 .../component/sql/deployment/SqlProcessor.java     |   3 +-
 integration-tests/sql/pom.xml                      |  17 ++
 .../quarkus/component/sql/it/SqlDbInitializer.java |  54 +++++++
 .../quarkus/component/sql/it/SqlResource.java      | 111 +++++++++----
 .../camel/quarkus/component/sql/it/SqlRoutes.java  | 149 +++++++++++++++++
 .../sql/src/main/resources/application.properties  |   3 +-
 .../sql/src/main/resources/sql/initDb.sql          |  33 ++++
 .../sql/src/main/resources/sql/selectProjects.sql  |  21 +++
 .../camel/quarkus/component/sql/it/SqlTest.java    | 180 ++++++++++++++++++++-
 9 files changed, 537 insertions(+), 34 deletions(-)

diff --git 
a/extensions/sql/deployment/src/main/java/org/apache/camel/quarkus/component/sql/deployment/SqlProcessor.java
 
b/extensions/sql/deployment/src/main/java/org/apache/camel/quarkus/component/sql/deployment/SqlProcessor.java
index d9d63f1..fbdcec7 100644
--- 
a/extensions/sql/deployment/src/main/java/org/apache/camel/quarkus/component/sql/deployment/SqlProcessor.java
+++ 
b/extensions/sql/deployment/src/main/java/org/apache/camel/quarkus/component/sql/deployment/SqlProcessor.java
@@ -24,6 +24,7 @@ import io.quarkus.deployment.builditem.FeatureBuildItem;
 import 
io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
 import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
 import org.apache.camel.quarkus.component.sql.CamelSqlConfig;
+import org.apache.camel.support.DefaultExchangeHolder;
 
 class SqlProcessor {
 
@@ -36,7 +37,7 @@ class SqlProcessor {
 
     @BuildStep
     void registerForReflection(BuildProducer<ReflectiveClassBuildItem> 
reflectiveClass) {
-        reflectiveClass.produce(new ReflectiveClassBuildItem(false, true, 
Types.class));
+        reflectiveClass.produce(new ReflectiveClassBuildItem(false, true, 
Types.class, DefaultExchangeHolder.class));
     }
 
     @BuildStep
diff --git a/integration-tests/sql/pom.xml b/integration-tests/sql/pom.xml
index 4ef3ba9..0cc9f08 100644
--- a/integration-tests/sql/pom.xml
+++ b/integration-tests/sql/pom.xml
@@ -40,8 +40,20 @@
         </dependency>
         <dependency>
             <groupId>io.quarkus</groupId>
+            <artifactId>quarkus-resteasy-jackson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.quarkus</groupId>
             <artifactId>quarkus-jdbc-h2</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-direct</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.quarkus</groupId>
+            <artifactId>camel-quarkus-jta</artifactId>
+        </dependency>
 
         <!-- test dependencies -->
         <dependency>
@@ -54,6 +66,11 @@
             <artifactId>rest-assured</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <!-- The following dependencies guarantee that this module is built 
after them. You can update them by running `mvn process-resources -Pformat -N` 
from the source tree root directory -->
         <dependency>
diff --git 
a/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlDbInitializer.java
 
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlDbInitializer.java
new file mode 100644
index 0000000..f368c51
--- /dev/null
+++ 
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlDbInitializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.sql.it;
+
+import java.io.*;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import io.agroal.api.AgroalDataSource;
+
+@ApplicationScoped
+public class SqlDbInitializer {
+
+    @Inject
+    AgroalDataSource dataSource;
+
+    public void initDb() throws SQLException, IOException {
+        try (Connection conn = dataSource.getConnection()) {
+            try (Statement statement = conn.createStatement()) {
+                try (InputStream is = 
Thread.currentThread().getContextClassLoader().getResourceAsStream("sql/initDb.sql");
+                        InputStreamReader isr = new InputStreamReader(is);
+                        BufferedReader reader = new BufferedReader(isr)) {
+
+                    reader.lines().filter(s -> s != null && !"".equals(s) && 
!s.startsWith("--")).forEach(s -> {
+                        try {
+                            statement.execute(s);
+                        } catch (SQLException e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+                }
+            }
+        }
+    }
+
+}
diff --git 
a/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlResource.java
 
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlResource.java
index a9c3398..89c7ff9 100644
--- 
a/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlResource.java
+++ 
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlResource.java
@@ -17,16 +17,12 @@
 package org.apache.camel.quarkus.component.sql.it;
 
 import java.net.URI;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.stream.Collectors;
+
 import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
+import javax.inject.Named;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -38,6 +34,8 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import io.agroal.api.AgroalDataSource;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.quarkus.component.sql.it.model.Camel;
 import org.springframework.util.LinkedCaseInsensitiveMap;
@@ -52,17 +50,12 @@ public class SqlResource {
     @Inject
     ProducerTemplate producerTemplate;
 
-    @PostConstruct
-    public void postConstruct() throws SQLException {
-        try (Connection conn = dataSource.getConnection()) {
-            try (Statement statement = conn.createStatement()) {
-                statement.execute("DROP TABLE IF EXISTS camel");
-                statement.execute("CREATE TABLE camel (id int AUTO_INCREMENT, 
species VARCHAR(255))");
-                statement.execute(
-                        "CREATE ALIAS ADD_NUMS FOR 
\"org.apache.camel.quarkus.component.sql.it.storedproc.NumberAddStoredProcedure.addNumbers\"");
-            }
-        }
-    }
+    @Inject
+    @Named("results")
+    Map<String, List> results;
+
+    @Inject
+    CamelContext camelContext;
 
     @Path("/get/{species}")
     @GET
@@ -113,17 +106,37 @@ public class SqlResource {
         return result.getSpecies() + " " + result.getId();
     }
 
-    @Path("/post")
+    @Path("/insert/")
     @POST
-    @Consumes(MediaType.TEXT_PLAIN)
+    @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.TEXT_PLAIN)
-    public Response createCamel(String species) throws Exception {
-        Map<String, Object> params = new HashMap<>();
-        params.put("species", species);
+    public Response insert(@QueryParam("table") String table, Map<String, 
Object> values) throws Exception {
+        LinkedHashMap linkedHashMap = new LinkedHashMap(values);
+
+        String sql = String.format("sql:INSERT INTO %s (%s) VALUES (%s)", 
table,
+                linkedHashMap.keySet().stream().collect(Collectors.joining(", 
")),
+                linkedHashMap.keySet().stream().map(s -> ":#" + 
s).collect(Collectors.joining(", ")));
+
+        producerTemplate.requestBodyAndHeaders(sql, null, values);
+
+        return Response
+                .created(new URI("https://camel.apache.org/";))
+                .build();
+    }
+
+    @Path("/update/")
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response update(@QueryParam("table") String table, Map<String, 
Object> values) throws Exception {
+
+        String sql = String.format("sql:update %s set %s where id=:#id", table,
+                values.keySet().stream()
+                        .filter(k -> !"ID".equals(k))
+                        .map(k -> k + " = :#" + k)
+                        .collect(Collectors.joining(", ")));
 
-        producerTemplate.requestBodyAndHeaders(
-                "sql:INSERT INTO camel (species) VALUES (:#species)", null,
-                params);
+        producerTemplate.requestBodyAndHeaders(sql, null, values);
 
         return Response
                 .created(new URI("https://camel.apache.org/";))
@@ -145,4 +158,48 @@ public class SqlResource {
 
         return results.get("#result-set-1").get(0).get("PUBLIC.ADD_NUMS(?1, 
?2)").toString();
     }
+
+    @Path("/get/results/{resultId}")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public List consumerResults(@PathParam("resultId") String resultId) throws 
Exception {
+        List<Map> list = new LinkedList(this.results.get(resultId));
+        results.get(resultId).clear();
+        return list;
+    }
+
+    @GET
+    @Path("/route/{routeId}/{operation}")
+    @Produces(MediaType.TEXT_PLAIN)
+    public String route(@PathParam("routeId") String routeId, 
@PathParam("operation") String operation)
+            throws Exception {
+        //is start enough
+        switch (operation) {
+        case "stop":
+            camelContext.getRouteController().stopRoute(routeId);
+            break;
+        case "start":
+            camelContext.getRouteController().startRoute(routeId);
+            break;
+        case "status":
+            return 
camelContext.getRouteController().getRouteStatus(routeId).name();
+
+        }
+
+        return null;
+    }
+
+    @Path("/toDirect/{directId}")
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public Object toDirect(@PathParam("directId") String directId, 
@QueryParam("body") String body, Map<String, Object> headers)
+            throws Exception {
+        try {
+            return producerTemplate.requestBodyAndHeaders("direct:" + 
directId, body, headers, Object.class);
+        } catch (CamelExecutionException e) {
+            return e.getCause().getClass().getName() + ":" + 
e.getCause().getMessage();
+        }
+    }
+
 }
diff --git 
a/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlRoutes.java
 
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlRoutes.java
new file mode 100644
index 0000000..23a3af0
--- /dev/null
+++ 
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlRoutes.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.sql.it;
+
+import java.io.*;
+import java.nio.file.Path;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.Produces;
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.transaction.TransactionManager;
+import javax.transaction.UserTransaction;
+
+import io.agroal.api.AgroalDataSource;
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository;
+import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository;
+import org.springframework.transaction.jta.JtaTransactionManager;
+
+@ApplicationScoped
+public class SqlRoutes extends RouteBuilder {
+
+    @Inject
+    @Named("results")
+    Map<String, List> results;
+
+    @Inject
+    TransactionManager tm;
+
+    @Inject
+    UserTransaction userTransaction;
+
+    @Inject
+    AgroalDataSource dataSource;
+
+    @Inject
+    SqlDbInitializer sqlDbInitializer;
+
+    @Override
+    public void configure() throws IOException, SQLException {
+        //db has to be initialized before routes are started
+        sqlDbInitializer.initDb();
+
+        from("sql:select * from projects where processed = false order by 
id?initialDelay=0&delay=50&consumer.onConsume=update projects set processed = 
true where id = :#id")
+                .id("consumerRoute").autoStartup(false)
+                .process(e -> 
results.get("consumerRoute").add(e.getMessage().getBody(Map.class)));
+
+        
from("sql:classpath:sql/selectProjects.sql?initialDelay=0&delay=50&consumer.onConsume=update
 projects set processed = true")
+                .id("consumerClasspathRoute").autoStartup(false)
+                .process(e -> 
results.get("consumerClasspathRoute").add(e.getMessage().getBody(Map.class)));
+
+        Path tmpFile = createTmpFileFrom("sql/selectProjects.sql");
+        from("sql:file:" + tmpFile
+                + "?initialDelay=0&delay=50&consumer.onConsume=update projects 
set processed = true")
+                        .id("consumerFileRoute").autoStartup(false)
+                        .process(e -> 
results.get("consumerFileRoute").add(e.getMessage().getBody(Map.class)));
+
+        from("direct:transacted")
+                .transacted("PROPAGATION_REQUIRED")
+                .to("sql:overriddenByTheHeader")
+                .process(e -> {
+                    if (e.getIn().getHeader("rollback", boolean.class)) {
+                        throw new Exception("forced Exception");
+                    }
+                });
+
+        // Idempotent Repository
+        JdbcMessageIdRepository repo = new JdbcMessageIdRepository(dataSource, 
"idempotentRepo");
+        from("direct:idempotent")
+                .idempotentConsumer(header("messageId"), repo)
+                .process(e -> 
results.get("idempotentRoute").add(e.getMessage().getBody(String.class)));
+
+        //aggregation repository
+        JdbcAggregationRepository aggregationRepo = new 
JdbcAggregationRepository(
+                new JtaTransactionManager(userTransaction, tm), "aggregation", 
dataSource);
+        from("direct:aggregation")
+                .aggregate(header("messageId"), new MyAggregationStrategy())
+                // use our created jdbc repo as aggregation repository
+                .completionSize(4).aggregationRepository(aggregationRepo)
+                .process(e -> 
results.get("aggregationRoute").add(e.getMessage().getBody(String.class)));
+
+    }
+
+    private Path createTmpFileFrom(String file) throws IOException {
+        File tmpFile = File.createTempFile("selectProjects-", ".sql");
+        tmpFile.deleteOnExit();
+        try (InputStream is = 
Thread.currentThread().getContextClassLoader().getResourceAsStream(file);
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                FileOutputStream fos = new FileOutputStream(tmpFile)) {
+
+            int c;
+            while ((c = is.read()) >= 0) {
+                baos.write(c);
+            }
+            fos.write(baos.toByteArray());
+        }
+        return tmpFile.toPath();
+    }
+
+    @Produces
+    @ApplicationScoped
+    @Named("results")
+    Map<String, List> results() {
+        Map<String, List> result = new HashMap<>();
+        result.put("consumerRoute", new CopyOnWriteArrayList<>());
+        result.put("consumerClasspathRoute", new CopyOnWriteArrayList<>());
+        result.put("consumerFileRoute", new CopyOnWriteArrayList<>());
+        result.put("idempotentRoute", new CopyOnWriteArrayList<>());
+        result.put("aggregationRoute", new CopyOnWriteArrayList<>());
+        return result;
+    }
+
+    static class MyAggregationStrategy implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
diff --git a/integration-tests/sql/src/main/resources/application.properties 
b/integration-tests/sql/src/main/resources/application.properties
index 6ed39f3..a9fadc9 100644
--- a/integration-tests/sql/src/main/resources/application.properties
+++ b/integration-tests/sql/src/main/resources/application.properties
@@ -15,7 +15,6 @@
 ## limitations under the License.
 ## ---------------------------------------------------------------------------
 
-#
 # Camel Quarkus SQL
 #
-quarkus.camel.sql.script-files=sql/get-camels.sql
+quarkus.camel.sql.script-files=sql/get-camels.sql,sql/initDb.sql,sql/selectProjects.sql
diff --git a/integration-tests/sql/src/main/resources/sql/initDb.sql 
b/integration-tests/sql/src/main/resources/sql/initDb.sql
new file mode 100644
index 0000000..f6896f7
--- /dev/null
+++ b/integration-tests/sql/src/main/resources/sql/initDb.sql
@@ -0,0 +1,33 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+DROP TABLE IF EXISTS camel
+CREATE TABLE camel (id int AUTO_INCREMENT, species VARCHAR(255))
+CREATE ALIAS ADD_NUMS FOR 
"org.apache.camel.quarkus.component.sql.it.storedproc.NumberAddStoredProcedure.addNumbers"
+
+-- for consumer
+DROP TABLE IF EXISTS projects
+create table projects (id integer primary key, project varchar(25), license 
varchar(5), processed BOOLEAN);
+
+-- idempotent repo
+DROP TABLE IF EXISTS CAMEL_MESSAGEPROCESSED
+
+-- aggregation repo
+DROP TABLE IF EXISTS aggregation
+CREATE TABLE aggregation (id varchar(255) NOT NULL, exchange blob NOT NULL, 
version BIGINT NOT NULL, constraint aggregation_pk PRIMARY KEY (id));
+DROP TABLE IF EXISTS aggregation_completed
+CREATE TABLE aggregation_completed (id varchar(255) NOT NULL, exchange blob 
NOT NULL, version BIGINT NOT NULL, constraint aggregation_completed_pk PRIMARY 
KEY (id));
\ No newline at end of file
diff --git a/integration-tests/sql/src/main/resources/sql/selectProjects.sql 
b/integration-tests/sql/src/main/resources/sql/selectProjects.sql
new file mode 100644
index 0000000..5f3ee5b
--- /dev/null
+++ b/integration-tests/sql/src/main/resources/sql/selectProjects.sql
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select *
+from projects
+where processed = false
+order by id
\ No newline at end of file
diff --git 
a/integration-tests/sql/src/test/java/org/apache/camel/quarkus/component/sql/it/SqlTest.java
 
b/integration-tests/sql/src/test/java/org/apache/camel/quarkus/component/sql/it/SqlTest.java
index c99995b..1182f5a 100644
--- 
a/integration-tests/sql/src/test/java/org/apache/camel/quarkus/component/sql/it/SqlTest.java
+++ 
b/integration-tests/sql/src/test/java/org/apache/camel/quarkus/component/sql/it/SqlTest.java
@@ -16,12 +16,23 @@
  */
 package org.apache.camel.quarkus.component.sql.it;
 
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
+import io.restassured.response.ValidatableResponse;
+import io.restassured.specification.RequestSpecification;
+import org.apache.camel.component.sql.SqlConstants;
+import org.apache.camel.util.CollectionHelper;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
-import static org.hamcrest.Matchers.is;
+import static io.restassured.RestAssured.given;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.*;
 
 @QuarkusTest
 class SqlTest {
@@ -30,9 +41,10 @@ class SqlTest {
     public void testSqlComponent() {
         // Create Camel species
         RestAssured.given()
-                .contentType(ContentType.TEXT)
-                .body("Dromedarius")
-                .post("/sql/post")
+                .contentType(ContentType.JSON)
+                .queryParam("table", "camel")
+                .body(CollectionHelper.mapOf("species", "Dromedarius"))
+                .post("/sql/insert")
                 .then()
                 .statusCode(201);
 
@@ -66,4 +78,164 @@ class SqlTest {
                 .statusCode(200)
                 .body(is("15"));
     }
+
+    @Test
+    public void testConsumer() throws InterruptedException {
+        testConsumer(1, "consumerRoute");
+    }
+
+    @Test
+    public void testClasspathConsumer() throws InterruptedException {
+        testConsumer(2, "consumerClasspathRoute");
+    }
+
+    @Test
+    public void testFileConsumer() throws InterruptedException {
+        testConsumer(3, "consumerFileRoute");
+    }
+
+    private void testConsumer(int id, String routeId) throws 
InterruptedException {
+        route(routeId, "start", "Started");
+
+        Map project = CollectionHelper.mapOf("ID", id, "PROJECT", routeId, 
"LICENSE", "222", "PROCESSED", false);
+        Map updatedProject = CollectionHelper.mapOf("ID", id, "PROJECT", 
routeId, "LICENSE", "XXX", "PROCESSED", false);
+
+        postMapWithParam("/sql/insert",
+                "table", "projects",
+                project)
+                        .statusCode(201);
+
+        //wait for the record to be caught
+        await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<Object>) 
RestAssured
+                .get("/sql/get/results/" + 
routeId).then().extract().as(List.class),
+                both(iterableWithSize(1)).and(contains(project)));
+
+        //update
+        postMapWithParam("/sql/update",
+                "table", "projects",
+                updatedProject)
+                        .statusCode(201);
+
+        //wait for the record to be caught
+        await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<Object>) 
RestAssured
+                .get("/sql/get/results/" + 
routeId).then().extract().as(List.class),
+                both(iterableWithSize(1)).and(contains(updatedProject)));
+
+        route(routeId, "stop", "Stopped");
+    }
+
+    @Test
+    public void testTransacted() throws InterruptedException {
+
+        postMap("/sql/toDirect/transacted", 
CollectionHelper.mapOf(SqlConstants.SQL_QUERY,
+                "insert into projects values (5, 'Transacted', 'ASF', false)",
+                "rollback", false))
+                        .statusCode(204);
+
+        postMap("/sql/toDirect/transacted", 
CollectionHelper.mapOf(SqlConstants.SQL_QUERY,
+                "select * from projects where project = 'Transacted'"))
+                        .statusCode(200)
+                        .body("size()", is(1));
+
+        postMap("/sql/toDirect/transacted", 
CollectionHelper.mapOf(SqlConstants.SQL_QUERY,
+                "insert into projects values (6, 'Transacted', 'ASF', false)",
+                "rollback", true))
+                        .statusCode(200)
+                        .body(is("java.lang.Exception:forced Exception"));
+
+        postMap("/sql/toDirect/transacted",
+                CollectionHelper.mapOf(SqlConstants.SQL_QUERY, "select * from 
projects where project = 'Transacted'"))
+                        .statusCode(200)
+                        .body("size()", is(1));
+    }
+
+    @Test
+    public void testDefaultErrorCode() throws InterruptedException {
+        postMap("/sql/toDirect/transacted", 
CollectionHelper.mapOf(SqlConstants.SQL_QUERY, "select * from NOT_EXIST order 
id"))
+                .statusCode(200)
+                
.body(startsWith("org.springframework.jdbc.BadSqlGrammarException"));
+    }
+
+    @Test
+    public void testIdempotentRepository() {
+        // add value with key 1
+        postMapWithParam("/sql/toDirect/idempotent",
+                "body", "one",
+                CollectionHelper.mapOf("messageId", "1"))
+                        .statusCode(200);
+
+        // add value with key 2
+        postMapWithParam("/sql/toDirect/idempotent",
+                "body", "two",
+                CollectionHelper.mapOf("messageId", "2"))
+                        .statusCode(200);
+
+        // add same value with key 3
+        postMapWithParam("/sql/toDirect/idempotent",
+                "body", "three",
+                CollectionHelper.mapOf("messageId", "3"))
+                        .statusCode(200);
+
+        // add another value with key 1 -- this one is supposed to be skipped
+        postMapWithParam("/sql/toDirect/idempotent",
+                "body", "four",
+                CollectionHelper.mapOf("messageId", "1"))
+                        .statusCode(200);
+
+        // get all values from the result map
+        await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<? extends 
String>) RestAssured
+                
.get("/sql/get/results/idempotentRoute").then().extract().as(List.class),
+                containsInAnyOrder("one", "two", "three"));
+    }
+
+    @Test
+    @Disabled //see https://github.com/apache/camel-quarkus/issues/2693
+    public void testAggregationRepository() {
+        postMapWithParam("/sql/toDirect/aggregation", "body", "A", 
CollectionHelper.mapOf("messageId", "123"))
+                .statusCode(200);
+
+        postMapWithParam("/sql/toDirect/aggregation", "body", "B", 
CollectionHelper.mapOf("messageId", "123"))
+                .statusCode(200);
+
+        postMapWithParam("/sql/toDirect/aggregation", "body", "C", 
CollectionHelper.mapOf("messageId", "123"))
+                .statusCode(200);
+
+        postMapWithParam("/sql/toDirect/aggregation", "body", "D", 
CollectionHelper.mapOf("messageId", "123"))
+                .statusCode(200);
+
+        // get all values from the result map
+        await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<? extends 
String>) RestAssured
+                
.get("/sql/get/results/aggregationRoute").then().extract().as(List.class),
+                containsInAnyOrder("ABCD"));
+    }
+
+    private ValidatableResponse postMap(String toUrl, Map<String, String> 
body) {
+        return postMapWithParam(toUrl, null, null, body);
+    }
+
+    private ValidatableResponse postMapWithParam(String toUrl, String param, 
String paramValue, Map<String, String> body) {
+        RequestSpecification rs = RestAssured.given()
+                .contentType(ContentType.JSON);
+
+        if (param != null) {
+            rs = rs.queryParam(param, paramValue);
+        }
+
+        return rs.body(body)
+                .post(toUrl)
+                .then();
+    }
+
+    private void route(String routeId, String operation, String 
expectedOutput) {
+        RestAssured.given()
+                .get("/sql/route/" + routeId + "/" + operation)
+                .then().statusCode(204);
+
+        if (expectedOutput != null) {
+            await().atMost(5, TimeUnit.SECONDS).until(() -> RestAssured
+                    .get("/sql/route/" + routeId + "/status")
+                    .then()
+                    .extract().asString(), equalTo(expectedOutput));
+        }
+    }
 }

Reply via email to