This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bf96619 Cancel retry if the error is autheticate failed (#8058)
bf96619 is described below
commit bf96619ae96655a27baff15551305fa7cbcb2835
Author: Yong Zhang <[email protected]>
AuthorDate: Wed Sep 16 21:39:56 2020 +0800
Cancel retry if the error is autheticate failed (#8058)
---
Fixes #7929
*Motivation*
We shouldn't retry to connect to the server if the client has authenticate
error.
---
.../main/java/org/apache/pulsar/client/impl/ClientCnx.java | 4 ++++
.../org/apache/pulsar/client/impl/PulsarClientImpl.java | 4 +++-
.../auth/token/PulsarTokenAuthenticationBaseSuite.java | 14 ++++++++++++++
3 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index b32dc99..6f8c511 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -607,6 +607,10 @@ public class ClientCnx extends PulsarHandler {
log.warn("{} Producer creation has been blocked because backlog
quota exceeded for producer topic",
ctx.channel());
}
+ if (error.getError() == ServerError.AuthenticationError) {
+ connectionFuture.completeExceptionally(new
PulsarClientException.AuthenticationException(error.getMessage()));
+ log.error("{} Failed to authenticate the client", ctx.channel());
+ }
CompletableFuture<ProducerResponse> requestFuture =
pendingRequests.remove(requestId);
if (requestFuture != null) {
requestFuture.completeExceptionally(getPulsarClientException(error.getError(),
error.getMessage()));
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 6c4dced..c33eb9b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -704,7 +704,9 @@ public class PulsarClientImpl implements PulsarClient {
lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e
-> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
// skip retry scheduler when set lookup throttle in client or
server side which will lead to `TooManyRequestsException`
- boolean isLookupThrottling =
!PulsarClientException.isRetriableError(e.getCause()) || e.getCause()
instanceof PulsarClientException.TooManyRequestsException;
+ boolean isLookupThrottling =
!PulsarClientException.isRetriableError(e.getCause())
+ || e.getCause() instanceof
PulsarClientException.TooManyRequestsException
+ || e.getCause() instanceof
PulsarClientException.AuthenticationException;
if (nextDelay <= 0 || isLookupThrottling) {
future.completeExceptionally(e);
return null;
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
index ad0fa83..7a923ee 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java
@@ -329,4 +329,18 @@ public abstract class PulsarTokenAuthenticationBaseSuite
extends PulsarClusterTe
}
}
}
+
+ @Test
+ public void testAuthenticationFailedImmediately() throws
PulsarClientException {
+ try {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .authentication(AuthenticationFactory.token("invalid_token"))
+ .build();
+ client.newProducer().topic("test_token_topic" + randomName(4));
+ } catch (PulsarClientException.AuthenticationException pae) {
+ // expected error
+ }
+ }
}