This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ed4e971 [pulsar-client] Refactor seek to reuse common logic. (#9670)
ed4e971 is described below
commit ed4e971b22a0c2c96bee0946cb2be447ca397e43
Author: Xiaoguang Sun <[email protected]>
AuthorDate: Tue Feb 23 21:44:13 2021 +0800
[pulsar-client] Refactor seek to reuse common logic. (#9670)
### Motivation
Refactor seek to reuse common logic so adding expression based seek later
on would be easier.
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 119 ++++++++-------------
1 file changed, 46 insertions(+), 73 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 94f73dc..555696b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1761,34 +1761,34 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
- @Override
- public CompletableFuture<Void> seekAsync(long timestamp) {
+ private Optional<CompletableFuture<Void>> seekAsyncCheckState(String
seekBy) {
if (getState() == State.Closing || getState() == State.Closed) {
- return FutureUtil
- .failedFuture(new PulsarClientException.AlreadyClosedException(
- String.format("The consumer %s was already closed when
seeking the subscription %s of the topic " +
- "%s to the timestamp %d", consumerName, subscription,
topicName.toString(), timestamp)));
+ return Optional.of(FutureUtil
+ .failedFuture(new
PulsarClientException.AlreadyClosedException(
+ String.format("The consumer %s was already closed
when seeking the subscription %s of the"
+ + " topic %s to %s", consumerName,
subscription, topicName.toString(), seekBy))));
}
if (!isConnected()) {
- return FutureUtil.failedFuture(new PulsarClientException(
- String.format("The client is not connected to the broker when
seeking the subscription %s of the " +
- "topic %s to the timestamp %d", subscription,
topicName.toString(), timestamp)));
+ return Optional.of(FutureUtil.failedFuture(new
PulsarClientException(
+ String.format("The client is not connected to the broker
when seeking the subscription %s of the "
+ + "topic %s to %s", subscription,
topicName.toString(), seekBy))));
}
- final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
+ return Optional.empty();
+ }
- long requestId = client.newRequestId();
- ByteBuf seek = Commands.newSeek(consumerId, requestId, timestamp);
+ private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf
seek, MessageId seekId, String seekBy) {
+ final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();
- log.info("[{}][{}] Seek subscription to publish time {}", topic,
subscription, timestamp);
+ log.info("[{}][{}] Seek subscription to {}", topic, subscription,
seekBy);
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
- log.info("[{}][{}] Successfully reset subscription to publish time
{}", topic, subscription, timestamp);
+ log.info("[{}][{}] Successfully reset subscription to {}", topic,
subscription, seekBy);
acknowledgmentsGroupingTracker.flushAndClean();
- seekMessageId = new BatchMessageIdImpl((MessageIdImpl)
MessageId.earliest);
+ seekMessageId = new BatchMessageIdImpl((MessageIdImpl) seekId);
duringSeek.set(true);
lastDequeuedMessageId = MessageId.earliest;
@@ -1799,72 +1799,45 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
log.error("[{}][{}] Failed to reset subscription: {}", topic,
subscription, e.getCause().getMessage());
seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
- String.format("Failed to seek the subscription %s of the
topic %s to the timestamp %d",
- subscription, topicName.toString(), timestamp)));
+ String.format("Failed to seek the subscription %s of the
topic %s to %s",
+ subscription, topicName.toString(), seekBy)));
return null;
});
return seekFuture;
}
@Override
- public CompletableFuture<Void> seekAsync(MessageId messageId) {
- if (getState() == State.Closing || getState() == State.Closed) {
- return FutureUtil
- .failedFuture(new PulsarClientException.AlreadyClosedException(
- String.format("The consumer %s was already closed when
seeking the subscription %s of the topic " +
- "%s to the message %s", consumerName,
subscription, topicName.toString(),
- messageId.toString())));
- }
-
- if (!isConnected()) {
- return FutureUtil.failedFuture(new PulsarClientException(
- String.format("The client is not connected to the broker
when seeking the subscription %s of the " +
- "topic %s to the message %s", subscription,
topicName.toString(), messageId.toString())));
- }
-
- final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
-
- long requestId = client.newRequestId();
- ByteBuf seek = null;
- if (messageId instanceof BatchMessageIdImpl) {
- BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
- // Initialize ack set
- BitSetRecyclable ackSet = BitSetRecyclable.create();
- ackSet.set(0, msgId.getBatchSize());
- ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
- long[] ackSetArr = ackSet.toLongArray();
- ackSet.recycle();
-
- seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
- } else {
- MessageIdImpl msgId = (MessageIdImpl) messageId;
- seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
- }
-
- ClientCnx cnx = cnx();
-
- log.info("[{}][{}] Seek subscription to message id {}", topic,
subscription, messageId);
-
- cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
- log.info("[{}][{}] Successfully reset subscription to message id
{}", topic, subscription, messageId);
- acknowledgmentsGroupingTracker.flushAndClean();
-
- seekMessageId = new BatchMessageIdImpl((MessageIdImpl) messageId);
- duringSeek.set(true);
- lastDequeuedMessageId = MessageId.earliest;
+ public CompletableFuture<Void> seekAsync(long timestamp) {
+ String seekBy = String.format("the timestamp %d", timestamp);
+ return seekAsyncCheckState(seekBy).orElseGet(() -> {
+ long requestId = client.newRequestId();
+ return seekAsyncInternal(requestId, Commands.newSeek(consumerId,
requestId, timestamp),
+ MessageId.earliest, seekBy);
+ });
+ }
- incomingMessages.clear();
- resetIncomingMessageSize();
- seekFuture.complete(null);
- }).exceptionally(e -> {
- log.error("[{}][{}] Failed to reset subscription: {}", topic,
subscription, e.getCause().getMessage());
- seekFuture.completeExceptionally(
- PulsarClientException.wrap(e.getCause(),
- String.format("Failed to seek the subscription %s of the
topic %s to the message %s",
- subscription, topicName.toString(),
messageId.toString())));
- return null;
+ @Override
+ public CompletableFuture<Void> seekAsync(MessageId messageId) {
+ String seekBy = String.format("the message %s", messageId.toString());
+ return seekAsyncCheckState(seekBy).orElseGet(() -> {
+ long requestId = client.newRequestId();
+ ByteBuf seek = null;
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl msgId = (BatchMessageIdImpl) messageId;
+ // Initialize ack set
+ BitSetRecyclable ackSet = BitSetRecyclable.create();
+ ackSet.set(0, msgId.getBatchSize());
+ ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
+ long[] ackSetArr = ackSet.toLongArray();
+ ackSet.recycle();
+
+ seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
+ } else {
+ MessageIdImpl msgId = (MessageIdImpl) messageId;
+ seek = Commands.newSeek(consumerId, requestId,
msgId.getLedgerId(), msgId.getEntryId(), new long[0]);
+ }
+ return seekAsyncInternal(requestId, seek, messageId, seekBy);
});
- return seekFuture;
}
public boolean hasMessageAvailable() throws PulsarClientException {