codelipenghui commented on code in PR #20963:
URL: https://github.com/apache/pulsar/pull/20963#discussion_r1294124706
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2148,100 +2148,108 @@ public CompletableFuture<Void>
seekAsync(Function<String, Object> function) {
new PulsarClientException("Only support seek by messageId or
timestamp"));
}
- private Optional<CompletableFuture<Void>> seekAsyncCheckState(String
seekBy) {
- if (getState() == State.Closing || getState() == State.Closed) {
- return Optional.of(FutureUtil
- .failedFuture(new
PulsarClientException.AlreadyClosedException(
- String.format("The consumer %s was already closed
when seeking the subscription %s of the"
- + " topic %s to %s", consumerName,
subscription, topicName.toString(), seekBy))));
- }
-
- if (!isConnected()) {
- return Optional.of(FutureUtil.failedFuture(new
PulsarClientException(
- String.format("The client is not connected to the broker
when seeking the subscription %s of the "
- + "topic %s to %s", subscription,
topicName.toString(), seekBy))));
- }
+ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf
seek, MessageId seekId, String seekBy) {
+ AtomicLong opTimeoutMs = new
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+ .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
+ .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+ .create();
- return Optional.empty();
+ CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+ seekAsyncInternal(requestId, seek, seekId, seekBy, backoff,
opTimeoutMs, seekFuture);
+ return seekFuture;
}
- private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf
seek, MessageId seekId, String seekBy) {
- final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+ private void seekAsyncInternal(long requestId, ByteBuf seek, MessageId
seekId, String seekBy,
+ final Backoff backoff, final AtomicLong
remainingTime,
+ CompletableFuture<Void> seekFuture) {
ClientCnx cnx = cnx();
+ if (isConnected() && cnx != null) {
+ if (!duringSeek.compareAndSet(false, true)) {
+ final String message = String.format(
+ "[%s][%s] attempting to seek operation that is already
in progress (seek by %s)",
+ topic, subscription, seekBy);
+ log.warn("[{}][{}] Attempting to seek operation that is
already in progress, cancelling {}",
+ topic, subscription, seekBy);
+ seekFuture.completeExceptionally(new
IllegalStateException(message));
+ return;
+ }
+ MessageIdAdv originSeekMessageId = seekMessageId;
+ seekMessageId = (MessageIdAdv) seekId;
+ log.info("[{}][{}] Seeking subscription to {}", topic,
subscription, seekBy);
- if (!duringSeek.compareAndSet(false, true)) {
- final String message = String.format(
- "[%s][%s] attempting to seek operation that is already in
progress (seek by %s)",
- topic, subscription, seekBy);
- log.warn("[{}][{}] Attempting to seek operation that is already in
progress, cancelling {}",
- topic, subscription, seekBy);
- seekFuture.completeExceptionally(new
IllegalStateException(message));
- return seekFuture;
- }
-
- MessageIdAdv originSeekMessageId = seekMessageId;
- seekMessageId = (MessageIdAdv) seekId;
- log.info("[{}][{}] Seeking subscription to {}", topic, subscription,
seekBy);
-
- cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
- log.info("[{}][{}] Successfully reset subscription to {}", topic,
subscription, seekBy);
- acknowledgmentsGroupingTracker.flushAndClean();
+ cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
+ log.info("[{}][{}] Successfully reset subscription to {}",
topic, subscription, seekBy);
+ acknowledgmentsGroupingTracker.flushAndClean();
- lastDequeuedMessageId = MessageId.earliest;
+ lastDequeuedMessageId = MessageId.earliest;
- clearIncomingMessages();
- seekFuture.complete(null);
- }).exceptionally(e -> {
- // re-set duringSeek and seekMessageId if seek failed
- seekMessageId = originSeekMessageId;
- duringSeek.set(false);
- log.error("[{}][{}] Failed to reset subscription: {}", topic,
subscription, e.getCause().getMessage());
+ clearIncomingMessages();
+ seekFuture.complete(null);
+ }).exceptionally(e -> {
Review Comment:
If the exception is retriable, I think we should also perform a backoff.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]