This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new cb05578 Use quarkus-grpc-common instead of quarkus-grpc in the gRPC
extension
cb05578 is described below
commit cb05578bdb99db11b90a8b55eaadb64f062676d0
Author: James Netherton <[email protected]>
AuthorDate: Mon Aug 2 15:10:12 2021 +0100
Use quarkus-grpc-common instead of quarkus-grpc in the gRPC extension
Fixes #2954
---
docs/modules/ROOT/pages/migration-guide/2.2.0.adoc | 10 +
docs/modules/ROOT/pages/migration-guide/index.adoc | 1 +
.../ROOT/pages/reference/extensions/grpc.adoc | 37 ---
extensions/grpc/deployment/pom.xml | 2 +-
.../component/grpc/deployment/GrpcProcessor.java | 38 +--
extensions/grpc/runtime/pom.xml | 8 +-
.../grpc/runtime/src/main/doc/configuration.adoc | 33 ---
.../quarkus/grpc/runtime/CamelGrpcRecorder.java | 174 ++++++++++++--
.../runtime/QuarkusBindableServiceFactory.java | 5 +-
...stitutions.java => CamelGrpcSubstitutions.java} | 27 ++-
integration-tests/grpc/README.adoc | 38 +++
integration-tests/grpc/pom.xml | 110 ++++++++-
.../quarkus/component/grpc/it/GrpcResource.java | 112 ++++++++-
.../camel/quarkus/component/grpc/it/GrpcRoute.java | 97 ++++++--
.../grpc/src/main/resources/application.properties | 18 ++
.../grpc/src/main/resources/certs/ca-openssl.conf | 18 ++
.../grpc/src/main/resources/certs/ca.key | 27 +++
.../grpc/src/main/resources/certs/ca.pem | 20 ++
.../grpc/src/main/resources/certs/client.key | 28 +++
.../grpc/src/main/resources/certs/client.pem | 20 ++
.../grpc/src/main/resources/certs/server.key | 28 +++
.../grpc/src/main/resources/certs/server.pem | 20 ++
.../component/grpc/it/GrpcServerTestResource.java | 22 +-
.../camel/quarkus/component/grpc/it/GrpcTest.java | 266 ++++++++++++++++++++-
pom.xml | 6 +-
poms/build-parent/pom.xml | 10 +
26 files changed, 1015 insertions(+), 160 deletions(-)
diff --git a/docs/modules/ROOT/pages/migration-guide/2.2.0.adoc
b/docs/modules/ROOT/pages/migration-guide/2.2.0.adoc
new file mode 100644
index 0000000..16a8339
--- /dev/null
+++ b/docs/modules/ROOT/pages/migration-guide/2.2.0.adoc
@@ -0,0 +1,10 @@
+= Camel Quarkus 2.2.0 Migration Guide
+
+The following guide outlines how to adapt your code to changes that were made
in Camel Quarkus 2.2.0 & Quarkus 2.2.0.Final.
+
+== camel-quarkus-grpc consumers no longer depends on Quarkus gRPC server
configuration
+
+In previous releases, many of the `camel-grpc` consumer configuration options
were ignored as the gRPC server configuration was
+driven by the `quarkus-grpc` extension.
+
+This is no longer the case. All of the `camel-grpc` consumer configuration
options are respected.
diff --git a/docs/modules/ROOT/pages/migration-guide/index.adoc
b/docs/modules/ROOT/pages/migration-guide/index.adoc
index 8a3dacf..cca6a59 100644
--- a/docs/modules/ROOT/pages/migration-guide/index.adoc
+++ b/docs/modules/ROOT/pages/migration-guide/index.adoc
@@ -4,5 +4,6 @@ We do frequent releases, a release almost every month, and even
though we strive
Listed here are guides on how to migrate between major versions and anything
of significance to watch for when upgrading from minor versions.
+* xref:migration-guide/2.2.0.adoc[Camel Quarkus 2.1.0 to Camel Quarkus 2.2.0
migration guide]
* xref:migration-guide/2.1.0.adoc[Camel Quarkus 2.0.0 to Camel Quarkus 2.1.0
migration guide]
* xref:migration-guide/2.0.0.adoc[Camel Quarkus 1.x to Camel Quarkus 2.0.0
migration guide]
diff --git a/docs/modules/ROOT/pages/reference/extensions/grpc.adoc
b/docs/modules/ROOT/pages/reference/extensions/grpc.adoc
index 7cd0edb..7bff770 100644
--- a/docs/modules/ROOT/pages/reference/extensions/grpc.adoc
+++ b/docs/modules/ROOT/pages/reference/extensions/grpc.adoc
@@ -38,40 +38,3 @@ Or add the coordinates to your existing project:
----
Check the xref:user-guide/index.adoc[User guide] for more information about
writing Camel Quarkus applications.
-
-== Additional Camel Quarkus configuration
-
-This extension leverages
https://quarkus.io/guides/grpc-service-implementation[Quarkus gRPC]. The
configuration of the gRPC consumer is different than normal,
-since Quarkus manages the lifecycle of the gRPC server. This means that the
consumer endpoint host and port is driven by the configuration properties
`quarkus.grpc.server.host`
-and `quarkus.grpc.server.port` and thus the Camel gRPC endpoint configuration
for the host & port is effectively ignored. But, it's still good practice to
have the
-endpoint configuration host / port mirror the Quarkus gRPC host / port
property values to avoid confusion and ambiguity.
-
-The full list of Quarkus gRPC configuration options can be found at the
https://quarkus.io/guides/grpc-service-implementation#server-configuration[Quarkus
gRPC guide].
-
-Use the `generate-code` goal of `quarkus-maven-plugin` to generate Java
classes from your `*.proto`
-service and message definitions stored in the `src/main/proto` directory:
-
-[source,xml]
-----
-<build>
- <plugins>
- <plugin>
- <groupId>io.quarkus</groupId>
- <artifactId>quarkus-maven-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>generate-code</goal>
- <goal>build</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
-</build>
-----
-
-You may want to check the
https://github.com/apache/camel-quarkus/tree/main/integration-tests/grpc[integration
test]
-in our source tree as an example.
-
-
diff --git a/extensions/grpc/deployment/pom.xml
b/extensions/grpc/deployment/pom.xml
index 8dd1f14..f908952 100644
--- a/extensions/grpc/deployment/pom.xml
+++ b/extensions/grpc/deployment/pom.xml
@@ -32,7 +32,7 @@
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
- <artifactId>quarkus-grpc-deployment</artifactId>
+ <artifactId>quarkus-grpc-common-deployment</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
diff --git
a/extensions/grpc/deployment/src/main/java/org/apache/camel/quarkus/component/grpc/deployment/GrpcProcessor.java
b/extensions/grpc/deployment/src/main/java/org/apache/camel/quarkus/component/grpc/deployment/GrpcProcessor.java
index 8c8ab1b..fa09738 100644
---
a/extensions/grpc/deployment/src/main/java/org/apache/camel/quarkus/component/grpc/deployment/GrpcProcessor.java
+++
b/extensions/grpc/deployment/src/main/java/org/apache/camel/quarkus/component/grpc/deployment/GrpcProcessor.java
@@ -18,16 +18,15 @@ package org.apache.camel.quarkus.component.grpc.deployment;
import java.lang.reflect.Modifier;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-import javax.inject.Singleton;
+import javax.enterprise.context.Dependent;
import io.grpc.BindableService;
import io.grpc.stub.AbstractAsyncStub;
import io.grpc.stub.AbstractBlockingStub;
import io.grpc.stub.AbstractFutureStub;
+import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.GeneratedBeanBuildItem;
@@ -44,8 +43,6 @@ import io.quarkus.gizmo.FieldCreator;
import io.quarkus.gizmo.MethodCreator;
import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.gizmo.ResultHandle;
-import io.quarkus.grpc.GrpcService;
-import io.quarkus.grpc.deployment.BindableServiceBuildItem;
import org.apache.camel.component.grpc.GrpcComponent;
import org.apache.camel.component.grpc.server.GrpcMethodHandler;
import org.apache.camel.quarkus.core.deployment.spi.CamelBeanBuildItem;
@@ -84,20 +81,7 @@ class GrpcProcessor {
.map(classInfo -> new ReflectiveClassBuildItem(true,
false, classInfo.name().toString()))
.forEach(reflectiveClass::produce);
}
- }
-
- @BuildStep
- void createBindableServiceBeans(
- BuildProducer<GeneratedBeanBuildItem> generatedBean,
- BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
- BuildProducer<BindableServiceBuildItem> bindableService,
- CombinedIndexBuildItem combinedIndexBuildItem) {
-
- Set<String> services = generateBindableServiceBeans(generatedBean,
reflectiveClass, combinedIndexBuildItem.getIndex());
- services.stream()
- .map(DotName::createSimple)
- .map(BindableServiceBuildItem::new)
- .forEach(bindableService::produce);
+ reflectiveClass.produce(new ReflectiveClassBuildItem(true, false,
AbstractStub.class.getName()));
}
@BuildStep
@@ -114,9 +98,13 @@ class GrpcProcessor {
recorder.createGrpcComponent());
}
- private Set<String>
generateBindableServiceBeans(BuildProducer<GeneratedBeanBuildItem>
generatedBean,
- BuildProducer<ReflectiveClassBuildItem> reflectiveClass, IndexView
index) {
- Set<String> generatedBindableServiceClassNames = new HashSet<>();
+ @BuildStep
+ void createBindableServiceBeans(
+ BuildProducer<GeneratedBeanBuildItem> generatedBean,
+ BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
+ CombinedIndexBuildItem combinedIndexBuildItem) {
+
+ IndexView index = combinedIndexBuildItem.getIndex();
Collection<ClassInfo> bindableServiceImpls =
index.getAllKnownImplementors(BINDABLE_SERVICE_DOT_NAME);
// Generate implementation classes from any abstract gRPC
BindableService implementations included in the application archive
@@ -136,7 +124,6 @@ class GrpcProcessor {
String superClassName = service.name().toString();
String generatedClassName = superClassName +
"QuarkusMethodHandler";
- generatedBindableServiceClassNames.add(generatedClassName);
// Register the service classes for reflection
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false,
service.name().toString()));
@@ -150,8 +137,7 @@ class GrpcProcessor {
.interfaces(CamelQuarkusBindableService.class)
.build()) {
- classCreator.addAnnotation(GrpcService.class);
- classCreator.addAnnotation(Singleton.class);
+ classCreator.addAnnotation(Dependent.class);
FieldCreator serverMethodHandler = classCreator
.getFieldCreator("methodHandler",
GrpcMethodHandler.class.getName())
@@ -213,8 +199,6 @@ class GrpcProcessor {
}
}
}
-
- return generatedBindableServiceClassNames;
}
private boolean isCandidateServiceMethod(MethodInfo method) {
diff --git a/extensions/grpc/runtime/pom.xml b/extensions/grpc/runtime/pom.xml
index 1399aad..a1af9da 100644
--- a/extensions/grpc/runtime/pom.xml
+++ b/extensions/grpc/runtime/pom.xml
@@ -49,7 +49,13 @@
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
- <artifactId>quarkus-grpc</artifactId>
+ <artifactId>quarkus-grpc-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>io.vertx</groupId>
+ <artifactId>vertx-grpc</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
diff --git a/extensions/grpc/runtime/src/main/doc/configuration.adoc
b/extensions/grpc/runtime/src/main/doc/configuration.adoc
deleted file mode 100644
index 9b415bf..0000000
--- a/extensions/grpc/runtime/src/main/doc/configuration.adoc
+++ /dev/null
@@ -1,33 +0,0 @@
-This extension leverages
https://quarkus.io/guides/grpc-service-implementation[Quarkus gRPC]. The
configuration of the gRPC consumer is different than normal,
-since Quarkus manages the lifecycle of the gRPC server. This means that the
consumer endpoint host and port is driven by the configuration properties
`quarkus.grpc.server.host`
-and `quarkus.grpc.server.port` and thus the Camel gRPC endpoint configuration
for the host & port is effectively ignored. But, it's still good practice to
have the
-endpoint configuration host / port mirror the Quarkus gRPC host / port
property values to avoid confusion and ambiguity.
-
-The full list of Quarkus gRPC configuration options can be found at the
https://quarkus.io/guides/grpc-service-implementation#server-configuration[Quarkus
gRPC guide].
-
-Use the `generate-code` goal of `quarkus-maven-plugin` to generate Java
classes from your `*.proto`
-service and message definitions stored in the `src/main/proto` directory:
-
-[source,xml]
-----
-<build>
- <plugins>
- <plugin>
- <groupId>io.quarkus</groupId>
- <artifactId>quarkus-maven-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>generate-code</goal>
- <goal>build</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
-</build>
-----
-
-You may want to check the
https://github.com/apache/camel-quarkus/tree/main/integration-tests/grpc[integration
test]
-in our source tree as an example.
-
diff --git
a/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/CamelGrpcRecorder.java
b/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/CamelGrpcRecorder.java
index ba13200..79b139a 100644
---
a/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/CamelGrpcRecorder.java
+++
b/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/CamelGrpcRecorder.java
@@ -18,21 +18,42 @@ package org.apache.camel.quarkus.grpc.runtime;
import java.util.Map;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import io.grpc.CallCredentials;
+import io.grpc.ManagedChannel;
+import io.grpc.auth.MoreCallCredentials;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NegotiationType;
+import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
-import org.apache.camel.Consumer;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.component.grpc.GrpcAuthType;
import org.apache.camel.component.grpc.GrpcComponent;
import org.apache.camel.component.grpc.GrpcConfiguration;
-import org.apache.camel.component.grpc.GrpcConsumer;
import org.apache.camel.component.grpc.GrpcEndpoint;
-import org.apache.camel.component.grpc.server.BindableServiceFactory;
+import org.apache.camel.component.grpc.GrpcProducer;
+import org.apache.camel.component.grpc.GrpcUtils;
+import org.apache.camel.component.grpc.auth.jwt.JwtCallCredentials;
+import org.apache.camel.component.grpc.auth.jwt.JwtHelper;
+import org.apache.camel.component.grpc.client.GrpcExchangeForwarder;
+import org.apache.camel.component.grpc.client.GrpcExchangeForwarderFactory;
+import
org.apache.camel.component.grpc.client.GrpcResponseAggregationStreamObserver;
+import org.apache.camel.component.grpc.client.GrpcResponseRouterStreamObserver;
import org.apache.camel.spi.annotations.Component;
-import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.ResourceHelper;
+import org.apache.camel.support.SynchronousDelegateProducer;
import org.apache.camel.support.service.ServiceHelper;
-
-import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_BINDABLE_SERVICE_FACTORY_NAME;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Recorder
public class CamelGrpcRecorder {
@@ -61,25 +82,142 @@ public class CamelGrpcRecorder {
super(uri, component, config);
}
- @Override
- public Consumer createConsumer(Processor processor) throws Exception {
- return new QuarkusGrpcConsumer(this, processor, configuration);
+ public Producer createProducer() throws Exception {
+ GrpcProducer producer = new QuarkusGrpcProducer(this,
this.configuration);
+ return this.configuration.isSynchronous() ? new
SynchronousDelegateProducer(producer) : producer;
}
}
- static final class QuarkusGrpcConsumer extends GrpcConsumer {
+ // Allow producer SSL configuration to do fallback when unsupported native
providers are specified
+ // Most of GrpcProducer is reproduced due to much of the configuration
being done by directly accessing private fields
+ // TODO: Remove when https://github.com/apache/camel-quarkus/issues/2966
is resolved
+ static final class QuarkusGrpcProducer extends GrpcProducer {
- public QuarkusGrpcConsumer(GrpcEndpoint endpoint, Processor processor,
GrpcConfiguration configuration) {
- super(endpoint, processor, configuration);
+ private static final Logger LOG =
LoggerFactory.getLogger(QuarkusGrpcProducer.class);
+ private ManagedChannel channel;
+ private Object grpcStub;
+ private GrpcExchangeForwarder forwarder;
+ private GrpcResponseRouterStreamObserver globalResponseObserver;
+
+ public QuarkusGrpcProducer(GrpcEndpoint endpoint, GrpcConfiguration
configuration) {
+ super(endpoint, configuration);
}
@Override
protected void doStart() throws Exception {
- // Quarkus gRPC extension handles server startup so we only need
to configure the BindableService for this consumer endpoint
- ServiceHelper.startService(getProcessor());
- BindableServiceFactory bindableServiceFactory =
CamelContextHelper.lookup(endpoint.getCamelContext(),
- GRPC_BINDABLE_SERVICE_FACTORY_NAME,
BindableServiceFactory.class);
- bindableServiceFactory.createBindableService(this);
+ if (channel == null) {
+ CallCredentials callCreds = null;
+ initializeChannel();
+
+ if (configuration.getAuthenticationType() ==
GrpcAuthType.GOOGLE) {
+
ObjectHelper.notNull(configuration.getKeyCertChainResource(),
"serviceAccountResource");
+
+ Credentials creds = GoogleCredentials.fromStream(
+
ResourceHelper.resolveResourceAsInputStream(endpoint.getCamelContext(),
+
configuration.getServiceAccountResource()));
+ callCreds = MoreCallCredentials.from(creds);
+ } else if (configuration.getAuthenticationType() ==
GrpcAuthType.JWT) {
+ ObjectHelper.notNull(configuration.getJwtSecret(),
"jwtSecret");
+
+ String jwtToken =
JwtHelper.createJwtToken(configuration.getJwtAlgorithm(),
configuration.getJwtSecret(),
+ configuration.getJwtIssuer(),
configuration.getJwtSubject());
+ callCreds = new JwtCallCredentials(jwtToken);
+ }
+
+ if (configuration.isSynchronous()) {
+ LOG.debug("Getting synchronous method stub from channel");
+ grpcStub =
GrpcUtils.constructGrpcBlockingStub(endpoint.getServicePackage(),
endpoint.getServiceName(),
+ channel,
+ callCreds, endpoint.getCamelContext());
+ } else {
+ LOG.debug("Getting asynchronous method stub from channel");
+ grpcStub =
GrpcUtils.constructGrpcAsyncStub(endpoint.getServicePackage(),
endpoint.getServiceName(),
+ channel,
+ callCreds, endpoint.getCamelContext());
+ }
+ forwarder =
GrpcExchangeForwarderFactory.createExchangeForwarder(configuration, grpcStub);
+
+ if (configuration.getStreamRepliesTo() != null) {
+ this.globalResponseObserver = new
GrpcResponseRouterStreamObserver(configuration, getEndpoint());
+ }
+
+ if (this.globalResponseObserver != null) {
+ ServiceHelper.startService(this.globalResponseObserver);
+ }
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (this.globalResponseObserver != null) {
+ ServiceHelper.stopService(this.globalResponseObserver);
+ }
+ if (channel != null) {
+ forwarder.shutdown();
+ forwarder = null;
+
+ LOG.debug("Terminating channel to the remote gRPC server");
+ channel.shutdown().shutdownNow();
+ channel = null;
+ grpcStub = null;
+ globalResponseObserver = null;
+ }
+ super.doStop();
+ }
+
+ @Override
+ protected void initializeChannel() throws Exception {
+ NettyChannelBuilder channelBuilder;
+
+ if (!ObjectHelper.isEmpty(configuration.getHost()) &&
!ObjectHelper.isEmpty(configuration.getPort())) {
+ LOG.info("Creating channel to the remote gRPC server {}:{}",
configuration.getHost(), configuration.getPort());
+ channelBuilder =
NettyChannelBuilder.forAddress(configuration.getHost(),
configuration.getPort());
+ } else {
+ throw new IllegalArgumentException("No connection properties
(host or port) specified");
+ }
+ if (configuration.getNegotiationType() == NegotiationType.TLS) {
+ ObjectHelper.notNull(configuration.getKeyCertChainResource(),
"keyCertChainResource");
+ ObjectHelper.notNull(configuration.getKeyResource(),
"keyResource");
+
+ SslContextBuilder sslContextBuilder =
GrpcSslContexts.forClient()
+ .sslProvider(SslProvider.OPENSSL)
+ .keyManager(
+
ResourceHelper.resolveResourceAsInputStream(endpoint.getCamelContext(),
+
configuration.getKeyCertChainResource()),
+
ResourceHelper.resolveResourceAsInputStream(endpoint.getCamelContext(),
+ configuration.getKeyResource()),
+ configuration.getKeyPassword());
+
+ if
(ObjectHelper.isNotEmpty(configuration.getTrustCertCollectionResource())) {
+ sslContextBuilder = sslContextBuilder
+
.trustManager(ResourceHelper.resolveResourceAsInputStream(endpoint.getCamelContext(),
+
configuration.getTrustCertCollectionResource()));
+ }
+
+ channelBuilder =
channelBuilder.sslContext(GrpcSslContexts.configure(sslContextBuilder).build());
+ }
+
+ channel =
channelBuilder.negotiationType(configuration.getNegotiationType())
+ .flowControlWindow(configuration.getFlowControlWindow())
+ .userAgent(configuration.getUserAgent())
+ .maxInboundMessageSize(configuration.getMaxMessageSize())
+ .intercept(configuration.getClientInterceptors())
+ .build();
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ StreamObserver<Object> streamObserver =
this.globalResponseObserver;
+ if (globalResponseObserver == null) {
+ streamObserver = new
GrpcResponseAggregationStreamObserver(exchange, callback);
+ }
+
+ return forwarder.forward(exchange, streamObserver, callback);
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ forwarder.forward(exchange);
}
}
}
diff --git
a/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/QuarkusBindableServiceFactory.java
b/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/QuarkusBindableServiceFactory.java
index c9f8b20..a28aabc 100644
---
a/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/QuarkusBindableServiceFactory.java
+++
b/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/QuarkusBindableServiceFactory.java
@@ -22,7 +22,6 @@ import javax.inject.Named;
import javax.inject.Singleton;
import io.grpc.BindableService;
-import io.quarkus.grpc.GrpcService;
import org.apache.camel.CamelContext;
import org.apache.camel.component.grpc.GrpcConsumer;
import org.apache.camel.component.grpc.GrpcEndpoint;
@@ -41,7 +40,6 @@ import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_BINDABLE_SERVIC
public class QuarkusBindableServiceFactory implements BindableServiceFactory {
@Inject
- @GrpcService
Instance<CamelQuarkusBindableService> bindableServices;
@Override
@@ -58,7 +56,8 @@ public class QuarkusBindableServiceFactory implements
BindableServiceFactory {
.findFirst()
.orElseThrow(() -> new IllegalStateException(
"Unable to find generated class for service " +
endpoint.getServiceName()));
- bindableService.setMethodHandler(new GrpcMethodHandler(consumer));
+ GrpcMethodHandler methodHandler = new GrpcMethodHandler(consumer);
+ bindableService.setMethodHandler(methodHandler);
return bindableService;
}
}
diff --git
a/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/graal/GrpcSubstitutions.java
b/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/graal/CamelGrpcSubstitutions.java
similarity index 59%
rename from
extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/graal/GrpcSubstitutions.java
rename to
extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/graal/CamelGrpcSubstitutions.java
index 9ca3e9f..289bec7 100644
---
a/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/graal/GrpcSubstitutions.java
+++
b/extensions/grpc/runtime/src/main/java/org/apache/camel/quarkus/grpc/runtime/graal/CamelGrpcSubstitutions.java
@@ -16,21 +16,40 @@
*/
package org.apache.camel.quarkus.grpc.runtime.graal;
+import com.oracle.svm.core.annotate.Alias;
import com.oracle.svm.core.annotate.Substitute;
import com.oracle.svm.core.annotate.TargetClass;
+import org.apache.camel.CamelContext;
import org.apache.camel.component.grpc.GrpcConsumer;
+import org.apache.camel.component.grpc.GrpcEndpoint;
import org.apache.camel.component.grpc.server.BindableServiceFactory;
+import org.apache.camel.support.CamelContextHelper;
-final class GrpcSubstitutions {
+import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_BINDABLE_SERVICE_FACTORY_NAME;
+
+final class CamelGrpcSubstitutions {
}
@TargetClass(GrpcConsumer.class)
final class SubstituteGrpcConsumer {
+ @Alias
+ protected GrpcEndpoint endpoint;
+
+ @Alias
+ private BindableServiceFactory factory;
+
@Substitute
private BindableServiceFactory getBindableServiceFactory() {
- // Remove unwanted references to javassist.
- // This is effectively replaced by the BindableServiceFactory lookup
in QuarkusGrpcConsumer.doStart()
- return null;
+ // Remove unwanted references to javassist
+ CamelContext context = endpoint.getCamelContext();
+ if (this.factory == null) {
+ BindableServiceFactory bindableServiceFactory =
CamelContextHelper.lookup(context,
+ GRPC_BINDABLE_SERVICE_FACTORY_NAME,
BindableServiceFactory.class);
+ if (bindableServiceFactory != null) {
+ this.factory = bindableServiceFactory;
+ }
+ }
+ return this.factory;
}
}
diff --git a/integration-tests/grpc/README.adoc
b/integration-tests/grpc/README.adoc
new file mode 100644
index 0000000..aed6f15
--- /dev/null
+++ b/integration-tests/grpc/README.adoc
@@ -0,0 +1,38 @@
+# gRPC test client & server certificate generation
+
+The certificates used by the client / server for the security integration
tests can be generated from within the src/main/resources/certs directory as
follows.
+
+Create the certificate authority.
+
+[source,shell]
+----
+openssl genrsa -out ca.key 2048
+openssl req -x509 -new -key ca.key -nodes -out ca.pem -days 3650 -config
ca-openssl.conf -extensions v3_req
+----
+
+Create the client certificate. When prompted for the 'common name' use
localhost as the value. It's safe to accept the defaults for the other options.
+
+[source,shell]
+----
+openssl genrsa -out client.key.rsa 2048
+openssl pkcs8 -topk8 -in client.key.rsa -out client.key -nocrypt
+openssl req -new -key client.key -out client.csr
+openssl x509 -req -in client.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out
client.pem -outform PEM -days 5000
+----
+
+Create the server certificate. When prompted for the 'common name' use
localhost as the value. It's safe to accept the defaults for the other options.
+
+[source,shell]
+----
+openssl genrsa -out server.key.rsa 2048
+openssl pkcs8 -topk8 -in server.key.rsa -out server.key -nocrypt
+openssl req -new -key server.key -out server.csr
+openssl x509 -req -in server.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out
server.pem -outform PEM -days 5000
+----
+
+Clean up.
+
+[source,shell]
+----
+rm -f *.rsa *.csr *.srl
+----
diff --git a/integration-tests/grpc/pom.xml b/integration-tests/grpc/pom.xml
index 718d7d7..1ef33e2 100644
--- a/integration-tests/grpc/pom.xml
+++ b/integration-tests/grpc/pom.xml
@@ -31,12 +31,20 @@
<dependencies>
<dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-bean</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-grpc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-direct</artifactId>
+ <artifactId>camel-quarkus-mock</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
@@ -63,6 +71,19 @@
<!-- The following dependencies guarantee that this module is built
after them. You can update them by running `mvn process-resources -Pformat -N`
from the source tree root directory -->
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-bean-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-direct-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
@@ -87,20 +108,99 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-mock-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
<plugins>
<plugin>
- <groupId>io.quarkus</groupId>
- <artifactId>quarkus-maven-plugin</artifactId>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>detect-os</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>detect</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.gmaven</groupId>
+ <artifactId>groovy-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <!-- Skip code generation & tests on unsupported
platforms -->
+ <id>skip-plugins-on-unsupported-platform</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>execute</goal>
+ </goals>
+ <configuration>
+ <source>
+
project.properties['skipTests']=project.properties['os.detected.classifier'].matches('^.*?(linux|windows|osx)-x86.*$')
? 'false' : 'true'
+ </source>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <extensions>true</extensions>
<executions>
<execution>
- <id>quarkus-generate-code</id>
<goals>
- <goal>generate-code</goal>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
</goals>
<phase>generate-sources</phase>
+ <configuration>
+
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+ <pluginId>grpc-java</pluginId>
+
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ <checkStaleness>true</checkStaleness>
+ <skip>${skipTests}</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-compile</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <configuration>
+ <skipMain>${skipTests}</skipMain>
+ </configuration>
+ </execution>
+ <execution>
+ <id>default-testCompile</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <configuration>
+ <skip>${skipTests}</skip>
+ </configuration>
</execution>
</executions>
</plugin>
diff --git
a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java
b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java
index 38a4baa..3b14996 100644
---
a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java
+++
b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcResource.java
@@ -16,19 +16,32 @@
*/
package org.apache.camel.quarkus.component.grpc.it;
+import java.util.List;
+
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
import org.apache.camel.quarkus.component.grpc.it.model.PongResponse;
+import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_HEADER;
+import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED;
+import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR;
+import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT;
+import static
org.apache.camel.component.grpc.GrpcConstants.GRPC_METHOD_NAME_HEADER;
+import static
org.apache.camel.quarkus.component.grpc.it.GrpcRoute.PING_PONG_SERVICE;
+
@Path("/grpc")
@ApplicationScoped
public class GrpcResource {
@@ -36,6 +49,9 @@ public class GrpcResource {
@Inject
ProducerTemplate producerTemplate;
+ @Inject
+ CamelContext context;
+
@Path("/producer")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@@ -46,9 +62,103 @@ public class GrpcResource {
.setPingId(pingId)
.build();
final PongResponse response = producerTemplate.requestBody(
-
"grpc://localhost:{{camel.grpc.test.server.port}}/org.apache.camel.quarkus.component.grpc.it.model.PingPong?method=pingSyncSync&synchronous=true",
+ "grpc://localhost:{{grpc.test.server.port}}/" +
PING_PONG_SERVICE + "?method=pingSyncSync&synchronous=true",
pingRequest, PongResponse.class);
return response.getPongName();
}
+ @Path("/forwardOnCompleted")
+ @GET
+ public void forwardOnCompleted() throws InterruptedException {
+ MockEndpoint endpoint = context.getEndpoint("mock:forwardOnCompleted",
MockEndpoint.class);
+ endpoint.expectedMessageCount(1);
+
endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER,
GRPC_EVENT_TYPE_ON_COMPLETED);
+
endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER,
"pingAsyncAsync");
+ endpoint.assertIsSatisfied(5000L);
+ }
+
+ @Path("/forwardOnError")
+ @GET
+ public String forwardOnError() throws InterruptedException {
+ MockEndpoint endpoint = context.getEndpoint("mock:forwardOnError",
MockEndpoint.class);
+ endpoint.expectedMessageCount(1);
+
endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER,
GRPC_EVENT_TYPE_ON_ERROR);
+
endpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER,
"pingAsyncAsync");
+ endpoint.assertIsSatisfied(5000L);
+
+ List<Exchange> exchanges = endpoint.getExchanges();
+ Exchange exchange = exchanges.get(0);
+ Throwable throwable = exchange.getMessage().getBody(Throwable.class);
+ return throwable.getClass().getName();
+ }
+
+ @Path("/grpcStreamReplies")
+ @GET
+ public void grpcStreamReplies() throws InterruptedException {
+ int messageCount = 10;
+ for (int i = 1; i <= messageCount; i++) {
+ PingRequest request =
PingRequest.newBuilder().setPingName(String.valueOf(i)).build();
+ producerTemplate.sendBody("direct:grpcStream", request);
+ }
+
+ MockEndpoint endpoint = context.getEndpoint("mock:grpcStreamReplies",
MockEndpoint.class);
+ endpoint.expectedMessageCount(messageCount);
+ endpoint.assertIsSatisfied();
+ }
+
+ @Path("/tls")
+ @GET
+ public void tlsConsumer() throws InterruptedException {
+ MockEndpoint mockEndpoint = context.getEndpoint("mock:tls",
MockEndpoint.class);
+ mockEndpoint.expectedMessageCount(1);
+
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER,
GRPC_EVENT_TYPE_ON_NEXT);
+
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER,
"pingAsyncSync");
+ mockEndpoint.assertIsSatisfied();
+ }
+
+ @Path("/tls")
+ @POST
+ @Produces(MediaType.TEXT_PLAIN)
+ public String tlsProducer(String message) {
+ MockEndpoint mockEndpoint = context.getEndpoint("mock:tls",
MockEndpoint.class);
+ try {
+ PingRequest pingRequest = PingRequest.newBuilder()
+ .setPingName(message)
+ .setPingId(12345)
+ .build();
+
+ PongResponse response =
producerTemplate.requestBody("direct:sendTls", pingRequest, PongResponse.class);
+ return response.getPongName();
+ } finally {
+ mockEndpoint.reset();
+ }
+ }
+
+ @Path("/jwt")
+ @GET
+ public void jwtConsumer() throws InterruptedException {
+ MockEndpoint mockEndpoint = context.getEndpoint("mock:jwt",
MockEndpoint.class);
+ mockEndpoint.expectedMessageCount(1);
+
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_EVENT_TYPE_HEADER,
GRPC_EVENT_TYPE_ON_NEXT);
+
mockEndpoint.expectedHeaderValuesReceivedInAnyOrder(GRPC_METHOD_NAME_HEADER,
"pingAsyncSync");
+ mockEndpoint.assertIsSatisfied();
+ }
+
+ @Path("/jwt")
+ @POST
+ @Produces(MediaType.TEXT_PLAIN)
+ public String jwtProducer(String message) {
+ MockEndpoint mockEndpoint = context.getEndpoint("mock:jwt",
MockEndpoint.class);
+ try {
+ PingRequest pingRequest = PingRequest.newBuilder()
+ .setPingName(message)
+ .setPingId(12345)
+ .build();
+
+ PongResponse response =
producerTemplate.requestBody("direct:sendJwt", pingRequest, PongResponse.class);
+ return response.getPongName();
+ } finally {
+ mockEndpoint.reset();
+ }
+ }
}
diff --git
a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java
b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java
index 23f31f6..c9b0838 100644
---
a/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java
+++
b/integration-tests/grpc/src/main/java/org/apache/camel/quarkus/component/grpc/it/GrpcRoute.java
@@ -16,33 +16,100 @@
*/
package org.apache.camel.quarkus.component.grpc.it;
-import java.util.Optional;
-
+import io.grpc.stub.StreamObserver;
+import io.quarkus.runtime.annotations.RegisterForReflection;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.grpc.GrpcConstants;
import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
import org.apache.camel.quarkus.component.grpc.it.model.PongResponse;
-import org.eclipse.microprofile.config.Config;
-import org.eclipse.microprofile.config.ConfigProvider;
public class GrpcRoute extends RouteBuilder {
+ public static final String GRPC_JWT_SECRET = "camel-quarkus-grpc-secret";
+ public static final String PING_PONG_SERVICE =
"org.apache.camel.quarkus.component.grpc.it.model.PingPong";
+
@Override
+ @SuppressWarnings("unchecked")
public void configure() throws Exception {
-
fromF("grpc://localhost:%d/org.apache.camel.quarkus.component.grpc.it.model.PingPong?synchronous=true",
- getServerPort())
+
fromF("grpc://localhost:{{camel.grpc.test.server.port}}/%s?synchronous=true",
PING_PONG_SERVICE)
+ .process(exchange -> {
+ final Message message = exchange.getMessage();
+ final PingRequest request =
message.getBody(PingRequest.class);
+ final PongResponse response = PongResponse.newBuilder()
+ .setPongName(request.getPingName() + " PONG")
+ .setPongId(request.getPingId())
+ .build();
+ message.setBody(response);
+ });
+
+
fromF("grpc://localhost:{{camel.grpc.test.forward.completed.server.port}}/%s?consumerStrategy=PROPAGATION&forwardOnCompleted=true",
+ PING_PONG_SERVICE)
+ .to("mock:forwardOnCompleted");
+
+
fromF("grpc://localhost:{{camel.grpc.test.forward.error.server.port}}/%s?consumerStrategy=PROPAGATION&forwardOnError=true",
+ PING_PONG_SERVICE)
+ .filter().body(Throwable.class)
+ .to("mock:forwardOnError");
+
+ from("direct:grpcStream")
+
.toF("grpc://localhost:{{camel.grpc.test.server.port}}/%s?producerStrategy=STREAMING&streamRepliesTo=direct:grpcStreamReplies&method=pingAsyncAsync",
+ PING_PONG_SERVICE);
+
+
fromF("grpc://localhost:{{camel.grpc.test.route.controlled.server.port}}/%s?synchronous=true&consumerStrategy=PROPAGATION&routeControlledStreamObserver=true",
+ PING_PONG_SERVICE)
.process(exchange -> {
- final Message message = exchange.getMessage();
- final PingRequest request =
message.getBody(PingRequest.class);
- final PongResponse response =
PongResponse.newBuilder().setPongName(request.getPingName() + " PONG")
- .setPongId(request.getPingId()).build();
- message.setBody(response);
+ Message message = exchange.getMessage();
+ PingRequest pingRequest =
message.getBody(PingRequest.class);
+
+ StreamObserver<Object> responseObserver =
(StreamObserver<Object>) exchange
+
.getProperty(GrpcConstants.GRPC_RESPONSE_OBSERVER);
+ PongResponse pongResponse =
PongResponse.newBuilder()
+ .setPongName(pingRequest.getPingName() + "
PONG")
+ .setPongId(pingRequest.getPingId())
+ .build();
+
+ message.setBody(pongResponse, PongResponse.class);
+ exchange.setMessage(message);
+ responseObserver.onNext(pongResponse);
+ responseObserver.onCompleted();
});
+
+ fromF("grpc://localhost:{{camel.grpc.test.tls.server.port}}"
+ + "/%s?consumerStrategy=PROPAGATION&"
+ + "negotiationType=TLS&keyCertChainResource=certs/server.pem&"
+ +
"keyResource=certs/server.key&trustCertCollectionResource=certs/ca.pem",
PING_PONG_SERVICE)
+ .to("mock:tls")
+ .bean(new GrpcMessageBuilder(),
"buildAsyncPongResponse");
+
+ from("direct:sendTls")
+ .toF("grpc://localhost:{{camel.grpc.test.tls.server.port}}"
+ + "/%s?method=pingSyncSync&synchronous=true&"
+ +
"negotiationType=TLS&keyCertChainResource=certs/client.pem&"
+ +
"keyResource=certs/client.key&trustCertCollectionResource=certs/ca.pem",
PING_PONG_SERVICE);
+
+ fromF("grpc://localhost:{{camel.grpc.test.jwt.server.port}}"
+ + "/%s?consumerStrategy=PROPAGATION&"
+ + "authenticationType=JWT&jwtSecret=%s", PING_PONG_SERVICE,
GRPC_JWT_SECRET)
+ .to("mock:jwt")
+ .bean(new GrpcMessageBuilder(),
"buildAsyncPongResponse");
+
+ from("direct:sendJwt")
+ .toF("grpc://localhost:{{camel.grpc.test.jwt.server.port}}"
+ + "/%s?method=pingSyncSync&synchronous=true&"
+ + "authenticationType=JWT&jwtSecret=%s",
PING_PONG_SERVICE, GRPC_JWT_SECRET);
+
+ from("direct:grpcStreamReplies")
+ .to("mock:grpcStreamReplies");
}
- public static int getServerPort() {
- Config config = ConfigProvider.getConfig();
- Optional<Integer> testServerPort =
config.getOptionalValue("quarkus.grpc.server.test-port", Integer.class);
- return testServerPort.orElse(9000);
+ @RegisterForReflection(fields = false)
+ static final class GrpcMessageBuilder {
+ public PongResponse buildAsyncPongResponse(PingRequest pingRequests) {
+ return PongResponse.newBuilder()
+ .setPongName(pingRequests.getPingName() + " PONG")
+ .setPongId(pingRequests.getPingId())
+ .build();
+ }
}
}
diff --git a/integration-tests/grpc/src/main/resources/application.properties
b/integration-tests/grpc/src/main/resources/application.properties
new file mode 100644
index 0000000..f51749e
--- /dev/null
+++ b/integration-tests/grpc/src/main/resources/application.properties
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+quarkus.native.resources.includes=certs/*.key,certs/*.pem
diff --git a/integration-tests/grpc/src/main/resources/certs/ca-openssl.conf
b/integration-tests/grpc/src/main/resources/certs/ca-openssl.conf
new file mode 100644
index 0000000..c8622b8
--- /dev/null
+++ b/integration-tests/grpc/src/main/resources/certs/ca-openssl.conf
@@ -0,0 +1,18 @@
+[req]
+distinguished_name = req_distinguished_name
+req_extensions = v3_req
+
+[req_distinguished_name]
+countryName = Country Name (2 letter code)
+countryName_default = US
+stateOrProvinceName = State or Province Name (full name)
+stateOrProvinceName_default = Test State
+organizationName = Organization Name (eg, company)
+organizationName_default = Apache Camel Quarkus
+commonName = Common Name (eg, YOUR name)
+commonName_default = localhost
+
+[v3_req]
+basicConstraints = CA:true
+keyUsage = critical, keyCertSign
+
diff --git a/integration-tests/grpc/src/main/resources/certs/ca.key
b/integration-tests/grpc/src/main/resources/certs/ca.key
new file mode 100644
index 0000000..785a1b0
--- /dev/null
+++ b/integration-tests/grpc/src/main/resources/certs/ca.key
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAya7tcM/2bJDin4fROIzX77T3NzNWDk9A8Y7F3OxKqdB15/aB
+Z8FshmVR7pcVW6IbVX7l069RVbsnfIic1HHqB5t/c5B6ACwD3X3fyL5VhlTCTRGX
+QPElVNBnu/8hMP1DXXQfrh6k8pZ5mpOSLRKZfkwxVfQR6tIX5uwAafnOXjVuViIv
+mf0EKWggI/xtFHId3okNg/tEQCfeqAviGl5PZO6MQWlsGeyjT7n4WJfOXyyWu2cP
+ssaJifUUvtiCIQ/diUW5GN6qAKK1CnfAd/Ad5gtPpQtpVqheKLGY+ZRIoDe4MmL0
+J/fkukVadUooTfxdJU1u38mYlUE5/6DUYpsqPwIDAQABAoIBACy63jBh/y5O5+Qn
+1DAtlIq2YQneGdzsmOO6HYoE4f5NnIjItCSV0oLwH77BxVvgVR2eGkzbmjdgXb6R
+8e5wIU50n0y7I0O8f1L1Ytd71y4GyF0dZKVjljr8SbV5PRxHX1Rvt8L/Suf+lZPB
+3O6gLGjFbz+39O1WkTSG7QSUAcAExtxovMloc/qC/Jp0M/ZTonasmbC5WkvHGgCK
+UKgFTYLJUmGDfj5Bk0i+ZI4gPElbGrEvMmw1l9BP3HLS1FFZiVC81m57S7RDaQQe
+w34PHQT5g9tVInjuHSUR/Y0zTyRMcara3e1aF8HBBLAVf+ViMLFH4mjmFbuAxgwE
+Yz9JINECgYEA45F/ZPFfj+nbE8nNnumMO9U5cLydK9LU7BCQjx3m0ygAXJIO3H2j
+FlRg09DRE2F5HBiXuE6mFzTjzFRwoSH4Iqy0V4ifUiiqIAoImneY0QxjEBYf8Mgl
+O+PM4WHfTU5hAbnUHDU0fN0MQz52S/f2h12uIs8AA5Mim60yURhduckCgYEA4uGH
+GeC5ihsuctmmKLR1vzFudp/BhL4bNRp6psOCFCnJ1K7qaGi8/f8kAAKcVFoFD16r
+Ivx+W133tqipYLzoKf4eCGcOZPAHJI2agAFREF3muS0m70hQwqWRIlbuF/J5NTBR
+rboV3cQX/RxDrw5R9KV/EkhCMzZik3VrnqXxR8cCgYB2+iL4y5mCzGlVtzRh2QkQ
+XzJg1rv1pYzcvuxVZvS/gTJvM43BiY4ypnrZQ1uMHnILVnA8VCJSD7zpn1wjYncw
+6cp/2VYyxj/r8FL/L3geE7PXPToF0R5maXs8esccy9W76UQUW1zNy2QV5BpFLace
+rsPUPxVqyQpXCf48l9iowQKBgQDNrxQh+795WwcTDAo9IM0WliPE2zQ/uXFFBUpo
+YRpOWUwmyqSsHzgnNOVYNQWw2yN3OaYAw9Nx1ZC7QdC8aMY5O63ShEtiatCu7MlC
+fu8U3WxMOIjxqSe+LweSkCiuAR6tf66sxuUH9byhtLwPufXGyzqWQnfxob5gXLVC
+UMKbBQKBgEjiPBAhclkub+OJwYZ8+079Z3aQ31ZfVnIffNS/3y2FJvBM2Y0CmtNA
+9YTuSyHO1mchcUG1bbmsc6ylvbMJ0PcoDAswQ6fURbmjvBNRcs5D9mW1ThrR2yV+
+Sg+1KqT3SM27nj1aQs0YsN7JoZT6FcAHQvzfB6KK7+lGxkwMBkGi
+-----END RSA PRIVATE KEY-----
diff --git a/integration-tests/grpc/src/main/resources/certs/ca.pem
b/integration-tests/grpc/src/main/resources/certs/ca.pem
new file mode 100644
index 0000000..15efcf3
--- /dev/null
+++ b/integration-tests/grpc/src/main/resources/certs/ca.pem
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDWjCCAkKgAwIBAgIUI3OU/rpsBhWj1cwW9kVAc1YnTuQwDQYJKoZIhvcNAQEL
+BQAwVjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTIw
+MDYwOTExNTQ0MloXDTMwMDYwNzExNTQ0MlowVjELMAkGA1UEBhMCQVUxEzARBgNV
+BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0
+ZDEPMA0GA1UEAwwGdGVzdGNhMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC
+AQEAya7tcM/2bJDin4fROIzX77T3NzNWDk9A8Y7F3OxKqdB15/aBZ8FshmVR7pcV
+W6IbVX7l069RVbsnfIic1HHqB5t/c5B6ACwD3X3fyL5VhlTCTRGXQPElVNBnu/8h
+MP1DXXQfrh6k8pZ5mpOSLRKZfkwxVfQR6tIX5uwAafnOXjVuViIvmf0EKWggI/xt
+FHId3okNg/tEQCfeqAviGl5PZO6MQWlsGeyjT7n4WJfOXyyWu2cPssaJifUUvtiC
+IQ/diUW5GN6qAKK1CnfAd/Ad5gtPpQtpVqheKLGY+ZRIoDe4MmL0J/fkukVadUoo
+TfxdJU1u38mYlUE5/6DUYpsqPwIDAQABoyAwHjAMBgNVHRMEBTADAQH/MA4GA1Ud
+DwEB/wQEAwICBDANBgkqhkiG9w0BAQsFAAOCAQEAF71GOXN1C7bd7kQdnLC87uQ+
+cF9REcrGb9kCF8YTRNDXTAIWAYRE4b8gGEPstOjOC/Kr4q4R0NqzYoXvWCnxUfV0
+ZyD+wlZTPNzkFRyJlIYFqwYJIoUvB/Qs/nbPINd4W4YSzeTdHuP98FR8nfEITQ36
+TZq2zTSwZSb0i14LRxe7uZjvZlK0rqiH2ELj3hSa40aq3g6b6cmZfZzILIATIg79
+iF47/kmPudi9/CvkU6GZ89suKkBDwOmqULLVqPL607M3UFTQs5qmyv9Ybr0pd6M2
+vs2KaYSobjToBwSDZ6dImp3+fQnjbNTi6VR+tfVEifGvVcq925Be6Z1u8KE8qg==
+-----END CERTIFICATE-----
diff --git a/integration-tests/grpc/src/main/resources/certs/client.key
b/integration-tests/grpc/src/main/resources/certs/client.key
new file mode 100644
index 0000000..860491e
--- /dev/null
+++ b/integration-tests/grpc/src/main/resources/certs/client.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDOt2D2q5xddLk9
+h7pFk2qcg2ySFgqzI3oGbWwncWoGY3LizXAwPJr96ZhWwoVDvhwWONoSs53eWTCr
+gLn12teacgk0ZuTuEzaThAePcLfnpT9QQiYXjyjpApJGnYOy48n3hJeYKknZqQuu
+EI/Z5+bIQKNd8XPznWGDCuuxmdtNsdneXbLrOInIu67gEVQnORGw2k5wwKA7lgYI
+1+848FxDTlTU31kWLDE3VZJIHAGbv8UaLP+/49cgJ2G5QHF28Gm9Jd08AZkXGEp4
+wMx7FvRjgeq6Hb3kZ6hKQUbVUejQBXRy3+Gqb3fFQsfEYKItrHPNKFddmzdof1IN
+i3pzjzwfAgMBAAECggEAUZrzHG4C/KHnvvKmBnF71O3NHp60+qM2uPLZj3imcn8w
+C3kuoNxME5D0IswH8AZM8DjhXiCqYeyz2gCnaPdLxhzepWE5XwMWuIWWtnm5ICmM
+JRL4FrTdYZ8yP85O/4ANazlc3yVSoiqqAb1oDI4P/V4HID8Bl1q59BGLmkEy4ihu
+RD4oHP5Vq1QVeAaYSdG4jLDvsLXO3ikQMCo4dTZfbfOzWFnjLQRwDwQoYmn2YSWn
+3Wt7sNbbCJd1OGnAVcdrZRrdM/672e2AbzenLWIKD8rlq5AMYri47j6UbbQjsMi4
+7xDiiSuj6nc5xkcqgc2wHO5jWLvwHA/0uFv9qFgWAQKBgQDpO2ly7wHtOgGmclVW
+Fh3/X1MzrE2xeSMyxqHzdJIQUlVP7kN7GCHN/7huwCVEaAOGuT8gW6R1zA/6Xz22
+G2g6hXQZM3DSMMntZRQeBrVU3CRMAQWx5K7Kp46bBNjbV3TVdUVk2vZlaK3z7iWt
+/z1N0xNvkFOJbxyGgZpArWPw7wKBgQDi5VMeT73J5i4ScPCNBhUwuzCy5QIOYkVn
+gtwhxNrYzSlzmqTG98p0waAS17uZeHwJJ0Sy8M8D7Sj/p5i7C6m8eLN8Apbz7zzY
+mOXt4+d8hbvD3SJCRkgHNEasAmqhxB44Kk9t1D2SsnAOR+Sprug+GC1Inzdxk222
+F2nN6q8H0QKBgQC1eDSj6LNnESsXhv4+7RaKDHGAUyBgZhCtAUAhXSII8QbQGmMG
+88/ZqI6oHXjhsMyNlrU3SBa+UPhdpp0thEWkwxGCSjQOV0fGHk94zV9pl5/mQyVb
+NMOLirO6ihrxDysoqwZDYts0LC6tDS7kIyRf2UvWEkXtOTbIsS6EBGUuoQKBgQC4
+iJR6QZZ0v11XKD2sJHdVg9jOlkx+0FPsJNTOLiFjVDDTUcGUBPHG1bdPu99aqVAP
+zb0k348ufVLr4i3oakro+y3WvPBygm91JZ1TRhj+AHI+kPoM9sYb/dtAiFKbMDr3
+qQmipIxTZAaYxsXlb1h5MAacWsmn+KSxBQfLLoSeEQKBgQDhR3O7P5LED/07lP4X
+eLAQR5TU+AjZBy74cKpvOQvo4VkhDQcLgf15yZOQEMqOTvuy46b1x6KNSRg4VlZH
+cBBLH/NlivWLnsM7Te9j2Rg0ZTGUPtoCU5q3dDw6AYG2TtEkDis2LvYc05khuGuS
+CdmyEemWogUiW2EYmFwJqV47Cg==
+-----END PRIVATE KEY-----
diff --git a/integration-tests/grpc/src/main/resources/certs/client.pem
b/integration-tests/grpc/src/main/resources/certs/client.pem
new file mode 100644
index 0000000..f92b3b5
--- /dev/null
+++ b/integration-tests/grpc/src/main/resources/certs/client.pem
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDMzCCAhsCFFJOv4SHojw+aduKtxbjvmuqhXbsMA0GCSqGSIb3DQEBCwUAMFYx
+CzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl
+cm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBnRlc3RjYTAeFw0yMDA2MDkx
+MTU1MzNaFw0zNDAyMTYxMTU1MzNaMFYxCzAJBgNVBAYTAlhYMRUwEwYDVQQHDAxE
+ZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQgQ29tcGFueSBMdGQxEjAQBgNV
+BAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAM63
+YParnF10uT2HukWTapyDbJIWCrMjegZtbCdxagZjcuLNcDA8mv3pmFbChUO+HBY4
+2hKznd5ZMKuAufXa15pyCTRm5O4TNpOEB49wt+elP1BCJhePKOkCkkadg7LjyfeE
+l5gqSdmpC64Qj9nn5shAo13xc/OdYYMK67GZ202x2d5dsus4ici7ruARVCc5EbDa
+TnDAoDuWBgjX7zjwXENOVNTfWRYsMTdVkkgcAZu/xRos/7/j1yAnYblAcXbwab0l
+3TwBmRcYSnjAzHsW9GOB6rodveRnqEpBRtVR6NAFdHLf4apvd8VCx8Rgoi2sc80o
+V12bN2h/Ug2LenOPPB8CAwEAATANBgkqhkiG9w0BAQsFAAOCAQEASorfZ3g4jLJY
+jYaGh2Lyg1dDiOGnCBgkJzO/lJ4t2qrXwf0OX3GepTHedO1Y4C00ca/XKGB8Qluh
+ylhJYrfG1O+fwm+CYHrg+sz2yRtnlHAdySCqa3DkD5RHy9CNIe7s2RGNpcz0gvrr
+byrqGB6u8AATvzVxO+Hx36Mttx89e1pcKoNmAJHAjueceJKmNgBFi+qZqEITrLeQ
+Q6sj4YzYEe9eVtTDqiZS2Tl5D/l/3YwRiWWyB3AxOINVnRMHdKM5qrUuVtX0hhIb
+YjMuUo/vOaimX9kqhphvDzE34hJCn2DQpEJUESUVBiXg8N6gUYjOH46Q8krAj48P
+yDTqHnU4yw==
+-----END CERTIFICATE-----
diff --git a/integration-tests/grpc/src/main/resources/certs/server.key
b/integration-tests/grpc/src/main/resources/certs/server.key
new file mode 100644
index 0000000..9f1c04a
--- /dev/null
+++ b/integration-tests/grpc/src/main/resources/certs/server.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCUrRCRuN5YYTVG
+XfYEb3JpSsi4ToEERMPsmfRxlljv0L8w6XbvWOk45kCLdk362SUNARdWv+mzgkuJ
+W9eIYxSUQSspkNcnpyhV6078i7ZPY7BERlNny5SZ1eg0ptEoB4Xuw6o1Hko5sjco
+xGGrxYPWaMmz0kLg6EmHgqsiCozXO1p/TpeltLUd0XFp/qE+NsxcjiuPosr0xioY
+F6QLrTeK7b+bR0VbyWQ0RVTZDta+mN9rfLmyN832h5zDaF6VMsHUdpj7ik6ehBhs
+DoLNMOR/zk1q34dx/AwQsrcBhlVFs/8NU3/QnYCgmGsZpld6+o6xZGpLqcxRTHM2
+aeKtwosdAgMBAAECggEAB8iTsH1ULp5QMrHimlLXU4nVvkhl2rpa430f3f54A34T
+NFVUo60s8IylXmfAJ1zM4sRm22TKsFom6DcFrp85SZWr085AZVQL+YuLJA+QaNGY
+PcrMvQdcbT1UncUJ+CWErn5R39wJGMIqvxig4GRDxarQvHhKhgnDJ7QAj5Sr0Oy3
+H57uTq718CitiF9BjSrRgLyg4ycmg+Li7hY+wyCJiLnyDdL7jdlGE5KGsTenrqEe
+/NvhgLqNic5WAyfH6NNCfkN7mCMps9BkMjHS4WGNLj/MEdas8h1sWmSJUtBLNbIL
+/S+5eUzClUI7KECm3s123dgWAdCXaEkQSZaKpRjyhQKBgQDEkCpGDZAcqMR7rJBz
+LikBb5ppc79a33ees69VwKcfvbgrCfbVnDz8fMhC0w4ZOXd7tICZX3wGdzymHVs5
+qlBrD96wvWjbNusH11jsT5mHtEsOpzZ1CYCYt+HVkvN54RFRxJ2FjdeQJywnCWc+
+iy1yeEOma0T2wccjxN63zxDq5wKBgQDBofy0QzZhwaIPs6v3Q7vRE73XKpWj7+U0
+GJY0X9GXEratusv4AXH/V/Aa6+/PexJ+zRSSlgfwqEPjgsZBmT52lshEfmQq4bvV
+hIRDNM+BOKyhzdOxRs3KD9oUSty5ElE43ntuW61APC3Id4a3rlcFcWPtvgbg/Fjf
+UzBFPEo9WwKBgQCSipQpk9T7uMp7FWS/qgtCGcz5jyR7ABzzASzXdoBiBpP16kDO
+ceN5rSGzyBs7rP8qB6DUNz6Ep4JcqYgoTpQTrUotY5h3zdKdDuwpiRKB/GT0sK7F
+xGbN9FD2Y88G70d1OWridaPewYWGONly5FnmfEibGl/YwxDO1ufgCwymwwKBgEW+
+B5wR3WOGb83CBD1ySxK6TpVSREWg4WhLNsa0gHQujFe0wUSZmKrEzLmVo3GSvY2o
+ZjGKEmAO6yzl7GyyOnpSQt3QvFlpptL4AP+H5PmBJUS+MvJWM0cFQKIweKG8RD78
+e267XIKXalIbIw3DoJpYRgrad/XPTl9ZwjgdyAujAoGAN+qEIOXVvq7Lnx050tZB
+TBDponVg5eVGfdduSx5nHUZhVIPH5oz5r+srFURgGutk8CTvfa2Tz8nS/uux5Df0
+Dp04OEDbuOS1BWerMKfOgZa1VMLnCbupAEjr9i+Oc8gmPlxLvypOoTQcZz6qXSo3
+Dp2ABZftNsNA5q1z4f/JW1M=
+-----END PRIVATE KEY-----
diff --git a/integration-tests/grpc/src/main/resources/certs/server.pem
b/integration-tests/grpc/src/main/resources/certs/server.pem
new file mode 100644
index 0000000..c1a1c07
--- /dev/null
+++ b/integration-tests/grpc/src/main/resources/certs/server.pem
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDMzCCAhsCFFJOv4SHojw+aduKtxbjvmuqhXbtMA0GCSqGSIb3DQEBCwUAMFYx
+CzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl
+cm5ldCBXaWRnaXRzIFB0eSBMdGQxDzANBgNVBAMMBnRlc3RjYTAeFw0yMDA2MDkx
+MTU2MDlaFw0zNDAyMTYxMTU2MDlaMFYxCzAJBgNVBAYTAlhYMRUwEwYDVQQHDAxE
+ZWZhdWx0IENpdHkxHDAaBgNVBAoME0RlZmF1bHQgQ29tcGFueSBMdGQxEjAQBgNV
+BAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAJSt
+EJG43lhhNUZd9gRvcmlKyLhOgQREw+yZ9HGWWO/QvzDpdu9Y6TjmQIt2TfrZJQ0B
+F1a/6bOCS4lb14hjFJRBKymQ1yenKFXrTvyLtk9jsERGU2fLlJnV6DSm0SgHhe7D
+qjUeSjmyNyjEYavFg9ZoybPSQuDoSYeCqyIKjNc7Wn9Ol6W0tR3RcWn+oT42zFyO
+K4+iyvTGKhgXpAutN4rtv5tHRVvJZDRFVNkO1r6Y32t8ubI3zfaHnMNoXpUywdR2
+mPuKTp6EGGwOgs0w5H/OTWrfh3H8DBCytwGGVUWz/w1Tf9CdgKCYaxmmV3r6jrFk
+akupzFFMczZp4q3Cix0CAwEAATANBgkqhkiG9w0BAQsFAAOCAQEAd4YnMK+556fT
+hRXtAtCU48tWkWkSEg4jV4fmp9vlm+Lo+Qa8A6X2S1wGpZrMtJVHdbbw4oLVq9NC
+AcFiVBXajMGFn181UnWqUTZCfzBNlYfJiZ0JF0ITzZEdoLv6nZlY9aQRCunEvx0J
+Bp9NVD8sErlr/7oJ2ni+uRrjmAvSnvhMimh+0jt3Chpb0Hy3/YDBLA0nbUW4aI/T
+6gPFuK+VTVtMyOt05a4ZbV/EsJIRjudiK3CcDsrmtsfPWPHDIQ7ZzJLzQgqC8HMp
+h1+gZKv35lCTHZsyZubHS7CEr2I0pIEZcDh9eTVRaiFOuPNxXsDxm5bZnDRllxvF
+m/4+pqqpfQ==
+-----END CERTIFICATE-----
diff --git
a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcServerTestResource.java
b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcServerTestResource.java
index b355d54..eb947fd 100644
---
a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcServerTestResource.java
+++
b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcServerTestResource.java
@@ -18,12 +18,12 @@
package org.apache.camel.quarkus.component.grpc.it;
import java.util.Map;
+import java.util.Objects;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import org.apache.camel.quarkus.test.AvailablePortFinder;
-import org.apache.camel.util.CollectionHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,9 +34,23 @@ public class GrpcServerTestResource implements
QuarkusTestResourceLifecycleManag
@Override
public Map<String, String> start() {
try {
- final int port = AvailablePortFinder.getNextAvailable();
- grpcServer = ServerBuilder.forPort(port).addService(new
PingPongImpl()).build().start();
- return CollectionHelper.mapOf("camel.grpc.test.server.port",
String.valueOf(port));
+ Map<String, String> config =
AvailablePortFinder.reserveNetworkPorts(
+ Objects::toString,
+ "grpc.test.server.port",
+ "camel.grpc.test.server.port",
+ "camel.grpc.test.forward.completed.server.port",
+ "camel.grpc.test.forward.error.server.port",
+ "camel.grpc.test.route.controlled.server.port",
+ "camel.grpc.test.tls.server.port",
+ "camel.grpc.test.jwt.server.port");
+
+ String port = config.get("grpc.test.server.port");
+ grpcServer = ServerBuilder.forPort(Integer.parseInt(port))
+ .addService(new PingPongImpl())
+ .build()
+ .start();
+
+ return config;
} catch (Exception e) {
throw new RuntimeException("Could not start gRPC server", e);
}
diff --git
a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
index e983b49..b9700ef 100644
---
a/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
+++
b/integration-tests/grpc/src/test/java/org/apache/camel/quarkus/component/grpc/it/GrpcTest.java
@@ -16,43 +16,65 @@
*/
package org.apache.camel.quarkus.component.grpc.it;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.StatusRuntimeException;
+import io.grpc.netty.GrpcSslContexts;
+import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.stub.StreamObserver;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
+import org.apache.camel.component.grpc.auth.jwt.JwtAlgorithm;
+import org.apache.camel.component.grpc.auth.jwt.JwtCallCredentials;
+import org.apache.camel.component.grpc.auth.jwt.JwtHelper;
import org.apache.camel.quarkus.component.grpc.it.model.PingPongGrpc;
import
org.apache.camel.quarkus.component.grpc.it.model.PingPongGrpc.PingPongBlockingStub;
import org.apache.camel.quarkus.component.grpc.it.model.PingRequest;
import org.apache.camel.quarkus.component.grpc.it.model.PongResponse;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import static
org.apache.camel.quarkus.component.grpc.it.GrpcRoute.GRPC_JWT_SECRET;
+import static
org.apache.camel.quarkus.component.grpc.it.PingPongImpl.GRPC_TEST_PONG_VALUE;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
@QuarkusTest
@QuarkusTestResource(GrpcServerTestResource.class)
class GrpcTest {
+ private static final String GRPC_TEST_PING_VALUE = "PING";
+ private static final int GRPC_TEST_PING_ID = 567;
+
@Test
public void consumer() {
- ManagedChannel syncRequestChannel = null;
+ Config config = ConfigProvider.getConfig();
+ Integer camelGrpcPort = config.getValue("camel.grpc.test.server.port",
Integer.class);
+ ManagedChannel channel = null;
try {
- syncRequestChannel = ManagedChannelBuilder.forAddress("localhost",
GrpcRoute.getServerPort()).usePlaintext()
+ channel = ManagedChannelBuilder.forAddress("localhost",
camelGrpcPort).usePlaintext()
.build();
- final PingPongBlockingStub blockingStub =
PingPongGrpc.newBlockingStub(syncRequestChannel);
+ final PingPongBlockingStub blockingStub =
PingPongGrpc.newBlockingStub(channel);
final PingRequest pingRequest = PingRequest.newBuilder()
- .setPingName("foo")
- .setPingId(567)
+ .setPingName(GRPC_TEST_PING_VALUE)
+ .setPingId(GRPC_TEST_PING_ID)
.build();
final PongResponse pongResponse =
blockingStub.pingSyncSync(pingRequest);
Assertions.assertNotNull(pongResponse);
- Assertions.assertEquals(567, pongResponse.getPongId());
- Assertions.assertEquals("foo PONG", pongResponse.getPongName());
+ assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+ assertEquals(GRPC_TEST_PING_VALUE + " PONG",
pongResponse.getPongName());
} finally {
- if (syncRequestChannel != null) {
- syncRequestChannel.shutdownNow();
+ if (channel != null) {
+ channel.shutdownNow();
}
}
}
@@ -63,11 +85,235 @@ class GrpcTest {
RestAssured.given()
.contentType("text/plain")
.queryParam("pingId", id)
- .body("PING")
+ .body(GRPC_TEST_PING_VALUE)
.post("/grpc/producer")
.then()
.statusCode(200)
.body(equalTo("PINGPONG"));
}
+
+ @Test
+ public void forwardOnComplete() throws InterruptedException {
+ Config config = ConfigProvider.getConfig();
+ Integer port =
config.getValue("camel.grpc.test.forward.completed.server.port", Integer.class);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost",
port).usePlaintext().build();
+ try {
+ PingPongGrpc.PingPongStub pingPongStub =
PingPongGrpc.newStub(channel);
+ PongResponseStreamObserver responseObserver = new
PongResponseStreamObserver(latch);
+ StreamObserver<PingRequest> requestObserver =
pingPongStub.pingAsyncAsync(responseObserver);
+ requestObserver.onCompleted();
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ RestAssured.get("/grpc/forwardOnCompleted")
+ .then()
+ .statusCode(204);
+ } finally {
+ channel.shutdownNow();
+ }
+ }
+
+ @Test
+ public void forwardOnError() throws InterruptedException {
+ Config config = ConfigProvider.getConfig();
+ Integer port =
config.getValue("camel.grpc.test.forward.error.server.port", Integer.class);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost",
port).usePlaintext().build();
+ try {
+ PingPongGrpc.PingPongStub pingPongStub =
PingPongGrpc.newStub(channel);
+ PongResponseStreamObserver responseObserver = new
PongResponseStreamObserver(latch, true);
+ StreamObserver<PingRequest> requestObserver =
pingPongStub.pingAsyncAsync(responseObserver);
+ requestObserver.onNext(null);
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ RestAssured.get("/grpc/forwardOnError")
+ .then()
+ .statusCode(200)
+ .body(is(StatusRuntimeException.class.getName()));
+ } finally {
+ channel.shutdownNow();
+ }
+ }
+
+ @Test
+ public void routeControlledStreamObserver() {
+ Config config = ConfigProvider.getConfig();
+ Integer port =
config.getValue("camel.grpc.test.route.controlled.server.port", Integer.class);
+
+ PingRequest pingRequest = PingRequest.newBuilder()
+ .setPingName(GRPC_TEST_PING_VALUE)
+ .setPingId(GRPC_TEST_PING_ID)
+ .build();
+
+ ManagedChannel channel = null;
+ try {
+ channel = ManagedChannelBuilder.forAddress("localhost",
port).usePlaintext().build();
+ PingPongBlockingStub blockingStub =
PingPongGrpc.newBlockingStub(channel);
+ PongResponse pongResponse = blockingStub.pingSyncSync(pingRequest);
+
+ assertNotNull(pongResponse);
+ assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+ assertEquals(GRPC_TEST_PING_VALUE + " " + GRPC_TEST_PONG_VALUE,
pongResponse.getPongName());
+ } finally {
+ if (channel != null) {
+ channel.shutdownNow();
+ }
+ }
+ }
+
+ @Test
+ public void streamReplies() {
+ RestAssured.get("/grpc/grpcStreamReplies")
+ .then()
+ .statusCode(204);
+ }
+
+ @Test
+ public void tlsConsumer() throws Exception {
+ Config config = ConfigProvider.getConfig();
+ Integer port = config.getValue("camel.grpc.test.tls.server.port",
Integer.class);
+
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
+ ManagedChannel channel = null;
+ try {
+ channel = NettyChannelBuilder.forAddress("localhost", port)
+ .sslContext(GrpcSslContexts.forClient()
+
.keyManager(classLoader.getResourceAsStream("certs/client.pem"),
+
classLoader.getResourceAsStream("certs/client.key"))
+
.trustManager(classLoader.getResourceAsStream("certs/ca.pem"))
+ .build())
+ .build();
+
+ PingPongGrpc.PingPongStub pingPongStub =
PingPongGrpc.newStub(channel);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ PingRequest pingRequest = PingRequest.newBuilder()
+ .setPingName(GRPC_TEST_PING_VALUE)
+ .setPingId(GRPC_TEST_PING_ID)
+ .build();
+
+ PongResponseStreamObserver responseObserver = new
PongResponseStreamObserver(latch);
+ StreamObserver<PingRequest> requestObserver =
pingPongStub.pingAsyncSync(responseObserver);
+ requestObserver.onNext(pingRequest);
+ latch.await(5, TimeUnit.SECONDS);
+
+ RestAssured.get("/grpc/tls")
+ .then()
+ .statusCode(204);
+
+ PongResponse pongResponse = responseObserver.getPongResponse();
+ assertNotNull(pongResponse);
+ assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+ assertEquals(GRPC_TEST_PING_VALUE + " " + GRPC_TEST_PONG_VALUE,
pongResponse.getPongName());
+ } finally {
+ if (channel != null) {
+ channel.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void tlsProducer() {
+ String message = GRPC_TEST_PING_VALUE + " TLS";
+ RestAssured.given()
+ .body(message)
+ .post("/grpc/tls")
+ .then()
+ .statusCode(200)
+ .body(is(message + " " + GRPC_TEST_PONG_VALUE));
+ }
+
+ @Test
+ public void jwtConsumer() throws Exception {
+ Config config = ConfigProvider.getConfig();
+ Integer port = config.getValue("camel.grpc.test.jwt.server.port",
Integer.class);
+
+ ManagedChannel channel = null;
+ try {
+ channel = NettyChannelBuilder.forAddress("localhost",
port).usePlaintext().build();
+
+ String jwtToken = JwtHelper.createJwtToken(JwtAlgorithm.HMAC256,
GRPC_JWT_SECRET, null, null);
+ PingPongGrpc.PingPongStub pingPongStub =
PingPongGrpc.newStub(channel)
+ .withCallCredentials(new JwtCallCredentials(jwtToken));
+
+ CountDownLatch latch = new CountDownLatch(1);
+ PingRequest pingRequest = PingRequest.newBuilder()
+ .setPingName(GRPC_TEST_PING_VALUE)
+ .setPingId(GRPC_TEST_PING_ID)
+ .build();
+
+ PongResponseStreamObserver responseObserver = new
PongResponseStreamObserver(latch);
+ StreamObserver<PingRequest> requestObserver =
pingPongStub.pingAsyncSync(responseObserver);
+ requestObserver.onNext(pingRequest);
+ latch.await(5, TimeUnit.SECONDS);
+
+ RestAssured.get("/grpc/jwt")
+ .then()
+ .statusCode(204);
+
+ PongResponse pongResponse = responseObserver.getPongResponse();
+ assertNotNull(pongResponse);
+ assertEquals(GRPC_TEST_PING_ID, pongResponse.getPongId());
+ assertEquals(GRPC_TEST_PING_VALUE + " " + GRPC_TEST_PONG_VALUE,
pongResponse.getPongName());
+ } finally {
+ if (channel != null) {
+ channel.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void jwtProducer() {
+ String message = GRPC_TEST_PING_VALUE + " JWT";
+ RestAssured.given()
+ .body(message)
+ .post("/grpc/jwt")
+ .then()
+ .statusCode(200)
+ .body(is(message + " " + GRPC_TEST_PONG_VALUE));
+ }
+
+ static final class PongResponseStreamObserver implements
StreamObserver<PongResponse> {
+ private PongResponse pongResponse;
+ private final CountDownLatch latch;
+ private final boolean simulateError;
+
+ public PongResponseStreamObserver(CountDownLatch latch) {
+ this(latch, false);
+ }
+
+ public PongResponseStreamObserver(CountDownLatch latch, boolean
simulateError) {
+ this.latch = latch;
+ this.simulateError = simulateError;
+ }
+
+ public PongResponse getPongResponse() {
+ return pongResponse;
+ }
+
+ @Override
+ public void onNext(PongResponse value) {
+ latch.countDown();
+ pongResponse = value;
+ if (simulateError) {
+ throw new IllegalStateException("Forced exception");
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ latch.countDown();
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index a9453cd..4c669b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,7 @@
<github-api.version>1.111</github-api.version><!-- Used in a Groovy
script bellow -->
<guava.version>29.0-jre</guava.version>
<graalvm.version>21.1.0</graalvm.version><!-- @sync
io.quarkus:quarkus-bom:${quarkus.version} dep:org.graalvm.nativeimage:svm -->
+ <grpc.version>1.35.0</grpc.version><!-- @sync
io.quarkus:quarkus-bom:${quarkus.version} dep:io.grpc:grpc-core -->
<gson.version>2.8.6</gson.version><!-- @sync
com.ibm.jsonata4java:JSONata4Java:${jsonata4java-version}
dep:com.google.code.gson:gson -->
<hadoop2.version>${hadoop2-version}</hadoop2.version><!-- Spark -->
<hapi.version>${hapi-version}</hapi.version>
@@ -165,7 +166,9 @@
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
<!-- NOTE: We pin to this version due to
https://github.com/apache/camel-quarkus/issues/723 -->
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
+
<os-maven-plugin.version>${os-maven-plugin-version}</os-maven-plugin.version>
<properties-maven-plugin.version>1.0.0</properties-maven-plugin.version>
+
<protobuf-maven-plugin.version>${protobuf-maven-plugin-version}</protobuf-maven-plugin.version>
<rpkgtests-maven-plugin.version>0.10.0</rpkgtests-maven-plugin.version>
<!-- Plugin configuration through properties -->
@@ -414,7 +417,7 @@
<type>pom</type>
<exclusions>
<exclusion>
- <!-- groovy-testng depends on testng 7.2.0
wich is not on maven central -->
+ <!-- groovy-testng depends on testng 7.2.0
which is not on maven central -->
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-testng</artifactId>
</exclusion>
@@ -453,6 +456,7 @@
<exclude>**/*.graphql</exclude>
<exclude>**/*.ics</exclude>
<exclude>**/*.jks</exclude>
+ <exclude>**/*.key</exclude>
<exclude>**/*.kts</exclude>
<exclude>**/*.lock</exclude>
<exclude>**/*.mp3</exclude>
diff --git a/poms/build-parent/pom.xml b/poms/build-parent/pom.xml
index c939c21..e6f51de 100644
--- a/poms/build-parent/pom.xml
+++ b/poms/build-parent/pom.xml
@@ -71,6 +71,11 @@
</executions>
</plugin>
<plugin>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>${os-maven-plugin.version}</version>
+ </plugin>
+ <plugin>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-maven-plugin</artifactId>
<version>${project.version}</version>
@@ -90,6 +95,11 @@
<version>${jandex-maven-plugin.version}</version>
</plugin>
<plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>${protobuf-maven-plugin.version}</version>
+ </plugin>
+ <plugin>
<groupId>net.revelc.code.formatter</groupId>
<artifactId>formatter-maven-plugin</artifactId>
<version>${formatter-maven-plugin.version}</version>