This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 5ef6339 CAMEL-14514: Improve performance of Camel Google Pub/Sub
(#3551)
5ef6339 is described below
commit 5ef63390a07ac6aace7162718b98344897c90aa8
Author: Alvin Kwekel <[email protected]>
AuthorDate: Fri Feb 28 10:02:12 2020 +0100
CAMEL-14514: Improve performance of Camel Google Pub/Sub (#3551)
CAMEL-14514: Improve performance of Camel Google Pub/Sub
---
components/camel-google-pubsub/pom.xml | 90 ++-------
.../pubsub/GooglePubsubComponentConfigurer.java | 9 +-
.../pubsub/GooglePubsubEndpointConfigurer.java | 4 +-
.../component/google/pubsub/google-pubsub.json | 9 +-
.../src/main/docs/google-pubsub-component.adoc | 59 +++---
.../google/pubsub/GooglePubsubComponent.java | 150 +++++++++++++--
.../pubsub/GooglePubsubConnectionFactory.java | 210 ---------------------
.../google/pubsub/GooglePubsubConstants.java | 1 -
.../google/pubsub/GooglePubsubConsumer.java | 127 +++++++------
.../google/pubsub/GooglePubsubEndpoint.java | 43 ++---
.../google/pubsub/GooglePubsubProducer.java | 94 ++++-----
.../AcknowledgeAsync.java} | 28 +--
.../google/pubsub/consumer/AcknowledgeSync.java | 75 ++++++++
.../pubsub/consumer/CamelMessageReceiver.java | 73 +++++++
.../pubsub/consumer/ExchangeAckTransaction.java | 79 --------
.../pubsub/consumer/PubsubAcknowledgement.java | 68 -------
.../component/google/pubsub/PubsubTestSupport.java | 114 ++++++-----
.../google/pubsub/integration/AckModeNoneTest.java | 19 +-
.../pubsub/integration/AcknowledgementTest.java | 6 +-
.../google/pubsub/integration/BodyTypesTest.java | 13 +-
.../integration/GroupedExchangeRoundtripTest.java | 11 +-
.../integration/PubsubConnectionFactoryTest.java | 53 ------
.../integration/SingleExchangeRoundtripTest.java | 9 +-
.../google/pubsub/unit/PubsubComponentTest.java | 41 ----
.../google/pubsub/unit/PubsubEndpointTest.java | 5 +-
.../src/test/resources/simple.properties | 8 +-
.../dsl/GooglePubsubComponentBuilderFactory.java | 54 +++++-
.../ROOT/pages/google-pubsub-component.adoc | 60 +++---
parent/pom.xml | 20 +-
.../karaf/features/src/main/resources/features.xml | 51 ++++-
30 files changed, 727 insertions(+), 856 deletions(-)
diff --git a/components/camel-google-pubsub/pom.xml
b/components/camel-google-pubsub/pom.xml
index f2e8178..8c524f4 100644
--- a/components/camel-google-pubsub/pom.xml
+++ b/components/camel-google-pubsub/pom.xml
@@ -44,13 +44,14 @@
<artifactId>camel-support</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
+ <groupId>com.google.cloud</groupId>
+ <artifactId>google-cloud-pubsub</artifactId>
+ <version>${google-cloud-pubsub-version}</version>
</dependency>
<dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-pubsub</artifactId>
- <version>${google-api-services-pubsub-version}</version>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${google-pubsub-guava-version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
@@ -72,6 +73,12 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers-version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -129,77 +136,4 @@
</plugin>
</plugins>
</reporting>
-
- <profiles>
- <profile>
- <id>google-pubsub-test</id>
- <build>
- <plugins>
-
- <!--
- PUBSUB EMULATOR
- depends on Google SDK being installed and available in
$PATH
- https://cloud.google.com/sdk/downloads
- -->
- <plugin>
- <groupId>com.bazaarvoice.maven.plugins</groupId>
- <artifactId>process-exec-maven-plugin</artifactId>
- <version>0.7</version>
- <executions>
-
- <!-- Start : compile, before tests -->
- <execution>
- <id>pubsub-emulator</id>
- <phase>test-compile</phase>
- <goals>
- <goal>start</goal>
- </goals>
- <configuration>
- <name>Google PubSub Emulator</name>
- <waitForInterrupt>false</waitForInterrupt>
-
<healthcheckUrl>http://localhost:8383</healthcheckUrl>
- <workingDir>/tmp</workingDir>
- <arguments>
- <argument>java</argument>
- <argument>
-
-Djava.util.logging.config.file=${project.basedir}/src/test/resources/logging.properties
- </argument>
- <argument>-jar</argument>
- <argument>
-
${env.GCLOUD_SDK_PATH}/platform/pubsub-emulator/lib/cloud-pubsub-emulator-0.1-SNAPSHOT-all.jar
- </argument>
- <argument>--port=8383</argument>
- </arguments>
- </configuration>
- </execution>
-
- <!-- Stop : package, after test -->
- <execution>
- <id>stop-all</id>
- <phase>prepare-package</phase>
- <goals>
- <goal>stop-all</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <childDelegation>false</childDelegation>
- <useFile>true</useFile>
- <forkCount>1</forkCount>
- <reuseForks>true</reuseForks>
-
<forkedProcessTimeoutInSeconds>300</forkedProcessTimeoutInSeconds>
- <includes>
- <include>**/*Test.java</include>
- </includes>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
-
</project>
diff --git
a/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
b/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
index b1db848..77110a4 100644
---
a/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
+++
b/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubComponentConfigurer.java
@@ -15,14 +15,19 @@ public class GooglePubsubComponentConfigurer extends
PropertyConfigurerSupport i
public boolean configure(CamelContext camelContext, Object obj, String
name, Object value, boolean ignoreCase) {
GooglePubsubComponent target = (GooglePubsubComponent) obj;
switch (ignoreCase ? name.toLowerCase() : name) {
- case "connectionfactory":
- case "connectionFactory":
target.setConnectionFactory(property(camelContext,
org.apache.camel.component.google.pubsub.GooglePubsubConnectionFactory.class,
value)); return true;
+ case "endpoint": target.setEndpoint(property(camelContext,
java.lang.String.class, value)); return true;
case "bridgeerrorhandler":
case "bridgeErrorHandler":
target.setBridgeErrorHandler(property(camelContext, boolean.class, value));
return true;
case "lazystartproducer":
case "lazyStartProducer":
target.setLazyStartProducer(property(camelContext, boolean.class, value));
return true;
+ case "publishercachesize":
+ case "publisherCacheSize":
target.setPublisherCacheSize(property(camelContext, int.class, value)); return
true;
+ case "publishercachetimeout":
+ case "publisherCacheTimeout":
target.setPublisherCacheTimeout(property(camelContext, int.class, value));
return true;
case "basicpropertybinding":
case "basicPropertyBinding":
target.setBasicPropertyBinding(property(camelContext, boolean.class, value));
return true;
+ case "publisherterminationtimeout":
+ case "publisherTerminationTimeout":
target.setPublisherTerminationTimeout(property(camelContext, int.class,
value)); return true;
default: return false;
}
}
diff --git
a/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
b/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
index 94a0c74..c9d39e6 100644
---
a/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
+++
b/components/camel-google-pubsub/src/generated/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpointConfigurer.java
@@ -19,12 +19,12 @@ public class GooglePubsubEndpointConfigurer extends
PropertyConfigurerSupport im
case "ackMode": target.setAckMode(property(camelContext,
org.apache.camel.component.google.pubsub.GooglePubsubConstants.AckMode.class,
value)); return true;
case "concurrentconsumers":
case "concurrentConsumers":
target.setConcurrentConsumers(property(camelContext, java.lang.Integer.class,
value)); return true;
- case "connectionfactory":
- case "connectionFactory":
target.setConnectionFactory(property(camelContext,
org.apache.camel.component.google.pubsub.GooglePubsubConnectionFactory.class,
value)); return true;
case "loggerid":
case "loggerId": target.setLoggerId(property(camelContext,
java.lang.String.class, value)); return true;
case "maxmessagesperpoll":
case "maxMessagesPerPoll":
target.setMaxMessagesPerPoll(property(camelContext, java.lang.Integer.class,
value)); return true;
+ case "synchronouspull":
+ case "synchronousPull":
target.setSynchronousPull(property(camelContext, boolean.class, value)); return
true;
case "bridgeerrorhandler":
case "bridgeErrorHandler":
target.setBridgeErrorHandler(property(camelContext, boolean.class, value));
return true;
case "exceptionhandler":
diff --git
a/components/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
b/components/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
index 4483fda..063c37b 100644
---
a/components/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
+++
b/components/camel-google-pubsub/src/generated/resources/org/apache/camel/component/google/pubsub/google-pubsub.json
@@ -19,19 +19,22 @@
"version": "3.2.0-SNAPSHOT"
},
"componentProperties": {
- "connectionFactory": { "kind": "property", "displayName": "Connection
Factory", "group": "common", "required": false, "type": "object", "javaType":
"org.apache.camel.component.google.pubsub.GooglePubsubConnectionFactory",
"deprecated": false, "secret": false, "description": "Sets the connection
factory to use: provides the ability to explicitly manage connection
credentials: - the path to the key file - the Service Account Key \/ Email
pair" },
+ "endpoint": { "kind": "property", "displayName": "Endpoint", "group":
"common", "label": "common", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "secret": false, "description":
"Endpoint to use with local Pub\/Sub emulator." },
"bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error
Handler", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions occurred while the
consumer is trying to pickup incoming messages, or the likes, will now be
processed as a message and handled by [...]
"lazyStartProducer": { "kind": "property", "displayName": "Lazy Start
Producer", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a producer may otherwise
fail during starting and cause the r [...]
- "basicPropertyBinding": { "kind": "property", "displayName": "Basic
Property Binding", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": false, "description": "Whether the component should use basic
property binding (Camel 2.x) or the newer property binding with additional
capabilities" }
+ "publisherCacheSize": { "kind": "property", "displayName": "Publisher
Cache Size", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "secret": false,
"description": "Maximum number of producers to cache. This could be increased
if you have producers for lots of different topics." },
+ "publisherCacheTimeout": { "kind": "property", "displayName": "Publisher
Cache Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "secret": false,
"description": "How many milliseconds should each producer stay alive in the
cache." },
+ "basicPropertyBinding": { "kind": "property", "displayName": "Basic
Property Binding", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": false, "description": "Whether the component should use basic
property binding (Camel 2.x) or the newer property binding with additional
capabilities" },
+ "publisherTerminationTimeout": { "kind": "property", "displayName":
"Publisher Termination Timeout", "group": "advanced", "label": "advanced",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"secret": false, "description": "How many milliseconds should a producer be
allowed to terminate." }
},
"properties": {
"projectId": { "kind": "path", "displayName": "Project Id", "group":
"common", "label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "secret":
false, "description": "Project Id" },
"destinationName": { "kind": "path", "displayName": "Destination Name",
"group": "common", "label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "secret":
false, "description": "Destination Name" },
"ackMode": { "kind": "parameter", "displayName": "Ack Mode", "group":
"common", "label": "", "required": false, "type": "object", "javaType":
"org.apache.camel.component.google.pubsub.GooglePubsubConstants.AckMode",
"enum": [ "AUTO", "NONE" ], "deprecated": false, "secret": false,
"defaultValue": "AUTO", "description": "AUTO = exchange gets ack'ed\/nack'ed on
completion. NONE = downstream process has to ack\/nack explicitly" },
"concurrentConsumers": { "kind": "parameter", "displayName": "Concurrent
Consumers", "group": "common", "label": "", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret":
false, "defaultValue": "1", "description": "The number of parallel streams
consuming from the subscription" },
- "connectionFactory": { "kind": "parameter", "displayName": "Connection
Factory", "group": "common", "label": "", "required": false, "type": "object",
"javaType":
"org.apache.camel.component.google.pubsub.GooglePubsubConnectionFactory",
"deprecated": false, "secret": false, "description": "ConnectionFactory to
obtain connection to PubSub Service. If non provided the default one will be
used" },
"loggerId": { "kind": "parameter", "displayName": "Logger Id", "group":
"common", "label": "", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "secret": false, "description":
"Logger ID to use when a match to the parent route required" },
"maxMessagesPerPoll": { "kind": "parameter", "displayName": "Max Messages
Per Poll", "group": "common", "label": "", "required": false, "type":
"integer", "javaType": "java.lang.Integer", "deprecated": false, "secret":
false, "defaultValue": "1", "description": "The max number of messages to
receive from the server in a single API call" },
+ "synchronousPull": { "kind": "parameter", "displayName": "Synchronous
Pull", "group": "common", "label": "", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue":
"false", "description": "Synchronously pull batches of messages" },
"bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error
Handler", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions occurred while the
consumer is trying to pickup incoming messages, or the likes, will now be
processed as a message and handled b [...]
"exceptionHandler": { "kind": "parameter", "displayName": "Exception
Handler", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "secret": false, "description": "To let the consumer use a
custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled
then this option is not in use. By default the consumer will deal with [...]
"exchangePattern": { "kind": "parameter", "displayName": "Exchange
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut",
"InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
diff --git
a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
index 2f96dde..5fab27e 100644
--- a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
+++ b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
@@ -9,9 +9,7 @@
The Google Pubsub component provides access
to https://cloud.google.com/pubsub/[Cloud Pub/Sub Infrastructure] via
-the https://cloud.google.com/apis/docs/client-libraries-explained[Google
Client Services API].
-
-The current implementation does not use gRPC.
+the https://github.com/googleapis/java-pubsub[Google Cloud Java Client for
Google Cloud Pub/Sub].
Maven users will need to add the following dependency to their pom.xml
for this component:
@@ -39,17 +37,20 @@ Destination Name can be either a topic or a subscription
name.
== Options
// component options: START
-The Google Pubsub component supports 4 options, which are listed below.
+The Google Pubsub component supports 7 options, which are listed below.
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
-| *connectionFactory* (common) | Sets the connection factory to use: provides
the ability to explicitly manage connection credentials: - the path to the key
file - the Service Account Key / Email pair | | GooglePubsubConnectionFactory
+| *endpoint* (common) | Endpoint to use with local Pub/Sub emulator. | |
String
| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions occurred while the
consumer is trying to pickup incoming messages, or the likes, will now be
processed as a message and handled by the routing Error Handler. By default the
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with
exceptions, that will be logged at WARN or ERROR level and ignored. | false |
boolean
| *lazyStartProducer* (producer) | Whether the producer should be started lazy
(on the first message). By starting lazy you can use this to allow CamelContext
and routes to startup in situations where a producer may otherwise fail during
starting and cause the route to fail being started. By deferring this startup
to be lazy then the startup failure can be handled during routing messages via
Camel's routing error handlers. Beware that when the first message is processed
then creating and [...]
+| *publisherCacheSize* (producer) | Maximum number of producers to cache. This
could be increased if you have producers for lots of different topics. | | int
+| *publisherCacheTimeout* (producer) | How many milliseconds should each
producer stay alive in the cache. | | int
| *basicPropertyBinding* (advanced) | Whether the component should use basic
property binding (Camel 2.x) or the newer property binding with additional
capabilities | false | boolean
+| *publisherTerminationTimeout* (advanced) | How many milliseconds should a
producer be allowed to terminate. | | int
|===
// component options: END
@@ -81,9 +82,9 @@ with the following path and query parameters:
| Name | Description | Default | Type
| *ackMode* (common) | AUTO = exchange gets ack'ed/nack'ed on completion. NONE
= downstream process has to ack/nack explicitly. The value can be one of: AUTO,
NONE | AUTO | AckMode
| *concurrentConsumers* (common) | The number of parallel streams consuming
from the subscription | 1 | Integer
-| *connectionFactory* (common) | ConnectionFactory to obtain connection to
PubSub Service. If non provided the default one will be used | |
GooglePubsubConnectionFactory
| *loggerId* (common) | Logger ID to use when a match to the parent route
required | | String
| *maxMessagesPerPoll* (common) | The max number of messages to receive from
the server in a single API call | 1 | Integer
+| *synchronousPull* (common) | Synchronously pull batches of messages | false
| boolean
| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions occurred while the
consumer is trying to pickup incoming messages, or the likes, will now be
processed as a message and handled by the routing Error Handler. By default the
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with
exceptions, that will be logged at WARN or ERROR level and ignored. | false |
boolean
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
option is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. The value can be one of: InOnly, InOut, InOptionalOut | |
ExchangePattern
@@ -92,6 +93,34 @@ with the following path and query parameters:
| *synchronous* (advanced) | Sets whether synchronous processing should be
strictly used, or Camel is allowed to use asynchronous processing (if
supported). | false | boolean
|===
// endpoint options: END
+// spring-boot-auto-configure options: START
+== Spring Boot Auto-Configuration
+
+When using Spring Boot make sure to use the following Maven dependency to have
support for auto configuration:
+
+[source,xml]
+----
+<dependency>
+ <groupId>org.apache.camel.springboot</groupId>
+ <artifactId>camel-google-pubsub-starter</artifactId>
+ <version>x.x.x</version>
+ <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+
+The component supports 8 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *camel.component.google-pubsub.basic-property-binding* | Whether the
component should use basic property binding (Camel 2.x) or the newer property
binding with additional capabilities | false | Boolean
+| *camel.component.google-pubsub.bridge-error-handler* | Allows for bridging
the consumer to the Camel routing Error Handler, which mean any exceptions
occurred while the consumer is trying to pickup incoming messages, or the
likes, will now be processed as a message and handled by the routing Error
Handler. By default the consumer will use the
org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be
logged at WARN or ERROR level and ignored. | false | Boolean
+| *camel.component.google-pubsub.enabled* | Whether to enable auto
configuration of the google-pubsub component. This is enabled by default. | |
Boolean
+|===
+// spring-boot-auto-configure options: END
== Producer Endpoints
@@ -128,7 +157,6 @@ Headers set by the consumer endpoints:
* GooglePubsubConstants.MESSAGE_ID
* GooglePubsubConstants.ATTRIBUTES
* GooglePubsubConstants.PUBLISH_TIME
-* GooglePubsubConstants.ACK_ID
== Message Body
@@ -137,22 +165,7 @@ It is up for the route to convert/unmarshall the contents.
== Authentication Configuration
-Google Pubsub component authentication is targeted for use with the GCP
Service Accounts.
-For more information please refer to
https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide]
-
-Google security credentials can be set explicitly via one of the two options:
-
-* Service Account Email and Service Account Key (PEM format)
-* GCP credentials file location
-
-If both are set, the Service Account Email/Key will take precedence.
-
-Or implicitly, where the connection factory falls back on
-https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application
Default Credentials].
-
-*OBS!* The location of the default credentials file is configurable - via
GOOGLE_APPLICATION_CREDENTIALS environment variable.
-
-Service Account Email and Service Account Key can be found in the GCP JSON
credentials file as client_email and private_key respectively.
+The location of the default credentials file is configurable - via
GOOGLE_APPLICATION_CREDENTIALS environment variable.
== Rollback and Redelivery
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
index d022b8f..f775b12 100644
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
+++
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubComponent.java
@@ -16,11 +16,31 @@
*/
package org.apache.camel.component.google.pubsub;
+import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
import org.apache.camel.Endpoint;
+import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.StringHelper;
/**
* Represents the component that manages {@link GooglePubsubEndpoint}.
@@ -28,7 +48,48 @@ import org.apache.camel.support.DefaultComponent;
@Component("google-pubsub")
public class GooglePubsubComponent extends DefaultComponent {
- private GooglePubsubConnectionFactory connectionFactory;
+ @Metadata(
+ label = "common",
+ description = "Endpoint to use with local Pub/Sub emulator."
+ )
+ private String endpoint;
+
+ @Metadata(
+ label = "producer",
+ description = "Maximum number of producers to cache. This could be
increased if you have producers for lots of different topics."
+ )
+ private int publisherCacheSize = 100;
+
+ @Metadata(
+ label = "producer",
+ description = "How many milliseconds should each producer stay
alive in the cache."
+ )
+ private int publisherCacheTimeout = 180000;
+
+ @Metadata(
+ label = "advanced",
+ description = "How many milliseconds should a producer be allowed
to terminate."
+ )
+ private int publisherTerminationTimeout = 60000;
+
+ private RemovalListener<String, Publisher> removalListener = removal -> {
+ Publisher publisher = removal.getValue();
+ if (publisher == null) {
+ return;
+ }
+ publisher.shutdown();
+ try {
+ publisher.awaitTermination(publisherTerminationTimeout,
TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ private Cache<String, Publisher> cachedPublishers =
CacheBuilder.newBuilder()
+ .expireAfterWrite(publisherCacheTimeout, TimeUnit.MILLISECONDS)
+ .maximumSize(publisherCacheSize)
+ .removalListener(removalListener)
+ .build();
public GooglePubsubComponent() {
}
@@ -51,19 +112,84 @@ public class GooglePubsubComponent extends
DefaultComponent {
return pubsubEndpoint;
}
- /**
- * Sets the connection factory to use: provides the ability to explicitly
- * manage connection credentials: - the path to the key file - the Service
- * Account Key / Email pair
- */
- public GooglePubsubConnectionFactory getConnectionFactory() {
- if (connectionFactory == null) {
- connectionFactory = new GooglePubsubConnectionFactory();
+ @Override
+ protected void doShutdown() throws Exception {
+ cachedPublishers.cleanUp();
+ super.doShutdown();
+ }
+
+ public Publisher getPublisher(String topicName) throws ExecutionException {
+ return cachedPublishers.get(topicName, () ->
buildPublisher(topicName));
+ }
+
+ private Publisher buildPublisher(String topicName) throws IOException {
+ Publisher.Builder builder = Publisher.newBuilder(topicName);
+ if (StringHelper.trimToNull(endpoint) != null) {
+ ManagedChannel channel =
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
+ TransportChannelProvider channelProvider =
+
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ CredentialsProvider credentialsProvider =
NoCredentialsProvider.create();
+
builder.setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider);
+ }
+ return builder.build();
+ }
+
+ public Subscriber getSubscriber(String subscriptionName, MessageReceiver
messageReceiver) {
+ Subscriber.Builder builder = Subscriber.newBuilder(subscriptionName,
messageReceiver);
+ if (StringHelper.trimToNull(endpoint) != null) {
+ ManagedChannel channel =
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
+ TransportChannelProvider channelProvider =
+
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ CredentialsProvider credentialsProvider =
NoCredentialsProvider.create();
+
builder.setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider);
+ }
+ return builder.build();
+ }
+
+ public SubscriberStub getSubscriberStub() throws IOException {
+ SubscriberStubSettings.Builder builder =
SubscriberStubSettings.newBuilder().setTransportChannelProvider(
+
SubscriberStubSettings.defaultGrpcTransportProviderBuilder().build());
+
+ if (StringHelper.trimToNull(endpoint) != null) {
+ ManagedChannel channel =
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
+ TransportChannelProvider channelProvider =
+
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ CredentialsProvider credentialsProvider =
NoCredentialsProvider.create();
+
builder.setTransportChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider);
}
- return connectionFactory;
+ return builder.build().createStub();
}
- public void setConnectionFactory(GooglePubsubConnectionFactory
connectionFactory) {
- this.connectionFactory = connectionFactory;
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public void setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public int getPublisherCacheSize() {
+ return publisherCacheSize;
+ }
+
+ public void setPublisherCacheSize(int publisherCacheSize) {
+ this.publisherCacheSize = publisherCacheSize;
+ }
+
+ public int getPublisherCacheTimeout() {
+ return publisherCacheTimeout;
+ }
+
+ public void setPublisherCacheTimeout(int publisherCacheTimeout) {
+ this.publisherCacheTimeout = publisherCacheTimeout;
+ }
+
+ public int getPublisherTerminationTimeout() {
+ return publisherTerminationTimeout;
+ }
+
+ public void setPublisherTerminationTimeout(int
publisherTerminationTimeout) {
+ this.publisherTerminationTimeout = publisherTerminationTimeout;
}
}
+
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
deleted file mode 100644
index eb49057..0000000
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.google.pubsub;
-
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.security.KeyFactory;
-import java.security.PrivateKey;
-import java.security.spec.PKCS8EncodedKeySpec;
-import java.util.Collection;
-import java.util.Collections;
-
-import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
-import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
-import com.google.api.client.http.HttpTransport;
-import com.google.api.client.http.apache.ApacheHttpTransport;
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.util.Base64;
-import com.google.api.client.util.Strings;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.PubsubScopes;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GooglePubsubConnectionFactory {
-
- private static JsonFactory jsonFactory = new JacksonFactory();
-
- private static final Logger LOG =
LoggerFactory.getLogger(GooglePubsubConnectionFactory.class);
-
- private String serviceAccount;
- private String serviceAccountKey;
- private String credentialsFileLocation;
- private String serviceURL;
-
- private Pubsub client;
-
- public GooglePubsubConnectionFactory() {
- }
-
- public synchronized Pubsub getDefaultClient() throws Exception {
- if (this.client == null) {
- this.client = buildClient();
- }
- return this.client;
- }
-
- public Pubsub getMultiThreadClient(int parallelThreads) throws Exception {
-
- PoolingHttpClientConnectionManager cm = new
PoolingHttpClientConnectionManager();
- cm.setDefaultMaxPerRoute(parallelThreads);
- cm.setMaxTotal(parallelThreads);
- CloseableHttpClient httpClient = HttpClients.createMinimal(cm);
-
- return buildClient(new ApacheHttpTransport(httpClient));
- }
-
- private Pubsub buildClient() throws Exception {
- return buildClient(GoogleNetHttpTransport.newTrustedTransport());
- }
-
- private Pubsub buildClient(HttpTransport httpTransport) throws Exception {
-
- GoogleCredential credential = null;
-
- if (!Strings.isNullOrEmpty(serviceAccount) &&
!Strings.isNullOrEmpty(serviceAccountKey)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Service Account and Key have been set explicitly.
Initialising PubSub using Service Account " + serviceAccount);
- }
- credential = createFromAccountKeyPair(httpTransport);
- }
-
- if (credential == null &&
!Strings.isNullOrEmpty(credentialsFileLocation)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Key File Name has been set explicitly. Initialising
PubSub using Key File " + credentialsFileLocation);
- }
- credential = createFromFile();
- }
-
- if (credential == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No explicit Service Account or Key File Name have
been provided. Initialising PubSub using defaults ");
- }
- credential = createDefault();
- }
-
- Pubsub.Builder builder = new Pubsub.Builder(httpTransport,
jsonFactory, credential).setApplicationName("camel-google-pubsub");
-
- // Local emulator, SOCKS proxy, etc.
- if (serviceURL != null) {
- builder.setRootUrl(serviceURL);
- }
-
- return builder.build();
- }
-
- private GoogleCredential createFromFile() throws Exception {
- try (InputStream is = new FileInputStream(credentialsFileLocation)) {
- GoogleCredential credential = GoogleCredential.fromStream(is);
-
- if (credential.createScopedRequired()) {
- credential = credential.createScoped(PubsubScopes.all());
- }
-
- return credential;
- }
- }
-
- private GoogleCredential createDefault() throws Exception {
- GoogleCredential credential = GoogleCredential.getApplicationDefault();
-
- Collection pubSubScopes =
Collections.singletonList(PubsubScopes.PUBSUB);
-
- if (credential.createScopedRequired()) {
- credential = credential.createScoped(pubSubScopes);
- }
-
- return credential;
- }
-
- private GoogleCredential createFromAccountKeyPair(HttpTransport
httpTransport) {
- try {
- GoogleCredential credential = new
GoogleCredential.Builder().setTransport(httpTransport).setJsonFactory(jsonFactory).setServiceAccountId(serviceAccount)
-
.setServiceAccountScopes(PubsubScopes.all()).setServiceAccountPrivateKey(getPrivateKeyFromString(serviceAccountKey)).build();
-
- return credential;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private PrivateKey getPrivateKeyFromString(String serviceKeyPem) {
- PrivateKey privateKey = null;
- try {
- String privKeyPEM = serviceKeyPem.replace("-----BEGIN PRIVATE
KEY-----", "").replace("-----END PRIVATE KEY-----", "").replace("\r",
"").replace("\n", "");
-
- byte[] encoded = Base64.decodeBase64(privKeyPEM);
-
- PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded);
- privateKey =
KeyFactory.getInstance("RSA").generatePrivate(keySpec);
- } catch (Exception e) {
- String error = "Constructing Private Key from PEM string failed: "
+ e.getMessage();
- LOG.error(error, e);
- throw new RuntimeException(e);
- }
- return privateKey;
- }
-
- public String getServiceAccount() {
- return serviceAccount;
- }
-
- public GooglePubsubConnectionFactory setServiceAccount(String
serviceAccount) {
- this.serviceAccount = serviceAccount;
- resetClient();
- return this;
- }
-
- public String getServiceAccountKey() {
- return serviceAccountKey;
- }
-
- public GooglePubsubConnectionFactory setServiceAccountKey(String
serviceAccountKey) {
- this.serviceAccountKey = serviceAccountKey;
- resetClient();
- return this;
- }
-
- public String getCredentialsFileLocation() {
- return credentialsFileLocation;
- }
-
- public GooglePubsubConnectionFactory setCredentialsFileLocation(String
credentialsFileLocation) {
- this.credentialsFileLocation = credentialsFileLocation;
- resetClient();
- return this;
- }
-
- public String getServiceURL() {
- return serviceURL;
- }
-
- public GooglePubsubConnectionFactory setServiceURL(String serviceURL) {
- this.serviceURL = serviceURL;
- resetClient();
- return this;
- }
-
- private synchronized void resetClient() {
- this.client = null;
- }
-}
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
index 116908b..15cc2dd 100644
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
+++
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
@@ -22,7 +22,6 @@ public final class GooglePubsubConstants {
public static final String ACK_ID = "CamelGooglePubsub.MsgAckId";
public static final String PUBLISH_TIME = "CamelGooglePubsub.PublishTime";
public static final String ATTRIBUTES = "CamelGooglePubsub.Attributes";
- public static final String ACK_DEADLINE = "CamelGooglePubsub.AckDeadline";
public enum AckMode {
AUTO, NONE
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 3f47ff93..cb9fc41 100644
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -16,21 +16,25 @@
*/
package org.apache.camel.component.google.pubsub;
-import java.net.SocketTimeoutException;
+import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import com.google.api.client.repackaged.com.google.common.base.Strings;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.api.core.AbstractApiService;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.common.base.Strings;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
-import
org.apache.camel.component.google.pubsub.consumer.ExchangeAckTransaction;
-import org.apache.camel.spi.Synchronization;
+import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync;
+import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,18 +45,13 @@ class GooglePubsubConsumer extends DefaultConsumer {
private final GooglePubsubEndpoint endpoint;
private final Processor processor;
- private final Synchronization ackStrategy;
-
private ExecutorService executor;
- private Pubsub pubsub;
+ private List<Subscriber> subscribers;
GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor)
throws Exception {
super(endpoint, processor);
this.endpoint = endpoint;
this.processor = processor;
- this.ackStrategy = new ExchangeAckTransaction(this.endpoint);
-
- pubsub =
endpoint.getConnectionFactory().getMultiThreadClient(this.endpoint.getConcurrentConsumers());
String loggerId = endpoint.getLoggerId();
@@ -69,7 +68,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
localLog.info("Starting Google PubSub consumer for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
executor = endpoint.createExecutor();
for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
- executor.submit(new PubsubPoller(i + ""));
+ executor.submit(new SubscriberWrapper());
}
}
@@ -78,9 +77,14 @@ class GooglePubsubConsumer extends DefaultConsumer {
super.doStop();
localLog.info("Stopping Google PubSub consumer for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
+ if (subscribers != null && !subscribers.isEmpty()) {
+ localLog.info("Stopping subscribers for {}/{}",
endpoint.getProjectId(), endpoint.getDestinationName());
+ subscribers.forEach(AbstractApiService::stopAsync);
+ }
+
if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() !=
null) {
-
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
} else {
executor.shutdownNow();
}
@@ -88,67 +92,70 @@ class GooglePubsubConsumer extends DefaultConsumer {
executor = null;
}
- private class PubsubPoller implements Runnable {
-
- private final String subscriptionFullName;
- private final String threadId;
-
- PubsubPoller(String id) {
- this.subscriptionFullName =
String.format("projects/%s/subscriptions/%s",
GooglePubsubConsumer.this.endpoint.getProjectId(),
-
GooglePubsubConsumer.this.endpoint.getDestinationName());
- this.threadId =
GooglePubsubConsumer.this.endpoint.getDestinationName() + "-" + "Thread " + id;
- }
+ private class SubscriberWrapper implements Runnable {
@Override
public void run() {
+ String subscriptionName =
ProjectSubscriptionName.format(endpoint.getProjectId(),
endpoint.getDestinationName());
+
if (localLog.isDebugEnabled()) {
- localLog.debug("Subscribing {} to {}", threadId,
subscriptionFullName);
+ localLog.debug("Subscribing to {}", subscriptionName);
}
- while (isRunAllowed() && !isSuspendingOrSuspended()) {
- try {
- PullRequest pullRequest = new
PullRequest().setMaxMessages(endpoint.getMaxMessagesPerPoll());
- PullResponse pullResponse;
- try {
- if (localLog.isTraceEnabled()) {
- localLog.trace("Polling : {}", threadId);
- }
- pullResponse =
GooglePubsubConsumer.this.pubsub.projects().subscriptions().pull(subscriptionFullName,
pullRequest).execute();
- } catch (SocketTimeoutException ste) {
- if (localLog.isTraceEnabled()) {
- localLog.trace("Socket timeout : {}", threadId);
- }
- continue;
- }
- if (pullResponse.getReceivedMessages() == null) {
- continue;
- }
+ if (endpoint.getSynchronousPull()) {
+ synchronousPull(subscriptionName);
+ } else {
+ asynchronousPull(subscriptionName);
+ }
- List<ReceivedMessage> receivedMessages =
pullResponse.getReceivedMessages();
+ localLog.debug("Exit run for subscription {}", subscriptionName);
+ }
- for (ReceivedMessage receivedMessage : receivedMessages) {
- PubsubMessage pubsubMessage =
receivedMessage.getMessage();
+ private void asynchronousPull(String subscriptionName) {
+ while (isRunAllowed() && !isSuspendingOrSuspended()) {
+ MessageReceiver messageReceiver = new
CamelMessageReceiver(endpoint, processor);
+
+ Subscriber subscriber =
endpoint.getComponent().getSubscriber(subscriptionName, messageReceiver);
+ subscribers.add(subscriber);
+ try {
+ subscriber.startAsync().awaitRunning();
+ subscriber.awaitTerminated();
+ } catch (Exception e) {
+ localLog.error("Failure getting messages from PubSub", e);
+ } finally {
+ localLog.debug("Stopping async subscriber {}",
subscriptionName);
+ subscriber.stopAsync();
+ }
+ }
+ }
- byte[] body = pubsubMessage.decodeData();
+ private void synchronousPull(String subscriptionName) {
+ while (isRunAllowed() && !isSuspendingOrSuspended()) {
+ try (SubscriberStub subscriber =
endpoint.getComponent().getSubscriberStub()) {
- if (localLog.isTraceEnabled()) {
- localLog.trace("Received message ID : {}",
pubsubMessage.getMessageId());
- }
+ PullRequest pullRequest = PullRequest.newBuilder()
+ .setMaxMessages(endpoint.getMaxMessagesPerPoll())
+ .setReturnImmediately(false)
+ .setSubscription(subscriptionName)
+ .build();
+ PullResponse pullResponse =
subscriber.pullCallable().call(pullRequest);
+ for (ReceivedMessage message :
pullResponse.getReceivedMessagesList()) {
+ PubsubMessage pubsubMessage = message.getMessage();
Exchange exchange = endpoint.createExchange();
- exchange.getIn().setBody(body);
+
exchange.getIn().setBody(pubsubMessage.getData().toByteArray());
-
exchange.getIn().setHeader(GooglePubsubConstants.ACK_ID,
receivedMessage.getAckId());
+
exchange.getIn().setHeader(GooglePubsubConstants.ACK_ID, message.getAckId());
exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID,
pubsubMessage.getMessageId());
exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME,
pubsubMessage.getPublishTime());
- if (null !=
receivedMessage.getMessage().getAttributes()) {
-
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES,
receivedMessage.getMessage().getAttributes());
+ if (null != pubsubMessage.getAttributesMap()) {
+
exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES,
pubsubMessage.getAttributesMap());
}
if (endpoint.getAckMode() !=
GooglePubsubConstants.AckMode.NONE) {
-
exchange.adapt(ExtendedExchange.class).addOnCompletion(GooglePubsubConsumer.this.ackStrategy);
+
exchange.adapt(ExtendedExchange.class).addOnCompletion(new
AcknowledgeSync(subscriber, subscriptionName));
}
try {
@@ -157,10 +164,10 @@ class GooglePubsubConsumer extends DefaultConsumer {
exchange.setException(e);
}
}
- } catch (Exception e) {
+ } catch (IOException e) {
localLog.error("Failure getting messages from PubSub", e);
}
}
}
}
-}
+}
\ No newline at end of file
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
index 5b34fd2..73e34f4 100644
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
+++
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
@@ -16,9 +16,14 @@
*/
package org.apache.camel.component.google.pubsub;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
-import com.google.api.services.pubsub.Pubsub;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.ExchangePattern;
@@ -36,7 +41,7 @@ import org.slf4j.LoggerFactory;
/**
* Messaging client for Google Cloud Platform PubSub Service
* <p/>
- * Built on top of the Service API libraries (v1).
+ * Built on top of the Google Cloud Pub/Sub libraries.
*/
@UriEndpoint(firstVersion = "2.19.0", scheme = "google-pubsub", title =
"Google Pubsub", syntax = "google-pubsub:projectId:destinationName", label =
"messaging")
public class GooglePubsubEndpoint extends DefaultEndpoint {
@@ -60,14 +65,12 @@ public class GooglePubsubEndpoint extends DefaultEndpoint {
@UriParam(name = "maxMessagesPerPoll", description = "The max number of
messages to receive from the server in a single API call", defaultValue = "1")
private Integer maxMessagesPerPoll = 1;
- @UriParam(name = "connectionFactory", description = "ConnectionFactory to
obtain connection to PubSub Service. If non provided the default one will be
used")
- private GooglePubsubConnectionFactory connectionFactory;
+ @UriParam(name = "synchronousPull", description = "Synchronously pull
batches of messages", defaultValue = "false")
+ private boolean synchronousPull;
@UriParam(defaultValue = "AUTO", enums = "AUTO,NONE", description = "AUTO
= exchange gets ack'ed/nack'ed on completion. NONE = downstream process has to
ack/nack explicitly")
private GooglePubsubConstants.AckMode ackMode =
GooglePubsubConstants.AckMode.AUTO;
- private Pubsub pubsub;
-
public GooglePubsubEndpoint(String uri, Component component, String
remaining) {
super(uri, component);
@@ -78,7 +81,7 @@ public class GooglePubsubEndpoint extends DefaultEndpoint {
@Override
public GooglePubsubComponent getComponent() {
- return (GooglePubsubComponent)super.getComponent();
+ return (GooglePubsubComponent) super.getComponent();
}
public void afterPropertiesSet() throws Exception {
@@ -91,9 +94,7 @@ public class GooglePubsubEndpoint extends DefaultEndpoint {
// Default pubsub connection.
// With the publisher endpoints - the main publisher
// with the consumer endpoints - the ack client
- pubsub = getConnectionFactory().getDefaultClient();
- log.trace("Credential file location : {}",
getConnectionFactory().getCredentialsFileLocation());
log.trace("Project ID: {}", this.projectId);
log.trace("Destination Name: {}", this.destinationName);
}
@@ -162,27 +163,19 @@ public class GooglePubsubEndpoint extends DefaultEndpoint
{
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
- public GooglePubsubConstants.AckMode getAckMode() {
- return ackMode;
+ public Boolean getSynchronousPull() {
+ return synchronousPull;
}
- public void setAckMode(GooglePubsubConstants.AckMode ackMode) {
- this.ackMode = ackMode;
+ public void setSynchronousPull(Boolean synchronousPull) {
+ this.synchronousPull = synchronousPull;
}
- public Pubsub getPubsub() {
- return pubsub;
- }
-
- /**
- * ConnectionFactory to obtain connection to PubSub Service. If non
provided
- * the default will be used.
- */
- public GooglePubsubConnectionFactory getConnectionFactory() {
- return (null == connectionFactory) ?
getComponent().getConnectionFactory() : connectionFactory;
+ public GooglePubsubConstants.AckMode getAckMode() {
+ return ackMode;
}
- public void setConnectionFactory(GooglePubsubConnectionFactory
connectionFactory) {
- this.connectionFactory = connectionFactory;
+ public void setAckMode(GooglePubsubConstants.AckMode ackMode) {
+ this.ackMode = ackMode;
}
}
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java
index a0cac7e..de4636d 100644
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java
+++
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubProducer.java
@@ -19,15 +19,14 @@ package org.apache.camel.component.google.pubsub;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.google.api.client.util.Strings;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PublishResponse;
-import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
import org.apache.camel.Exchange;
import org.apache.camel.support.DefaultProducer;
import org.slf4j.Logger;
@@ -59,77 +58,54 @@ public class GooglePubsubProducer extends DefaultProducer {
@Override
public void process(Exchange exchange) throws Exception {
- List<Exchange> entryList = prepareExchangeList(exchange);
-
- if (entryList == null || entryList.size() == 0) {
- logger.warn("The incoming message is either null or empty.
Triggered by an aggregation timeout?");
- return;
- }
-
if (logger.isDebugEnabled()) {
logger.debug("uploader thread/id: " +
Thread.currentThread().getId() + " / " + exchange.getExchangeId() + " . api
call completed.");
}
- sendMessages(entryList);
- }
-
- /**
- * The method converts a single incoming message into a List
- */
- private static List<Exchange> prepareExchangeList(Exchange exchange) {
-
- List<Exchange> entryList = null;
-
- if (null == exchange.getProperty(Exchange.GROUPED_EXCHANGE)) {
- entryList = new ArrayList<>();
- entryList.add(exchange);
+ if (exchange.getIn().getBody() instanceof List) {
+ boolean groupedExchanges = false;
+ for (Object body : exchange.getIn().getBody(List.class)) {
+ if (body instanceof Exchange) {
+ send((Exchange) body);
+ groupedExchanges = true;
+ }
+ }
+ if (!groupedExchanges) {
+ send(exchange);
+ }
} else {
- entryList =
(List<Exchange>)exchange.getProperty(Exchange.GROUPED_EXCHANGE);
+ send(exchange);
}
-
- return entryList;
}
- private void sendMessages(List<Exchange> exchanges) throws Exception {
+ private void send(Exchange exchange) throws Exception {
- GooglePubsubEndpoint endpoint = (GooglePubsubEndpoint)getEndpoint();
+ GooglePubsubEndpoint endpoint = (GooglePubsubEndpoint) getEndpoint();
String topicName = String.format("projects/%s/topics/%s",
endpoint.getProjectId(), endpoint.getDestinationName());
- List<PubsubMessage> messages = new ArrayList<>();
-
- for (Exchange exchange : exchanges) {
- PubsubMessage message = new PubsubMessage();
-
- Object body = exchange.getIn().getBody();
+ Publisher publisher = endpoint.getComponent().getPublisher(topicName);
- if (body instanceof String) {
-
message.encodeData(((String)body).getBytes(StandardCharsets.UTF_8));
- } else if (body instanceof byte[]) {
- message.encodeData((byte[])body);
- } else {
- message.encodeData(serialize(body));
- }
-
- Object attributes =
exchange.getIn().getHeader(GooglePubsubConstants.ATTRIBUTES);
+ Object body = exchange.getIn().getBody();
+ ByteString byteString;
- if (attributes != null && attributes instanceof Map &&
((Map)attributes).size() > 0) {
- message.setAttributes((Map)attributes);
- }
-
- messages.add(message);
+ if (body instanceof String) {
+ byteString = ByteString.copyFromUtf8((String) body);
+ } else if (body instanceof byte[]) {
+ byteString = ByteString.copyFrom((byte[]) body);
+ } else {
+ byteString = ByteString.copyFrom(serialize(body));
}
- PublishRequest publishRequest = new
PublishRequest().setMessages(messages);
-
- PublishResponse response =
endpoint.getPubsub().projects().topics().publish(topicName,
publishRequest).execute();
+ PubsubMessage.Builder messageBuilder =
PubsubMessage.newBuilder().setData(byteString);
- List<String> sentMessageIds = response.getMessageIds();
-
- int i = 0;
- for (Exchange entry : exchanges) {
- entry.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID,
sentMessageIds.get(i));
- i++;
+ Map<String, String> attributes =
exchange.getIn().getHeader(GooglePubsubConstants.ATTRIBUTES, Map.class);
+ if (attributes != null) {
+ messageBuilder.putAllAttributes(attributes).build();
}
+ PubsubMessage message = messageBuilder.build();
+
+ ApiFuture<String> messageIdFuture = publisher.publish(message);
+ exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID,
messageIdFuture.get());
}
public static byte[] serialize(Object obj) throws IOException {
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
similarity index 57%
copy from
components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
copy to
components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
index 116908b..1a93db3 100644
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
+++
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeAsync.java
@@ -14,21 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.google.pubsub;
+package org.apache.camel.component.google.pubsub.consumer;
-public final class GooglePubsubConstants {
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Synchronization;
- public static final String MESSAGE_ID = "CamelGooglePubsub.MessageId";
- public static final String ACK_ID = "CamelGooglePubsub.MsgAckId";
- public static final String PUBLISH_TIME = "CamelGooglePubsub.PublishTime";
- public static final String ATTRIBUTES = "CamelGooglePubsub.Attributes";
- public static final String ACK_DEADLINE = "CamelGooglePubsub.AckDeadline";
+public class AcknowledgeAsync implements Synchronization {
- public enum AckMode {
- AUTO, NONE
+ private final AckReplyConsumer ackReplyConsumer;
+
+ public AcknowledgeAsync(AckReplyConsumer ackReplyConsumer) {
+ this.ackReplyConsumer = ackReplyConsumer;
+ }
+
+ @Override
+ public void onComplete(Exchange exchange) {
+ ackReplyConsumer.ack();
}
- private GooglePubsubConstants() {
- // not called
+ @Override
+ public void onFailure(Exchange exchange) {
+ ackReplyConsumer.nack();
}
}
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
new file mode 100644
index 0000000..9756483
--- /dev/null
+++
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/AcknowledgeSync.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
+import org.apache.camel.spi.Synchronization;
+
+public class AcknowledgeSync implements Synchronization {
+
+ private final SubscriberStub subscriber;
+ private final String subscriptionName;
+
+ public AcknowledgeSync(SubscriberStub subscriber, String subscriptionName)
{
+ this.subscriber = subscriber;
+ this.subscriptionName = subscriptionName;
+ }
+
+ @Override
+ public void onComplete(Exchange exchange) {
+ AcknowledgeRequest ackRequest = AcknowledgeRequest.newBuilder()
+ .addAllAckIds(getAckIdList(exchange))
+ .setSubscription(subscriptionName).build();
+ try {
+ subscriber.acknowledgeCallable().call(ackRequest);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ }
+
+ private List<String> getAckIdList(Exchange exchange) {
+ List<String> ackList = new ArrayList<>();
+
+ if (exchange.getIn().getBody() instanceof List) {
+ for (Object body : exchange.getIn().getBody(List.class)) {
+ if (body instanceof Exchange) {
+ String ackId =
exchange.getIn().getHeader(GooglePubsubConstants.ACK_ID, String.class);
+ if (null != ackId) {
+ ackList.add(ackId);
+ }
+ }
+ }
+ }
+
+ String ackId =
exchange.getIn().getHeader(GooglePubsubConstants.ACK_ID, String.class);
+ if (null != ackId) {
+ ackList.add(ackId);
+ }
+
+ return ackList;
+ }
+}
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
new file mode 100644
index 0000000..cf7610e
--- /dev/null
+++
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub.consumer;
+
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.common.base.Strings;
+import com.google.pubsub.v1.PubsubMessage;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
+import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CamelMessageReceiver implements MessageReceiver {
+
+ private final Logger localLog;
+ private final GooglePubsubEndpoint endpoint;
+ private final Processor processor;
+
+ public CamelMessageReceiver(GooglePubsubEndpoint endpoint, Processor
processor) {
+ this.endpoint = endpoint;
+ this.processor = processor;
+ String loggerId = endpoint.getLoggerId();
+ if (Strings.isNullOrEmpty(loggerId)) {
+ loggerId = this.getClass().getName();
+ }
+ localLog = LoggerFactory.getLogger(loggerId);
+ }
+
+ @Override
+ public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer
ackReplyConsumer) {
+ if (localLog.isTraceEnabled()) {
+ localLog.trace("Received message ID : {}",
pubsubMessage.getMessageId());
+ }
+
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setBody(pubsubMessage.getData().toByteArray());
+
+ exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID,
pubsubMessage.getMessageId());
+ exchange.getIn().setHeader(GooglePubsubConstants.PUBLISH_TIME,
pubsubMessage.getPublishTime());
+
+ if (null != pubsubMessage.getAttributesMap()) {
+ exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES,
pubsubMessage.getAttributesMap());
+ }
+
+ if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(new
AcknowledgeAsync(ackReplyConsumer));
+ }
+
+ try {
+ processor.process(exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+ }
+}
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
deleted file mode 100644
index 49a8224..0000000
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.google.pubsub.consumer;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
-import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
-import org.apache.camel.spi.Synchronization;
-
-public class ExchangeAckTransaction extends PubsubAcknowledgement implements
Synchronization {
-
- public ExchangeAckTransaction(GooglePubsubEndpoint endpoint) {
- super(endpoint);
- }
-
- @Override
- public void onComplete(Exchange exchange) {
- acknowledge(getAckIdList(exchange));
- }
-
- @Override
- public void onFailure(Exchange exchange) {
-
- Integer deadline = 0;
- Object configuredDeadline =
exchange.getIn().getHeader(GooglePubsubConstants.ACK_DEADLINE);
-
- if (configuredDeadline != null &&
Integer.class.isInstance(configuredDeadline)) {
- deadline = (Integer)configuredDeadline;
- }
-
- if (configuredDeadline != null &&
String.class.isInstance(configuredDeadline)) {
- try {
- deadline = Integer.valueOf((String)configuredDeadline);
- } catch (Exception e) {
- log.warn("Unable to parse ACK Deadline header value", e);
- }
- }
-
- if (deadline != 0) {
- log.trace(" Exchange {} : Ack deadline : {}",
exchange.getExchangeId(), deadline);
- }
-
- resetAckDeadline(getAckIdList(exchange), deadline);
- }
-
- private List<String> getAckIdList(Exchange exchange) {
- List<String> ackList = new ArrayList<>();
-
- if (null != exchange.getProperty(Exchange.GROUPED_EXCHANGE)) {
- for (Exchange ex :
(List<Exchange>)exchange.getProperty(Exchange.GROUPED_EXCHANGE)) {
- String ackId =
(String)ex.getIn().getHeader(GooglePubsubConstants.ACK_ID);
- if (null != ackId) {
- ackList.add(ackId);
- }
- }
- } else {
-
ackList.add((String)exchange.getIn().getHeader(GooglePubsubConstants.ACK_ID));
- }
-
- return ackList;
- }
-}
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
deleted file mode 100644
index 4756e7a..0000000
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.google.pubsub.consumer;
-
-import java.util.List;
-
-import com.google.api.client.repackaged.com.google.common.base.Strings;
-import com.google.api.services.pubsub.model.AcknowledgeRequest;
-import com.google.api.services.pubsub.model.ModifyAckDeadlineRequest;
-import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class PubsubAcknowledgement {
-
- protected Logger log;
-
- private final String subscriptionFullName;
- private final GooglePubsubEndpoint endpoint;
-
- public PubsubAcknowledgement(GooglePubsubEndpoint endpoint) {
- this.endpoint = endpoint;
- this.subscriptionFullName =
String.format("projects/%s/subscriptions/%s", endpoint.getProjectId(),
endpoint.getDestinationName());
-
- String loggerId = endpoint.getLoggerId();
-
- if (Strings.isNullOrEmpty(loggerId)) {
- loggerId = this.getClass().getName();
- }
-
- log = LoggerFactory.getLogger(PubsubAcknowledgement.class);
- }
-
- void acknowledge(List<String> ackIdList) {
- AcknowledgeRequest ackRequest = new
AcknowledgeRequest().setAckIds(ackIdList);
- try {
-
endpoint.getPubsub().projects().subscriptions().acknowledge(subscriptionFullName,
ackRequest).execute();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- void resetAckDeadline(List<String> ackIdList, Integer seconds) {
-
- ModifyAckDeadlineRequest nackRequest = new
ModifyAckDeadlineRequest().setAckIds(ackIdList).setAckDeadlineSeconds(seconds);
-
- try {
-
endpoint.getPubsub().projects().subscriptions().modifyAckDeadline(subscriptionFullName,
nackRequest).execute();
- } catch (Exception e) {
- // It will timeout automatically on the channel
- log.warn("Unable to reset ack deadline " + ackIdList, e);
- }
- }
-}
diff --git
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
index acf62c6..bda219d 100644
---
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
+++
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
@@ -19,29 +19,43 @@ package org.apache.camel.component.google.pubsub;
import java.io.InputStream;
import java.util.Properties;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.api.services.pubsub.model.Topic;
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
+import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
+import com.google.cloud.pubsub.v1.TopicAdminClient;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PushConfig;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
import org.apache.camel.BindToRegistry;
import org.apache.camel.CamelContext;
import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.ClassRule;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
public class PubsubTestSupport extends CamelTestSupport {
- public static final String SERVICE_KEY;
- public static final String SERVICE_ACCOUNT;
public static final String PROJECT_ID;
- public static final String SERVICE_URL;
static {
Properties testProperties = loadProperties();
- SERVICE_KEY = testProperties.getProperty("service.key");
- SERVICE_ACCOUNT = testProperties.getProperty("service.account");
PROJECT_ID = testProperties.getProperty("project.id");
- SERVICE_URL = testProperties.getProperty("test.serviceURL");
}
+ @ClassRule
+ public static GenericContainer container = new
GenericContainer("google/cloud-sdk:latest")
+ .withExposedPorts(8383)
+ .withCommand("/bin/sh", "-c",
+ String.format("gcloud beta emulators pubsub start
--project %s --host-port=0.0.0.0:%d",
+ PROJECT_ID, 8383))
+ .waitingFor(new
LogMessageWaitStrategy().withRegEx("(?s).*started.*$"));
+
private static Properties loadProperties() {
Properties testProperties = new Properties();
InputStream fileIn =
PubsubTestSupport.class.getClassLoader().getResourceAsStream("simple.properties");
@@ -57,10 +71,8 @@ public class PubsubTestSupport extends CamelTestSupport {
protected void addPubsubComponent(CamelContext context) {
- GooglePubsubConnectionFactory cf = new
GooglePubsubConnectionFactory().setServiceAccount(SERVICE_ACCOUNT).setServiceAccountKey(SERVICE_KEY).setServiceURL(SERVICE_URL);
-
GooglePubsubComponent component = new GooglePubsubComponent();
- component.setConnectionFactory(cf);
+ component.setEndpoint(container.getContainerIpAddress() + ":" +
container.getFirstMappedPort());
context.addComponent("google-pubsub", component);
context.getPropertiesComponent().setLocation("ref:prop");
@@ -73,46 +85,64 @@ public class PubsubTestSupport extends CamelTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
+ container.start();
+ createTopicSubscription();
CamelContext context = super.createCamelContext();
addPubsubComponent(context);
return context;
}
- public static void createTopicSubscriptionPair(String topicName, String
subscriptionName) throws Exception {
- createTopicSubscriptionPair(topicName, subscriptionName, 10);
+ public void createTopicSubscription() throws Exception {
}
- public static void createTopicSubscriptionPair(String topicName, String
subscriptionName, int ackDealineSeconds) throws Exception {
-
- Pubsub pubsub = new
GooglePubsubConnectionFactory().setServiceAccount(SERVICE_ACCOUNT).setServiceAccountKey(SERVICE_KEY).setServiceURL(SERVICE_URL).getDefaultClient();
-
- String topicFullName = String.format("projects/%s/topics/%s",
PubsubTestSupport.PROJECT_ID, topicName);
-
- String subscriptionFullName =
String.format("projects/%s/subscriptions/%s", PubsubTestSupport.PROJECT_ID,
subscriptionName);
+ public void createTopicSubscriptionPair(String topicName, String
subscriptionName) {
+ createTopicSubscriptionPair(topicName, subscriptionName, 10);
+ }
- try {
- pubsub.projects().topics().create(topicFullName, new
Topic()).execute();
- } catch (Exception e) {
- handleAlreadyExistsException(e);
- }
+ public void createTopicSubscriptionPair(String topicName, String
subscriptionName, int ackDeadlineSeconds) {
+ ManagedChannel channel = null;
+ TopicAdminClient topicAdminClient = null;
+ SubscriptionAdminClient subscriptionAdminClient = null;
try {
- Subscription subscription = new
Subscription().setTopic(topicFullName).setAckDeadlineSeconds(ackDealineSeconds);
-
- pubsub.projects().subscriptions().create(subscriptionFullName,
subscription).execute();
- } catch (Exception e) {
- handleAlreadyExistsException(e);
- }
- }
-
- private static void handleAlreadyExistsException(Exception e) throws
Exception {
- if (e instanceof GoogleJsonResponseException) {
- GoogleJsonResponseException exc = (GoogleJsonResponseException)e;
- // 409 indicates that the resource is available already
- if (409 == exc.getStatusCode()) {
- return;
+ Integer port = container.getFirstMappedPort();
+ channel = ManagedChannelBuilder
+ .forTarget(String.format("%s:%s", "localhost", port))
+ .usePlaintext()
+ .build();
+
+ FixedTransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+ CredentialsProvider credentialsProvider =
NoCredentialsProvider.create();
+
+ ProjectTopicName projectTopicName =
ProjectTopicName.of(PROJECT_ID, topicName);
+ ProjectSubscriptionName projectSubscriptionName =
ProjectSubscriptionName.of(PROJECT_ID, subscriptionName);
+
+ topicAdminClient = TopicAdminClient.create(
+ TopicAdminSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .build());
+ topicAdminClient.createTopic(projectTopicName);
+
+ subscriptionAdminClient = SubscriptionAdminClient.create(
+ SubscriptionAdminSettings.newBuilder()
+ .setTransportChannelProvider(channelProvider)
+ .setCredentialsProvider(credentialsProvider)
+ .build());
+
subscriptionAdminClient.createSubscription(projectSubscriptionName,
projectTopicName,
+ PushConfig.getDefaultInstance(), ackDeadlineSeconds);
+
+ } catch (Exception ignored) {
+ } finally {
+ if (channel != null) {
+ channel.shutdown();
+ }
+ if (topicAdminClient != null) {
+ topicAdminClient.shutdown();
+ }
+ if (subscriptionAdminClient != null) {
+ subscriptionAdminClient.shutdown();
}
}
- throw e;
}
}
diff --git
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckModeNoneTest.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckModeNoneTest.java
index af26401..7a9365d 100644
---
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckModeNoneTest.java
+++
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AckModeNoneTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.google.pubsub.integration;
+import java.io.IOException;
+
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
@@ -25,7 +27,6 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.google.pubsub.PubsubTestSupport;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.support.DefaultExchange;
-import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
@@ -42,7 +43,7 @@ public class AckModeNoneTest extends PubsubTestSupport {
@EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME)
private Endpoint pubsubTopic;
- @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?ackMode=NONE")
+ @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?ackMode=NONE&synchronousPull=true")
private Endpoint pubsubSub;
@EndpointInject("mock:receiveResult")
@@ -51,13 +52,8 @@ public class AckModeNoneTest extends PubsubTestSupport {
@Produce("direct:in")
private ProducerTemplate producer;
- @BeforeClass
- public static void createPubSub() throws Exception {
- createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME, 1);
- }
-
@Override
- protected RouteBuilder createRouteBuilder() throws Exception {
+ protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
from(directIn).routeId("AckNONE_SEND").to(pubsubTopic);
@@ -67,12 +63,17 @@ public class AckModeNoneTest extends PubsubTestSupport {
};
}
+ @Override
+ public void createTopicSubscription() throws IOException {
+ createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME, 1);
+ }
+
/**
* Expecting two messages received for the one sent. With Ack mode set to
* NONE the same message will be delivered again and again, after the
* deadline expiration. Setting deadline to 1 second and waiting for more
* than 2 to ensure the message has been resent.
- *
+ *
* @throws Exception
*/
@Test
diff --git
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementTest.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementTest.java
index 9bd914f..a8636e2 100644
---
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementTest.java
+++
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/AcknowledgementTest.java
@@ -41,7 +41,7 @@ public class AcknowledgementTest extends PubsubTestSupport {
@EndpointInject("google-pubsub:{{project.id}}:" + TOPIC_NAME)
private Endpoint pubsubTopic;
- @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME)
+ @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?synchronousPull=true")
private Endpoint pubsubSubscription;
@EndpointInject("mock:receiveResult")
@@ -50,8 +50,8 @@ public class AcknowledgementTest extends PubsubTestSupport {
@Produce("direct:in")
private ProducerTemplate producer;
- @BeforeClass
- public static void createTopicSubscription() throws Exception {
+ @Override
+ public void createTopicSubscription() {
createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME);
}
diff --git
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/BodyTypesTest.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/BodyTypesTest.java
index 0667ada..552c8a4 100644
---
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/BodyTypesTest.java
+++
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/BodyTypesTest.java
@@ -33,7 +33,6 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.google.pubsub.PubsubTestSupport;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.support.DefaultExchange;
-import org.junit.BeforeClass;
import org.junit.Test;
public class BodyTypesTest extends PubsubTestSupport {
@@ -50,7 +49,7 @@ public class BodyTypesTest extends PubsubTestSupport {
@EndpointInject("mock:sendResult")
private MockEndpoint sendResult;
- @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME)
+ @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?synchronousPull=true")
private Endpoint pubsubSubscription;
@EndpointInject("mock:receiveResult")
@@ -59,8 +58,8 @@ public class BodyTypesTest extends PubsubTestSupport {
@Produce("direct:from")
private ProducerTemplate producer;
- @BeforeClass
- public static void createTopicSubscription() throws Exception {
+ @Override
+ public void createTopicSubscription() {
createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME);
}
@@ -109,7 +108,7 @@ public class BodyTypesTest extends PubsubTestSupport {
assertTrue("Received body is of byte[] type",
receivedExchange.getIn().getBody() instanceof byte[]);
- assertTrue("Received body equals sent", Arrays.equals(body,
(byte[])receivedExchange.getIn().getBody()));
+ assertTrue("Received body equals sent", Arrays.equals(body, (byte[])
receivedExchange.getIn().getBody()));
}
@@ -144,9 +143,9 @@ public class BodyTypesTest extends PubsubTestSupport {
assertTrue("Received body is of byte[] type",
receivedExchange.getIn().getBody() instanceof byte[]);
- Object bodyReceived =
deserialize((byte[])receivedExchange.getIn().getBody());
+ Object bodyReceived = deserialize((byte[])
receivedExchange.getIn().getBody());
- assertTrue("Received body is a Map ",
((Map)bodyReceived).get("KEY").equals("VALUE1212"));
+ assertTrue("Received body is a Map ", ((Map)
bodyReceived).get("KEY").equals("VALUE1212"));
}
diff --git
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/GroupedExchangeRoundtripTest.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/GroupedExchangeRoundtripTest.java
index fdf3c49..62ddc1e 100644
---
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/GroupedExchangeRoundtripTest.java
+++
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/GroupedExchangeRoundtripTest.java
@@ -45,7 +45,7 @@ public class GroupedExchangeRoundtripTest extends
PubsubTestSupport {
@EndpointInject("mock:sendResult")
private MockEndpoint sendResult;
- @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME)
+ @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?synchronousPull=true")
private Endpoint pubsubSubscription;
@EndpointInject("mock:receiveResult")
@@ -54,8 +54,8 @@ public class GroupedExchangeRoundtripTest extends
PubsubTestSupport {
@Produce("direct:aggregator")
private ProducerTemplate producer;
- @BeforeClass
- public static void createTopicSubscription() throws Exception {
+ @Override
+ public void createTopicSubscription() {
createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME);
}
@@ -74,7 +74,7 @@ public class GroupedExchangeRoundtripTest extends
PubsubTestSupport {
}
/**
- * Tests that a grouped exhcange is successfully received
+ * Tests that a grouped exchange is successfully received
*
* @throws Exception
*/
@@ -102,8 +102,5 @@ public class GroupedExchangeRoundtripTest extends
PubsubTestSupport {
// Send result section
List<Exchange> results = sendResult.getExchanges();
assertEquals("Received exchanges", 1, results.size());
-
- List exchangeGrouped =
(List)results.get(0).getProperty(Exchange.GROUPED_EXCHANGE);
- assertEquals("Received messages within the exchange", 2,
exchangeGrouped.size());
}
}
diff --git
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
deleted file mode 100644
index 164384b..0000000
---
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.google.pubsub.integration;
-
-import java.io.File;
-
-import com.google.api.services.pubsub.Pubsub;
-import org.apache.camel.component.google.pubsub.GooglePubsubConnectionFactory;
-import org.apache.camel.component.google.pubsub.PubsubTestSupport;
-import org.junit.Test;
-
-public class PubsubConnectionFactoryTest extends PubsubTestSupport {
-
- /**
- * Testing Credentials File only, the explicitly set Service Account and
Key
- * are tested everywhere else. A section of the test is disabled by default
- * as it relies on - a valid credentials file - a valid project and
- * therefore can not be tested with the PubSub Emulator Defaults Option is
- * not tested.
- *
- * @throws Exception
- */
- @Test
- public void testCredentialsFile() throws Exception {
-
- ClassLoader classLoader = getClass().getClassLoader();
- File file = new
File(classLoader.getResource("camel-pubsub-component.json").getFile());
-
- GooglePubsubConnectionFactory cf = new
GooglePubsubConnectionFactory().setCredentialsFileLocation(file.getAbsolutePath()).setServiceURL(SERVICE_URL);
-
- Pubsub pubsub = cf.getDefaultClient();
-
- String query = String.format("projects/%s", PROJECT_ID);
- // [ DEPENDS on actual project being available]
- /*
- * pubsub.projects() .topics() .list(query) .execute();
- */
- }
-}
diff --git
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/SingleExchangeRoundtripTest.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/SingleExchangeRoundtripTest.java
index 8e9586a..d3de77c 100644
---
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/SingleExchangeRoundtripTest.java
+++
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/SingleExchangeRoundtripTest.java
@@ -47,7 +47,7 @@ public class SingleExchangeRoundtripTest extends
PubsubTestSupport {
@EndpointInject("mock:sendResult")
private MockEndpoint sendResult;
- @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME)
+ @EndpointInject("google-pubsub:{{project.id}}:" + SUBSCRIPTION_NAME +
"?synchronousPull=true")
private Endpoint pubsubSubscription;
@EndpointInject("mock:receiveResult")
@@ -56,13 +56,13 @@ public class SingleExchangeRoundtripTest extends
PubsubTestSupport {
@Produce("direct:from")
private ProducerTemplate producer;
- @BeforeClass
- public static void createTopicSubscription() throws Exception {
+ @Override
+ public void createTopicSubscription() {
createTopicSubscriptionPair(TOPIC_NAME, SUBSCRIPTION_NAME);
}
@Override
- protected RouteBuilder createRouteBuilder() throws Exception {
+ protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
from(directIn).routeId("Single_Send").to(pubsubTopic).to(sendResult);
@@ -109,7 +109,6 @@ public class SingleExchangeRoundtripTest extends
PubsubTestSupport {
Exchange receivedExchange = receivedExchanges.get(0);
assertNotNull("PUBSUB Message ID Property",
receivedExchange.getIn().getHeader(GooglePubsubConstants.MESSAGE_ID));
- assertNotNull("PUBSUB Ack ID Property",
receivedExchange.getIn().getHeader(GooglePubsubConstants.ACK_ID));
assertNotNull("PUBSUB Published Time",
receivedExchange.getIn().getHeader(GooglePubsubConstants.PUBLISH_TIME));
assertEquals("PUBSUB Header Attribute", attributeValue,
((Map)receivedExchange.getIn().getHeader(GooglePubsubConstants.ATTRIBUTES)).get(attributeKey));
diff --git
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubComponentTest.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubComponentTest.java
deleted file mode 100644
index fa63642..0000000
---
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubComponentTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.google.pubsub.unit;
-
-import org.apache.camel.Component;
-import org.apache.camel.component.google.pubsub.GooglePubsubComponent;
-import org.apache.camel.component.google.pubsub.PubsubTestSupport;
-import org.junit.Test;
-
-public class PubsubComponentTest extends PubsubTestSupport {
-
- @Test
- public void testComponentConfiguration() throws Exception {
-
- Component contextComponent = context.hasComponent("google-pubsub");
-
- assertNotNull(contextComponent);
- assertTrue(contextComponent instanceof GooglePubsubComponent);
-
- GooglePubsubComponent pubsubComponent =
(GooglePubsubComponent)contextComponent;
-
- assertEquals(SERVICE_ACCOUNT,
pubsubComponent.getConnectionFactory().getServiceAccount());
-
- assertEquals(SERVICE_KEY,
pubsubComponent.getConnectionFactory().getServiceAccountKey());
- }
-
-}
diff --git
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubEndpointTest.java
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubEndpointTest.java
index 64b1418..7603998 100644
---
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubEndpointTest.java
+++
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/unit/PubsubEndpointTest.java
@@ -31,7 +31,7 @@ public class PubsubEndpointTest extends PubsubTestSupport {
// For testing purposes the URI params need to be aligned in alphabetical
// order
- private static final String SUBSCRIPTION_URI = TEST_SUBSCRIPTION_NAME +
"?ackMode=NONE" + "&concurrentConsumers=5" + "&maxMessagesPerPoll=2";
+ private static final String SUBSCRIPTION_URI = TEST_SUBSCRIPTION_NAME +
"?ackMode=NONE" + "&concurrentConsumers=5";
@EndpointInject("google-pubsub://{{project.id}}:" + SUBSCRIPTION_URI)
private Endpoint from;
@@ -54,8 +54,7 @@ public class PubsubEndpointTest extends PubsubTestSupport {
assertEquals(PROJECT_ID, pubsubEndpoint.getProjectId());
assertEquals(TEST_SUBSCRIPTION_NAME,
pubsubEndpoint.getDestinationName());
- assertEquals(new Integer(5), pubsubEndpoint.getConcurrentConsumers());
- assertEquals(new Integer(2), pubsubEndpoint.getMaxMessagesPerPoll());
+ assertEquals(Integer.valueOf(5),
pubsubEndpoint.getConcurrentConsumers());
assertEquals(GooglePubsubConstants.AckMode.NONE,
pubsubEndpoint.getAckMode());
}
diff --git
a/components/camel-google-pubsub/src/test/resources/simple.properties
b/components/camel-google-pubsub/src/test/resources/simple.properties
index 9e18baa..e9402b4 100644
--- a/components/camel-google-pubsub/src/test/resources/simple.properties
+++ b/components/camel-google-pubsub/src/test/resources/simple.properties
@@ -17,10 +17,4 @@
project.id=test-project
topic.name=test-topic
-subscription.name=test-topic-subscription
-# Set to empty to test against GCP PubSub
-test.serviceURL=http://localhost:8383
-# Service Account Name and Key do not belong to any project
-# Have been generated for PubSub Emulator
-service.account=test-acco...@camel-pubsub-component.iam.gserviceaccount.com
-service.key=-----BEGIN PRIVATE
KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCfCiEwLed3hJ+h\n3zkpsGZj+MEB8MbpbqdUsiAp+Ok05zchGHM8iEG5s4gh013CI0rnta4zYDTrB98p\nBD+BX0TFP4S1QecSK0RoaJ8OmLYgYN56olobbedPRRdZIwopvQ7wSIqrEwWtez6Y\nRXcQzykYzETDEc2s0JyJU9BI2ZAENPbMheZICUkLHJdX0FqVf5WTtRDXnyL79CiW\nRirqN+eJdhq46Dz/TlEymuMePZVWAdcx0v8xv102H9bqFWtJvin8pD6fIT6f2iL1\ne/lQjNUVvX7Sx2EuLWZlPo+mWNvRCTXZymTcluj0jleAYhjuMc2xVEDx2RaCt2sx\nCo2Nb0edAgMBAAECggEAIFEJn2WkhCfB3D2kuvDqTWQtq/xGHwbqH46
[...]
+subscription.name=test-topic-subscription
\ No newline at end of file
diff --git
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
index 8af7fd0..144cad2 100644
---
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
+++
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/GooglePubsubComponentBuilderFactory.java
@@ -49,18 +49,14 @@ public interface GooglePubsubComponentBuilderFactory {
extends
ComponentBuilder<GooglePubsubComponent> {
/**
- * Sets the connection factory to use: provides the ability to
- * explicitly manage connection credentials: - the path to the key file
- * - the Service Account Key / Email pair.
+ * Endpoint to use with local Pub/Sub emulator.
*
- * The option is a:
- *
<code>org.apache.camel.component.google.pubsub.GooglePubsubConnectionFactory</code>
type.
+ * The option is a: <code>java.lang.String</code> type.
*
* Group: common
*/
- default GooglePubsubComponentBuilder connectionFactory(
-
org.apache.camel.component.google.pubsub.GooglePubsubConnectionFactory
connectionFactory) {
- doSetProperty("connectionFactory", connectionFactory);
+ default GooglePubsubComponentBuilder endpoint(java.lang.String
endpoint) {
+ doSetProperty("endpoint", endpoint);
return this;
}
/**
@@ -104,6 +100,31 @@ public interface GooglePubsubComponentBuilderFactory {
return this;
}
/**
+ * Maximum number of producers to cache. This could be increased if you
+ * have producers for lots of different topics.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Group: producer
+ */
+ default GooglePubsubComponentBuilder publisherCacheSize(
+ int publisherCacheSize) {
+ doSetProperty("publisherCacheSize", publisherCacheSize);
+ return this;
+ }
+ /**
+ * How many milliseconds should each producer stay alive in the cache.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Group: producer
+ */
+ default GooglePubsubComponentBuilder publisherCacheTimeout(
+ int publisherCacheTimeout) {
+ doSetProperty("publisherCacheTimeout", publisherCacheTimeout);
+ return this;
+ }
+ /**
* Whether the component should use basic property binding (Camel 2.x)
* or the newer property binding with additional capabilities.
*
@@ -117,6 +138,18 @@ public interface GooglePubsubComponentBuilderFactory {
doSetProperty("basicPropertyBinding", basicPropertyBinding);
return this;
}
+ /**
+ * How many milliseconds should a producer be allowed to terminate.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Group: advanced
+ */
+ default GooglePubsubComponentBuilder publisherTerminationTimeout(
+ int publisherTerminationTimeout) {
+ doSetProperty("publisherTerminationTimeout",
publisherTerminationTimeout);
+ return this;
+ }
}
class GooglePubsubComponentBuilderImpl
@@ -134,10 +167,13 @@ public interface GooglePubsubComponentBuilderFactory {
String name,
Object value) {
switch (name) {
- case "connectionFactory": ((GooglePubsubComponent)
component).setConnectionFactory((org.apache.camel.component.google.pubsub.GooglePubsubConnectionFactory)
value); return true;
+ case "endpoint": ((GooglePubsubComponent)
component).setEndpoint((java.lang.String) value); return true;
case "bridgeErrorHandler": ((GooglePubsubComponent)
component).setBridgeErrorHandler((boolean) value); return true;
case "lazyStartProducer": ((GooglePubsubComponent)
component).setLazyStartProducer((boolean) value); return true;
+ case "publisherCacheSize": ((GooglePubsubComponent)
component).setPublisherCacheSize((int) value); return true;
+ case "publisherCacheTimeout": ((GooglePubsubComponent)
component).setPublisherCacheTimeout((int) value); return true;
case "basicPropertyBinding": ((GooglePubsubComponent)
component).setBasicPropertyBinding((boolean) value); return true;
+ case "publisherTerminationTimeout": ((GooglePubsubComponent)
component).setPublisherTerminationTimeout((int) value); return true;
default: return false;
}
}
diff --git a/docs/components/modules/ROOT/pages/google-pubsub-component.adoc
b/docs/components/modules/ROOT/pages/google-pubsub-component.adoc
index 49c3d6b..ade4a04 100644
--- a/docs/components/modules/ROOT/pages/google-pubsub-component.adoc
+++ b/docs/components/modules/ROOT/pages/google-pubsub-component.adoc
@@ -10,9 +10,7 @@
The Google Pubsub component provides access
to https://cloud.google.com/pubsub/[Cloud Pub/Sub Infrastructure] via
-the https://cloud.google.com/apis/docs/client-libraries-explained[Google
Client Services API].
-
-The current implementation does not use gRPC.
+the https://github.com/googleapis/java-pubsub[Google Cloud Java Client for
Google Cloud Pub/Sub].
Maven users will need to add the following dependency to their pom.xml
for this component:
@@ -40,16 +38,18 @@ Destination Name can be either a topic or a subscription
name.
== Options
// component options: START
-The Google Pubsub component supports 4 options, which are listed below.
+The Google Pubsub component supports 6 options, which are listed below.
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
-| *connectionFactory* (common) | Sets the connection factory to use: provides
the ability to explicitly manage connection credentials: - the path to the key
file - the Service Account Key / Email pair | | GooglePubsubConnectionFactory
| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions occurred while the
consumer is trying to pickup incoming messages, or the likes, will now be
processed as a message and handled by the routing Error Handler. By default the
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with
exceptions, that will be logged at WARN or ERROR level and ignored. | false |
boolean
| *lazyStartProducer* (producer) | Whether the producer should be started lazy
(on the first message). By starting lazy you can use this to allow CamelContext
and routes to startup in situations where a producer may otherwise fail during
starting and cause the route to fail being started. By deferring this startup
to be lazy then the startup failure can be handled during routing messages via
Camel's routing error handlers. Beware that when the first message is processed
then creating and [...]
+| *publisherCacheSize* (producer) | Maximum number of producers to cache. This
could be increased if you have producers for lots of different topics. | | int
+| *publisherCacheTimeout* (producer) | How many milliseconds should each
producer stay alive in the cache. | | int
+| *publisherTerminationTimeout* (producer) | How many milliseconds should a
producer be allowed to terminate. | | int
| *basicPropertyBinding* (advanced) | Whether the component should use basic
property binding (Camel 2.x) or the newer property binding with additional
capabilities | false | boolean
|===
// component options: END
@@ -74,7 +74,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (11 parameters):
+=== Query Parameters (9 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -82,9 +82,7 @@ with the following path and query parameters:
| Name | Description | Default | Type
| *ackMode* (common) | AUTO = exchange gets ack'ed/nack'ed on completion. NONE
= downstream process has to ack/nack explicitly. The value can be one of: AUTO,
NONE | AUTO | AckMode
| *concurrentConsumers* (common) | The number of parallel streams consuming
from the subscription | 1 | Integer
-| *connectionFactory* (common) | ConnectionFactory to obtain connection to
PubSub Service. If non provided the default one will be used | |
GooglePubsubConnectionFactory
| *loggerId* (common) | Logger ID to use when a match to the parent route
required | | String
-| *maxMessagesPerPoll* (common) | The max number of messages to receive from
the server in a single API call | 1 | Integer
| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions occurred while the
consumer is trying to pickup incoming messages, or the likes, will now be
processed as a message and handled by the routing Error Handler. By default the
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with
exceptions, that will be logged at WARN or ERROR level and ignored. | false |
boolean
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
option is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. The value can be one of: InOnly, InOut, InOptionalOut | |
ExchangePattern
@@ -93,6 +91,34 @@ with the following path and query parameters:
| *synchronous* (advanced) | Sets whether synchronous processing should be
strictly used, or Camel is allowed to use asynchronous processing (if
supported). | false | boolean
|===
// endpoint options: END
+// spring-boot-auto-configure options: START
+== Spring Boot Auto-Configuration
+
+When using Spring Boot make sure to use the following Maven dependency to have
support for auto configuration:
+
+[source,xml]
+----
+<dependency>
+ <groupId>org.apache.camel.springboot</groupId>
+ <artifactId>camel-google-pubsub-starter</artifactId>
+ <version>x.x.x</version>
+ <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+
+The component supports 8 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *camel.component.google-pubsub.basic-property-binding* | Whether the
component should use basic property binding (Camel 2.x) or the newer property
binding with additional capabilities | false | Boolean
+| *camel.component.google-pubsub.bridge-error-handler* | Allows for bridging
the consumer to the Camel routing Error Handler, which mean any exceptions
occurred while the consumer is trying to pickup incoming messages, or the
likes, will now be processed as a message and handled by the routing Error
Handler. By default the consumer will use the
org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be
logged at WARN or ERROR level and ignored. | false | Boolean
+| *camel.component.google-pubsub.enabled* | Whether to enable auto
configuration of the google-pubsub component. This is enabled by default. | |
Boolean
+|===
+// spring-boot-auto-configure options: END
== Producer Endpoints
@@ -129,7 +155,6 @@ Headers set by the consumer endpoints:
* GooglePubsubConstants.MESSAGE_ID
* GooglePubsubConstants.ATTRIBUTES
* GooglePubsubConstants.PUBLISH_TIME
-* GooglePubsubConstants.ACK_ID
== Message Body
@@ -138,22 +163,7 @@ It is up for the route to convert/unmarshall the contents.
== Authentication Configuration
-Google Pubsub component authentication is targeted for use with the GCP
Service Accounts.
-For more information please refer to
https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide]
-
-Google security credentials can be set explicitly via one of the two options:
-
-* Service Account Email and Service Account Key (PEM format)
-* GCP credentials file location
-
-If both are set, the Service Account Email/Key will take precedence.
-
-Or implicitly, where the connection factory falls back on
-https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application
Default Credentials].
-
-*OBS!* The location of the default credentials file is configurable - via
GOOGLE_APPLICATION_CREDENTIALS environment variable.
-
-Service Account Email and Service Account Key can be found in the GCP JSON
credentials file as client_email and private_key respectively.
+The location of the default credentials file is configurable - via
GOOGLE_APPLICATION_CREDENTIALS environment variable.
== Rollback and Redelivery
diff --git a/parent/pom.xml b/parent/pom.xml
index 37358855..bf6d31f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -46,6 +46,8 @@
<activemq-artemis-version>2.11.0</activemq-artemis-version>
<aether-version>1.0.2.v20150114</aether-version>
<ahc-version>2.10.5</ahc-version>
+ <animal-sniffer-version>1.17</animal-sniffer-version>
+ <android-annotations-version>4.1.1.4</android-annotations-version>
<ant-bundle-version>1.7.0_6</ant-bundle-version>
<antlr-bundle-version>3.5.2_1</antlr-bundle-version>
<antlr-runtime-bundle-version>3.5.2_1</antlr-runtime-bundle-version>
@@ -145,6 +147,7 @@
<commons-pool2-version>2.8.0</commons-pool2-version>
<commons-text-version>1.8</commons-text-version>
<compress-lzf-version>1.0.4</compress-lzf-version>
+ <conscrypt-uber-version>2.2.1</conscrypt-uber-version>
<consul-client-version>1.3.3</consul-client-version>
<consul-client-bundle-version>1.3.3_1</consul-client-bundle-version>
<cobertura-maven-plugin-version>2.7</cobertura-maven-plugin-version>
@@ -230,15 +233,27 @@
<geronimo-ws-metadata-spec-version>1.1.3</geronimo-ws-metadata-spec-version>
<gmetric4j-version>1.0.10</gmetric4j-version>
<google-guava-version>19.0</google-guava-version>
+ <google-pubsub-guava-version>28.1-jre</google-pubsub-guava-version>
<google-api-client-version>1.22.0</google-api-client-version>
+ <google-api-common-version>1.8.1</google-api-common-version>
<google-api-services-drive-version>v2-rev297-1.22.0</google-api-services-drive-version>
+ <google-auto-value-version>1.7</google-auto-value-version>
<google-api-services-calendar-version>v3-rev291-1.22.0</google-api-services-calendar-version>
<google-api-services-sheets-version>v4-rev551-1.22.0</google-api-services-sheets-version>
<google-api-services-mail-version>v1-rev81-1.22.0</google-api-services-mail-version>
<google-api-services-bigquery-version>v2-rev352-1.22.0</google-api-services-bigquery-version>
<google-api-services-pubsub-version>v1-rev12-1.22.0</google-api-services-pubsub-version>
+ <google-cloud-pubsub-version>1.102.0</google-cloud-pubsub-version>
+ <google-errorprone-version>2.3.3</google-errorprone-version>
+ <google-gax-version>1.50.1</google-gax-version>
+ <google-http-client-version>1.33.0</google-http-client-version>
+ <google-http-jackson2-version>1.32.1</google-http-jackson2-version>
+ <google-j2objc-version>1.3</google-j2objc-version>
<google-mail-guava-version>17.0</google-mail-guava-version>
<google-truth-version>0.30</google-truth-version>
+ <google-proto-common-version>1.17.0</google-proto-common-version>
+ <google-proto-pubsub-version>1.84.0</google-proto-pubsub-version>
+ <google-proto-iam-version>0.13.0</google-proto-iam-version>
<graphql-java-version>14.0</graphql-java-version>
<grizzly-websockets-version>2.3.25</grizzly-websockets-version>
<grpc-version>1.27.0</grpc-version>
@@ -499,7 +514,7 @@
<olingo4-version>4.7.1</olingo4-version>
<ognl-version>3.1.12</ognl-version>
<ognl-bundle-version>3.1.12_1</ognl-bundle-version>
- <opencensus-api-version>0.21.0</opencensus-api-version>
+ <opencensus-api-version>0.24.0</opencensus-api-version>
<oncrpc-version>1.1.3</oncrpc-version>
<openjpa-version>3.1.1</openjpa-version>
<openstack4j-version>3.0.2</openstack4j-version>
@@ -626,6 +641,7 @@
<tagsoup-version>1.2.1</tagsoup-version>
<templating-maven-plugin-version>1.0.0</templating-maven-plugin-version>
<testcontainers-version>1.12.5</testcontainers-version>
+ <threetenbp-version>1.4.0</threetenbp-version>
<tinybundles-version>2.1.1</tinybundles-version>
<tika-version>1.23</tika-version>
<twilio-version>7.9.1</twilio-version>
@@ -3313,7 +3329,7 @@
<artifactId>scala-library</artifactId>
<version>${scala-version}</version>
</dependency>
-
+
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
diff --git a/platforms/karaf/features/src/main/resources/features.xml
b/platforms/karaf/features/src/main/resources/features.xml
index 8228cc8..f74ad8b 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -1127,18 +1127,49 @@
</feature>
<feature name='camel-google-pubsub' version='${project.version}'
start-level='50'>
<feature version='${project.version}'>camel-core</feature>
- <bundle
dependency='true'>wrap:mvn:com.google.apis/google-api-services-pubsub/${google-api-services-pubsub-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:com.google.api-client/google-api-client/${google-api-client-version}</bundle>
- <bundle
dependency='true'>mvn:commons-codec/commons-codec/${commons-codec-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:com.google.http-client/google-http-client/${google-api-client-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:com.google.http-client/google-http-client-jackson2/${google-api-client-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:com.google.oauth-client/google-oauth-client/${google-api-client-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:com.google.oauth-client/google-oauth-client-java6/${google-api-client-version}</bundle>
- <bundle
dependency='true'>wrap:mvn:com.google.oauth-client/google-oauth-client-jetty/${google-api-client-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.cloud/google-cloud-pubsub/${google-cloud-pubsub-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.grpc/grpc-api/${grpc-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.grpc/grpc-context/${grpc-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:org.codehaus.mojo/animal-sniffer-annotations/${animal-sniffer-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.grpc/grpc-stub/${grpc-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.grpc/grpc-protobuf/${grpc-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.grpc/grpc-protobuf-lite/${grpc-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.api/api-common/${google-api-common-version}</bundle>
+ <bundle
dependency='true'>mvn:com.google.protobuf/protobuf-java/${protobuf-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.api.grpc/proto-google-common-protos/${google-proto-common-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.api.grpc/proto-google-cloud-pubsub-v1/${google-proto-pubsub-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.api.grpc/proto-google-iam-v1/${google-proto-iam-version}</bundle>
+ <bundle
dependency='true'>mvn:com.google.guava/guava/${google-pubsub-guava-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.api/gax/${google-gax-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.auth/google-auth-library-oauth2-http/${grpc-google-auth-library-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.http-client/google-http-client-jackson2/${google-http-jackson2-version}</bundle>
<bundle
dependency='true'>mvn:com.fasterxml.jackson.core/jackson-core/${jackson2-version}</bundle>
- <bundle
dependency='true'>mvn:org.apache.httpcomponents/httpcore-osgi/${httpcore4-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.api/gax-grpc/${google-gax-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.grpc/grpc-auth/${grpc-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.auth/google-auth-library-credentials/${grpc-google-auth-library-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.grpc/grpc-netty-shaded/${grpc-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.grpc/grpc-alts/${grpc-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.grpc/grpc-grpclb/${grpc-version}</bundle>
+ <bundle
dependency='true'>mvn:com.google.protobuf/protobuf-java-util/${protobuf-version}</bundle>
+ <bundle
dependency='true'>mvn:org.apache.commons/commons-lang3/${commons-lang3-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:org.conscrypt/conscrypt-openjdk-uber/${conscrypt-uber-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:org.threeten/threetenbp/${threetenbp-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.code.findbugs/jsr305/${google-findbugs-jsr305-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.opencensus/opencensus-api/${opencensus-api-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.grpc/grpc-core/${grpc-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.code.gson/gson/${gson-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.android/annotations/${android-annotations-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.perfmark/perfmark-api/${perfmark-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.opencensus/opencensus-contrib-grpc-metrics/${opencensus-api-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.errorprone/error_prone_annotations/${google-errorprone-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.auto.value/auto-value-annotations/${google-auto-value-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.http-client/google-http-client/${google-http-client-version}</bundle>
<bundle
dependency='true'>mvn:org.apache.httpcomponents/httpclient-osgi/${httpclient4-version}</bundle>
- <bundle
dependency='true'>mvn:javax.servlet/javax.servlet-api/${javax-servlet-api-version}</bundle>
+ <bundle
dependency='true'>mvn:commons-logging/commons-logging/${commons-logging-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:commons-codec/commons-codec/${commons-codec-version}</bundle>
+ <bundle
dependency='true'>mvn:org.apache.httpcomponents/httpcore-osgi/${httpcore4-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:com.google.j2objc/j2objc-annotations/${google-j2objc-version}</bundle>
+ <bundle
dependency='true'>wrap:mvn:io.opencensus/opencensus-contrib-http-util/${opencensus-api-version}</bundle>
<bundle>mvn:org.apache.camel/camel-google-pubsub/${project.version}</bundle>
</feature>
<feature name='camel-grape' version='${project.version}' start-level='50'>