This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit 81352df837494af5baa6d92a1ea7fe45981edff9 Author: James Netherton <[email protected]> AuthorDate: Fri Feb 4 14:04:27 2022 +0000 Improve SQL extension docs for native mode aggregation repository suppport --- .../ROOT/pages/reference/extensions/sql.adoc | 9 +- .../sql/runtime/src/main/doc/configuration.adoc | 9 +- .../cassandraql/it/CassandraqlRoutes.java | 119 +++++++++++++++++++++ 3 files changed, 125 insertions(+), 12 deletions(-) diff --git a/docs/modules/ROOT/pages/reference/extensions/sql.adoc b/docs/modules/ROOT/pages/reference/extensions/sql.adoc index 7a4e525..675ee4d 100644 --- a/docs/modules/ROOT/pages/reference/extensions/sql.adoc +++ b/docs/modules/ROOT/pages/reference/extensions/sql.adoc @@ -79,12 +79,9 @@ When configuring `sql` or `sql-stored` endpoints to reference script files from quarkus.native.resources.includes = queries.sql, sql/*.sql ---- -=== SQL Aggregator - -If your exchanges in native mode contain objects, which are not automatically registered for serialization (see xref:extensions/core.adoc#quarkus.camel.native.reflection.serialization-enabled[documentation]), -you have to register them manually (see xref:extensions/core.adoc#quarkus.camel.native.reflection.serialization-enabled[documentation]) - - +=== SQL aggregation repository in native mode +In order to use SQL aggregation repositories like `JdbcAggregationRepository` in native mode, you must xref:extensions/core.adoc#quarkus.camel.native.reflection.serialization-enabled[enable native serialization support]. +In addition, if your exchange bodies are custom types, they must be registered for serialization by annotating their class declaration with `@RegisterForReflection(serialization = true)`. diff --git a/extensions/sql/runtime/src/main/doc/configuration.adoc b/extensions/sql/runtime/src/main/doc/configuration.adoc index bb17fe6..ae69ede 100644 --- a/extensions/sql/runtime/src/main/doc/configuration.adoc +++ b/extensions/sql/runtime/src/main/doc/configuration.adoc @@ -27,11 +27,8 @@ When configuring `sql` or `sql-stored` endpoints to reference script files from quarkus.native.resources.includes = queries.sql, sql/*.sql ---- -=== SQL Aggregator - -If your exchanges in native mode contain objects, which are not automatically registered for serialization (see xref:extensions/core.adoc#quarkus.camel.native.reflection.serialization-enabled[documentation]), -you have to register them manually (see xref:extensions/core.adoc#quarkus.camel.native.reflection.serialization-enabled[documentation]) - - +=== SQL aggregation repository in native mode +In order to use SQL aggregation repositories like `JdbcAggregationRepository` in native mode, you must xref:extensions/core.adoc#quarkus.camel.native.reflection.serialization-enabled[enable native serialization support]. +In addition, if your exchange bodies are custom types, they must be registered for serialization by annotating their class declaration with `@RegisterForReflection(serialization = true)`. diff --git a/integration-tests/cassandraql/src/main/java/org/apache/camel/quarkus/component/cassandraql/it/CassandraqlRoutes.java b/integration-tests/cassandraql/src/main/java/org/apache/camel/quarkus/component/cassandraql/it/CassandraqlRoutes.java new file mode 100644 index 0000000..d12714e --- /dev/null +++ b/integration-tests/cassandraql/src/main/java/org/apache/camel/quarkus/component/cassandraql/it/CassandraqlRoutes.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.cassandraql.it; + +import java.net.InetSocketAddress; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.inject.Named; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.quarkus.runtime.api.session.QuarkusCqlSession; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.BindToRegistry; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.cassandra.CassandraAggregationRepository; +import org.apache.camel.processor.aggregate.cassandra.NamedCassandraAggregationRepository; +import org.apache.camel.processor.idempotent.cassandra.NamedCassandraIdempotentRepository; +import org.apache.camel.spi.AggregationRepository; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +@ApplicationScoped +public class CassandraRoutes extends RouteBuilder { + public static final String KEYSPACE = "test"; + + @ConfigProperty(name = "quarkus.cassandra.contact-points") + String dbUrl; + + @Inject + @BindToRegistry("quarkusCqlSession") + QuarkusCqlSession session; + + @Override + public void configure() throws Exception { + from("direct:create") + .toF("cql://%s/%s?cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)", dbUrl, KEYSPACE); + + from("direct:createIdempotent") + .idempotentConsumer(simple("${body[0]}"), new NamedCassandraIdempotentRepository(session, "ID")) + .toF("cql://%s/%s?cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)", dbUrl, KEYSPACE); + + from("direct:createCustomSession") + .to("cql:bean:customCqlSession?cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)"); + + from("direct:createQuarkusSession") + .to("cql:bean:quarkusCqlSession?cql=INSERT INTO employee (id, name, address) VALUES (?, ?, ?)"); + + from("direct:read") + .toF("cql://%s/%s?cql=SELECT * FROM employee WHERE id = ?", dbUrl, KEYSPACE); + + from("direct:update") + .toF("cql://%s/%s?cql=UPDATE employee SET name = ?, address = ? WHERE id = ?", dbUrl, KEYSPACE); + + from("direct:delete") + .toF("cql://%s/%s?cql=DELETE FROM employee WHERE id = ?", dbUrl, KEYSPACE); + + fromF("cql://%s/%s?repeatCount=1&cql=SELECT * FROM employee", dbUrl, KEYSPACE).id("employee-consumer").autoStartup(false) + .to("seda:employees"); + + from("direct:aggregate") + .aggregate(simple("${body.id}"), createAggregationStrategy()) + .completionSize(3) + .completionTimeout(5000) + .aggregationRepository(createAggregationRepository()) + .to("seda:employees"); + } + + @Named("customCqlSession") + CqlSession customCqlSession() { + String[] urlParts = dbUrl.split(":"); + CqlSessionBuilder sessionBuilder = CqlSession.builder(); + sessionBuilder.addContactPoint(new InetSocketAddress(urlParts[0], Integer.parseInt(urlParts[1]))); + sessionBuilder.withLocalDatacenter("datacenter1"); + sessionBuilder.withKeyspace(KEYSPACE); + return sessionBuilder.build(); + } + + private AggregationStrategy createAggregationStrategy() { + return (oldExchange, newExchange) -> { + if (oldExchange == null) { + return newExchange; + } + Employee newBody = newExchange.getMessage().getBody(Employee.class); + Object oldBody = oldExchange.getMessage().getBody(); + + String newName = null; + if (oldBody instanceof Employee) { + newName = ((Employee) oldBody).getName() + "," + newBody.getName(); + } else if (oldBody instanceof String) { + newName = oldBody + "," + newBody.getName(); + } + + oldExchange.getMessage().setBody(newName); + return oldExchange; + }; + } + + private AggregationRepository createAggregationRepository() { + CassandraAggregationRepository repository = new NamedCassandraAggregationRepository(); + repository.setSession(session); + return repository; + } +}
