Repository: camel
Updated Branches:
  refs/heads/master 7561e4766 -> 2310011d1


CAMEL-10486: The consumer threading/message undelelivered issue fix


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc2138a9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc2138a9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc2138a9

Branch: refs/heads/master
Commit: dc2138a9d39f15ec4a7c657edb4bd608115fcbf8
Parents: 7561e47
Author: Evgeny Minkevich <evgeny.minkev...@gmail.com>
Authored: Thu Nov 17 14:50:55 2016 +1100
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Thu Dec 1 08:21:45 2016 +0100

----------------------------------------------------------------------
 .../src/main/docs/google-pubsub-component.adoc  | 23 +++++++++-
 .../pubsub/GooglePubsubConnectionFactory.java   | 38 ++++++++++------
 .../google/pubsub/GooglePubsubConstants.java    |  1 +
 .../google/pubsub/GooglePubsubConsumer.java     | 46 +++++++++++---------
 .../google/pubsub/GooglePubsubEndpoint.java     | 22 ++++++----
 .../pubsub/consumer/ExchangeAckTransaction.java | 22 +++++++++-
 .../pubsub/consumer/PubsubAcknowledgement.java  |  8 ++--
 .../google/pubsub/PubsubTestSupport.java        |  2 +-
 .../PubsubConnectionFactoryTest.java            |  2 +-
 9 files changed, 113 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
----------------------------------------------------------------------
diff --git 
a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc 
b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
index 38fefef..0eb947f 100644
--- a/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
+++ b/components/camel-google-pubsub/src/main/docs/google-pubsub-component.adoc
@@ -87,7 +87,7 @@ The Google Pubsub component supports 11 endpoint options 
which are listed below:
 | destinationName | common |  | String | *Required* Destination Name
 | ackMode | common | AUTO | AckMode | AUTO = exchange gets ack'ed/nack'ed on 
completion. NONE = downstream process has to ack/nack explicitly
 | concurrentConsumers | common | 1 | Integer | The number of parallel streams 
consuming from the subscription
-| connectionFactory | common |  | GooglePubsubConnectionFactory | 
ConnectionFactory to obtain connection to PubSub Service. If non provided the 
default one will be used
+| connectionFactory | common |  | GooglePubsubConnectionFactory | 
ConnectionFactory to obtain connection to PubSub Service. If non provided the 
default will be used.
 | loggerId | common |  | String | Logger ID to use when a match to the parent 
route required
 | maxMessagesPerPoll | common | 1 | Integer | The max number of messages to 
receive from the server in a single API call
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the 
consumer to the Camel routing Error Handler which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages or the likes will now 
be processed as a message and handled by the routing Error Handler. By default 
the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions that will be logged at WARN/ERROR level and ignored.
@@ -146,3 +146,24 @@ Message Body
 
 The consumer endpoint returns the content of the message as byte[] - exactly 
as the underlying system sends it.
 It is up for the route to convert/unmarshall the contents.
+
+[[GooglePubsub-RollbackRedelivery]]
+Rollback and Redelivery
+^^^^^^^^^^^^
+
+The rollback for Google PubSub relies on the idea of the Acknowledgement 
Deadline - the time period where Google PubSub expects to receive the 
acknowledgement.
+If the acknowledgement has not been received, the message is redelivered.
+
+Google provides an API to extend the deadline for a message.
+
+More information in 
https://cloud.google.com/pubsub/docs/subscriber#ack_deadline[Google PubSub 
Documentation]
+
+So, rollback is effectively a deadline extension API call with zero value - 
i.e. deadline is reached now and message can
+be redelivered to the next consumer.
+
+It is possible to delay the message redelivery by setting the acknowledgement 
deadline explicitly for the rollback by
+setting the message header
+
+* GooglePubsubConstants.ACK_DEADLINE
+
+to the value in seconds.

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
index 104bd63..59bdb8b 100644
--- 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
+++ 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConnectionFactory.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
 import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
 import com.google.api.client.http.HttpTransport;
+import com.google.api.client.http.apache.ApacheHttpTransport;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.client.util.Base64;
@@ -33,13 +34,15 @@ import com.google.api.client.util.Strings;
 import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.pubsub.PubsubScopes;
 
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class GooglePubsubConnectionFactory {
 
-    private static JsonFactory jsonFactory;
-    private static HttpTransport transport;
+    private static JsonFactory jsonFactory = new JacksonFactory();
 
     private final Logger logger = 
LoggerFactory.getLogger(GooglePubsubConnectionFactory.class);
 
@@ -51,23 +54,30 @@ public class GooglePubsubConnectionFactory {
     private Pubsub client;
 
     public GooglePubsubConnectionFactory() {
-        jsonFactory = new JacksonFactory();
-
-        try {
-            transport = GoogleNetHttpTransport.newTrustedTransport();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
     }
 
-    public synchronized Pubsub getClient() throws Exception {
+    public synchronized Pubsub getDefaultClient() throws Exception {
         if (this.client == null) {
             this.client = buildClient();
         }
         return this.client;
     }
 
+    public Pubsub getMultiThreadClient(int parallelThreads) throws Exception {
+
+        PoolingHttpClientConnectionManager cm=new 
PoolingHttpClientConnectionManager();
+        cm.setDefaultMaxPerRoute(parallelThreads);
+        cm.setMaxTotal(parallelThreads);
+        CloseableHttpClient httpClient = HttpClients.createMinimal(cm);
+
+        return buildClient(new ApacheHttpTransport(httpClient));
+    }
+
     private Pubsub buildClient() throws Exception {
+        return buildClient(GoogleNetHttpTransport.newTrustedTransport());
+    };
+
+    private Pubsub buildClient(HttpTransport httpTransport) throws Exception {
 
         GoogleCredential credential = null;
 
@@ -75,7 +85,7 @@ public class GooglePubsubConnectionFactory {
             if (logger.isDebugEnabled()) {
                 logger.debug("Service Account and Key have been set 
explicitly. Initialising PubSub using Service Account " + serviceAccount);
             }
-            credential = createFromAccountKeyPair();
+            credential = createFromAccountKeyPair(httpTransport);
         }
 
         if (credential == null && 
!Strings.isNullOrEmpty(credentialsFileLocation)) {
@@ -92,7 +102,7 @@ public class GooglePubsubConnectionFactory {
             credential = createDefault();
         }
 
-        Pubsub.Builder builder = new Pubsub.Builder(transport, jsonFactory, 
credential)
+        Pubsub.Builder builder = new Pubsub.Builder(httpTransport, 
jsonFactory, credential)
                 .setApplicationName("camel-google-pubsub");
 
         // Local emulator, SOCKS proxy, etc.
@@ -126,10 +136,10 @@ public class GooglePubsubConnectionFactory {
         return credential;
     }
 
-    private GoogleCredential createFromAccountKeyPair() {
+    private GoogleCredential createFromAccountKeyPair(HttpTransport 
httpTransport) {
         try {
             GoogleCredential credential = new GoogleCredential.Builder()
-                    .setTransport(transport)
+                    .setTransport(httpTransport)
                     .setJsonFactory(jsonFactory)
                     .setServiceAccountId(serviceAccount)
                     .setServiceAccountScopes(PubsubScopes.all())

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
index 669a521..f3e8785 100644
--- 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
+++ 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConstants.java
@@ -22,6 +22,7 @@ public final class GooglePubsubConstants {
     public static final String ACK_ID = "CamelGooglePubsub.MsgAckId";
     public static final String PUBLISH_TIME = "CamelGooglePubsub.PublishTime";
     public static final String ATTRIBUTES = "CamelGooglePubsub.Attributes";
+    public static final String ACK_DEADLINE = "CamelGooglePubsub.AckDeadline";
 
     public enum AckMode {
         AUTO, NONE

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 24390ae..dffc63c 100644
--- 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import com.google.api.client.repackaged.com.google.common.base.Strings;
+import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.pubsub.model.PubsubMessage;
 import com.google.api.services.pubsub.model.PullRequest;
 import com.google.api.services.pubsub.model.PullResponse;
@@ -40,17 +41,21 @@ class GooglePubsubConsumer extends DefaultConsumer {
 
     private Logger localLog;
 
-    private ExecutorService executor;
     private final GooglePubsubEndpoint endpoint;
     private final Processor processor;
     private final Synchronization ackStrategy;
 
-    GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) {
+    private ExecutorService executor;
+    private Pubsub pubsub;
+
+    GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) 
throws Exception {
         super(endpoint, processor);
         this.endpoint = endpoint;
         this.processor = processor;
         this.ackStrategy = new ExchangeAckTransaction(this.endpoint);
 
+        pubsub = 
endpoint.getConnectionFactory().getMultiThreadClient(this.endpoint.getConcurrentConsumers());
+
         String loggerId = endpoint.getLoggerId();
 
         if (Strings.isNullOrEmpty(loggerId)) {
@@ -63,7 +68,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        localLog.info("Starting Google PubSub consumer");
+        localLog.info("Starting Google PubSub consumer for {}/{}", 
endpoint.getProjectId(), endpoint.getDestinationName());
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
 
@@ -75,7 +80,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        localLog.info("Stopping Google PubSub consumer");
+        localLog.info("Stopping Google PubSub consumer for {}/{}", 
endpoint.getProjectId(), endpoint.getDestinationName());
 
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
@@ -101,24 +106,23 @@ class GooglePubsubConsumer extends DefaultConsumer {
 
         @Override
         public void run() {
-            try {
-                if (localLog.isDebugEnabled()) {
-                    localLog.debug("Subscribing {} to {}", threadId, 
subscriptionFullName);
-                }
+            if (localLog.isDebugEnabled()) {
+                localLog.debug("Subscribing {} to {}", threadId, 
subscriptionFullName);
+            }
 
-                while (isRunAllowed() && !isSuspendingOrSuspended()) {
+            while (isRunAllowed() && !isSuspendingOrSuspended()) {
+                try {
                     PullRequest pullRequest = new 
PullRequest().setMaxMessages(endpoint.getMaxMessagesPerPoll());
                     PullResponse pullResponse;
                     try {
                         if (localLog.isTraceEnabled()) {
                             localLog.trace("Polling : {}", threadId);
                         }
-                        pullResponse = GooglePubsubConsumer.this.endpoint
-                                               .getPubsub()
-                                               .projects()
-                                               .subscriptions()
-                                               .pull(subscriptionFullName, 
pullRequest)
-                                               .execute();
+                        pullResponse = GooglePubsubConsumer.this.pubsub
+                                .projects()
+                                .subscriptions()
+                                .pull(subscriptionFullName, pullRequest)
+                                .execute();
                     } catch (SocketTimeoutException ste) {
                         if (localLog.isTraceEnabled()) {
                             localLog.trace("Socket timeout : {}", threadId);
@@ -126,6 +130,10 @@ class GooglePubsubConsumer extends DefaultConsumer {
                         continue;
                     }
 
+                    if (null == pullResponse.getReceivedMessages()) {
+                        continue;
+                    }
+
                     List<ReceivedMessage> receivedMessages = 
pullResponse.getReceivedMessages();
 
                     for (ReceivedMessage receivedMessage : receivedMessages) {
@@ -158,11 +166,9 @@ class GooglePubsubConsumer extends DefaultConsumer {
                             exchange.setException(e);
                         }
                     }
+                } catch (Exception e) {
+                    localLog.error("Failure getting messages from PubSub : ", 
e);
                 }
-            } catch (Exception e) {
-                localLog.error("Requesting messages from PubSub Failed:", e);
-                RuntimeCamelException rce = wrapRuntimeCamelException(e);
-                throw rce;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
index 8163b61..03c7b93 100644
--- 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
+++ 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubEndpoint.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -33,7 +33,7 @@ import org.apache.camel.spi.UriPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
- /**
+/**
  * Messaging client for Google Cloud Platform PubSub Service:
  * https://cloud.google.com/pubsub/
  *
@@ -91,15 +91,14 @@ public class GooglePubsubEndpoint extends DefaultEndpoint {
             log = LoggerFactory.getLogger(loggerId);
         }
 
-        GooglePubsubConnectionFactory cf = (null == connectionFactory)
-                ? getComponent().getConnectionFactory()
-                : connectionFactory;
-
-        pubsub = cf.getClient();
+        // Default pubsub connection.
+        // With the publisher endpoints - the main publisher
+        // with the consumer endpoints  - the ack client
+        pubsub = getConnectionFactory().getDefaultClient();
 
+        log.trace("Credential file location : {}", 
getConnectionFactory().getCredentialsFileLocation());
         log.trace("Project ID: {}", this.projectId);
         log.trace("Destination Name: {}", this.destinationName);
-        log.trace("From file : {}", cf.getCredentialsFileLocation());
     }
 
     public Producer createProducer() throws Exception {
@@ -177,8 +176,13 @@ public class GooglePubsubEndpoint extends DefaultEndpoint {
         return pubsub;
     }
 
+    /**
+     * ConnectionFactory to obtain connection to PubSub Service. If non 
provided the default will be used.
+     */
     public GooglePubsubConnectionFactory getConnectionFactory() {
-        return connectionFactory;
+        return (null == connectionFactory)
+                ? getComponent().getConnectionFactory()
+                : connectionFactory;
     }
 
     public void setConnectionFactory(GooglePubsubConnectionFactory 
connectionFactory) {

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
----------------------------------------------------------------------
diff --git 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
index b1e7c41..b807496 100644
--- 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
+++ 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/ExchangeAckTransaction.java
@@ -37,7 +37,27 @@ public class ExchangeAckTransaction extends 
PubsubAcknowledgement implements Syn
 
     @Override
     public void onFailure(Exchange exchange) {
-        resetAckDeadline(getAckIdList(exchange));
+
+        Integer deadline = 0;
+        Object configuredDeadline = 
exchange.getIn().getHeader(GooglePubsubConstants.ACK_DEADLINE);
+
+        if ( configuredDeadline != null && 
Integer.class.isInstance(configuredDeadline) ) {
+            deadline = (Integer) configuredDeadline;
+        }
+
+        if ( configuredDeadline != null && 
String.class.isInstance(configuredDeadline) ) {
+            try {
+                deadline = Integer.valueOf((String) configuredDeadline);
+            } catch (Exception e) {
+                logger.warn("Unable to parse ACK Deadline header value", e);
+            }
+        }
+
+        if (deadline != 0) {
+            logger.trace(" Exchange {} : Ack deadline : {}", 
exchange.getExchangeId(), deadline);
+        }
+
+        resetAckDeadline(getAckIdList(exchange), deadline);
     }
 
     private List<String> getAckIdList(Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
----------------------------------------------------------------------
diff --git 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
index 742c469..267abc0 100644
--- 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
+++ 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/PubsubAcknowledgement.java
@@ -28,11 +28,11 @@ import org.slf4j.LoggerFactory;
 
 public abstract class PubsubAcknowledgement {
 
-    private Logger logger;
     private final String subscriptionFullName;
-
     private final GooglePubsubEndpoint endpoint;
 
+    protected Logger logger;
+
     public PubsubAcknowledgement(GooglePubsubEndpoint endpoint) {
         super();
         this.endpoint = endpoint;
@@ -61,11 +61,11 @@ public abstract class PubsubAcknowledgement {
         }
     }
 
-    void resetAckDeadline(List<String> ackIdList) {
+    void resetAckDeadline(List<String> ackIdList, Integer seconds) {
 
         ModifyAckDeadlineRequest nackRequest = new ModifyAckDeadlineRequest()
                 .setAckIds(ackIdList)
-                .setAckDeadlineSeconds(0);
+                .setAckDeadlineSeconds(seconds);
 
         try {
             endpoint.getPubsub()

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
----------------------------------------------------------------------
diff --git 
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
 
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
index 47ed6cc..7aa2800 100644
--- 
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
+++ 
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/PubsubTestSupport.java
@@ -94,7 +94,7 @@ public class PubsubTestSupport extends CamelTestSupport {
             .setServiceAccount(SERVICE_ACCOUNT)
             .setServiceAccountKey(SERVICE_KEY)
             .setServiceURL(SERVICE_URL)
-            .getClient();
+            .getDefaultClient();
 
         String topicFullName = String.format("projects/%s/topics/%s",
                                          PubsubTestSupport.PROJECT_ID,

http://git-wip-us.apache.org/repos/asf/camel/blob/dc2138a9/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
 
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
index 60dbacd..99128c2 100644
--- 
a/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
+++ 
b/components/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/integration/PubsubConnectionFactoryTest.java
@@ -50,7 +50,7 @@ public class PubsubConnectionFactoryTest extends 
PubsubTestSupport {
                 .setCredentialsFileLocation(file.getAbsolutePath())
                 .setServiceURL(SERVICE_URL);
 
-        Pubsub pubsub = cf.getClient();
+        Pubsub pubsub = cf.getDefaultClient();
 
         String query = String.format("projects/%s", PROJECT_ID);
         // [ DEPENDS on actual project being available]

Reply via email to