This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 665f1bc Fixed hasMessageAvailable() with empty topic (branch 2.7)
(#9798)
665f1bc is described below
commit 665f1bcc2f233ded008e33f6af7e9ea30986b71c
Author: Yong Zhang <[email protected]>
AuthorDate: Thu Mar 4 21:44:30 2021 +0800
Fixed hasMessageAvailable() with empty topic (branch 2.7) (#9798)
---
Master Issue: #<xyz>
*Motivation*
PR #9652 fix the issue at master but it's base on the master
code. In the master code, we change the generate proto way and
it's difficult to cherry-pick that PR into the branch 2.7. So
I open a new PR to fix that issue at branch2.7.
---
.../apache/pulsar/broker/service/ServerCnx.java | 24 +++++-
.../apache/pulsar/client/api/TopicReaderTest.java | 68 +++++++++++++--
.../org/apache/pulsar/client/impl/ClientCnx.java | 6 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 93 +++++++++++++--------
.../apache/pulsar/common/api/proto/PulsarApi.java | 96 ++++++++++++++++++++++
.../apache/pulsar/common/protocol/Commands.java | 6 +-
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
7 files changed, 250 insertions(+), 44 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bb169b8..d8f09b9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -72,6 +72,7 @@ import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataExc
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import
org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -1511,10 +1512,15 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
Topic topic = consumer.getSubscription().getTopic();
Position position = topic.getLastPosition();
int partitionIndex = TopicName.getPartitionIndex(topic.getName());
+ Position markDeletePosition = null;
+ if (consumer.getSubscription() instanceof PersistentSubscription) {
+ markDeletePosition = ((PersistentSubscription)
consumer.getSubscription()).getCursor().getMarkDeletedPosition();
+ }
getLargestBatchIndexWhenPossible(
topic,
(PositionImpl) position,
+ (PositionImpl) markDeletePosition,
partitionIndex,
requestId,
consumer.getSubscription().getName());
@@ -1527,6 +1533,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private void getLargestBatchIndexWhenPossible(
Topic topic,
PositionImpl position,
+ PositionImpl markDeletePosition,
int partitionIndex,
long requestId,
String subscriptionName) {
@@ -1541,7 +1548,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.setEntryId(position.getEntryId())
.setPartition(partitionIndex).build();
- ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
messageId));
+ MessageIdData markDeleteMessageId = null;
+ if (null != markDeletePosition) {
+ markDeleteMessageId = MessageIdData.newBuilder()
+ .setLedgerId(markDeletePosition.getLedgerId())
+ .setEntryId(markDeletePosition.getEntryId()).build();
+ }
+ ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
messageId, markDeleteMessageId));
return;
}
@@ -1583,8 +1596,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.setEntryId(position.getEntryId())
.setPartition(partitionIndex)
.setBatchIndex(largestBatchIndex).build();
-
-
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
+ MessageIdData markDeleteMessageId = null;
+ if (null != markDeletePosition) {
+ markDeleteMessageId = MessageIdData.newBuilder()
+ .setLedgerId(markDeletePosition.getLedgerId())
+ .setEntryId(markDeletePosition.getEntryId()).build();
+ }
+
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId,
markDeleteMessageId));
}
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index b24a7f8..68baf34 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -1080,14 +1081,14 @@ public class TopicReaderTest extends
ProducerConsumerBase {
.topic(topicName).create();
//For batch-messages with single message, the type of client messageId
should be the same as that of broker
- MessageId messageId = producer.send("msg".getBytes());
+ MessageIdImpl messageId = (MessageIdImpl)
producer.send("msg".getBytes());
assertTrue(messageId instanceof MessageIdImpl);
ReaderImpl<byte[]> reader =
(ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).startMessageIdInclusive().create();
- MessageId lastMsgId = reader.getConsumer().getLastMessageId();
- assertTrue(lastMsgId instanceof BatchMessageIdImpl);
+ MessageIdImpl lastMsgId = (MessageIdImpl)
reader.getConsumer().getLastMessageId();
assertTrue(messageId instanceof BatchMessageIdImpl);
- assertEquals(lastMsgId, messageId);
+ assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
+ assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
reader.close();
CountDownLatch latch = new CountDownLatch(numOfMessage);
@@ -1125,7 +1126,7 @@ public class TopicReaderTest extends ProducerConsumerBase
{
//For non-batch message, the type of client messageId should be the
same as that of broker
producer = pulsarClient.newProducer()
.enableBatching(false).topic(topicName).create();
- messageId = producer.send("non-batch".getBytes());
+ messageId = (MessageIdImpl) producer.send("non-batch".getBytes());
assertFalse(messageId instanceof BatchMessageIdImpl);
assertTrue(messageId instanceof MessageIdImpl);
reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName)
@@ -1555,4 +1556,61 @@ public class TopicReaderTest extends
ProducerConsumerBase {
producer.close();
}
+
+ @Test
+ public void testHasMessageAvailableOnEmptyTopic() throws Exception {
+ String topic = "my-property/my-ns/topic-" + UUID.randomUUID();
+
+ @Cleanup
+ Reader<String> r1 = pulsarClient.newReader(Schema.STRING)
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .create();
+
+ @Cleanup
+ Reader<String> r2 = pulsarClient.newReader(Schema.STRING)
+ .topic(topic)
+ .startMessageId(MessageId.latest)
+ .create();
+
+ @Cleanup
+ Reader<String> r2Inclusive = pulsarClient.newReader(Schema.STRING)
+ .topic(topic)
+ .startMessageId(MessageId.latest)
+ .startMessageIdInclusive()
+ .create();
+
+ // no data write, should return false
+ assertFalse(r1.hasMessageAvailable());
+ assertFalse(r2.hasMessageAvailable());
+ assertFalse(r2Inclusive.hasMessageAvailable());
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ producer.send("hello-1");
+
+ Thread.sleep(1000);
+ assertTrue(r1.hasMessageAvailable());
+ assertTrue(r2.hasMessageAvailable());
+ assertTrue(r2Inclusive.hasMessageAvailable());
+
+ @Cleanup
+ Reader<String> r3 = pulsarClient.newReader(Schema.STRING)
+ .topic(topic)
+ .startMessageId(MessageId.latest)
+ .create();
+
+
+ assertFalse(r3.hasMessageAvailable());
+
+ producer.send("hello-2");
+
+ assertTrue(r1.hasMessageAvailable());
+ assertTrue(r2.hasMessageAvailable());
+ assertTrue(r2Inclusive.hasMessageAvailable());
+ assertTrue(r3.hasMessageAvailable());
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index c551588..09faaa5e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -455,9 +455,9 @@ public class ClientCnx extends PulsarHandler {
log.debug("{} Received success GetLastMessageId response from
server: {}", ctx.channel(), success.getRequestId());
}
long requestId = success.getRequestId();
- CompletableFuture<MessageIdData> requestFuture =
(CompletableFuture<MessageIdData>) pendingRequests.remove(requestId);
+ CompletableFuture<CommandGetLastMessageIdResponse> requestFuture =
(CompletableFuture<CommandGetLastMessageIdResponse>)
pendingRequests.remove(requestId);
if (requestFuture != null) {
- requestFuture.complete(success.getLastMessageId());
+ requestFuture.complete(success);
} else {
log.warn("{} Received unknown request id from server: {}",
ctx.channel(), success.getRequestId());
}
@@ -802,7 +802,7 @@ public class ClientCnx extends PulsarHandler {
return future;
}
- public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf
request, long requestId) {
+ public CompletableFuture<CommandGetLastMessageIdResponse>
sendGetLastMessageId(ByteBuf request, long requestId) {
return sendRequestAndHandleTimeout(request, requestId,
RequestType.GetLastMessageId);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f942422..2eefb5b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -24,6 +24,7 @@ import static
org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.protocol.Commands.readChecksum;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables;
import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;
@@ -2035,26 +2036,38 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (lastDequeuedMessageId == MessageId.earliest) {
// if we are starting from latest, we should seek to the actual
last message first.
// allow the last one to be read when read head inclusively.
- if (startMessageId.getLedgerId() == Long.MAX_VALUE &&
- startMessageId.getEntryId() == Long.MAX_VALUE &&
- startMessageId.partitionIndex == -1) {
-
- getLastMessageIdAsync()
- .thenCompose((msgId) ->
seekAsync(msgId).thenApply((ignore) -> msgId))
- .whenComplete((msgId, e) -> {
- if (e != null) {
- log.error("[{}][{}] Failed getLastMessageId
command", topic, subscription);
-
booleanFuture.completeExceptionally(e.getCause());
- return;
- }
- MessageIdImpl messageId =
MessageIdImpl.convertToMessageIdImpl(msgId);
- if (messageId == null || messageId.getEntryId() <
0) {
- booleanFuture.complete(false);
- } else {
- booleanFuture.complete(resetIncludeHead);
- }
- });
-
+ if (startMessageId.equals(MessageId.latest)) {
+ CompletableFuture<GetLastMessageIdResponse> future =
internalGetLastMessageIdAsync();
+ // if the consumer is configured to read inclusive then we
need to seek to the last message
+ if (resetIncludeHead) {
+ future = future.thenCompose((lastMessageIdResponse) ->
+ seekAsync(lastMessageIdResponse.lastMessageId)
+ .thenApply((ignore) -> lastMessageIdResponse));
+ }
+ future.thenAccept(response -> {
+ MessageIdImpl lastMessageId =
MessageIdImpl.convertToMessageIdImpl(response.lastMessageId);
+ MessageIdImpl markDeletePosition =
MessageIdImpl.convertToMessageIdImpl(response.markDeletePosition);
+ if (markDeletePosition != null) {
+ // we only care about comparing ledger ids and entry
ids as mark delete position doesn't have other ids such as batch index
+ int result = ComparisonChain.start()
+ .compare(markDeletePosition.getLedgerId(),
lastMessageId.getLedgerId())
+ .compare(markDeletePosition.getEntryId(),
lastMessageId.getEntryId())
+ .result();
+ if (lastMessageId.getEntryId() < 0) {
+ booleanFuture.complete(false);
+ } else {
+ booleanFuture.complete(resetIncludeHead ? result
<= 0 : result < 0);
+ }
+ } else if (lastMessageId == null ||
lastMessageId.getEntryId() < 0) {
+ booleanFuture.complete(false);
+ } else {
+ booleanFuture.complete(resetIncludeHead);
+ }
+ }).exceptionally(ex -> {
+ log.error("[{}][{}] Failed getLastMessageId command",
topic, subscription, ex);
+ booleanFuture.completeExceptionally(ex.getCause());
+ return null;
+ });
return booleanFuture;
}
@@ -2114,8 +2127,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return false;
}
- @Override
+ private static final class GetLastMessageIdResponse {
+ final MessageId lastMessageId;
+ final MessageId markDeletePosition;
+
+ GetLastMessageIdResponse(MessageId lastMessageId, MessageId
markDeletePosition) {
+ this.lastMessageId = lastMessageId;
+ this.markDeletePosition = markDeletePosition;
+ }
+ }
+
public CompletableFuture<MessageId> getLastMessageIdAsync() {
+ return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId);
+ }
+
+ public CompletableFuture<GetLastMessageIdResponse>
internalGetLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException(
@@ -2130,7 +2156,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create();
- CompletableFuture<MessageId> getLastMessageIdFuture = new
CompletableFuture<>();
+ CompletableFuture<GetLastMessageIdResponse> getLastMessageIdFuture =
new CompletableFuture<>();
internalGetLastMessageIdAsync(backoff, opTimeoutMs,
getLastMessageIdFuture);
return getLastMessageIdFuture;
@@ -2138,7 +2164,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private void internalGetLastMessageIdAsync(final Backoff backoff,
final AtomicLong remainingTime,
- CompletableFuture<MessageId>
future) {
+
CompletableFuture<GetLastMessageIdResponse> future) {
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
if
(!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion()))
{
@@ -2153,16 +2179,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
ByteBuf getLastIdCmd = Commands.newGetLastMessageId(consumerId,
requestId);
log.info("[{}][{}] Get topic last message Id", topic,
subscription);
- cnx.sendGetLastMessageId(getLastIdCmd,
requestId).thenAccept((result) -> {
- log.info("[{}][{}] Successfully getLastMessageId {}:{}",
- topic, subscription, result.getLedgerId(),
result.getEntryId());
- if (result.getBatchIndex() < 0) {
- future.complete(new MessageIdImpl(result.getLedgerId(),
- result.getEntryId(), result.getPartition()));
- } else {
- future.complete(new
BatchMessageIdImpl(result.getLedgerId(),
- result.getEntryId(), result.getPartition(),
result.getBatchIndex()));
+ cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(cmd
-> {
+ MessageIdData lastMessageId = cmd.getLastMessageId();
+ MessageIdImpl markDeletePosition = null;
+ if (cmd.hasConsumerMarkDeletePosition()) {
+ markDeletePosition = new
MessageIdImpl(cmd.getConsumerMarkDeletePosition().getLedgerId(),
+ cmd.getConsumerMarkDeletePosition().getEntryId(), -1);
}
+ log.info("[{}][{}] Successfully getLastMessageId {}:{}",
+ topic, subscription, lastMessageId.getLedgerId(),
lastMessageId.getEntryId());
+ MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0 ?
+ new MessageIdImpl(lastMessageId.getLedgerId(),
lastMessageId.getEntryId(), lastMessageId.getPartition())
+ : new BatchMessageIdImpl(lastMessageId.getLedgerId(),
lastMessageId.getEntryId(), lastMessageId.getPartition(),
lastMessageId.getBatchIndex());
+ future.complete(new GetLastMessageIdResponse(lastMsgId,
markDeletePosition));
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic,
subscription);
future.completeExceptionally(
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 9e1e9d3..317e3f1 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -28223,6 +28223,10 @@ public final class PulsarApi {
// required uint64 request_id = 2;
boolean hasRequestId();
long getRequestId();
+
+ // optional .pulsar.proto.MessageIdData consumer_mark_delete_position = 3;
+ boolean hasConsumerMarkDeletePosition();
+ org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData
getConsumerMarkDeletePosition();
}
public static final class CommandGetLastMessageIdResponse extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -28279,9 +28283,20 @@ public final class PulsarApi {
return requestId_;
}
+ // optional .pulsar.proto.MessageIdData consumer_mark_delete_position = 3;
+ public static final int CONSUMER_MARK_DELETE_POSITION_FIELD_NUMBER = 3;
+ private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData
consumerMarkDeletePosition_;
+ public boolean hasConsumerMarkDeletePosition() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData
getConsumerMarkDeletePosition() {
+ return consumerMarkDeletePosition_;
+ }
+
private void initFields() {
lastMessageId_ =
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
requestId_ = 0L;
+ consumerMarkDeletePosition_ =
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -28300,6 +28315,12 @@ public final class PulsarApi {
memoizedIsInitialized = 0;
return false;
}
+ if (hasConsumerMarkDeletePosition()) {
+ if (!getConsumerMarkDeletePosition().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -28318,6 +28339,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeUInt64(2, requestId_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeMessage(3, consumerMarkDeletePosition_);
+ }
}
private int memoizedSerializedSize = -1;
@@ -28334,6 +28358,10 @@ public final class PulsarApi {
size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeUInt64Size(2, requestId_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size +=
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+ .computeMessageSize(3, consumerMarkDeletePosition_);
+ }
memoizedSerializedSize = size;
return size;
}
@@ -28451,6 +28479,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000001);
requestId_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002);
+ consumerMarkDeletePosition_ =
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -28492,6 +28522,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00000002;
}
result.requestId_ = requestId_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.consumerMarkDeletePosition_ = consumerMarkDeletePosition_;
result.bitField0_ = to_bitField0_;
return result;
}
@@ -28504,6 +28538,9 @@ public final class PulsarApi {
if (other.hasRequestId()) {
setRequestId(other.getRequestId());
}
+ if (other.hasConsumerMarkDeletePosition()) {
+
mergeConsumerMarkDeletePosition(other.getConsumerMarkDeletePosition());
+ }
return this;
}
@@ -28520,6 +28557,12 @@ public final class PulsarApi {
return false;
}
+ if (hasConsumerMarkDeletePosition()) {
+ if (!getConsumerMarkDeletePosition().isInitialized()) {
+
+ return false;
+ }
+ }
return true;
}
@@ -28560,6 +28603,16 @@ public final class PulsarApi {
requestId_ = input.readUInt64();
break;
}
+ case 26: {
+
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder subBuilder =
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder();
+ if (hasConsumerMarkDeletePosition()) {
+ subBuilder.mergeFrom(getConsumerMarkDeletePosition());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setConsumerMarkDeletePosition(subBuilder.buildPartial());
+ subBuilder.recycle();
+ break;
+ }
}
}
}
@@ -28630,6 +28683,49 @@ public final class PulsarApi {
return this;
}
+ // optional .pulsar.proto.MessageIdData consumer_mark_delete_position =
3;
+ private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData
consumerMarkDeletePosition_ =
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+ public boolean hasConsumerMarkDeletePosition() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData
getConsumerMarkDeletePosition() {
+ return consumerMarkDeletePosition_;
+ }
+ public Builder
setConsumerMarkDeletePosition(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData
value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ consumerMarkDeletePosition_ = value;
+
+ bitField0_ |= 0x00000004;
+ return this;
+ }
+ public Builder setConsumerMarkDeletePosition(
+ org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder
builderForValue) {
+ consumerMarkDeletePosition_ = builderForValue.build();
+
+ bitField0_ |= 0x00000004;
+ return this;
+ }
+ public Builder
mergeConsumerMarkDeletePosition(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData
value) {
+ if (((bitField0_ & 0x00000004) == 0x00000004) &&
+ consumerMarkDeletePosition_ !=
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance())
{
+ consumerMarkDeletePosition_ =
+
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(consumerMarkDeletePosition_).mergeFrom(value).buildPartial();
+ } else {
+ consumerMarkDeletePosition_ = value;
+ }
+
+ bitField0_ |= 0x00000004;
+ return this;
+ }
+ public Builder clearConsumerMarkDeletePosition() {
+ consumerMarkDeletePosition_ =
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+
+ bitField0_ = (bitField0_ & ~0x00000004);
+ return this;
+ }
+
//
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetLastMessageIdResponse)
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index a2df6d1..1a0dac5 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1367,12 +1367,16 @@ public class Commands {
return res;
}
- public static ByteBuf newGetLastMessageIdResponse(long requestId,
MessageIdData messageIdData) {
+ public static ByteBuf newGetLastMessageIdResponse(long requestId,
MessageIdData messageIdData,
+ MessageIdData
consumerMarkDeletePosition) {
PulsarApi.CommandGetLastMessageIdResponse.Builder response =
PulsarApi.CommandGetLastMessageIdResponse.newBuilder()
.setLastMessageId(messageIdData)
.setRequestId(requestId);
+ if (consumerMarkDeletePosition != null) {
+ response.setConsumerMarkDeletePosition(consumerMarkDeletePosition);
+ }
ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
.setType(Type.GET_LAST_MESSAGE_ID_RESPONSE)
.setGetLastMessageIdResponse(response.build()));
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 5c12ca3..1009fcc 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -662,6 +662,7 @@ message CommandGetLastMessageId {
message CommandGetLastMessageIdResponse {
required MessageIdData last_message_id = 1;
required uint64 request_id = 2;
+ optional MessageIdData consumer_mark_delete_position = 3;
}
message CommandGetTopicsOfNamespace {