This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bf96619  Cancel retry if the error is autheticate failed (#8058)
bf96619 is described below

commit bf96619ae96655a27baff15551305fa7cbcb2835
Author: Yong Zhang <[email protected]>
AuthorDate: Wed Sep 16 21:39:56 2020 +0800

    Cancel retry if the error is autheticate failed (#8058)
    
    ---
    
    Fixes #7929
    
    *Motivation*
    
    We shouldn't retry to connect to the server if the client has authenticate 
error.
---
 .../main/java/org/apache/pulsar/client/impl/ClientCnx.java |  4 ++++
 .../org/apache/pulsar/client/impl/PulsarClientImpl.java    |  4 +++-
 .../auth/token/PulsarTokenAuthenticationBaseSuite.java     | 14 ++++++++++++++
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index b32dc99..6f8c511 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -607,6 +607,10 @@ public class ClientCnx extends PulsarHandler {
             log.warn("{} Producer creation has been blocked because backlog 
quota exceeded for producer topic",
                     ctx.channel());
         }
+        if (error.getError() == ServerError.AuthenticationError) {
+            connectionFuture.completeExceptionally(new 
PulsarClientException.AuthenticationException(error.getMessage()));
+            log.error("{} Failed to authenticate the client", ctx.channel());
+        }
         CompletableFuture<ProducerResponse> requestFuture = 
pendingRequests.remove(requestId);
         if (requestFuture != null) {
             
requestFuture.completeExceptionally(getPulsarClientException(error.getError(), 
error.getMessage()));
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6c4dced..c33eb9b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -704,7 +704,9 @@ public class PulsarClientImpl implements PulsarClient {
         
lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e
 -> {
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
             // skip retry scheduler when set lookup throttle in client or 
server side which will lead to `TooManyRequestsException`
-            boolean isLookupThrottling = 
!PulsarClientException.isRetriableError(e.getCause()) || e.getCause() 
instanceof PulsarClientException.TooManyRequestsException;
+            boolean isLookupThrottling = 
!PulsarClientException.isRetriableError(e.getCause())
+                || e.getCause() instanceof 
PulsarClientException.TooManyRequestsException
+                || e.getCause() instanceof 
PulsarClientException.AuthenticationException;
             if (nextDelay <= 0 || isLookupThrottling) {
                 future.completeExceptionally(e);
                 return null;
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
index ad0fa83..7a923ee 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -329,4 +329,18 @@ public abstract class PulsarTokenAuthenticationBaseSuite 
extends PulsarClusterTe
             }
         }
     }
+
+    @Test
+    public void testAuthenticationFailedImmediately() throws 
PulsarClientException {
+        try {
+            @Cleanup
+            PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .authentication(AuthenticationFactory.token("invalid_token"))
+                .build();
+            client.newProducer().topic("test_token_topic" + randomName(4));
+        } catch (PulsarClientException.AuthenticationException pae) {
+            // expected error
+        }
+    }
 }

Reply via email to