This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a1ef8a8d0e071c4556abf403a22f7a64376fee69
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Sat Feb 20 02:40:13 2021 -0800

    Fixed hasMessageAvailable() with empty topic (#9652)
    
    * Fixed hasMessageAvailable() with empty topic
    
    * Fixed test
    
    * adding additional fixes
    
    * fix comment
    
    Co-authored-by: Matteo Merli <[email protected]>
    Co-authored-by: Jerry Peng <[email protected]>
    (cherry picked from commit e017b108fc9f1fea085465ba1bbbbb625e6370f6)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 53 ++++++++----
 .../pulsar/client/api/ProducerConsumerBase.java    |  6 ++
 .../apache/pulsar/client/api/TopicReaderTest.java  | 67 +++++++++++++--
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  7 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 99 +++++++++++++++-------
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 96 +++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |  5 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 8 files changed, 280 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bb169b8..6f297f2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -42,6 +42,7 @@ import java.net.SocketAddress;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
@@ -72,6 +73,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataExc
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -1509,12 +1511,19 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             long requestId = getLastMessageId.getRequestId();
 
             Topic topic = consumer.getSubscription().getTopic();
-            Position position = topic.getLastPosition();
+            Position lastPosition = topic.getLastPosition();
             int partitionIndex = TopicName.getPartitionIndex(topic.getName());
 
+            Position markDeletePosition = null;
+            if (consumer.getSubscription() instanceof PersistentSubscription) {
+                markDeletePosition = ((PersistentSubscription) 
consumer.getSubscription()).getCursor()
+                        .getMarkDeletedPosition();
+            }
+
             getLargestBatchIndexWhenPossible(
                     topic,
-                    (PositionImpl) position,
+                    (PositionImpl) lastPosition,
+                    (PositionImpl) markDeletePosition,
                     partitionIndex,
                     requestId,
                     consumer.getSubscription().getName());
@@ -1526,7 +1535,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     private void getLargestBatchIndexWhenPossible(
             Topic topic,
-            PositionImpl position,
+            PositionImpl lastPosition,
+            PositionImpl markDeletePosition,
             int partitionIndex,
             long requestId,
             String subscriptionName) {
@@ -1535,19 +1545,26 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
 
         // If it's not pointing to a valid entry, respond messageId of the 
current position.
-        if (position.getEntryId() == -1) {
+        if (lastPosition.getEntryId() == -1) {
             MessageIdData messageId = MessageIdData.newBuilder()
-                    .setLedgerId(position.getLedgerId())
-                    .setEntryId(position.getEntryId())
-                    .setPartition(partitionIndex).build();
+                .setLedgerId(lastPosition.getLedgerId())
+                .setEntryId(lastPosition.getEntryId())
+                .setPartition(partitionIndex).build();
+            MessageIdData consumerMarkDeletePosition = null;
+            if (markDeletePosition != null) {
+                consumerMarkDeletePosition = MessageIdData.newBuilder()
+                    .setLedgerId(markDeletePosition.getLedgerId())
+                    .setEntryId(markDeletePosition.getEntryId()).build();
+            }
 
-            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
messageId));
+            ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
messageId,
+                Optional.ofNullable(consumerMarkDeletePosition)));
             return;
         }
 
         // For a valid position, we read the entry out and parse the batch 
size from its metadata.
         CompletableFuture<Entry> entryFuture = new CompletableFuture<>();
-        ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+        ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() 
{
             @Override
             public void readEntryComplete(Entry entry, Object ctx) {
                 entryFuture.complete(entry);
@@ -1575,16 +1592,22 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}][{}] Get LastMessageId {} 
partitionIndex {}", remoteAddress,
-                            topic.getName(), subscriptionName, position, 
partitionIndex);
+                            topic.getName(), subscriptionName, lastPosition, 
partitionIndex);
                 }
 
                 MessageIdData messageId = MessageIdData.newBuilder()
-                        .setLedgerId(position.getLedgerId())
-                        .setEntryId(position.getEntryId())
-                        .setPartition(partitionIndex)
-                        .setBatchIndex(largestBatchIndex).build();
+                    .setLedgerId(lastPosition.getLedgerId())
+                    .setEntryId(lastPosition.getEntryId())
+                    .setPartition(largestBatchIndex).build();
+                MessageIdData consumerMarkDeletePosition = null;
+                if (markDeletePosition != null) {
+                    consumerMarkDeletePosition = MessageIdData.newBuilder()
+                        .setLedgerId(markDeletePosition.getLedgerId())
+                        .setEntryId(markDeletePosition.getEntryId()).build();
+                }
 
-                
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId));
+                
ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId,
+                    Optional.ofNullable(consumerMarkDeletePosition)));
             }
         });
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index 6e85b08..9d08653 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.api;
 import com.google.common.collect.Sets;
 
 import java.lang.reflect.Method;
+import java.util.Random;
 import java.util.Set;
 
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -61,4 +62,9 @@ public abstract class ProducerConsumerBase extends 
MockedPulsarServiceBaseTest {
         Assert.assertTrue(messagesReceived.add(receivedMessage), "Received 
duplicate message " + receivedMessage);
     }
 
+    private static final Random random = new Random();
+
+    protected String newTopicName() {
+        return "my-property/my-ns/topic-" + 
Long.toHexString(random.nextLong());
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index ebc2889..7a55d2f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -35,6 +35,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import lombok.Cleanup;
+
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -992,14 +994,14 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
                 .topic(topicName).create();
 
         //For batch-messages with single message, the type of client messageId 
should be the same as that of broker
-        MessageId messageId = producer.send("msg".getBytes());
+        MessageIdImpl messageId = (MessageIdImpl) 
producer.send("msg".getBytes());
         assertTrue(messageId instanceof MessageIdImpl);
         ReaderImpl<byte[]> reader = 
(ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
                 .startMessageId(messageId).startMessageIdInclusive().create();
-        MessageId lastMsgId = reader.getConsumer().getLastMessageId();
-        assertTrue(lastMsgId instanceof BatchMessageIdImpl);
+        MessageIdImpl lastMsgId = (MessageIdImpl) 
reader.getConsumer().getLastMessageId();
         assertTrue(messageId instanceof BatchMessageIdImpl);
-        assertEquals(lastMsgId, messageId);
+        assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId());
+        assertEquals(lastMsgId.getEntryId(), messageId.getEntryId());
         reader.close();
 
         CountDownLatch latch = new CountDownLatch(numOfMessage);
@@ -1037,7 +1039,7 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
         //For non-batch message, the type of client messageId should be the 
same as that of broker
         producer = pulsarClient.newProducer()
                 .enableBatching(false).topic(topicName).create();
-        messageId = producer.send("non-batch".getBytes());
+        messageId = (MessageIdImpl) producer.send("non-batch".getBytes());
         assertFalse(messageId instanceof BatchMessageIdImpl);
         assertTrue(messageId instanceof MessageIdImpl);
         reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName)
@@ -1467,4 +1469,59 @@ public class TopicReaderTest extends 
ProducerConsumerBase {
 
         producer.close();
     }
+
+    @Test
+    public void testHasMessageAvailableOnEmptyTopic() throws Exception {
+        String topic = newTopicName();
+
+        @Cleanup
+        Reader<String> r1 = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        @Cleanup
+        Reader<String> r2 = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.latest)
+                .create();
+
+        @Cleanup
+        Reader<String> r2Inclusive = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.latest)
+                .startMessageIdInclusive()
+                .create();
+
+        // no data write, should return false
+        assertFalse(r1.hasMessageAvailable());
+        assertFalse(r2.hasMessageAvailable());
+        assertFalse(r2Inclusive.hasMessageAvailable());
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        producer.send("hello-1");
+        assertTrue(r1.hasMessageAvailable());
+        assertTrue(r2.hasMessageAvailable());
+        assertTrue(r2Inclusive.hasMessageAvailable());
+
+        @Cleanup
+        Reader<String> r3 = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.latest)
+                .create();
+
+
+        assertFalse(r3.hasMessageAvailable());
+
+        producer.send("hello-2");
+
+        assertTrue(r1.hasMessageAvailable());
+        assertTrue(r2.hasMessageAvailable());
+        assertTrue(r2Inclusive.hasMessageAvailable());
+        assertTrue(r3.hasMessageAvailable());
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index c551588..3a77147 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -455,9 +455,10 @@ public class ClientCnx extends PulsarHandler {
             log.debug("{} Received success GetLastMessageId response from 
server: {}", ctx.channel(), success.getRequestId());
         }
         long requestId = success.getRequestId();
-        CompletableFuture<MessageIdData> requestFuture = 
(CompletableFuture<MessageIdData>) pendingRequests.remove(requestId);
+        CompletableFuture<CommandGetLastMessageIdResponse> requestFuture =
+                (CompletableFuture<CommandGetLastMessageIdResponse>) 
pendingRequests.remove(requestId);
         if (requestFuture != null) {
-            requestFuture.complete(success.getLastMessageId());
+            requestFuture.complete(success);
         } else {
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), success.getRequestId());
         }
@@ -802,7 +803,7 @@ public class ClientCnx extends PulsarHandler {
         return future;
     }
 
-    public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf 
request, long requestId) {
+    public CompletableFuture<CommandGetLastMessageIdResponse> 
sendGetLastMessageId(ByteBuf request, long requestId) {
         return sendRequestAndHandleTimeout(request, requestId, 
RequestType.GetLastMessageId);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ef39bfe..391f45d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.pulsar.common.protocol.Commands.hasChecksum;
 import static org.apache.pulsar.common.protocol.Commands.readChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
@@ -2019,25 +2020,42 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (lastDequeuedMessageId == MessageId.earliest) {
             // if we are starting from latest, we should seek to the actual 
last message first.
             // allow the last one to be read when read head inclusively.
-            if (startMessageId.getLedgerId() == Long.MAX_VALUE &&
-                    startMessageId.getEntryId() == Long.MAX_VALUE &&
-                    startMessageId.partitionIndex == -1) {
-
-                getLastMessageIdAsync()
-                        .thenCompose((msgId) -> 
seekAsync(msgId).thenApply((ignore) -> msgId))
-                        .whenComplete((msgId, e) -> {
-                            if (e != null) {
-                                log.error("[{}][{}] Failed getLastMessageId 
command", topic, subscription);
-                                
booleanFuture.completeExceptionally(e.getCause());
-                                return;
-                            }
-                            MessageIdImpl messageId = 
MessageIdImpl.convertToMessageIdImpl(msgId);
-                            if (messageId == null || messageId.getEntryId() < 
0) {
-                                booleanFuture.complete(false);
-                            } else {
-                                booleanFuture.complete(resetIncludeHead);
-                            }
-                        });
+            if (startMessageId.equals(MessageId.latest)) {
+
+                CompletableFuture<GetLastMessageIdResponse> future = 
internalGetLastMessageIdAsync();
+                // if the consumer is configured to read inclusive then we 
need to seek to the last message
+                if (resetIncludeHead) {
+                    future = future.thenCompose((lastMessageIdResponse) ->
+                            seekAsync(lastMessageIdResponse.lastMessageId)
+                                    .thenApply((ignore) -> 
lastMessageIdResponse));
+                }
+
+                future.thenAccept(response -> {
+                    MessageIdImpl lastMessageId = 
MessageIdImpl.convertToMessageIdImpl(response.lastMessageId);
+                    MessageIdImpl markDeletePosition = MessageIdImpl
+                            
.convertToMessageIdImpl(response.markDeletePosition);
+
+                    if (markDeletePosition != null) {
+                        // we only care about comparing ledger ids and entry 
ids as mark delete position doesn't have other ids such as batch index
+                        int result = ComparisonChain.start()
+                                .compare(markDeletePosition.getLedgerId(), 
lastMessageId.getLedgerId())
+                                .compare(markDeletePosition.getEntryId(), 
lastMessageId.getEntryId())
+                                .result();
+                        if (lastMessageId.getEntryId() < 0) {
+                            booleanFuture.complete(false);
+                        } else {
+                            booleanFuture.complete(resetIncludeHead ? result 
<= 0 : result < 0);
+                        }
+                    } else if (lastMessageId == null || 
lastMessageId.getEntryId() < 0) {
+                        booleanFuture.complete(false);
+                    } else {
+                        booleanFuture.complete(resetIncludeHead);
+                    }
+                }).exceptionally(ex -> {
+                    log.error("[{}][{}] Failed getLastMessageId command", 
topic, subscription, ex);
+                    booleanFuture.completeExceptionally(ex.getCause());
+                    return null;
+                });
 
                 return booleanFuture;
             }
@@ -2098,8 +2116,22 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return false;
     }
 
+    private static final class GetLastMessageIdResponse {
+        final MessageId lastMessageId;
+        final MessageId markDeletePosition;
+
+        GetLastMessageIdResponse(MessageId lastMessageId, MessageId 
markDeletePosition) {
+            this.lastMessageId = lastMessageId;
+            this.markDeletePosition = markDeletePosition;
+        }
+    }
+
     @Override
     public CompletableFuture<MessageId> getLastMessageIdAsync() {
+        return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId);
+    }
+
+    public CompletableFuture<GetLastMessageIdResponse> 
internalGetLastMessageIdAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil
                 .failedFuture(new PulsarClientException.AlreadyClosedException(
@@ -2114,7 +2146,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 .setMandatoryStop(0, TimeUnit.MILLISECONDS)
                 .create();
 
-        CompletableFuture<MessageId> getLastMessageIdFuture = new 
CompletableFuture<>();
+        CompletableFuture<GetLastMessageIdResponse> getLastMessageIdFuture = 
new CompletableFuture<>();
 
         internalGetLastMessageIdAsync(backoff, opTimeoutMs, 
getLastMessageIdFuture);
         return getLastMessageIdFuture;
@@ -2122,7 +2154,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private void internalGetLastMessageIdAsync(final Backoff backoff,
                                                final AtomicLong remainingTime,
-                                               CompletableFuture<MessageId> 
future) {
+                                               
CompletableFuture<GetLastMessageIdResponse> future) {
         ClientCnx cnx = cnx();
         if (isConnected() && cnx != null) {
             if 
(!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion()))
 {
@@ -2137,16 +2169,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             ByteBuf getLastIdCmd = Commands.newGetLastMessageId(consumerId, 
requestId);
             log.info("[{}][{}] Get topic last message Id", topic, 
subscription);
 
-            cnx.sendGetLastMessageId(getLastIdCmd, 
requestId).thenAccept((result) -> {
-                log.info("[{}][{}] Successfully getLastMessageId {}:{}",
-                    topic, subscription, result.getLedgerId(), 
result.getEntryId());
-                if (result.getBatchIndex() < 0) {
-                    future.complete(new MessageIdImpl(result.getLedgerId(),
-                            result.getEntryId(), result.getPartition()));
-                } else {
-                    future.complete(new 
BatchMessageIdImpl(result.getLedgerId(),
-                            result.getEntryId(), result.getPartition(), 
result.getBatchIndex()));
+            cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(cmd 
-> {
+                MessageIdData lastMessageId = cmd.getLastMessageId();
+                MessageIdImpl markDeletePosition = null;
+                if (cmd.hasConsumerMarkDeletePosition()) {
+                    markDeletePosition = new 
MessageIdImpl(cmd.getConsumerMarkDeletePosition().getLedgerId(),
+                            cmd.getConsumerMarkDeletePosition().getEntryId(), 
-1);
                 }
+                log.info("[{}][{}] Successfully getLastMessageId {}:{}",
+                    topic, subscription, lastMessageId.getLedgerId(), 
lastMessageId.getEntryId());
+
+                MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0 ?
+                        new MessageIdImpl(lastMessageId.getLedgerId(),
+                                lastMessageId.getEntryId(), 
lastMessageId.getPartition())
+                        : new BatchMessageIdImpl(lastMessageId.getLedgerId(),
+                                lastMessageId.getEntryId(), 
lastMessageId.getPartition(), lastMessageId.getBatchIndex());
+
+                future.complete(new GetLastMessageIdResponse(lastMsgId, 
markDeletePosition));
             }).exceptionally(e -> {
                 log.error("[{}][{}] Failed getLastMessageId command", topic, 
subscription);
                 future.completeExceptionally(
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 9e1e9d3..317e3f1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -28223,6 +28223,10 @@ public final class PulsarApi {
     // required uint64 request_id = 2;
     boolean hasRequestId();
     long getRequestId();
+    
+    // optional .pulsar.proto.MessageIdData consumer_mark_delete_position = 3;
+    boolean hasConsumerMarkDeletePosition();
+    org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
getConsumerMarkDeletePosition();
   }
   public static final class CommandGetLastMessageIdResponse extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -28279,9 +28283,20 @@ public final class PulsarApi {
       return requestId_;
     }
     
+    // optional .pulsar.proto.MessageIdData consumer_mark_delete_position = 3;
+    public static final int CONSUMER_MARK_DELETE_POSITION_FIELD_NUMBER = 3;
+    private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
consumerMarkDeletePosition_;
+    public boolean hasConsumerMarkDeletePosition() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
getConsumerMarkDeletePosition() {
+      return consumerMarkDeletePosition_;
+    }
+    
     private void initFields() {
       lastMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
       requestId_ = 0L;
+      consumerMarkDeletePosition_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -28300,6 +28315,12 @@ public final class PulsarApi {
         memoizedIsInitialized = 0;
         return false;
       }
+      if (hasConsumerMarkDeletePosition()) {
+        if (!getConsumerMarkDeletePosition().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -28318,6 +28339,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeUInt64(2, requestId_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeMessage(3, consumerMarkDeletePosition_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -28334,6 +28358,10 @@ public final class PulsarApi {
         size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeUInt64Size(2, requestId_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += 
org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeMessageSize(3, consumerMarkDeletePosition_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -28451,6 +28479,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000001);
         requestId_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000002);
+        consumerMarkDeletePosition_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
       
@@ -28492,6 +28522,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000002;
         }
         result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.consumerMarkDeletePosition_ = consumerMarkDeletePosition_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -28504,6 +28538,9 @@ public final class PulsarApi {
         if (other.hasRequestId()) {
           setRequestId(other.getRequestId());
         }
+        if (other.hasConsumerMarkDeletePosition()) {
+          
mergeConsumerMarkDeletePosition(other.getConsumerMarkDeletePosition());
+        }
         return this;
       }
       
@@ -28520,6 +28557,12 @@ public final class PulsarApi {
           
           return false;
         }
+        if (hasConsumerMarkDeletePosition()) {
+          if (!getConsumerMarkDeletePosition().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -28560,6 +28603,16 @@ public final class PulsarApi {
               requestId_ = input.readUInt64();
               break;
             }
+            case 26: {
+              
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder();
+              if (hasConsumerMarkDeletePosition()) {
+                subBuilder.mergeFrom(getConsumerMarkDeletePosition());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setConsumerMarkDeletePosition(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -28630,6 +28683,49 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional .pulsar.proto.MessageIdData consumer_mark_delete_position = 
3;
+      private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
consumerMarkDeletePosition_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+      public boolean hasConsumerMarkDeletePosition() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData 
getConsumerMarkDeletePosition() {
+        return consumerMarkDeletePosition_;
+      }
+      public Builder 
setConsumerMarkDeletePosition(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData
 value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        consumerMarkDeletePosition_ = value;
+        
+        bitField0_ |= 0x00000004;
+        return this;
+      }
+      public Builder setConsumerMarkDeletePosition(
+          org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder 
builderForValue) {
+        consumerMarkDeletePosition_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000004;
+        return this;
+      }
+      public Builder 
mergeConsumerMarkDeletePosition(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData
 value) {
+        if (((bitField0_ & 0x00000004) == 0x00000004) &&
+            consumerMarkDeletePosition_ != 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance())
 {
+          consumerMarkDeletePosition_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(consumerMarkDeletePosition_).mergeFrom(value).buildPartial();
+        } else {
+          consumerMarkDeletePosition_ = value;
+        }
+        
+        bitField0_ |= 0x00000004;
+        return this;
+      }
+      public Builder clearConsumerMarkDeletePosition() {
+        consumerMarkDeletePosition_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000004);
+        return this;
+      }
+      
       // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetLastMessageIdResponse)
     }
     
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index a2df6d1..5da02d4 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1367,12 +1367,15 @@ public class Commands {
         return res;
     }
 
-    public static ByteBuf newGetLastMessageIdResponse(long requestId, 
MessageIdData messageIdData) {
+    public static ByteBuf newGetLastMessageIdResponse(long requestId, 
MessageIdData messageIdData,
+                                                      Optional<MessageIdData> 
consumerMarkDeletePosition) {
         PulsarApi.CommandGetLastMessageIdResponse.Builder response =
             PulsarApi.CommandGetLastMessageIdResponse.newBuilder()
             .setLastMessageId(messageIdData)
             .setRequestId(requestId);
 
+        consumerMarkDeletePosition.ifPresent(position -> 
response.setConsumerMarkDeletePosition(position));
+
         ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
             .setType(Type.GET_LAST_MESSAGE_ID_RESPONSE)
             .setGetLastMessageIdResponse(response.build()));
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 5c12ca3..1009fcc 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -662,6 +662,7 @@ message CommandGetLastMessageId {
 message CommandGetLastMessageIdResponse {
     required MessageIdData last_message_id = 1;
     required uint64 request_id  = 2;
+    optional MessageIdData consumer_mark_delete_position = 3;
 }
 
 message CommandGetTopicsOfNamespace {

Reply via email to