This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new dae5983b720 CAMEL-20277 make gRPC proxy routes available for all
streaming method types (#12586)
dae5983b720 is described below
commit dae5983b720580c394216c9943934c4d92539151
Author: Ivan Mashtak <[email protected]>
AuthorDate: Sun Dec 24 11:06:12 2023 +0300
CAMEL-20277 make gRPC proxy routes available for all streaming method types
(#12586)
---
.../org/apache/camel/catalog/components/grpc.json | 38 ++---
.../component/grpc/GrpcEndpointConfigurer.java | 12 ++
.../component/grpc/GrpcEndpointUriFactory.java | 4 +-
.../org/apache/camel/component/grpc/grpc.json | 38 ++---
.../camel-grpc/src/main/docs/grpc-component.adoc | 85 +++++++++++
.../camel/component/grpc/GrpcConfiguration.java | 29 +++-
.../apache/camel/component/grpc/GrpcConsumer.java | 2 +-
.../camel/component/grpc/GrpcConsumerStrategy.java | 4 +-
.../apache/camel/component/grpc/GrpcEndpoint.java | 3 +
.../apache/camel/component/grpc/GrpcProducer.java | 28 +---
...rpcProducerToRouteControlledStreamObserver.java | 41 +++++
.../client/GrpcResponseRouterStreamObserver.java | 77 ++++++----
.../grpc/client/GrpcStreamObserverFactory.java | 66 ++++++++
.../client/GrpcStreamingExchangeForwarder.java | 12 +-
.../component/grpc/server/GrpcMethodHandler.java | 2 +
.../GrpcRequestDelegationStreamObserver.java | 62 ++++++++
.../component/grpc/GrpcProxyAsyncAsyncTest.java | 167 +++++++++++++++++++++
.../component/grpc/GrpcProxyAsyncSyncTest.java | 162 ++++++++++++++++++++
.../component/grpc/GrpcProxySyncAsyncTest.java | 148 ++++++++++++++++++
.../endpoint/dsl/GrpcEndpointBuilderFactory.java | 84 ++++++++++-
20 files changed, 973 insertions(+), 91 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json
index bec29142995..a58d2e33112 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/grpc.json
@@ -38,7 +38,7 @@
"flowControlWindow": { "index": 3, "kind": "parameter", "displayName":
"Flow Control Window", "group": "common", "label": "", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 1048576, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The HTTP\/2 flow control window size (MiB)" },
"maxMessageSize": { "index": 4, "kind": "parameter", "displayName": "Max
Message Size", "group": "common", "label": "", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 4194304, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The maximum message size allowed to be
received\/sent (MiB)" },
"autoDiscoverServerInterceptors": { "index": 5, "kind": "parameter",
"displayName": "Auto Discover Server Interceptors", "group": "consumer",
"label": "consumer", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Setting the autoDiscoverServerInterceptors
mechanism, [...]
- "consumerStrategy": { "index": 6, "kind": "parameter", "displayName":
"Consumer Strategy", "group": "consumer", "label": "consumer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcConsumerStrategy", "enum": [
"AGGREGATION", "PROPAGATION" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "PROPAGATION", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "desc [...]
+ "consumerStrategy": { "index": 6, "kind": "parameter", "displayName":
"Consumer Strategy", "group": "consumer", "label": "consumer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcConsumerStrategy", "enum": [
"AGGREGATION", "PROPAGATION", "DELEGATION" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "PROPAGATION", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configu [...]
"forwardOnCompleted": { "index": 7, "kind": "parameter", "displayName":
"Forward On Completed", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration",
"configurationField": "configuration", "description": "Determines if
onCompleted events should be pushed to the Camel route." },
"forwardOnError": { "index": 8, "kind": "parameter", "displayName":
"Forward On Error", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration",
"configurationField": "configuration", "description": "Determines if onError
events should be pushed to the Camel route. Exceptions will be s [...]
"maxConcurrentCallsPerConnection": { "index": 9, "kind": "parameter",
"displayName": "Max Concurrent Calls Per Connection", "group": "consumer",
"label": "consumer", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
2147483647, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The maximum number of concurrent calls
permitted [...]
@@ -47,22 +47,24 @@
"exceptionHandler": { "index": 12, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
"exchangePattern": { "index": 13, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
"autoDiscoverClientInterceptors": { "index": 14, "kind": "parameter",
"displayName": "Auto Discover Client Interceptors", "group": "producer",
"label": "producer", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Setting the autoDiscoverClientInterceptors
mechanism, [...]
- "method": { "index": 15, "kind": "parameter", "displayName": "Method",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "gRPC method name" },
- "producerStrategy": { "index": 16, "kind": "parameter", "displayName":
"Producer Strategy", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcProducerStrategy", "enum": [ "SIMPLE",
"STREAMING" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "SIMPLE", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": " [...]
- "streamRepliesTo": { "index": 17, "kind": "parameter", "displayName":
"Stream Replies To", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "When using STREAMING client mode, it indicates
the endpoint where responses should be forwarded." },
- "userAgent": { "index": 18, "kind": "parameter", "displayName": "User
Agent", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The user agent header passed to the server" },
- "lazyStartProducer": { "index": 19, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
- "synchronous": { "index": 20, "kind": "parameter", "displayName":
"Synchronous", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Sets whether synchronous processing should be
strictly used" },
- "authenticationType": { "index": 21, "kind": "parameter", "displayName":
"Authentication Type", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcAuthType", "enum": [ "NONE", "GOOGLE",
"JWT" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "NONE", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Auth [...]
- "jwtAlgorithm": { "index": 22, "kind": "parameter", "displayName": "Jwt
Algorithm", "group": "security", "label": "security", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.grpc.auth.jwt.JwtAlgorithm", "enum": [ "HMAC256",
"HMAC384", "HMAC512" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "HMAC256", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description [...]
- "jwtIssuer": { "index": 23, "kind": "parameter", "displayName": "Jwt
Issuer", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token issuer" },
- "jwtSecret": { "index": 24, "kind": "parameter", "displayName": "Jwt
Secret", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token secret" },
- "jwtSubject": { "index": 25, "kind": "parameter", "displayName": "Jwt
Subject", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token subject" },
- "keyCertChainResource": { "index": 26, "kind": "parameter", "displayName":
"Key Cert Chain Resource", "group": "security", "label": "security",
"required": false, "type": "string", "javaType": "java.lang.String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The X.509 certificate chain file r [...]
- "keyPassword": { "index": 27, "kind": "parameter", "displayName": "Key
Password", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The PKCS#8 private key file password" },
- "keyResource": { "index": 28, "kind": "parameter", "displayName": "Key
Resource", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false,
"supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The PKCS#8 private key file resource in PEM
format link" },
- "negotiationType": { "index": 29, "kind": "parameter", "displayName":
"Negotiation Type", "group": "security", "label": "security", "required":
false, "type": "object", "javaType": "io.grpc.netty.NegotiationType", "enum": [
"TLS", "PLAINTEXT_UPGRADE", "PLAINTEXT" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "PLAINTEXT", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Iden [...]
- "serviceAccountResource": { "index": 30, "kind": "parameter",
"displayName": "Service Account Resource", "group": "security", "label":
"security", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Service Account key file in JSO [...]
- "trustCertCollectionResource": { "index": 31, "kind": "parameter",
"displayName": "Trust Cert Collection Resource", "group": "security", "label":
"security", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The trusted certific [...]
+ "inheritExchangePropertiesForReplies": { "index": 15, "kind": "parameter",
"displayName": "Inherit Exchange Properties For Replies", "group": "producer",
"label": "producer", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Copies exchange properties from original [...]
+ "method": { "index": 16, "kind": "parameter", "displayName": "Method",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "gRPC method name" },
+ "producerStrategy": { "index": 17, "kind": "parameter", "displayName":
"Producer Strategy", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcProducerStrategy", "enum": [ "SIMPLE",
"STREAMING" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "SIMPLE", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": " [...]
+ "streamRepliesTo": { "index": 18, "kind": "parameter", "displayName":
"Stream Replies To", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "When using STREAMING client mode, it indicates
the endpoint where responses should be forwarded." },
+ "toRouteControlledStreamObserver": { "index": 19, "kind": "parameter",
"displayName": "To Route Controlled Stream Observer", "group": "producer",
"label": "producer", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Expects that exchange property
GrpcConstants.GRPC [...]
+ "userAgent": { "index": 20, "kind": "parameter", "displayName": "User
Agent", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The user agent header passed to the server" },
+ "lazyStartProducer": { "index": 21, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "synchronous": { "index": 22, "kind": "parameter", "displayName":
"Synchronous", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Sets whether synchronous processing should be
strictly used" },
+ "authenticationType": { "index": 23, "kind": "parameter", "displayName":
"Authentication Type", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcAuthType", "enum": [ "NONE", "GOOGLE",
"JWT" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "NONE", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Auth [...]
+ "jwtAlgorithm": { "index": 24, "kind": "parameter", "displayName": "Jwt
Algorithm", "group": "security", "label": "security", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.grpc.auth.jwt.JwtAlgorithm", "enum": [ "HMAC256",
"HMAC384", "HMAC512" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "HMAC256", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description [...]
+ "jwtIssuer": { "index": 25, "kind": "parameter", "displayName": "Jwt
Issuer", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token issuer" },
+ "jwtSecret": { "index": 26, "kind": "parameter", "displayName": "Jwt
Secret", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token secret" },
+ "jwtSubject": { "index": 27, "kind": "parameter", "displayName": "Jwt
Subject", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token subject" },
+ "keyCertChainResource": { "index": 28, "kind": "parameter", "displayName":
"Key Cert Chain Resource", "group": "security", "label": "security",
"required": false, "type": "string", "javaType": "java.lang.String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The X.509 certificate chain file r [...]
+ "keyPassword": { "index": 29, "kind": "parameter", "displayName": "Key
Password", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The PKCS#8 private key file password" },
+ "keyResource": { "index": 30, "kind": "parameter", "displayName": "Key
Resource", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false,
"supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The PKCS#8 private key file resource in PEM
format link" },
+ "negotiationType": { "index": 31, "kind": "parameter", "displayName":
"Negotiation Type", "group": "security", "label": "security", "required":
false, "type": "object", "javaType": "io.grpc.netty.NegotiationType", "enum": [
"TLS", "PLAINTEXT_UPGRADE", "PLAINTEXT" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "PLAINTEXT", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Iden [...]
+ "serviceAccountResource": { "index": 32, "kind": "parameter",
"displayName": "Service Account Resource", "group": "security", "label":
"security", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Service Account key file in JSO [...]
+ "trustCertCollectionResource": { "index": 33, "kind": "parameter",
"displayName": "Trust Cert Collection Resource", "group": "security", "label":
"security", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The trusted certific [...]
}
}
diff --git
a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java
b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java
index 9a849bc6836..c982e98b20c 100644
---
a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java
+++
b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointConfigurer.java
@@ -41,6 +41,8 @@ public class GrpcEndpointConfigurer extends
PropertyConfigurerSupport implements
case "forwardOnCompleted":
target.getConfiguration().setForwardOnCompleted(property(camelContext,
boolean.class, value)); return true;
case "forwardonerror":
case "forwardOnError":
target.getConfiguration().setForwardOnError(property(camelContext,
boolean.class, value)); return true;
+ case "inheritexchangepropertiesforreplies":
+ case "inheritExchangePropertiesForReplies":
target.getConfiguration().setInheritExchangePropertiesForReplies(property(camelContext,
boolean.class, value)); return true;
case "jwtalgorithm":
case "jwtAlgorithm":
target.getConfiguration().setJwtAlgorithm(property(camelContext,
org.apache.camel.component.grpc.auth.jwt.JwtAlgorithm.class, value)); return
true;
case "jwtissuer":
@@ -73,6 +75,8 @@ public class GrpcEndpointConfigurer extends
PropertyConfigurerSupport implements
case "streamrepliesto":
case "streamRepliesTo":
target.getConfiguration().setStreamRepliesTo(property(camelContext,
java.lang.String.class, value)); return true;
case "synchronous":
target.getConfiguration().setSynchronous(property(camelContext, boolean.class,
value)); return true;
+ case "toroutecontrolledstreamobserver":
+ case "toRouteControlledStreamObserver":
target.getConfiguration().setToRouteControlledStreamObserver(property(camelContext,
boolean.class, value)); return true;
case "trustcertcollectionresource":
case "trustCertCollectionResource":
target.getConfiguration().setTrustCertCollectionResource(property(camelContext,
java.lang.String.class, value)); return true;
case "useragent":
@@ -104,6 +108,8 @@ public class GrpcEndpointConfigurer extends
PropertyConfigurerSupport implements
case "forwardOnCompleted": return boolean.class;
case "forwardonerror":
case "forwardOnError": return boolean.class;
+ case "inheritexchangepropertiesforreplies":
+ case "inheritExchangePropertiesForReplies": return boolean.class;
case "jwtalgorithm":
case "jwtAlgorithm": return
org.apache.camel.component.grpc.auth.jwt.JwtAlgorithm.class;
case "jwtissuer":
@@ -136,6 +142,8 @@ public class GrpcEndpointConfigurer extends
PropertyConfigurerSupport implements
case "streamrepliesto":
case "streamRepliesTo": return java.lang.String.class;
case "synchronous": return boolean.class;
+ case "toroutecontrolledstreamobserver":
+ case "toRouteControlledStreamObserver": return boolean.class;
case "trustcertcollectionresource":
case "trustCertCollectionResource": return java.lang.String.class;
case "useragent":
@@ -168,6 +176,8 @@ public class GrpcEndpointConfigurer extends
PropertyConfigurerSupport implements
case "forwardOnCompleted": return
target.getConfiguration().isForwardOnCompleted();
case "forwardonerror":
case "forwardOnError": return
target.getConfiguration().isForwardOnError();
+ case "inheritexchangepropertiesforreplies":
+ case "inheritExchangePropertiesForReplies": return
target.getConfiguration().isInheritExchangePropertiesForReplies();
case "jwtalgorithm":
case "jwtAlgorithm": return
target.getConfiguration().getJwtAlgorithm();
case "jwtissuer":
@@ -200,6 +210,8 @@ public class GrpcEndpointConfigurer extends
PropertyConfigurerSupport implements
case "streamrepliesto":
case "streamRepliesTo": return
target.getConfiguration().getStreamRepliesTo();
case "synchronous": return target.getConfiguration().isSynchronous();
+ case "toroutecontrolledstreamobserver":
+ case "toRouteControlledStreamObserver": return
target.getConfiguration().isToRouteControlledStreamObserver();
case "trustcertcollectionresource":
case "trustCertCollectionResource": return
target.getConfiguration().getTrustCertCollectionResource();
case "useragent":
diff --git
a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java
b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java
index d4350e5c7cc..fe990fa1072 100644
---
a/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java
+++
b/components/camel-grpc/src/generated/java/org/apache/camel/component/grpc/GrpcEndpointUriFactory.java
@@ -21,7 +21,7 @@ public class GrpcEndpointUriFactory extends
org.apache.camel.support.component.E
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(32);
+ Set<String> props = new HashSet<>(34);
props.add("authenticationType");
props.add("autoDiscoverClientInterceptors");
props.add("autoDiscoverServerInterceptors");
@@ -33,6 +33,7 @@ public class GrpcEndpointUriFactory extends
org.apache.camel.support.component.E
props.add("forwardOnCompleted");
props.add("forwardOnError");
props.add("host");
+ props.add("inheritExchangePropertiesForReplies");
props.add("jwtAlgorithm");
props.add("jwtIssuer");
props.add("jwtSecret");
@@ -52,6 +53,7 @@ public class GrpcEndpointUriFactory extends
org.apache.camel.support.component.E
props.add("serviceAccountResource");
props.add("streamRepliesTo");
props.add("synchronous");
+ props.add("toRouteControlledStreamObserver");
props.add("trustCertCollectionResource");
props.add("userAgent");
PROPERTY_NAMES = Collections.unmodifiableSet(props);
diff --git
a/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json
b/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json
index bec29142995..a58d2e33112 100644
---
a/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json
+++
b/components/camel-grpc/src/generated/resources/org/apache/camel/component/grpc/grpc.json
@@ -38,7 +38,7 @@
"flowControlWindow": { "index": 3, "kind": "parameter", "displayName":
"Flow Control Window", "group": "common", "label": "", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 1048576, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The HTTP\/2 flow control window size (MiB)" },
"maxMessageSize": { "index": 4, "kind": "parameter", "displayName": "Max
Message Size", "group": "common", "label": "", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 4194304, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The maximum message size allowed to be
received\/sent (MiB)" },
"autoDiscoverServerInterceptors": { "index": 5, "kind": "parameter",
"displayName": "Auto Discover Server Interceptors", "group": "consumer",
"label": "consumer", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Setting the autoDiscoverServerInterceptors
mechanism, [...]
- "consumerStrategy": { "index": 6, "kind": "parameter", "displayName":
"Consumer Strategy", "group": "consumer", "label": "consumer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcConsumerStrategy", "enum": [
"AGGREGATION", "PROPAGATION" ], "deprecated": false, "autowired": false,
"secret": false, "defaultValue": "PROPAGATION", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "desc [...]
+ "consumerStrategy": { "index": 6, "kind": "parameter", "displayName":
"Consumer Strategy", "group": "consumer", "label": "consumer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcConsumerStrategy", "enum": [
"AGGREGATION", "PROPAGATION", "DELEGATION" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "PROPAGATION", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configu [...]
"forwardOnCompleted": { "index": 7, "kind": "parameter", "displayName":
"Forward On Completed", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration",
"configurationField": "configuration", "description": "Determines if
onCompleted events should be pushed to the Camel route." },
"forwardOnError": { "index": 8, "kind": "parameter", "displayName":
"Forward On Error", "group": "consumer", "label": "consumer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.grpc.GrpcConfiguration",
"configurationField": "configuration", "description": "Determines if onError
events should be pushed to the Camel route. Exceptions will be s [...]
"maxConcurrentCallsPerConnection": { "index": 9, "kind": "parameter",
"displayName": "Max Concurrent Calls Per Connection", "group": "consumer",
"label": "consumer", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
2147483647, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The maximum number of concurrent calls
permitted [...]
@@ -47,22 +47,24 @@
"exceptionHandler": { "index": 12, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
"exchangePattern": { "index": 13, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
"autoDiscoverClientInterceptors": { "index": 14, "kind": "parameter",
"displayName": "Auto Discover Client Interceptors", "group": "producer",
"label": "producer", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Setting the autoDiscoverClientInterceptors
mechanism, [...]
- "method": { "index": 15, "kind": "parameter", "displayName": "Method",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "gRPC method name" },
- "producerStrategy": { "index": 16, "kind": "parameter", "displayName":
"Producer Strategy", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcProducerStrategy", "enum": [ "SIMPLE",
"STREAMING" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "SIMPLE", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": " [...]
- "streamRepliesTo": { "index": 17, "kind": "parameter", "displayName":
"Stream Replies To", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "When using STREAMING client mode, it indicates
the endpoint where responses should be forwarded." },
- "userAgent": { "index": 18, "kind": "parameter", "displayName": "User
Agent", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The user agent header passed to the server" },
- "lazyStartProducer": { "index": 19, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
- "synchronous": { "index": 20, "kind": "parameter", "displayName":
"Synchronous", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Sets whether synchronous processing should be
strictly used" },
- "authenticationType": { "index": 21, "kind": "parameter", "displayName":
"Authentication Type", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcAuthType", "enum": [ "NONE", "GOOGLE",
"JWT" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "NONE", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Auth [...]
- "jwtAlgorithm": { "index": 22, "kind": "parameter", "displayName": "Jwt
Algorithm", "group": "security", "label": "security", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.grpc.auth.jwt.JwtAlgorithm", "enum": [ "HMAC256",
"HMAC384", "HMAC512" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "HMAC256", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description [...]
- "jwtIssuer": { "index": 23, "kind": "parameter", "displayName": "Jwt
Issuer", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token issuer" },
- "jwtSecret": { "index": 24, "kind": "parameter", "displayName": "Jwt
Secret", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token secret" },
- "jwtSubject": { "index": 25, "kind": "parameter", "displayName": "Jwt
Subject", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token subject" },
- "keyCertChainResource": { "index": 26, "kind": "parameter", "displayName":
"Key Cert Chain Resource", "group": "security", "label": "security",
"required": false, "type": "string", "javaType": "java.lang.String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The X.509 certificate chain file r [...]
- "keyPassword": { "index": 27, "kind": "parameter", "displayName": "Key
Password", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The PKCS#8 private key file password" },
- "keyResource": { "index": 28, "kind": "parameter", "displayName": "Key
Resource", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false,
"supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The PKCS#8 private key file resource in PEM
format link" },
- "negotiationType": { "index": 29, "kind": "parameter", "displayName":
"Negotiation Type", "group": "security", "label": "security", "required":
false, "type": "object", "javaType": "io.grpc.netty.NegotiationType", "enum": [
"TLS", "PLAINTEXT_UPGRADE", "PLAINTEXT" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "PLAINTEXT", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Iden [...]
- "serviceAccountResource": { "index": 30, "kind": "parameter",
"displayName": "Service Account Resource", "group": "security", "label":
"security", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Service Account key file in JSO [...]
- "trustCertCollectionResource": { "index": 31, "kind": "parameter",
"displayName": "Trust Cert Collection Resource", "group": "security", "label":
"security", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The trusted certific [...]
+ "inheritExchangePropertiesForReplies": { "index": 15, "kind": "parameter",
"displayName": "Inherit Exchange Properties For Replies", "group": "producer",
"label": "producer", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Copies exchange properties from original [...]
+ "method": { "index": 16, "kind": "parameter", "displayName": "Method",
"group": "producer", "label": "producer", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "gRPC method name" },
+ "producerStrategy": { "index": 17, "kind": "parameter", "displayName":
"Producer Strategy", "group": "producer", "label": "producer", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcProducerStrategy", "enum": [ "SIMPLE",
"STREAMING" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "SIMPLE", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": " [...]
+ "streamRepliesTo": { "index": 18, "kind": "parameter", "displayName":
"Stream Replies To", "group": "producer", "label": "producer", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "When using STREAMING client mode, it indicates
the endpoint where responses should be forwarded." },
+ "toRouteControlledStreamObserver": { "index": 19, "kind": "parameter",
"displayName": "To Route Controlled Stream Observer", "group": "producer",
"label": "producer", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Expects that exchange property
GrpcConstants.GRPC [...]
+ "userAgent": { "index": 20, "kind": "parameter", "displayName": "User
Agent", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The user agent header passed to the server" },
+ "lazyStartProducer": { "index": 21, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "synchronous": { "index": 22, "kind": "parameter", "displayName":
"Synchronous", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Sets whether synchronous processing should be
strictly used" },
+ "authenticationType": { "index": 23, "kind": "parameter", "displayName":
"Authentication Type", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.component.grpc.GrpcAuthType", "enum": [ "NONE", "GOOGLE",
"JWT" ], "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "NONE", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Auth [...]
+ "jwtAlgorithm": { "index": 24, "kind": "parameter", "displayName": "Jwt
Algorithm", "group": "security", "label": "security", "required": false,
"type": "object", "javaType":
"org.apache.camel.component.grpc.auth.jwt.JwtAlgorithm", "enum": [ "HMAC256",
"HMAC384", "HMAC512" ], "deprecated": false, "autowired": false, "secret":
false, "defaultValue": "HMAC256", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description [...]
+ "jwtIssuer": { "index": 25, "kind": "parameter", "displayName": "Jwt
Issuer", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token issuer" },
+ "jwtSecret": { "index": 26, "kind": "parameter", "displayName": "Jwt
Secret", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token secret" },
+ "jwtSubject": { "index": 27, "kind": "parameter", "displayName": "Jwt
Subject", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "JSON Web Token subject" },
+ "keyCertChainResource": { "index": 28, "kind": "parameter", "displayName":
"Key Cert Chain Resource", "group": "security", "label": "security",
"required": false, "type": "string", "javaType": "java.lang.String",
"deprecated": false, "deprecationNote": "", "autowired": false, "secret":
false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The X.509 certificate chain file r [...]
+ "keyPassword": { "index": 29, "kind": "parameter", "displayName": "Key
Password", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The PKCS#8 private key file password" },
+ "keyResource": { "index": 30, "kind": "parameter", "displayName": "Key
Resource", "group": "security", "label": "security", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false,
"supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The PKCS#8 private key file resource in PEM
format link" },
+ "negotiationType": { "index": 31, "kind": "parameter", "displayName":
"Negotiation Type", "group": "security", "label": "security", "required":
false, "type": "object", "javaType": "io.grpc.netty.NegotiationType", "enum": [
"TLS", "PLAINTEXT_UPGRADE", "PLAINTEXT" ], "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "PLAINTEXT", "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Iden [...]
+ "serviceAccountResource": { "index": 32, "kind": "parameter",
"displayName": "Service Account Resource", "group": "security", "label":
"security", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "Service Account key file in JSO [...]
+ "trustCertCollectionResource": { "index": 33, "kind": "parameter",
"displayName": "Trust Cert Collection Resource", "group": "security", "label":
"security", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "supportFileReference": true, "configurationClass":
"org.apache.camel.component.grpc.GrpcConfiguration", "configurationField":
"configuration", "description": "The trusted certific [...]
}
}
diff --git a/components/camel-grpc/src/main/docs/grpc-component.adoc
b/components/camel-grpc/src/main/docs/grpc-component.adoc
index 40ec083bdce..0492b5ce5de 100644
--- a/components/camel-grpc/src/main/docs/grpc-component.adoc
+++ b/components/camel-grpc/src/main/docs/grpc-component.adoc
@@ -102,6 +102,91 @@ The table below shows the types of objects in the message
body, depending on the
|=======================================================================
+== gRPC Proxy
+
+It is not possible to create universal proxy-route for all methods, so you
need to divide your gRPC service into several services by method's type: unary,
server streaming, client streaming and bidirectional streaming.
+
+=== Unary
+
+For unary requests it is enough to write the following code:
+
+[source,java]
+----
+from("grpc://localhost:1101" +
+ "/org.apache.camel.component.grpc.PingPong"
+)
+ .toD("grpc://remotehost:1101" +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?method=${header.CamelGrpcMethodName}"
+ )
+----
+
+=== Server streaming
+
+Server streaming may be done by the same approach as unary, but in that
configuration Camel route will wait stream for completion and will aggregate
all responses to list before sending that data as response stream. If this
behavior is unacceptable, you need to apply a number of options:
+
+1. Set `routeControlledStreamObserver=true` for consumer. Later it will be
used to publish responses;
+2. Set `streamRepliesTo` option for producer to handle streaming nature of
responses;
+3. Set forwarding of `onError` and `onCompleted` for producer;
+4. Set `inheritExchangePropertiesForReplies=true` to inherit `StreamObserver`
obtained on the first step;
+5. Create another route to process streamed data. That route must contain
gRPC-producer step with the only parameter
`toRouteControlledStreamObserver=true` which will publish incoming exchanges as
response stream elements.
+
+Example:
+
+[source,java]
+----
+from("grpc://localhost:1101" +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?routeControlledStreamObserver=true"
+)
+ .toD("grpc://remotehost:1101" +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?method=${header.CamelGrpcMethodName}" +
+ "&streamRepliesTo=direct:next" +
+ "&forwardOnError=true" +
+ "&forwardOnCompleted=true" +
+ "&inheritExchangePropertiesForReplies=true"
+ );
+
+from("direct:next")
+ .to("grpc://dummy:0/?toRouteControlledStreamObserver=true");
+----
+
+=== Client streaming and bidirectional streaming
+
+Both client streaming and bidirectional streaming gRPC methods exposes
StreamObserver as responses handler, so you need the same technic as described
in server streaming section -- all 5 steps.
+
+But there another thing -- requests also comes in streaming mode. So you need
the following:
+
+1. Set consumer strategy to DELEGATION -- that differs from default
PROPAGATION option in the fact that consumer will not produce responses at all.
If you set PROPAGATION, then you will receive more responses than you expected;
+2. Forward `onError` and `onCompletion` on consumer;
+3. Set producer strategy to STREAMING.
+
+Example:
+
+[source,java]
+----
+from("grpc://localhost:1101" +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?routeControlledStreamObserver=true" +
+ "&consumerStrategy=DELEGATION" +
+ "&forwardOnError=true" +
+ "&forwardOnCompleted=true"
+)
+ .toD("grpc://remotehost:1101" +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?method=${header.CamelGrpcMethodName}" +
+ "&producerStrategy=STREAMING" +
+ "&streamRepliesTo=direct:next" +
+ "&forwardOnError=true" +
+ "&forwardOnCompleted=true" +
+ "&inheritExchangePropertiesForReplies=true"
+ );
+
+from("direct:next")
+ .to("grpc://dummy:0/?toRouteControlledStreamObserver=true");
+----
+
== Examples
Below is a simple synchronous method invoke with host and port parameters
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
index 5b46fb6dd8d..098759b145e 100644
---
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConfiguration.java
@@ -130,6 +130,14 @@ public class GrpcConfiguration {
description = "Sets whether synchronous processing should be
strictly used")
private boolean synchronous;
+ @UriParam(defaultValue = "false", label = "producer",
+ description = "Copies exchange properties from original exchange
to all exchanges created for route defined by streamRepliesTo.")
+ private boolean inheritExchangePropertiesForReplies = false;
+
+ @UriParam(defaultValue = "false", label = "producer",
+ description = "Expects that exchange property
GrpcConstants.GRPC_RESPONSE_OBSERVER is set. Takes its value and calls onNext,
onError and onComplete on that StreamObserver. All other gRPC parameters are
ignored.")
+ private boolean toRouteControlledStreamObserver = false;
+
/**
* Fully qualified service name from the protocol buffer descriptor file
(package dot service definition name)
*/
@@ -300,7 +308,10 @@ public class GrpcConfiguration {
* This option specifies the top-level strategy for processing service
requests and responses in streaming mode. If
* an aggregation strategy is selected, all requests will be accumulated
in the list, then transferred to the flow,
* and the accumulated responses will be sent to the sender. If a
propagation strategy is selected, request is sent
- * to the stream, and the response will be immediately sent back to the
sender.
+ * to the stream, and the response will be immediately sent back to the
sender. If a delegation strategy is
+ * selected, request is sent to the stream, but no response generated
under the assumption that all necessary
+ * responses will be sent at another part of route. Delegation strategy
always comes with
+ * routeControlledStreamObserver=true to be able to achieve the assumption.
*/
public GrpcConsumerStrategy getConsumerStrategy() {
return consumerStrategy;
@@ -470,6 +481,22 @@ public class GrpcConfiguration {
this.synchronous = synchronous;
}
+ public boolean isInheritExchangePropertiesForReplies() {
+ return inheritExchangePropertiesForReplies;
+ }
+
+ public void setInheritExchangePropertiesForReplies(boolean
inheritExchangePropertiesForReplies) {
+ this.inheritExchangePropertiesForReplies =
inheritExchangePropertiesForReplies;
+ }
+
+ public boolean isToRouteControlledStreamObserver() {
+ return toRouteControlledStreamObserver;
+ }
+
+ public void setToRouteControlledStreamObserver(boolean
toRouteControlledStreamObserver) {
+ this.toRouteControlledStreamObserver = toRouteControlledStreamObserver;
+ }
+
public void parseURI(URI uri) {
setHost(uri.getHost());
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
index 38ecabea11a..d41ef7a1756 100644
---
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumer.java
@@ -104,7 +104,7 @@ public class GrpcConsumer extends DefaultConsumer {
if (configuration.isRouteControlledStreamObserver()
&& configuration.getConsumerStrategy() ==
GrpcConsumerStrategy.AGGREGATION) {
throw new IllegalArgumentException(
- "Consumer strategy AGGREGATION and
routeControlledStreamObserver are not compatible. Set the consumer strategy to
PROPAGATION");
+ "Consumer strategy AGGREGATION and
routeControlledStreamObserver are not compatible. Set the consumer strategy to
PROPAGATION or DELEGATION");
}
if (configuration.getNegotiationType() == NegotiationType.TLS) {
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumerStrategy.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumerStrategy.java
index 65a4e3d82b0..44fc4b8ee6d 100644
---
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumerStrategy.java
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcConsumerStrategy.java
@@ -29,6 +29,8 @@ public enum GrpcConsumerStrategy {
/**
* Process each streaming element of a request independently.
*/
- PROPAGATION;
+ PROPAGATION,
+
+ DELEGATION;
}
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java
index e293f0b2280..aa0fd8d0ad1 100644
---
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcEndpoint.java
@@ -49,6 +49,9 @@ public class GrpcEndpoint extends DefaultEndpoint {
@Override
public Producer createProducer() throws Exception {
+ if (configuration.isToRouteControlledStreamObserver()) {
+ return new GrpcProducerToRouteControlledStreamObserver(this);
+ }
GrpcProducer producer = new GrpcProducer(this, configuration);
if (configuration.isSynchronous()) {
return new SynchronousDelegateProducer(producer);
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
index eef7d85ecb5..3290221debd 100644
---
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducer.java
@@ -30,10 +30,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.component.grpc.auth.jwt.JwtCallCredentials;
import org.apache.camel.component.grpc.auth.jwt.JwtHelper;
-import org.apache.camel.component.grpc.client.GrpcExchangeForwarder;
-import org.apache.camel.component.grpc.client.GrpcExchangeForwarderFactory;
-import
org.apache.camel.component.grpc.client.GrpcResponseAggregationStreamObserver;
-import org.apache.camel.component.grpc.client.GrpcResponseRouterStreamObserver;
+import org.apache.camel.component.grpc.client.*;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.ResourceHelper;
import org.apache.camel.support.service.ServiceHelper;
@@ -53,7 +50,7 @@ public class GrpcProducer extends DefaultAsyncProducer {
private ManagedChannel channel;
private Object grpcStub;
private GrpcExchangeForwarder forwarder;
- private StreamObserver<Object> globalResponseObserver;
+ private GrpcStreamObserverFactory streamObserverFactory;
public GrpcProducer(GrpcEndpoint endpoint, GrpcConfiguration
configuration) {
super(endpoint);
@@ -76,11 +73,7 @@ public class GrpcProducer extends DefaultAsyncProducer {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
- StreamObserver<Object> streamObserver = this.globalResponseObserver;
- if (globalResponseObserver == null) {
- streamObserver = new
GrpcResponseAggregationStreamObserver(exchange, callback);
- }
-
+ StreamObserver<Object> streamObserver =
streamObserverFactory.getStreamObserver(exchange, callback);
return forwarder.forward(exchange, streamObserver, callback);
}
@@ -122,21 +115,14 @@ public class GrpcProducer extends DefaultAsyncProducer {
}
forwarder =
GrpcExchangeForwarderFactory.createExchangeForwarder(configuration, grpcStub);
- if (configuration.getStreamRepliesTo() != null) {
- this.globalResponseObserver = new
GrpcResponseRouterStreamObserver(configuration, getEndpoint());
- }
-
- if (this.globalResponseObserver != null) {
- ServiceHelper.startService(this.globalResponseObserver);
- }
+ streamObserverFactory = new
GrpcStreamObserverFactory(getEndpoint(), configuration);
+ ServiceHelper.startService(streamObserverFactory);
}
}
@Override
protected void doStop() throws Exception {
- if (this.globalResponseObserver != null) {
- ServiceHelper.stopService(this.globalResponseObserver);
- }
+ ServiceHelper.stopService(streamObserverFactory);
if (channel != null) {
forwarder.shutdown();
forwarder = null;
@@ -145,7 +131,7 @@ public class GrpcProducer extends DefaultAsyncProducer {
channel.shutdown().shutdownNow();
channel = null;
grpcStub = null;
- globalResponseObserver = null;
+ streamObserverFactory = null;
}
super.doStop();
}
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducerToRouteControlledStreamObserver.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducerToRouteControlledStreamObserver.java
new file mode 100644
index 00000000000..6f57a1c5f4d
--- /dev/null
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/GrpcProducerToRouteControlledStreamObserver.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultProducer;
+
+public class GrpcProducerToRouteControlledStreamObserver extends
DefaultProducer {
+ public GrpcProducerToRouteControlledStreamObserver(Endpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ StreamObserver<Object> observer = exchange.getProperty(
+ GrpcConstants.GRPC_RESPONSE_OBSERVER, StreamObserver.class);
+ String eventType = exchange.getMessage().getHeader(
+ GrpcConstants.GRPC_EVENT_TYPE_HEADER, String.class);
+ switch (eventType) {
+ case GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT ->
observer.onNext(exchange.getMessage().getBody());
+ case GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED ->
observer.onCompleted();
+ case GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR ->
observer.onError((Throwable) exchange.getMessage().getBody());
+ }
+ }
+}
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
index 2b290fd41b9..152a43cb2e0 100644
---
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
@@ -16,72 +16,95 @@
*/
package org.apache.camel.component.grpc.client;
+import java.util.Objects;
+
import io.grpc.stub.StreamObserver;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProducer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.component.grpc.GrpcConfiguration;
import org.apache.camel.component.grpc.GrpcConstants;
-import org.apache.camel.support.CamelContextHelper;
-import org.apache.camel.support.service.ServiceHelper;
-import org.apache.camel.support.service.ServiceSupport;
/**
* A stream observer that routes all responses to another endpoint.
*/
-public class GrpcResponseRouterStreamObserver extends ServiceSupport
implements StreamObserver<Object> {
+public class GrpcResponseRouterStreamObserver implements
StreamObserver<Object> {
private final Endpoint sourceEndpoint;
private final GrpcConfiguration configuration;
- private Endpoint endpoint;
- private AsyncProducer producer;
+ private final AsyncProducer producer;
+ private final Exchange exchange;
+ private final AsyncCallback callback;
- public GrpcResponseRouterStreamObserver(GrpcConfiguration configuration,
Endpoint sourceEndpoint) {
+ public GrpcResponseRouterStreamObserver(GrpcConfiguration configuration,
+ Endpoint sourceEndpoint,
+ AsyncProducer producer,
+ Exchange exchange,
+ AsyncCallback callback) {
this.configuration = configuration;
this.sourceEndpoint = sourceEndpoint;
+ this.producer = producer;
+ this.exchange = exchange;
+ this.callback = callback;
}
@Override
public void onNext(Object o) {
- Exchange exchange = sourceEndpoint.createExchange();
- exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT);
- exchange.getIn().setBody(o);
- doSend(exchange);
+ Exchange newExchange = sourceEndpoint.createExchange();
+ inherit(newExchange);
+ newExchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT);
+ newExchange.getIn().setBody(o);
+ doSend(newExchange);
}
@Override
public void onError(Throwable throwable) {
if (configuration.isForwardOnError()) {
- Exchange exchange = sourceEndpoint.createExchange();
- exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR);
- exchange.getIn().setBody(throwable);
- doSend(exchange);
+ Exchange newExchange = sourceEndpoint.createExchange();
+ inherit(newExchange);
+
newExchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR);
+ newExchange.getIn().setBody(throwable);
+ doSend(newExchange);
}
+ callback.done(true);
}
@Override
public void onCompleted() {
if (configuration.isForwardOnCompleted()) {
- Exchange exchange = sourceEndpoint.createExchange();
- exchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
- doSend(exchange);
+ Exchange newExchange = sourceEndpoint.createExchange();
+ inherit(newExchange);
+
newExchange.getIn().setHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED);
+ doSend(newExchange);
}
+ callback.done(true);
+ }
+
+ private void doSend(Exchange newExchange) {
+ producer.processAsync(newExchange);
}
- private void doSend(Exchange exchange) {
- producer.processAsync(exchange);
+ private void inherit(Exchange newExchange) {
+ if (configuration.isInheritExchangePropertiesForReplies()) {
+ for (var entry : exchange.getProperties().entrySet()) {
+ newExchange.setProperty(entry.getKey(), entry.getValue());
+ }
+ }
}
@Override
- protected void doStart() throws Exception {
- this.endpoint
- =
CamelContextHelper.getMandatoryEndpoint(sourceEndpoint.getCamelContext(),
configuration.getStreamRepliesTo());
- this.producer = endpoint.createAsyncProducer();
- ServiceHelper.startService(producer);
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ GrpcResponseRouterStreamObserver that =
(GrpcResponseRouterStreamObserver) o;
+ return Objects.equals(sourceEndpoint, that.sourceEndpoint) &&
Objects.equals(producer, that.producer);
}
@Override
- protected void doStop() throws Exception {
- ServiceHelper.stopService(producer);
+ public int hashCode() {
+ return Objects.hash(sourceEndpoint, producer);
}
}
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamObserverFactory.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamObserverFactory.java
new file mode 100644
index 00000000000..9385520c42f
--- /dev/null
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamObserverFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc.client;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.grpc.GrpcConfiguration;
+import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.service.ServiceSupport;
+
+public class GrpcStreamObserverFactory extends ServiceSupport {
+
+ private final Endpoint sourceEndpoint;
+ private final GrpcConfiguration configuration;
+
+ private Endpoint endpoint;
+ private AsyncProducer producer;
+
+ public GrpcStreamObserverFactory(Endpoint sourceEndpoint,
GrpcConfiguration configuration) {
+ this.sourceEndpoint = sourceEndpoint;
+ this.configuration = configuration;
+ }
+
+ public StreamObserver<Object> getStreamObserver(Exchange exchange,
AsyncCallback callback) {
+ if (configuration.getStreamRepliesTo() == null) {
+ return new GrpcResponseAggregationStreamObserver(exchange,
callback);
+ } else {
+ return new GrpcResponseRouterStreamObserver(configuration,
sourceEndpoint, producer, exchange, callback);
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (configuration.getStreamRepliesTo() != null) {
+ this.endpoint = CamelContextHelper.getMandatoryEndpoint(
+ sourceEndpoint.getCamelContext(),
configuration.getStreamRepliesTo());
+ this.producer = endpoint.createAsyncProducer();
+ ServiceHelper.startService(producer);
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (configuration.getStreamRepliesTo() != null) {
+ ServiceHelper.stopService(producer);
+ }
+ }
+}
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamingExchangeForwarder.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamingExchangeForwarder.java
index eb3c2844d56..c67805417bc 100644
---
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamingExchangeForwarder.java
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcStreamingExchangeForwarder.java
@@ -21,6 +21,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.grpc.GrpcConfiguration;
+import org.apache.camel.component.grpc.GrpcConstants;
import org.apache.camel.component.grpc.GrpcUtils;
/**
@@ -44,7 +45,16 @@ class GrpcStreamingExchangeForwarder implements
GrpcExchangeForwarder {
@Override
public boolean forward(Exchange exchange, StreamObserver<Object>
responseObserver, AsyncCallback callback) {
Message message = exchange.getIn();
-
checkAndRecreateStreamObserver(responseObserver).onNext(message.getBody());
+ StreamObserver<Object> streamObserver =
checkAndRecreateStreamObserver(responseObserver);
+ if
(message.getHeaders().containsKey(GrpcConstants.GRPC_EVENT_TYPE_HEADER)) {
+ switch (message.getHeader(GrpcConstants.GRPC_EVENT_TYPE_HEADER,
String.class)) {
+ case GrpcConstants.GRPC_EVENT_TYPE_ON_NEXT ->
streamObserver.onNext(message.getBody());
+ case GrpcConstants.GRPC_EVENT_TYPE_ON_ERROR ->
streamObserver.onError((Throwable) message.getBody());
+ case GrpcConstants.GRPC_EVENT_TYPE_ON_COMPLETED ->
streamObserver.onCompleted();
+ }
+ } else {
+ streamObserver.onNext(message.getBody());
+ }
callback.done(true);
return true;
}
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
index bb7c4d1b60f..f0ee042534c 100644
---
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcMethodHandler.java
@@ -105,6 +105,8 @@ public class GrpcMethodHandler {
requestObserver = new
GrpcRequestAggregationStreamObserver(endpoint, consumer, responseObserver,
grcpHeaders);
} else if (consumer.getConfiguration().getConsumerStrategy() ==
GrpcConsumerStrategy.PROPAGATION) {
requestObserver = new
GrpcRequestPropagationStreamObserver(endpoint, consumer, responseObserver,
grcpHeaders);
+ } else if (consumer.getConfiguration().getConsumerStrategy() ==
GrpcConsumerStrategy.DELEGATION) {
+ requestObserver = new
GrpcRequestDelegationStreamObserver(endpoint, consumer, responseObserver,
grcpHeaders);
} else {
throw new IllegalArgumentException(
"gRPC processing strategy not implemented " +
consumer.getConfiguration().getConsumerStrategy());
diff --git
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestDelegationStreamObserver.java
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestDelegationStreamObserver.java
new file mode 100644
index 00000000000..bb7d2bd2018
--- /dev/null
+++
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/server/GrpcRequestDelegationStreamObserver.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc.server;
+
+import java.util.Map;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.component.grpc.GrpcConstants;
+import org.apache.camel.component.grpc.GrpcConsumer;
+import org.apache.camel.component.grpc.GrpcEndpoint;
+
+public class GrpcRequestDelegationStreamObserver extends
GrpcRequestAbstractStreamObserver {
+
+ public GrpcRequestDelegationStreamObserver(GrpcEndpoint endpoint,
GrpcConsumer consumer,
+ StreamObserver<Object>
responseObserver, Map<String, Object> headers) {
+ super(endpoint, consumer, responseObserver, headers);
+ if (!endpoint.getConfiguration().isRouteControlledStreamObserver()) {
+ throw new IllegalStateException(
+ "DELEGATION consumer strategy must be used with enabled
routeControlledStreamObserver");
+ }
+ }
+
+ @Override
+ public void onNext(Object request) {
+ var exchange = endpoint.createExchange();
+ exchange.getIn().setBody(request);
+ exchange.getIn().setHeaders(headers);
+ exchange.setProperty(GrpcConstants.GRPC_RESPONSE_OBSERVER,
responseObserver);
+ consumer.process(exchange, doneSync -> {
+ });
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ var exchange = endpoint.createExchange();
+ exchange.getIn().setHeaders(headers);
+ exchange.setProperty(GrpcConstants.GRPC_RESPONSE_OBSERVER,
responseObserver);
+ consumer.onError(exchange, throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ var exchange = endpoint.createExchange();
+ exchange.getIn().setHeaders(headers);
+ exchange.setProperty(GrpcConstants.GRPC_RESPONSE_OBSERVER,
responseObserver);
+ consumer.onCompleted(exchange);
+ }
+}
diff --git
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncAsyncTest.java
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncAsyncTest.java
new file mode 100644
index 00000000000..9e57ddb907b
--- /dev/null
+++
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncAsyncTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class GrpcProxyAsyncAsyncTest extends CamelTestSupport {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GrpcProxyAsyncAsyncTest.class);
+
+ private static final int GRPC_STUB_PORT =
AvailablePortFinder.getNextAvailable();
+ private static final int GRPC_ROUTE_PORT =
AvailablePortFinder.getNextAvailable();
+
+ private static Server grpcServer;
+ private ManagedChannel channel;
+ private PingPongGrpc.PingPongStub stub;
+ private final AtomicBoolean routeHasException = new AtomicBoolean(false);
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ grpcServer = ServerBuilder.forPort(GRPC_STUB_PORT).addService(new
PingPongImpl()).build().start();
+ LOG.info("gRPC server started on port {}", GRPC_STUB_PORT);
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ if (grpcServer != null) {
+ grpcServer.shutdown();
+ LOG.info("gRPC server stopped");
+ }
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ channel = ManagedChannelBuilder.forAddress("localhost",
GRPC_ROUTE_PORT).usePlaintext().build();
+ stub = PingPongGrpc.newStub(channel);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void asyncAsyncTest() throws Exception {
+ PingRequest rq1 =
PingRequest.newBuilder().setPingName("rq1").setPingId(100).build();
+ PingRequest rq2 =
PingRequest.newBuilder().setPingName("rq2").setPingId(200).build();
+ List<PongResponse> responses = new ArrayList<>();
+ AtomicBoolean onCompleted = new AtomicBoolean(false);
+ AtomicBoolean onError = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ var requests = stub.pingAsyncAsync(new StreamObserver<PongResponse>() {
+ @Override
+ public void onNext(PongResponse pongResponse) {
+ responses.add(pongResponse);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ onError.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ onCompleted.set(true);
+ latch.countDown();
+ }
+ });
+ requests.onNext(rq1);
+ requests.onNext(rq2);
+ requests.onNext(rq2);
+ requests.onCompleted();
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertFalse(routeHasException.get());
+ assertEquals(6, responses.size());
+ assertTrue(onCompleted.get());
+ assertFalse(onError.get());
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(Exception.class).process(e ->
routeHasException.set(true));
+ from("grpc://localhost:" + GRPC_ROUTE_PORT +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?routeControlledStreamObserver=true" +
+ "&consumerStrategy=DELEGATION" +
+ "&forwardOnError=true" +
+ "&forwardOnCompleted=true")
+ .toD("grpc://localhost:" + GRPC_STUB_PORT +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?method=${header.CamelGrpcMethodName}" +
+ "&producerStrategy=STREAMING" +
+ "&streamRepliesTo=direct:next" +
+ "&forwardOnError=true" +
+ "&forwardOnCompleted=true" +
+ "&inheritExchangePropertiesForReplies=true");
+ from("direct:next")
+
.to("grpc://dummy:80/?toRouteControlledStreamObserver=true");
+ }
+ };
+ }
+
+ static class PingPongImpl extends PingPongGrpc.PingPongImplBase {
+
+ @Override
+ public StreamObserver<PingRequest>
pingAsyncAsync(StreamObserver<PongResponse> responseObserver) {
+ return new StreamObserver<PingRequest>() {
+ @Override
+ public void onNext(PingRequest pingRequest) {
+ var rs = PongResponse.newBuilder()
+ .setPongId(pingRequest.getPingId())
+ .setPongName(pingRequest.getPingName() + "-rs")
+ .build();
+ responseObserver.onNext(rs);
+ responseObserver.onNext(rs);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ responseObserver.onError(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ }
+ };
+ }
+ }
+}
diff --git
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncSyncTest.java
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncSyncTest.java
new file mode 100644
index 00000000000..a2a8bab9363
--- /dev/null
+++
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxyAsyncSyncTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class GrpcProxyAsyncSyncTest extends CamelTestSupport {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GrpcProxyAsyncSyncTest.class);
+
+ private static final int GRPC_STUB_PORT =
AvailablePortFinder.getNextAvailable();
+ private static final int GRPC_ROUTE_PORT =
AvailablePortFinder.getNextAvailable();
+
+ private static Server grpcServer;
+ private ManagedChannel channel;
+ private PingPongGrpc.PingPongStub stub;
+ private final AtomicBoolean routeHasException = new AtomicBoolean(false);
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ grpcServer = ServerBuilder.forPort(GRPC_STUB_PORT).addService(new
PingPongImpl()).build().start();
+ LOG.info("gRPC server started on port {}", GRPC_STUB_PORT);
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ if (grpcServer != null) {
+ grpcServer.shutdown();
+ LOG.info("gRPC server stopped");
+ }
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ channel = ManagedChannelBuilder.forAddress("localhost",
GRPC_ROUTE_PORT).usePlaintext().build();
+ stub = PingPongGrpc.newStub(channel);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void asyncSyncTest() throws Exception {
+ PingRequest rq =
PingRequest.newBuilder().setPingName("rq").setPingId(1).build();
+ List<PongResponse> responses = new ArrayList<>();
+ AtomicBoolean onCompleted = new AtomicBoolean(false);
+ AtomicBoolean onError = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ StreamObserver<PingRequest> requests = stub.pingAsyncSync(new
StreamObserver<PongResponse>() {
+ @Override
+ public void onNext(PongResponse pongResponse) {
+ responses.add(pongResponse);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ onError.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ onCompleted.set(true);
+ latch.countDown();
+ }
+ });
+ requests.onNext(rq);
+ requests.onNext(rq);
+ requests.onCompleted();
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertFalse(routeHasException.get());
+ assertEquals(1, responses.size());
+ assertTrue(onCompleted.get());
+ assertFalse(onError.get());
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(Exception.class).process(e ->
routeHasException.set(true));
+ from("grpc://localhost:" + GRPC_ROUTE_PORT +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?routeControlledStreamObserver=true" +
+ "&consumerStrategy=DELEGATION" +
+ "&forwardOnError=true" +
+ "&forwardOnCompleted=true")
+ .toD("grpc://localhost:" + GRPC_STUB_PORT +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?method=${header.CamelGrpcMethodName}" +
+ "&producerStrategy=STREAMING" +
+ "&streamRepliesTo=direct:next" +
+ "&forwardOnError=true" +
+ "&forwardOnCompleted=true" +
+ "&inheritExchangePropertiesForReplies=true");
+ from("direct:next")
+
.to("grpc://dummy:80/?toRouteControlledStreamObserver=true");
+ }
+ };
+ }
+
+ static class PingPongImpl extends PingPongGrpc.PingPongImplBase {
+
+ @Override
+ public StreamObserver<PingRequest>
pingAsyncSync(StreamObserver<PongResponse> responseObserver) {
+ return new StreamObserver<PingRequest>() {
+ @Override
+ public void onNext(PingRequest pingRequest) {
+ LOG.info(pingRequest.toString());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ PongResponse response01 =
PongResponse.newBuilder().setPongName("rs").setPongId(1).build();
+ responseObserver.onNext(response01);
+ responseObserver.onCompleted();
+ }
+ };
+ }
+ }
+}
diff --git
a/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxySyncAsyncTest.java
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxySyncAsyncTest.java
new file mode 100644
index 00000000000..7f24234b269
--- /dev/null
+++
b/components/camel-grpc/src/test/java/org/apache/camel/component/grpc/GrpcProxySyncAsyncTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.grpc;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+import io.grpc.stub.StreamObserver;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class GrpcProxySyncAsyncTest extends CamelTestSupport {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GrpcProxySyncAsyncTest.class);
+
+ private static final int GRPC_STUB_PORT =
AvailablePortFinder.getNextAvailable();
+ private static final int GRPC_ROUTE_PORT =
AvailablePortFinder.getNextAvailable();
+
+ private static Server grpcServer;
+ private ManagedChannel channel;
+ private PingPongGrpc.PingPongStub stub;
+ private final AtomicBoolean routeHasException = new AtomicBoolean(false);
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ grpcServer = ServerBuilder.forPort(GRPC_STUB_PORT).addService(new
PingPongImpl()).build().start();
+ LOG.info("gRPC server started on port {}", GRPC_STUB_PORT);
+ }
+
+ @AfterAll
+ public static void afterAll() {
+ if (grpcServer != null) {
+ grpcServer.shutdown();
+ LOG.info("gRPC server stopped");
+ }
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ channel = ManagedChannelBuilder.forAddress("localhost",
GRPC_ROUTE_PORT).usePlaintext().build();
+ stub = PingPongGrpc.newStub(channel);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void syncAsyncTest() throws Exception {
+ PingRequest rq =
PingRequest.newBuilder().setPingName("rq").setPingId(1).build();
+ List<PongResponse> responses = new ArrayList<>();
+ AtomicBoolean onCompleted = new AtomicBoolean(false);
+ AtomicBoolean onError = new AtomicBoolean(false);
+ CountDownLatch latch = new CountDownLatch(1);
+ stub.pingSyncAsync(rq, new StreamObserver<PongResponse>() {
+ @Override
+ public void onNext(PongResponse pongResponse) {
+ responses.add(pongResponse);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ onError.set(true);
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ onCompleted.set(true);
+ latch.countDown();
+ }
+ });
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertFalse(routeHasException.get());
+ assertEquals(2, responses.size());
+ assertTrue(onCompleted.get());
+ assertFalse(onError.get());
+ assertEquals("rq-rs-1", responses.get(0).getPongName());
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(Exception.class).process(e ->
routeHasException.set(true));
+ from("grpc://localhost:" + GRPC_ROUTE_PORT +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?routeControlledStreamObserver=true")
+ .toD("grpc://localhost:" + GRPC_STUB_PORT +
+ "/org.apache.camel.component.grpc.PingPong" +
+ "?method=${header.CamelGrpcMethodName}" +
+ "&streamRepliesTo=direct:next" +
+ "&forwardOnError=true" +
+ "&forwardOnCompleted=true" +
+ "&inheritExchangePropertiesForReplies=true");
+ from("direct:next")
+
.to("grpc://dummy:80/?toRouteControlledStreamObserver=true");
+ }
+ };
+ }
+
+ static class PingPongImpl extends PingPongGrpc.PingPongImplBase {
+
+ @Override
+ public void pingSyncAsync(PingRequest request,
StreamObserver<PongResponse> responseObserver) {
+ LOG.info("gRPC server received data from PingAsyncResponse service
PingId={} PingName={}",
+ request.getPingId(),
+ request.getPingName());
+ PongResponse response01
+ =
PongResponse.newBuilder().setPongName(request.getPingName() +
"-rs-1").setPongId(1).build();
+ PongResponse response02
+ =
PongResponse.newBuilder().setPongName(request.getPingName() +
"-rs-2").setPongId(2).build();
+ responseObserver.onNext(response01);
+ responseObserver.onNext(response02);
+ responseObserver.onCompleted();
+ }
+ }
+}
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java
index 9fb880a9914..41542bb9b26 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/GrpcEndpointBuilderFactory.java
@@ -149,7 +149,11 @@ public interface GrpcEndpointBuilderFactory {
* transferred to the flow, and the accumulated responses will be sent
* to the sender. If a propagation strategy is selected, request is
sent
* to the stream, and the response will be immediately sent back to the
- * sender.
+ * sender. If a delegation strategy is selected, request is sent to the
+ * stream, but no response generated under the assumption that all
+ * necessary responses will be sent at another part of route.
Delegation
+ * strategy always comes with routeControlledStreamObserver=true to be
+ * able to achieve the assumption.
*
* The option is a:
*
<code>org.apache.camel.component.grpc.GrpcConsumerStrategy</code>
type.
@@ -172,7 +176,11 @@ public interface GrpcEndpointBuilderFactory {
* transferred to the flow, and the accumulated responses will be sent
* to the sender. If a propagation strategy is selected, request is
sent
* to the stream, and the response will be immediately sent back to the
- * sender.
+ * sender. If a delegation strategy is selected, request is sent to the
+ * stream, but no response generated under the assumption that all
+ * necessary responses will be sent at another part of route.
Delegation
+ * strategy always comes with routeControlledStreamObserver=true to be
+ * able to achieve the assumption.
*
* The option will be converted to a
*
<code>org.apache.camel.component.grpc.GrpcConsumerStrategy</code>
type.
@@ -843,6 +851,41 @@ public interface GrpcEndpointBuilderFactory {
doSetProperty("autoDiscoverClientInterceptors",
autoDiscoverClientInterceptors);
return this;
}
+ /**
+ * Copies exchange properties from original exchange to all exchanges
+ * created for route defined by streamRepliesTo.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: producer
+ *
+ * @param inheritExchangePropertiesForReplies the value to set
+ * @return the dsl builder
+ */
+ default GrpcEndpointProducerBuilder
inheritExchangePropertiesForReplies(
+ boolean inheritExchangePropertiesForReplies) {
+ doSetProperty("inheritExchangePropertiesForReplies",
inheritExchangePropertiesForReplies);
+ return this;
+ }
+ /**
+ * Copies exchange properties from original exchange to all exchanges
+ * created for route defined by streamRepliesTo.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: producer
+ *
+ * @param inheritExchangePropertiesForReplies the value to set
+ * @return the dsl builder
+ */
+ default GrpcEndpointProducerBuilder
inheritExchangePropertiesForReplies(
+ String inheritExchangePropertiesForReplies) {
+ doSetProperty("inheritExchangePropertiesForReplies",
inheritExchangePropertiesForReplies);
+ return this;
+ }
/**
* gRPC method name.
*
@@ -915,6 +958,43 @@ public interface GrpcEndpointBuilderFactory {
doSetProperty("streamRepliesTo", streamRepliesTo);
return this;
}
+ /**
+ * Expects that exchange property GrpcConstants.GRPC_RESPONSE_OBSERVER
+ * is set. Takes its value and calls onNext, onError and onComplete on
+ * that StreamObserver. All other gRPC parameters are ignored.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: producer
+ *
+ * @param toRouteControlledStreamObserver the value to set
+ * @return the dsl builder
+ */
+ default GrpcEndpointProducerBuilder toRouteControlledStreamObserver(
+ boolean toRouteControlledStreamObserver) {
+ doSetProperty("toRouteControlledStreamObserver",
toRouteControlledStreamObserver);
+ return this;
+ }
+ /**
+ * Expects that exchange property GrpcConstants.GRPC_RESPONSE_OBSERVER
+ * is set. Takes its value and calls onNext, onError and onComplete on
+ * that StreamObserver. All other gRPC parameters are ignored.
+ *
+ * The option will be converted to a <code>boolean</code>
+ * type.
+ *
+ * Default: false
+ * Group: producer
+ *
+ * @param toRouteControlledStreamObserver the value to set
+ * @return the dsl builder
+ */
+ default GrpcEndpointProducerBuilder toRouteControlledStreamObserver(
+ String toRouteControlledStreamObserver) {
+ doSetProperty("toRouteControlledStreamObserver",
toRouteControlledStreamObserver);
+ return this;
+ }
/**
* The user agent header passed to the server.
*