This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new e780587 Expand Sql test coverage #2623
e780587 is described below
commit e780587f835fc0a2af7fe71c73625f6208f84007
Author: JiriOndrusek <[email protected]>
AuthorDate: Thu May 27 16:22:48 2021 +0200
Expand Sql test coverage #2623
---
.../component/sql/deployment/SqlProcessor.java | 3 +-
integration-tests/sql/pom.xml | 17 ++
.../quarkus/component/sql/it/SqlDbInitializer.java | 54 +++++++
.../quarkus/component/sql/it/SqlResource.java | 111 +++++++++----
.../camel/quarkus/component/sql/it/SqlRoutes.java | 149 +++++++++++++++++
.../sql/src/main/resources/application.properties | 3 +-
.../sql/src/main/resources/sql/initDb.sql | 33 ++++
.../sql/src/main/resources/sql/selectProjects.sql | 21 +++
.../camel/quarkus/component/sql/it/SqlTest.java | 180 ++++++++++++++++++++-
9 files changed, 537 insertions(+), 34 deletions(-)
diff --git
a/extensions/sql/deployment/src/main/java/org/apache/camel/quarkus/component/sql/deployment/SqlProcessor.java
b/extensions/sql/deployment/src/main/java/org/apache/camel/quarkus/component/sql/deployment/SqlProcessor.java
index d9d63f1..fbdcec7 100644
---
a/extensions/sql/deployment/src/main/java/org/apache/camel/quarkus/component/sql/deployment/SqlProcessor.java
+++
b/extensions/sql/deployment/src/main/java/org/apache/camel/quarkus/component/sql/deployment/SqlProcessor.java
@@ -24,6 +24,7 @@ import io.quarkus.deployment.builditem.FeatureBuildItem;
import
io.quarkus.deployment.builditem.nativeimage.NativeImageResourceBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import org.apache.camel.quarkus.component.sql.CamelSqlConfig;
+import org.apache.camel.support.DefaultExchangeHolder;
class SqlProcessor {
@@ -36,7 +37,7 @@ class SqlProcessor {
@BuildStep
void registerForReflection(BuildProducer<ReflectiveClassBuildItem>
reflectiveClass) {
- reflectiveClass.produce(new ReflectiveClassBuildItem(false, true,
Types.class));
+ reflectiveClass.produce(new ReflectiveClassBuildItem(false, true,
Types.class, DefaultExchangeHolder.class));
}
@BuildStep
diff --git a/integration-tests/sql/pom.xml b/integration-tests/sql/pom.xml
index 4ef3ba9..0cc9f08 100644
--- a/integration-tests/sql/pom.xml
+++ b/integration-tests/sql/pom.xml
@@ -40,8 +40,20 @@
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
+ <artifactId>quarkus-resteasy-jackson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-h2</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-jta</artifactId>
+ </dependency>
<!-- test dependencies -->
<dependency>
@@ -54,6 +66,11 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- The following dependencies guarantee that this module is built
after them. You can update them by running `mvn process-resources -Pformat -N`
from the source tree root directory -->
<dependency>
diff --git
a/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlDbInitializer.java
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlDbInitializer.java
new file mode 100644
index 0000000..f368c51
--- /dev/null
+++
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlDbInitializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.sql.it;
+
+import java.io.*;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import io.agroal.api.AgroalDataSource;
+
+@ApplicationScoped
+public class SqlDbInitializer {
+
+ @Inject
+ AgroalDataSource dataSource;
+
+ public void initDb() throws SQLException, IOException {
+ try (Connection conn = dataSource.getConnection()) {
+ try (Statement statement = conn.createStatement()) {
+ try (InputStream is =
Thread.currentThread().getContextClassLoader().getResourceAsStream("sql/initDb.sql");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader reader = new BufferedReader(isr)) {
+
+ reader.lines().filter(s -> s != null && !"".equals(s) &&
!s.startsWith("--")).forEach(s -> {
+ try {
+ statement.execute(s);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+ }
+ }
+
+}
diff --git
a/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlResource.java
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlResource.java
index a9c3398..89c7ff9 100644
---
a/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlResource.java
+++
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlResource.java
@@ -17,16 +17,12 @@
package org.apache.camel.quarkus.component.sql.it;
import java.net.URI;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.PostConstruct;
+import java.util.*;
+import java.util.stream.Collectors;
+
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
+import javax.inject.Named;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -38,6 +34,8 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import io.agroal.api.AgroalDataSource;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.quarkus.component.sql.it.model.Camel;
import org.springframework.util.LinkedCaseInsensitiveMap;
@@ -52,17 +50,12 @@ public class SqlResource {
@Inject
ProducerTemplate producerTemplate;
- @PostConstruct
- public void postConstruct() throws SQLException {
- try (Connection conn = dataSource.getConnection()) {
- try (Statement statement = conn.createStatement()) {
- statement.execute("DROP TABLE IF EXISTS camel");
- statement.execute("CREATE TABLE camel (id int AUTO_INCREMENT,
species VARCHAR(255))");
- statement.execute(
- "CREATE ALIAS ADD_NUMS FOR
\"org.apache.camel.quarkus.component.sql.it.storedproc.NumberAddStoredProcedure.addNumbers\"");
- }
- }
- }
+ @Inject
+ @Named("results")
+ Map<String, List> results;
+
+ @Inject
+ CamelContext camelContext;
@Path("/get/{species}")
@GET
@@ -113,17 +106,37 @@ public class SqlResource {
return result.getSpecies() + " " + result.getId();
}
- @Path("/post")
+ @Path("/insert/")
@POST
- @Consumes(MediaType.TEXT_PLAIN)
+ @Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.TEXT_PLAIN)
- public Response createCamel(String species) throws Exception {
- Map<String, Object> params = new HashMap<>();
- params.put("species", species);
+ public Response insert(@QueryParam("table") String table, Map<String,
Object> values) throws Exception {
+ LinkedHashMap linkedHashMap = new LinkedHashMap(values);
+
+ String sql = String.format("sql:INSERT INTO %s (%s) VALUES (%s)",
table,
+ linkedHashMap.keySet().stream().collect(Collectors.joining(",
")),
+ linkedHashMap.keySet().stream().map(s -> ":#" +
s).collect(Collectors.joining(", ")));
+
+ producerTemplate.requestBodyAndHeaders(sql, null, values);
+
+ return Response
+ .created(new URI("https://camel.apache.org/"))
+ .build();
+ }
+
+ @Path("/update/")
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response update(@QueryParam("table") String table, Map<String,
Object> values) throws Exception {
+
+ String sql = String.format("sql:update %s set %s where id=:#id", table,
+ values.keySet().stream()
+ .filter(k -> !"ID".equals(k))
+ .map(k -> k + " = :#" + k)
+ .collect(Collectors.joining(", ")));
- producerTemplate.requestBodyAndHeaders(
- "sql:INSERT INTO camel (species) VALUES (:#species)", null,
- params);
+ producerTemplate.requestBodyAndHeaders(sql, null, values);
return Response
.created(new URI("https://camel.apache.org/"))
@@ -145,4 +158,48 @@ public class SqlResource {
return results.get("#result-set-1").get(0).get("PUBLIC.ADD_NUMS(?1,
?2)").toString();
}
+
+ @Path("/get/results/{resultId}")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public List consumerResults(@PathParam("resultId") String resultId) throws
Exception {
+ List<Map> list = new LinkedList(this.results.get(resultId));
+ results.get(resultId).clear();
+ return list;
+ }
+
+ @GET
+ @Path("/route/{routeId}/{operation}")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String route(@PathParam("routeId") String routeId,
@PathParam("operation") String operation)
+ throws Exception {
+ //is start enough
+ switch (operation) {
+ case "stop":
+ camelContext.getRouteController().stopRoute(routeId);
+ break;
+ case "start":
+ camelContext.getRouteController().startRoute(routeId);
+ break;
+ case "status":
+ return
camelContext.getRouteController().getRouteStatus(routeId).name();
+
+ }
+
+ return null;
+ }
+
+ @Path("/toDirect/{directId}")
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Object toDirect(@PathParam("directId") String directId,
@QueryParam("body") String body, Map<String, Object> headers)
+ throws Exception {
+ try {
+ return producerTemplate.requestBodyAndHeaders("direct:" +
directId, body, headers, Object.class);
+ } catch (CamelExecutionException e) {
+ return e.getCause().getClass().getName() + ":" +
e.getCause().getMessage();
+ }
+ }
+
}
diff --git
a/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlRoutes.java
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlRoutes.java
new file mode 100644
index 0000000..23a3af0
--- /dev/null
+++
b/integration-tests/sql/src/main/java/org/apache/camel/quarkus/component/sql/it/SqlRoutes.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.sql.it;
+
+import java.io.*;
+import java.nio.file.Path;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.Produces;
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.transaction.TransactionManager;
+import javax.transaction.UserTransaction;
+
+import io.agroal.api.AgroalDataSource;
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository;
+import org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository;
+import org.springframework.transaction.jta.JtaTransactionManager;
+
+@ApplicationScoped
+public class SqlRoutes extends RouteBuilder {
+
+ @Inject
+ @Named("results")
+ Map<String, List> results;
+
+ @Inject
+ TransactionManager tm;
+
+ @Inject
+ UserTransaction userTransaction;
+
+ @Inject
+ AgroalDataSource dataSource;
+
+ @Inject
+ SqlDbInitializer sqlDbInitializer;
+
+ @Override
+ public void configure() throws IOException, SQLException {
+ //db has to be initialized before routes are started
+ sqlDbInitializer.initDb();
+
+ from("sql:select * from projects where processed = false order by
id?initialDelay=0&delay=50&consumer.onConsume=update projects set processed =
true where id = :#id")
+ .id("consumerRoute").autoStartup(false)
+ .process(e ->
results.get("consumerRoute").add(e.getMessage().getBody(Map.class)));
+
+
from("sql:classpath:sql/selectProjects.sql?initialDelay=0&delay=50&consumer.onConsume=update
projects set processed = true")
+ .id("consumerClasspathRoute").autoStartup(false)
+ .process(e ->
results.get("consumerClasspathRoute").add(e.getMessage().getBody(Map.class)));
+
+ Path tmpFile = createTmpFileFrom("sql/selectProjects.sql");
+ from("sql:file:" + tmpFile
+ + "?initialDelay=0&delay=50&consumer.onConsume=update projects
set processed = true")
+ .id("consumerFileRoute").autoStartup(false)
+ .process(e ->
results.get("consumerFileRoute").add(e.getMessage().getBody(Map.class)));
+
+ from("direct:transacted")
+ .transacted("PROPAGATION_REQUIRED")
+ .to("sql:overriddenByTheHeader")
+ .process(e -> {
+ if (e.getIn().getHeader("rollback", boolean.class)) {
+ throw new Exception("forced Exception");
+ }
+ });
+
+ // Idempotent Repository
+ JdbcMessageIdRepository repo = new JdbcMessageIdRepository(dataSource,
"idempotentRepo");
+ from("direct:idempotent")
+ .idempotentConsumer(header("messageId"), repo)
+ .process(e ->
results.get("idempotentRoute").add(e.getMessage().getBody(String.class)));
+
+ //aggregation repository
+ JdbcAggregationRepository aggregationRepo = new
JdbcAggregationRepository(
+ new JtaTransactionManager(userTransaction, tm), "aggregation",
dataSource);
+ from("direct:aggregation")
+ .aggregate(header("messageId"), new MyAggregationStrategy())
+ // use our created jdbc repo as aggregation repository
+ .completionSize(4).aggregationRepository(aggregationRepo)
+ .process(e ->
results.get("aggregationRoute").add(e.getMessage().getBody(String.class)));
+
+ }
+
+ private Path createTmpFileFrom(String file) throws IOException {
+ File tmpFile = File.createTempFile("selectProjects-", ".sql");
+ tmpFile.deleteOnExit();
+ try (InputStream is =
Thread.currentThread().getContextClassLoader().getResourceAsStream(file);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FileOutputStream fos = new FileOutputStream(tmpFile)) {
+
+ int c;
+ while ((c = is.read()) >= 0) {
+ baos.write(c);
+ }
+ fos.write(baos.toByteArray());
+ }
+ return tmpFile.toPath();
+ }
+
+ @Produces
+ @ApplicationScoped
+ @Named("results")
+ Map<String, List> results() {
+ Map<String, List> result = new HashMap<>();
+ result.put("consumerRoute", new CopyOnWriteArrayList<>());
+ result.put("consumerClasspathRoute", new CopyOnWriteArrayList<>());
+ result.put("consumerFileRoute", new CopyOnWriteArrayList<>());
+ result.put("idempotentRoute", new CopyOnWriteArrayList<>());
+ result.put("aggregationRoute", new CopyOnWriteArrayList<>());
+ return result;
+ }
+
+ static class MyAggregationStrategy implements AggregationStrategy {
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
diff --git a/integration-tests/sql/src/main/resources/application.properties
b/integration-tests/sql/src/main/resources/application.properties
index 6ed39f3..a9fadc9 100644
--- a/integration-tests/sql/src/main/resources/application.properties
+++ b/integration-tests/sql/src/main/resources/application.properties
@@ -15,7 +15,6 @@
## limitations under the License.
## ---------------------------------------------------------------------------
-#
# Camel Quarkus SQL
#
-quarkus.camel.sql.script-files=sql/get-camels.sql
+quarkus.camel.sql.script-files=sql/get-camels.sql,sql/initDb.sql,sql/selectProjects.sql
diff --git a/integration-tests/sql/src/main/resources/sql/initDb.sql
b/integration-tests/sql/src/main/resources/sql/initDb.sql
new file mode 100644
index 0000000..f6896f7
--- /dev/null
+++ b/integration-tests/sql/src/main/resources/sql/initDb.sql
@@ -0,0 +1,33 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+DROP TABLE IF EXISTS camel
+CREATE TABLE camel (id int AUTO_INCREMENT, species VARCHAR(255))
+CREATE ALIAS ADD_NUMS FOR
"org.apache.camel.quarkus.component.sql.it.storedproc.NumberAddStoredProcedure.addNumbers"
+
+-- for consumer
+DROP TABLE IF EXISTS projects
+create table projects (id integer primary key, project varchar(25), license
varchar(5), processed BOOLEAN);
+
+-- idempotent repo
+DROP TABLE IF EXISTS CAMEL_MESSAGEPROCESSED
+
+-- aggregation repo
+DROP TABLE IF EXISTS aggregation
+CREATE TABLE aggregation (id varchar(255) NOT NULL, exchange blob NOT NULL,
version BIGINT NOT NULL, constraint aggregation_pk PRIMARY KEY (id));
+DROP TABLE IF EXISTS aggregation_completed
+CREATE TABLE aggregation_completed (id varchar(255) NOT NULL, exchange blob
NOT NULL, version BIGINT NOT NULL, constraint aggregation_completed_pk PRIMARY
KEY (id));
\ No newline at end of file
diff --git a/integration-tests/sql/src/main/resources/sql/selectProjects.sql
b/integration-tests/sql/src/main/resources/sql/selectProjects.sql
new file mode 100644
index 0000000..5f3ee5b
--- /dev/null
+++ b/integration-tests/sql/src/main/resources/sql/selectProjects.sql
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select *
+from projects
+where processed = false
+order by id
\ No newline at end of file
diff --git
a/integration-tests/sql/src/test/java/org/apache/camel/quarkus/component/sql/it/SqlTest.java
b/integration-tests/sql/src/test/java/org/apache/camel/quarkus/component/sql/it/SqlTest.java
index c99995b..1182f5a 100644
---
a/integration-tests/sql/src/test/java/org/apache/camel/quarkus/component/sql/it/SqlTest.java
+++
b/integration-tests/sql/src/test/java/org/apache/camel/quarkus/component/sql/it/SqlTest.java
@@ -16,12 +16,23 @@
*/
package org.apache.camel.quarkus.component.sql.it;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
+import io.restassured.response.ValidatableResponse;
+import io.restassured.specification.RequestSpecification;
+import org.apache.camel.component.sql.SqlConstants;
+import org.apache.camel.util.CollectionHelper;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import static org.hamcrest.Matchers.is;
+import static io.restassured.RestAssured.given;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.*;
@QuarkusTest
class SqlTest {
@@ -30,9 +41,10 @@ class SqlTest {
public void testSqlComponent() {
// Create Camel species
RestAssured.given()
- .contentType(ContentType.TEXT)
- .body("Dromedarius")
- .post("/sql/post")
+ .contentType(ContentType.JSON)
+ .queryParam("table", "camel")
+ .body(CollectionHelper.mapOf("species", "Dromedarius"))
+ .post("/sql/insert")
.then()
.statusCode(201);
@@ -66,4 +78,164 @@ class SqlTest {
.statusCode(200)
.body(is("15"));
}
+
+ @Test
+ public void testConsumer() throws InterruptedException {
+ testConsumer(1, "consumerRoute");
+ }
+
+ @Test
+ public void testClasspathConsumer() throws InterruptedException {
+ testConsumer(2, "consumerClasspathRoute");
+ }
+
+ @Test
+ public void testFileConsumer() throws InterruptedException {
+ testConsumer(3, "consumerFileRoute");
+ }
+
+ private void testConsumer(int id, String routeId) throws
InterruptedException {
+ route(routeId, "start", "Started");
+
+ Map project = CollectionHelper.mapOf("ID", id, "PROJECT", routeId,
"LICENSE", "222", "PROCESSED", false);
+ Map updatedProject = CollectionHelper.mapOf("ID", id, "PROJECT",
routeId, "LICENSE", "XXX", "PROCESSED", false);
+
+ postMapWithParam("/sql/insert",
+ "table", "projects",
+ project)
+ .statusCode(201);
+
+ //wait for the record to be caught
+ await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<Object>)
RestAssured
+ .get("/sql/get/results/" +
routeId).then().extract().as(List.class),
+ both(iterableWithSize(1)).and(contains(project)));
+
+ //update
+ postMapWithParam("/sql/update",
+ "table", "projects",
+ updatedProject)
+ .statusCode(201);
+
+ //wait for the record to be caught
+ await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<Object>)
RestAssured
+ .get("/sql/get/results/" +
routeId).then().extract().as(List.class),
+ both(iterableWithSize(1)).and(contains(updatedProject)));
+
+ route(routeId, "stop", "Stopped");
+ }
+
+ @Test
+ public void testTransacted() throws InterruptedException {
+
+ postMap("/sql/toDirect/transacted",
CollectionHelper.mapOf(SqlConstants.SQL_QUERY,
+ "insert into projects values (5, 'Transacted', 'ASF', false)",
+ "rollback", false))
+ .statusCode(204);
+
+ postMap("/sql/toDirect/transacted",
CollectionHelper.mapOf(SqlConstants.SQL_QUERY,
+ "select * from projects where project = 'Transacted'"))
+ .statusCode(200)
+ .body("size()", is(1));
+
+ postMap("/sql/toDirect/transacted",
CollectionHelper.mapOf(SqlConstants.SQL_QUERY,
+ "insert into projects values (6, 'Transacted', 'ASF', false)",
+ "rollback", true))
+ .statusCode(200)
+ .body(is("java.lang.Exception:forced Exception"));
+
+ postMap("/sql/toDirect/transacted",
+ CollectionHelper.mapOf(SqlConstants.SQL_QUERY, "select * from
projects where project = 'Transacted'"))
+ .statusCode(200)
+ .body("size()", is(1));
+ }
+
+ @Test
+ public void testDefaultErrorCode() throws InterruptedException {
+ postMap("/sql/toDirect/transacted",
CollectionHelper.mapOf(SqlConstants.SQL_QUERY, "select * from NOT_EXIST order
id"))
+ .statusCode(200)
+
.body(startsWith("org.springframework.jdbc.BadSqlGrammarException"));
+ }
+
+ @Test
+ public void testIdempotentRepository() {
+ // add value with key 1
+ postMapWithParam("/sql/toDirect/idempotent",
+ "body", "one",
+ CollectionHelper.mapOf("messageId", "1"))
+ .statusCode(200);
+
+ // add value with key 2
+ postMapWithParam("/sql/toDirect/idempotent",
+ "body", "two",
+ CollectionHelper.mapOf("messageId", "2"))
+ .statusCode(200);
+
+ // add same value with key 3
+ postMapWithParam("/sql/toDirect/idempotent",
+ "body", "three",
+ CollectionHelper.mapOf("messageId", "3"))
+ .statusCode(200);
+
+ // add another value with key 1 -- this one is supposed to be skipped
+ postMapWithParam("/sql/toDirect/idempotent",
+ "body", "four",
+ CollectionHelper.mapOf("messageId", "1"))
+ .statusCode(200);
+
+ // get all values from the result map
+ await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<? extends
String>) RestAssured
+
.get("/sql/get/results/idempotentRoute").then().extract().as(List.class),
+ containsInAnyOrder("one", "two", "three"));
+ }
+
+ @Test
+ @Disabled //see https://github.com/apache/camel-quarkus/issues/2693
+ public void testAggregationRepository() {
+ postMapWithParam("/sql/toDirect/aggregation", "body", "A",
CollectionHelper.mapOf("messageId", "123"))
+ .statusCode(200);
+
+ postMapWithParam("/sql/toDirect/aggregation", "body", "B",
CollectionHelper.mapOf("messageId", "123"))
+ .statusCode(200);
+
+ postMapWithParam("/sql/toDirect/aggregation", "body", "C",
CollectionHelper.mapOf("messageId", "123"))
+ .statusCode(200);
+
+ postMapWithParam("/sql/toDirect/aggregation", "body", "D",
CollectionHelper.mapOf("messageId", "123"))
+ .statusCode(200);
+
+ // get all values from the result map
+ await().atMost(5, TimeUnit.SECONDS).until(() -> (Iterable<? extends
String>) RestAssured
+
.get("/sql/get/results/aggregationRoute").then().extract().as(List.class),
+ containsInAnyOrder("ABCD"));
+ }
+
+ private ValidatableResponse postMap(String toUrl, Map<String, String>
body) {
+ return postMapWithParam(toUrl, null, null, body);
+ }
+
+ private ValidatableResponse postMapWithParam(String toUrl, String param,
String paramValue, Map<String, String> body) {
+ RequestSpecification rs = RestAssured.given()
+ .contentType(ContentType.JSON);
+
+ if (param != null) {
+ rs = rs.queryParam(param, paramValue);
+ }
+
+ return rs.body(body)
+ .post(toUrl)
+ .then();
+ }
+
+ private void route(String routeId, String operation, String
expectedOutput) {
+ RestAssured.given()
+ .get("/sql/route/" + routeId + "/" + operation)
+ .then().statusCode(204);
+
+ if (expectedOutput != null) {
+ await().atMost(5, TimeUnit.SECONDS).until(() -> RestAssured
+ .get("/sql/route/" + routeId + "/status")
+ .then()
+ .extract().asString(), equalTo(expectedOutput));
+ }
+ }
}