This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ed4e971  [pulsar-client] Refactor seek to reuse common logic. (#9670)
ed4e971 is described below

commit ed4e971b22a0c2c96bee0946cb2be447ca397e43
Author: Xiaoguang Sun <[email protected]>
AuthorDate: Tue Feb 23 21:44:13 2021 +0800

    [pulsar-client] Refactor seek to reuse common logic. (#9670)
    
    ### Motivation
    Refactor seek to reuse common logic so adding expression based seek later 
on would be easier.
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 119 ++++++++-------------
 1 file changed, 46 insertions(+), 73 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 94f73dc..555696b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1761,34 +1761,34 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
     }
 
-    @Override
-    public CompletableFuture<Void> seekAsync(long timestamp) {
+    private Optional<CompletableFuture<Void>> seekAsyncCheckState(String 
seekBy) {
         if (getState() == State.Closing || getState() == State.Closed) {
-            return FutureUtil
-                .failedFuture(new PulsarClientException.AlreadyClosedException(
-                    String.format("The consumer %s was already closed when 
seeking the subscription %s of the topic " +
-                        "%s to the timestamp %d", consumerName, subscription, 
topicName.toString(), timestamp)));
+            return Optional.of(FutureUtil
+                    .failedFuture(new 
PulsarClientException.AlreadyClosedException(
+                            String.format("The consumer %s was already closed 
when seeking the subscription %s of the"
+                                    + " topic %s to %s", consumerName, 
subscription, topicName.toString(), seekBy))));
         }
 
         if (!isConnected()) {
-            return FutureUtil.failedFuture(new PulsarClientException(
-                String.format("The client is not connected to the broker when 
seeking the subscription %s of the " +
-                    "topic %s to the timestamp %d", subscription, 
topicName.toString(), timestamp)));
+            return Optional.of(FutureUtil.failedFuture(new 
PulsarClientException(
+                    String.format("The client is not connected to the broker 
when seeking the subscription %s of the "
+                            + "topic %s to %s", subscription, 
topicName.toString(), seekBy))));
         }
 
-        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+        return Optional.empty();
+    }
 
-        long requestId = client.newRequestId();
-        ByteBuf seek = Commands.newSeek(consumerId, requestId, timestamp);
+    private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf 
seek, MessageId seekId, String seekBy) {
+        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
         ClientCnx cnx = cnx();
 
-        log.info("[{}][{}] Seek subscription to publish time {}", topic, 
subscription, timestamp);
+        log.info("[{}][{}] Seek subscription to {}", topic, subscription, 
seekBy);
 
         cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
-            log.info("[{}][{}] Successfully reset subscription to publish time 
{}", topic, subscription, timestamp);
+            log.info("[{}][{}] Successfully reset subscription to {}", topic, 
subscription, seekBy);
             acknowledgmentsGroupingTracker.flushAndClean();
 
-            seekMessageId = new BatchMessageIdImpl((MessageIdImpl) 
MessageId.earliest);
+            seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
             duringSeek.set(true);
             lastDequeuedMessageId = MessageId.earliest;
 
@@ -1799,72 +1799,45 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             log.error("[{}][{}] Failed to reset subscription: {}", topic, 
subscription, e.getCause().getMessage());
             seekFuture.completeExceptionally(
                 PulsarClientException.wrap(e.getCause(),
-                    String.format("Failed to seek the subscription %s of the 
topic %s to the timestamp %d",
-                        subscription, topicName.toString(), timestamp)));
+                    String.format("Failed to seek the subscription %s of the 
topic %s to %s",
+                        subscription, topicName.toString(), seekBy)));
             return null;
         });
         return seekFuture;
     }
 
     @Override
-    public CompletableFuture<Void> seekAsync(MessageId messageId) {
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return FutureUtil
-                .failedFuture(new PulsarClientException.AlreadyClosedException(
-                    String.format("The consumer %s was already closed when 
seeking the subscription %s of the topic " +
-                            "%s to the message %s", consumerName, 
subscription, topicName.toString(),
-                        messageId.toString())));
-        }
-
-        if (!isConnected()) {
-            return FutureUtil.failedFuture(new PulsarClientException(
-                    String.format("The client is not connected to the broker 
when seeking the subscription %s of the " +
-                            "topic %s to the message %s", subscription, 
topicName.toString(), messageId.toString())));
-        }
-
-        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
-
-        long requestId = client.newRequestId();
-        ByteBuf seek = null;
-        if (messageId instanceof BatchMessageIdImpl) {
-            BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
-            // Initialize ack set
-            BitSetRecyclable ackSet = BitSetRecyclable.create();
-            ackSet.set(0, msgId.getBatchSize());
-            ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
-            long[] ackSetArr = ackSet.toLongArray();
-            ackSet.recycle();
-
-            seek = Commands.newSeek(consumerId, requestId, 
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
-        } else {
-            MessageIdImpl msgId = (MessageIdImpl) messageId;
-            seek = Commands.newSeek(consumerId, requestId, 
msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
-        }
-
-        ClientCnx cnx = cnx();
-
-        log.info("[{}][{}] Seek subscription to message id {}", topic, 
subscription, messageId);
-
-        cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
-            log.info("[{}][{}] Successfully reset subscription to message id 
{}", topic, subscription, messageId);
-            acknowledgmentsGroupingTracker.flushAndClean();
-
-            seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId);
-            duringSeek.set(true);
-            lastDequeuedMessageId = MessageId.earliest;
+    public CompletableFuture<Void> seekAsync(long timestamp) {
+        String seekBy = String.format("the timestamp %d", timestamp);
+        return seekAsyncCheckState(seekBy).orElseGet(() -> {
+            long requestId = client.newRequestId();
+            return seekAsyncInternal(requestId, Commands.newSeek(consumerId, 
requestId, timestamp),
+                MessageId.earliest, seekBy);
+        });
+    }
 
-            incomingMessages.clear();
-            resetIncomingMessageSize();
-            seekFuture.complete(null);
-        }).exceptionally(e -> {
-            log.error("[{}][{}] Failed to reset subscription: {}", topic, 
subscription, e.getCause().getMessage());
-            seekFuture.completeExceptionally(
-                PulsarClientException.wrap(e.getCause(),
-                    String.format("Failed to seek the subscription %s of the 
topic %s to the message %s",
-                        subscription, topicName.toString(), 
messageId.toString())));
-            return null;
+    @Override
+    public CompletableFuture<Void> seekAsync(MessageId messageId) {
+        String seekBy = String.format("the message %s", messageId.toString());
+        return seekAsyncCheckState(seekBy).orElseGet(() -> {
+            long requestId = client.newRequestId();
+            ByteBuf seek = null;
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
+                // Initialize ack set
+                BitSetRecyclable ackSet = BitSetRecyclable.create();
+                ackSet.set(0, msgId.getBatchSize());
+                ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+                long[] ackSetArr = ackSet.toLongArray();
+                ackSet.recycle();
+
+                seek = Commands.newSeek(consumerId, requestId, 
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+            } else {
+                MessageIdImpl msgId = (MessageIdImpl) messageId;
+                seek = Commands.newSeek(consumerId, requestId, 
msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
+            }
+            return seekAsyncInternal(requestId, seek, messageId, seekBy);
         });
-        return seekFuture;
     }
 
     public boolean hasMessageAvailable() throws PulsarClientException {

Reply via email to