This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a1ef8a8d0e071c4556abf403a22f7a64376fee69 Author: Boyang Jerry Peng <[email protected]> AuthorDate: Sat Feb 20 02:40:13 2021 -0800 Fixed hasMessageAvailable() with empty topic (#9652) * Fixed hasMessageAvailable() with empty topic * Fixed test * adding additional fixes * fix comment Co-authored-by: Matteo Merli <[email protected]> Co-authored-by: Jerry Peng <[email protected]> (cherry picked from commit e017b108fc9f1fea085465ba1bbbbb625e6370f6) --- .../apache/pulsar/broker/service/ServerCnx.java | 53 ++++++++---- .../pulsar/client/api/ProducerConsumerBase.java | 6 ++ .../apache/pulsar/client/api/TopicReaderTest.java | 67 +++++++++++++-- .../org/apache/pulsar/client/impl/ClientCnx.java | 7 +- .../apache/pulsar/client/impl/ConsumerImpl.java | 99 +++++++++++++++------- .../apache/pulsar/common/api/proto/PulsarApi.java | 96 +++++++++++++++++++++ .../apache/pulsar/common/protocol/Commands.java | 5 +- pulsar-common/src/main/proto/PulsarApi.proto | 1 + 8 files changed, 280 insertions(+), 54 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bb169b8..6f297f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -42,6 +42,7 @@ import java.net.SocketAddress; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; @@ -72,6 +73,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataExc import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; @@ -1509,12 +1511,19 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { long requestId = getLastMessageId.getRequestId(); Topic topic = consumer.getSubscription().getTopic(); - Position position = topic.getLastPosition(); + Position lastPosition = topic.getLastPosition(); int partitionIndex = TopicName.getPartitionIndex(topic.getName()); + Position markDeletePosition = null; + if (consumer.getSubscription() instanceof PersistentSubscription) { + markDeletePosition = ((PersistentSubscription) consumer.getSubscription()).getCursor() + .getMarkDeletedPosition(); + } + getLargestBatchIndexWhenPossible( topic, - (PositionImpl) position, + (PositionImpl) lastPosition, + (PositionImpl) markDeletePosition, partitionIndex, requestId, consumer.getSubscription().getName()); @@ -1526,7 +1535,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private void getLargestBatchIndexWhenPossible( Topic topic, - PositionImpl position, + PositionImpl lastPosition, + PositionImpl markDeletePosition, int partitionIndex, long requestId, String subscriptionName) { @@ -1535,19 +1545,26 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); // If it's not pointing to a valid entry, respond messageId of the current position. - if (position.getEntryId() == -1) { + if (lastPosition.getEntryId() == -1) { MessageIdData messageId = MessageIdData.newBuilder() - .setLedgerId(position.getLedgerId()) - .setEntryId(position.getEntryId()) - .setPartition(partitionIndex).build(); + .setLedgerId(lastPosition.getLedgerId()) + .setEntryId(lastPosition.getEntryId()) + .setPartition(partitionIndex).build(); + MessageIdData consumerMarkDeletePosition = null; + if (markDeletePosition != null) { + consumerMarkDeletePosition = MessageIdData.newBuilder() + .setLedgerId(markDeletePosition.getLedgerId()) + .setEntryId(markDeletePosition.getEntryId()).build(); + } - ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId, + Optional.ofNullable(consumerMarkDeletePosition))); return; } // For a valid position, we read the entry out and parse the batch size from its metadata. CompletableFuture<Entry> entryFuture = new CompletableFuture<>(); - ml.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { + ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { entryFuture.complete(entry); @@ -1575,16 +1592,22 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (log.isDebugEnabled()) { log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, - topic.getName(), subscriptionName, position, partitionIndex); + topic.getName(), subscriptionName, lastPosition, partitionIndex); } MessageIdData messageId = MessageIdData.newBuilder() - .setLedgerId(position.getLedgerId()) - .setEntryId(position.getEntryId()) - .setPartition(partitionIndex) - .setBatchIndex(largestBatchIndex).build(); + .setLedgerId(lastPosition.getLedgerId()) + .setEntryId(lastPosition.getEntryId()) + .setPartition(largestBatchIndex).build(); + MessageIdData consumerMarkDeletePosition = null; + if (markDeletePosition != null) { + consumerMarkDeletePosition = MessageIdData.newBuilder() + .setLedgerId(markDeletePosition.getLedgerId()) + .setEntryId(markDeletePosition.getEntryId()).build(); + } - ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId, + Optional.ofNullable(consumerMarkDeletePosition))); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java index 6e85b08..9d08653 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.api; import com.google.common.collect.Sets; import java.lang.reflect.Method; +import java.util.Random; import java.util.Set; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -61,4 +62,9 @@ public abstract class ProducerConsumerBase extends MockedPulsarServiceBaseTest { Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage); } + private static final Random random = new Random(); + + protected String newTopicName() { + return "my-property/my-ns/topic-" + Long.toHexString(random.nextLong()); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index ebc2889..7a55d2f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -35,6 +35,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import lombok.Cleanup; + import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.impl.BatchMessageIdImpl; @@ -992,14 +994,14 @@ public class TopicReaderTest extends ProducerConsumerBase { .topic(topicName).create(); //For batch-messages with single message, the type of client messageId should be the same as that of broker - MessageId messageId = producer.send("msg".getBytes()); + MessageIdImpl messageId = (MessageIdImpl) producer.send("msg".getBytes()); assertTrue(messageId instanceof MessageIdImpl); ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName) .startMessageId(messageId).startMessageIdInclusive().create(); - MessageId lastMsgId = reader.getConsumer().getLastMessageId(); - assertTrue(lastMsgId instanceof BatchMessageIdImpl); + MessageIdImpl lastMsgId = (MessageIdImpl) reader.getConsumer().getLastMessageId(); assertTrue(messageId instanceof BatchMessageIdImpl); - assertEquals(lastMsgId, messageId); + assertEquals(lastMsgId.getLedgerId(), messageId.getLedgerId()); + assertEquals(lastMsgId.getEntryId(), messageId.getEntryId()); reader.close(); CountDownLatch latch = new CountDownLatch(numOfMessage); @@ -1037,7 +1039,7 @@ public class TopicReaderTest extends ProducerConsumerBase { //For non-batch message, the type of client messageId should be the same as that of broker producer = pulsarClient.newProducer() .enableBatching(false).topic(topicName).create(); - messageId = producer.send("non-batch".getBytes()); + messageId = (MessageIdImpl) producer.send("non-batch".getBytes()); assertFalse(messageId instanceof BatchMessageIdImpl); assertTrue(messageId instanceof MessageIdImpl); reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName) @@ -1467,4 +1469,59 @@ public class TopicReaderTest extends ProducerConsumerBase { producer.close(); } + + @Test + public void testHasMessageAvailableOnEmptyTopic() throws Exception { + String topic = newTopicName(); + + @Cleanup + Reader<String> r1 = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.earliest) + .create(); + + @Cleanup + Reader<String> r2 = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.latest) + .create(); + + @Cleanup + Reader<String> r2Inclusive = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.latest) + .startMessageIdInclusive() + .create(); + + // no data write, should return false + assertFalse(r1.hasMessageAvailable()); + assertFalse(r2.hasMessageAvailable()); + assertFalse(r2Inclusive.hasMessageAvailable()); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + producer.send("hello-1"); + assertTrue(r1.hasMessageAvailable()); + assertTrue(r2.hasMessageAvailable()); + assertTrue(r2Inclusive.hasMessageAvailable()); + + @Cleanup + Reader<String> r3 = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .startMessageId(MessageId.latest) + .create(); + + + assertFalse(r3.hasMessageAvailable()); + + producer.send("hello-2"); + + assertTrue(r1.hasMessageAvailable()); + assertTrue(r2.hasMessageAvailable()); + assertTrue(r2Inclusive.hasMessageAvailable()); + assertTrue(r3.hasMessageAvailable()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index c551588..3a77147 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -455,9 +455,10 @@ public class ClientCnx extends PulsarHandler { log.debug("{} Received success GetLastMessageId response from server: {}", ctx.channel(), success.getRequestId()); } long requestId = success.getRequestId(); - CompletableFuture<MessageIdData> requestFuture = (CompletableFuture<MessageIdData>) pendingRequests.remove(requestId); + CompletableFuture<CommandGetLastMessageIdResponse> requestFuture = + (CompletableFuture<CommandGetLastMessageIdResponse>) pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(success.getLastMessageId()); + requestFuture.complete(success); } else { log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); } @@ -802,7 +803,7 @@ public class ClientCnx extends PulsarHandler { return future; } - public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf request, long requestId) { + public CompletableFuture<CommandGetLastMessageIdResponse> sendGetLastMessageId(ByteBuf request, long requestId) { return sendRequestAndHandleTimeout(request, requestId, RequestType.GetLastMessageId); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ef39bfe..391f45d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -24,6 +24,7 @@ import static org.apache.pulsar.common.protocol.Commands.hasChecksum; import static org.apache.pulsar.common.protocol.Commands.readChecksum; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ComparisonChain; import com.google.common.collect.Iterables; import com.google.common.collect.Queues; import io.netty.buffer.ByteBuf; @@ -2019,25 +2020,42 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (lastDequeuedMessageId == MessageId.earliest) { // if we are starting from latest, we should seek to the actual last message first. // allow the last one to be read when read head inclusively. - if (startMessageId.getLedgerId() == Long.MAX_VALUE && - startMessageId.getEntryId() == Long.MAX_VALUE && - startMessageId.partitionIndex == -1) { - - getLastMessageIdAsync() - .thenCompose((msgId) -> seekAsync(msgId).thenApply((ignore) -> msgId)) - .whenComplete((msgId, e) -> { - if (e != null) { - log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); - booleanFuture.completeExceptionally(e.getCause()); - return; - } - MessageIdImpl messageId = MessageIdImpl.convertToMessageIdImpl(msgId); - if (messageId == null || messageId.getEntryId() < 0) { - booleanFuture.complete(false); - } else { - booleanFuture.complete(resetIncludeHead); - } - }); + if (startMessageId.equals(MessageId.latest)) { + + CompletableFuture<GetLastMessageIdResponse> future = internalGetLastMessageIdAsync(); + // if the consumer is configured to read inclusive then we need to seek to the last message + if (resetIncludeHead) { + future = future.thenCompose((lastMessageIdResponse) -> + seekAsync(lastMessageIdResponse.lastMessageId) + .thenApply((ignore) -> lastMessageIdResponse)); + } + + future.thenAccept(response -> { + MessageIdImpl lastMessageId = MessageIdImpl.convertToMessageIdImpl(response.lastMessageId); + MessageIdImpl markDeletePosition = MessageIdImpl + .convertToMessageIdImpl(response.markDeletePosition); + + if (markDeletePosition != null) { + // we only care about comparing ledger ids and entry ids as mark delete position doesn't have other ids such as batch index + int result = ComparisonChain.start() + .compare(markDeletePosition.getLedgerId(), lastMessageId.getLedgerId()) + .compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId()) + .result(); + if (lastMessageId.getEntryId() < 0) { + booleanFuture.complete(false); + } else { + booleanFuture.complete(resetIncludeHead ? result <= 0 : result < 0); + } + } else if (lastMessageId == null || lastMessageId.getEntryId() < 0) { + booleanFuture.complete(false); + } else { + booleanFuture.complete(resetIncludeHead); + } + }).exceptionally(ex -> { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription, ex); + booleanFuture.completeExceptionally(ex.getCause()); + return null; + }); return booleanFuture; } @@ -2098,8 +2116,22 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return false; } + private static final class GetLastMessageIdResponse { + final MessageId lastMessageId; + final MessageId markDeletePosition; + + GetLastMessageIdResponse(MessageId lastMessageId, MessageId markDeletePosition) { + this.lastMessageId = lastMessageId; + this.markDeletePosition = markDeletePosition; + } + } + @Override public CompletableFuture<MessageId> getLastMessageIdAsync() { + return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId); + } + + public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() { if (getState() == State.Closing || getState() == State.Closed) { return FutureUtil .failedFuture(new PulsarClientException.AlreadyClosedException( @@ -2114,7 +2146,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle .setMandatoryStop(0, TimeUnit.MILLISECONDS) .create(); - CompletableFuture<MessageId> getLastMessageIdFuture = new CompletableFuture<>(); + CompletableFuture<GetLastMessageIdResponse> getLastMessageIdFuture = new CompletableFuture<>(); internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture); return getLastMessageIdFuture; @@ -2122,7 +2154,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private void internalGetLastMessageIdAsync(final Backoff backoff, final AtomicLong remainingTime, - CompletableFuture<MessageId> future) { + CompletableFuture<GetLastMessageIdResponse> future) { ClientCnx cnx = cnx(); if (isConnected() && cnx != null) { if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) { @@ -2137,16 +2169,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle ByteBuf getLastIdCmd = Commands.newGetLastMessageId(consumerId, requestId); log.info("[{}][{}] Get topic last message Id", topic, subscription); - cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept((result) -> { - log.info("[{}][{}] Successfully getLastMessageId {}:{}", - topic, subscription, result.getLedgerId(), result.getEntryId()); - if (result.getBatchIndex() < 0) { - future.complete(new MessageIdImpl(result.getLedgerId(), - result.getEntryId(), result.getPartition())); - } else { - future.complete(new BatchMessageIdImpl(result.getLedgerId(), - result.getEntryId(), result.getPartition(), result.getBatchIndex())); + cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(cmd -> { + MessageIdData lastMessageId = cmd.getLastMessageId(); + MessageIdImpl markDeletePosition = null; + if (cmd.hasConsumerMarkDeletePosition()) { + markDeletePosition = new MessageIdImpl(cmd.getConsumerMarkDeletePosition().getLedgerId(), + cmd.getConsumerMarkDeletePosition().getEntryId(), -1); } + log.info("[{}][{}] Successfully getLastMessageId {}:{}", + topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId()); + + MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0 ? + new MessageIdImpl(lastMessageId.getLedgerId(), + lastMessageId.getEntryId(), lastMessageId.getPartition()) + : new BatchMessageIdImpl(lastMessageId.getLedgerId(), + lastMessageId.getEntryId(), lastMessageId.getPartition(), lastMessageId.getBatchIndex()); + + future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition)); }).exceptionally(e -> { log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); future.completeExceptionally( diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 9e1e9d3..317e3f1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -28223,6 +28223,10 @@ public final class PulsarApi { // required uint64 request_id = 2; boolean hasRequestId(); long getRequestId(); + + // optional .pulsar.proto.MessageIdData consumer_mark_delete_position = 3; + boolean hasConsumerMarkDeletePosition(); + org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getConsumerMarkDeletePosition(); } public static final class CommandGetLastMessageIdResponse extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -28279,9 +28283,20 @@ public final class PulsarApi { return requestId_; } + // optional .pulsar.proto.MessageIdData consumer_mark_delete_position = 3; + public static final int CONSUMER_MARK_DELETE_POSITION_FIELD_NUMBER = 3; + private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData consumerMarkDeletePosition_; + public boolean hasConsumerMarkDeletePosition() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getConsumerMarkDeletePosition() { + return consumerMarkDeletePosition_; + } + private void initFields() { lastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); requestId_ = 0L; + consumerMarkDeletePosition_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -28300,6 +28315,12 @@ public final class PulsarApi { memoizedIsInitialized = 0; return false; } + if (hasConsumerMarkDeletePosition()) { + if (!getConsumerMarkDeletePosition().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -28318,6 +28339,9 @@ public final class PulsarApi { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeUInt64(2, requestId_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeMessage(3, consumerMarkDeletePosition_); + } } private int memoizedSerializedSize = -1; @@ -28334,6 +28358,10 @@ public final class PulsarApi { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeUInt64Size(2, requestId_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeMessageSize(3, consumerMarkDeletePosition_); + } memoizedSerializedSize = size; return size; } @@ -28451,6 +28479,8 @@ public final class PulsarApi { bitField0_ = (bitField0_ & ~0x00000001); requestId_ = 0L; bitField0_ = (bitField0_ & ~0x00000002); + consumerMarkDeletePosition_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -28492,6 +28522,10 @@ public final class PulsarApi { to_bitField0_ |= 0x00000002; } result.requestId_ = requestId_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.consumerMarkDeletePosition_ = consumerMarkDeletePosition_; result.bitField0_ = to_bitField0_; return result; } @@ -28504,6 +28538,9 @@ public final class PulsarApi { if (other.hasRequestId()) { setRequestId(other.getRequestId()); } + if (other.hasConsumerMarkDeletePosition()) { + mergeConsumerMarkDeletePosition(other.getConsumerMarkDeletePosition()); + } return this; } @@ -28520,6 +28557,12 @@ public final class PulsarApi { return false; } + if (hasConsumerMarkDeletePosition()) { + if (!getConsumerMarkDeletePosition().isInitialized()) { + + return false; + } + } return true; } @@ -28560,6 +28603,16 @@ public final class PulsarApi { requestId_ = input.readUInt64(); break; } + case 26: { + org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(); + if (hasConsumerMarkDeletePosition()) { + subBuilder.mergeFrom(getConsumerMarkDeletePosition()); + } + input.readMessage(subBuilder, extensionRegistry); + setConsumerMarkDeletePosition(subBuilder.buildPartial()); + subBuilder.recycle(); + break; + } } } } @@ -28630,6 +28683,49 @@ public final class PulsarApi { return this; } + // optional .pulsar.proto.MessageIdData consumer_mark_delete_position = 3; + private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData consumerMarkDeletePosition_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + public boolean hasConsumerMarkDeletePosition() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getConsumerMarkDeletePosition() { + return consumerMarkDeletePosition_; + } + public Builder setConsumerMarkDeletePosition(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData value) { + if (value == null) { + throw new NullPointerException(); + } + consumerMarkDeletePosition_ = value; + + bitField0_ |= 0x00000004; + return this; + } + public Builder setConsumerMarkDeletePosition( + org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder builderForValue) { + consumerMarkDeletePosition_ = builderForValue.build(); + + bitField0_ |= 0x00000004; + return this; + } + public Builder mergeConsumerMarkDeletePosition(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData value) { + if (((bitField0_ & 0x00000004) == 0x00000004) && + consumerMarkDeletePosition_ != org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance()) { + consumerMarkDeletePosition_ = + org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(consumerMarkDeletePosition_).mergeFrom(value).buildPartial(); + } else { + consumerMarkDeletePosition_ = value; + } + + bitField0_ |= 0x00000004; + return this; + } + public Builder clearConsumerMarkDeletePosition() { + consumerMarkDeletePosition_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetLastMessageIdResponse) } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index a2df6d1..5da02d4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1367,12 +1367,15 @@ public class Commands { return res; } - public static ByteBuf newGetLastMessageIdResponse(long requestId, MessageIdData messageIdData) { + public static ByteBuf newGetLastMessageIdResponse(long requestId, MessageIdData messageIdData, + Optional<MessageIdData> consumerMarkDeletePosition) { PulsarApi.CommandGetLastMessageIdResponse.Builder response = PulsarApi.CommandGetLastMessageIdResponse.newBuilder() .setLastMessageId(messageIdData) .setRequestId(requestId); + consumerMarkDeletePosition.ifPresent(position -> response.setConsumerMarkDeletePosition(position)); + ByteBuf res = serializeWithSize(BaseCommand.newBuilder() .setType(Type.GET_LAST_MESSAGE_ID_RESPONSE) .setGetLastMessageIdResponse(response.build())); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 5c12ca3..1009fcc 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -662,6 +662,7 @@ message CommandGetLastMessageId { message CommandGetLastMessageIdResponse { required MessageIdData last_message_id = 1; required uint64 request_id = 2; + optional MessageIdData consumer_mark_delete_position = 3; } message CommandGetTopicsOfNamespace {
