codelipenghui commented on code in PR #20963:
URL: https://github.com/apache/pulsar/pull/20963#discussion_r1294124706


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2148,100 +2148,108 @@ public CompletableFuture<Void> 
seekAsync(Function<String, Object> function) {
                 new PulsarClientException("Only support seek by messageId or 
timestamp"));
     }
 
-    private Optional<CompletableFuture<Void>> seekAsyncCheckState(String 
seekBy) {
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return Optional.of(FutureUtil
-                    .failedFuture(new 
PulsarClientException.AlreadyClosedException(
-                            String.format("The consumer %s was already closed 
when seeking the subscription %s of the"
-                                    + " topic %s to %s", consumerName, 
subscription, topicName.toString(), seekBy))));
-        }
-
-        if (!isConnected()) {
-            return Optional.of(FutureUtil.failedFuture(new 
PulsarClientException(
-                    String.format("The client is not connected to the broker 
when seeking the subscription %s of the "
-                            + "topic %s to %s", subscription, 
topicName.toString(), seekBy))));
-        }
+    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf 
seek, MessageId seekId, String seekBy) {
+        AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(100, TimeUnit.MILLISECONDS)
+                .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                .create();
 
-        return Optional.empty();
+        CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+        seekAsyncInternal(requestId, seek, seekId, seekBy, backoff, 
opTimeoutMs, seekFuture);
+        return seekFuture;
     }
 
-    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf 
seek, MessageId seekId, String seekBy) {
-        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+    private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId 
seekId, String seekBy,
+                                   final Backoff backoff, final AtomicLong 
remainingTime,
+                                   CompletableFuture<Void> seekFuture) {
         ClientCnx cnx = cnx();
+        if (isConnected() && cnx != null) {
+            if (!duringSeek.compareAndSet(false, true)) {
+                final String message = String.format(
+                        "[%s][%s] attempting to seek operation that is already 
in progress (seek by %s)",
+                        topic, subscription, seekBy);
+                log.warn("[{}][{}] Attempting to seek operation that is 
already in progress, cancelling {}",
+                        topic, subscription, seekBy);
+                seekFuture.completeExceptionally(new 
IllegalStateException(message));
+                return;
+            }
+            MessageIdAdv originSeekMessageId = seekMessageId;
+            seekMessageId = (MessageIdAdv) seekId;
+            log.info("[{}][{}] Seeking subscription to {}", topic, 
subscription, seekBy);
 
-        if (!duringSeek.compareAndSet(false, true)) {
-            final String message = String.format(
-                    "[%s][%s] attempting to seek operation that is already in 
progress (seek by %s)",
-                    topic, subscription, seekBy);
-            log.warn("[{}][{}] Attempting to seek operation that is already in 
progress, cancelling {}",
-                    topic, subscription, seekBy);
-            seekFuture.completeExceptionally(new 
IllegalStateException(message));
-            return seekFuture;
-        }
-
-        MessageIdAdv originSeekMessageId = seekMessageId;
-        seekMessageId = (MessageIdAdv) seekId;
-        log.info("[{}][{}] Seeking subscription to {}", topic, subscription, 
seekBy);
-
-        cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
-            log.info("[{}][{}] Successfully reset subscription to {}", topic, 
subscription, seekBy);
-            acknowledgmentsGroupingTracker.flushAndClean();
+            cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
+                log.info("[{}][{}] Successfully reset subscription to {}", 
topic, subscription, seekBy);
+                acknowledgmentsGroupingTracker.flushAndClean();
 
-            lastDequeuedMessageId = MessageId.earliest;
+                lastDequeuedMessageId = MessageId.earliest;
 
-            clearIncomingMessages();
-            seekFuture.complete(null);
-        }).exceptionally(e -> {
-            // re-set duringSeek and seekMessageId if seek failed
-            seekMessageId = originSeekMessageId;
-            duringSeek.set(false);
-            log.error("[{}][{}] Failed to reset subscription: {}", topic, 
subscription, e.getCause().getMessage());
+                clearIncomingMessages();
+                seekFuture.complete(null);
+            }).exceptionally(e -> {

Review Comment:
   If the exception is retriable, I think we should also perform a backoff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to