This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7404952 Issue 937: add CommandGetLastMessageId to make reader know the end of topic (#1066) 7404952 is described below commit 74049522a1d97e1171c9088acd638b426b6de015 Author: Jia Zhai <zhaiji...@gmail.com> AuthorDate: Tue Feb 13 21:21:29 2018 -0800 Issue 937: add CommandGetLastMessageId to make reader know the end of topic (#1066) * add CommandGetLastMessageId to getlastMessageId of topic * rebase master, change following comments * add partition index in GetLastMessageIdResponse * fix rebase error * bump proot version to v11 * change following comments * change following comments2 * change following comments3 * change following comments * get cnx() first --- .../apache/bookkeeper/mledger/ManagedLedger.java | 7 + .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +- .../apache/pulsar/broker/service/ServerCnx.java | 52 +- .../org/apache/pulsar/broker/service/Topic.java | 3 + .../service/nonpersistent/NonPersistentTopic.java | 6 + .../broker/service/persistent/PersistentTopic.java | 5 + .../apache/pulsar/client/api/TopicReaderTest.java | 107 +- .../java/org/apache/pulsar/client/api/Reader.java | 10 + .../org/apache/pulsar/client/impl/ClientCnx.java | 37 + .../apache/pulsar/client/impl/ConsumerImpl.java | 131 ++- .../org/apache/pulsar/client/impl/ReaderImpl.java | 11 +- .../pulsar/client/util/ExecutorProvider.java | 2 +- .../org/apache/pulsar/common/api/Commands.java | 41 +- .../apache/pulsar/common/api/PulsarDecoder.java | 20 + .../apache/pulsar/common/api/proto/PulsarApi.java | 1026 ++++++++++++++++++++ pulsar-common/src/main/proto/PulsarApi.proto | 47 +- 16 files changed, 1457 insertions(+), 57 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index e13664c..9149bb9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -334,4 +334,11 @@ public interface ManagedLedger { * @param config */ void setConfig(ManagedLedgerConfig config); + + /** + * Gets last confirmed entry of the managed ledger. + * + * @return the last confirmed entry id + */ + Position getLastConfirmedEntry(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 80a0bbe..89d3476 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1062,7 +1062,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { Futures.waitForAll(futures).thenRun(() -> { callback.closeComplete(ctx); }).exceptionally(exception -> { - callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx); + callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()), ctx); return null; }); } @@ -1282,7 +1282,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { }).exceptionally(ex -> { log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition, ex.getMessage()); - opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), opReadEntry.ctx); + opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), opReadEntry.ctx); return null; }); } @@ -1351,7 +1351,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { entryCache.asyncReadEntry(ledger, position, callback, ctx); }).exceptionally(ex -> { log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage()); - callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx); + callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx); return null; }); } @@ -2173,7 +2173,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return pendingAddEntries.size(); } - public PositionImpl getLastConfirmedEntry() { + @Override + public Position getLastConfirmedEntry() { return lastConfirmedEntry; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 0b8d512..cb473c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -31,10 +31,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; - import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; - import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; @@ -59,6 +57,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer; @@ -110,7 +109,7 @@ public class ServerCnx extends PulsarHandler { private String originalPrincipal = null; private Set<String> proxyRoles; private boolean authenticateOriginalAuthData; - + enum State { Start, Connected, Failed } @@ -192,8 +191,8 @@ public class ServerCnx extends PulsarHandler { } /* - * If authentication and authorization is enabled and if the authRole is one of proxyRoles we want to enforce - * - the originalPrincipal is given while connecting + * If authentication and authorization is enabled and if the authRole is one of proxyRoles we want to enforce + * - the originalPrincipal is given while connecting * - originalPrincipal is not blank * - originalPrincipal is not a proxy principal */ @@ -218,7 +217,7 @@ public class ServerCnx extends PulsarHandler { if (topicName == null) { return; } - + String originalPrincipal = null; if (authenticateOriginalAuthData && lookup.hasOriginalAuthData()) { originalPrincipal = validateOriginalPrincipal( @@ -233,9 +232,9 @@ public class ServerCnx extends PulsarHandler { } else { originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal; } - + final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); - if (lookupSemaphore.tryAcquire()) { + if (lookupSemaphore.tryAcquire()) { if (invalidOriginalPrincipal(originalPrincipal)) { final String msg = "Valid Proxy Client role should be provided for lookup "; log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole, @@ -319,7 +318,7 @@ public class ServerCnx extends PulsarHandler { } else { originalPrincipal = partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal; } - + final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { if (invalidOriginalPrincipal(originalPrincipal)) { @@ -441,7 +440,7 @@ public class ServerCnx extends PulsarHandler { return commandConsumerStatsResponseBuilder; } - + private String validateOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, Long requestId, GeneratedMessageLite request) { ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER); SSLSession sslSession = null; @@ -461,7 +460,7 @@ public class ServerCnx extends PulsarHandler { return null; } } - + private String getOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, SSLSession sslSession) throws AuthenticationException { if (authenticateOriginalAuthData) { @@ -532,7 +531,7 @@ public class ServerCnx extends PulsarHandler { DestinationName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe); if (topicName == null) { return; - } + } if (invalidOriginalPrincipal(originalPrincipal)) { final String msg = "Valid Proxy Client role should be provided while subscribing "; @@ -1105,6 +1104,35 @@ public class ServerCnx extends PulsarHandler { } @Override + protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) { + checkArgument(state == State.Connected); + + CompletableFuture<Consumer> consumerFuture = consumers.get(getLastMessageId.getConsumerId()); + + if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { + Consumer consumer = consumerFuture.getNow(null); + long requestId = getLastMessageId.getRequestId(); + + Topic topic = consumer.getSubscription().getTopic(); + Position position = topic.getLastMessageId(); + int partitionIndex = DestinationName.getPartitionIndex(topic.getName()); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress, + topic.getName(), consumer.getSubscription().getName(), position, partitionIndex); + } + MessageIdData messageId = MessageIdData.newBuilder() + .setLedgerId(((PositionImpl)position).getLedgerId()) + .setEntryId(((PositionImpl)position).getEntryId()) + .setPartition(partitionIndex) + .build(); + + ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, messageId)); + } else { + ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found")); + } + } + + @Override protected boolean isHandshakeCompleted() { return state == State.Connected; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 7a426b7..80aed77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.client.api.MessageId; @@ -122,4 +123,6 @@ public interface Topic { PersistentTopicStats getStats(); PersistentTopicInternalStats getInternalStats(); + + Position getLastMessageId(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 44a9e14..588c494 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.pulsar.broker.admin.AdminResource; @@ -909,6 +910,11 @@ public class NonPersistentTopic implements Topic { return CompletableFuture.completedFuture(null); } + @Override + public Position getLastMessageId() { + throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic"); + } + public void markBatchMessagePublished() { this.hasBatchMessagePublished = true; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 20ae527..a17e2de 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1536,5 +1536,10 @@ public class PersistentTopic implements Topic, AddEntryCallback { return messageDeduplication.getLastPublishedSequenceId(producerName); } + @Override + public Position getLastMessageId() { + return ledger.getLastConfirmedEntry(); + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index df36b8e..e8f635f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -19,7 +19,11 @@ package org.apache.pulsar.client.api; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import com.google.common.collect.Sets; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -28,8 +32,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; - import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.policies.data.PersistentTopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +42,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.Sets; - public class TopicReaderTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(TopicReaderTest.class); @@ -359,4 +361,103 @@ public class TopicReaderTest extends ProducerConsumerBase { reader.close(); log.info("-- Exiting {} test --", methodName); } + + + @Test + public void testSimpleReaderReachEndofTopic() throws Exception { + ReaderConfiguration conf = new ReaderConfiguration(); + Reader reader = pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, + conf); + ProducerConfiguration producerConf = new ProducerConfiguration(); + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); + + // no data write, should return false + assertFalse(reader.hasMessageAvailable()); + + // produce message 0 -- 99 + for (int i = 0; i < 100; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + MessageImpl msg = null; + Set<String> messageSet = Sets.newHashSet(); + int index = 0; + + // read message till end. + while (reader.hasMessageAvailable()) { + msg = (MessageImpl) reader.readNext(1, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + (index ++); + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + + assertEquals(index, 100); + // readNext should return null, after reach the end of topic. + assertNull(reader.readNext(1, TimeUnit.SECONDS)); + + // produce message again. + for (int i = 100; i < 200; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + // read message till end again. + while (reader.hasMessageAvailable()) { + msg = (MessageImpl) reader.readNext(1, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + (index ++); + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + + assertEquals(index, 200); + // readNext should return null, after reach the end of topic. + assertNull(reader.readNext(1, TimeUnit.SECONDS)); + + producer.close(); + } + + @Test + public void testReaderReachEndofTopicOnMessageWithBatches() throws Exception { + Reader reader = pulsarClient.createReader( + "persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches", MessageId.earliest, + new ReaderConfiguration()); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + producerConf.setBatchingEnabled(true); + producerConf.setBatchingMaxPublishDelay(100, TimeUnit.MILLISECONDS); + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/testReaderReachEndofTopicOnMessageWithBatches", producerConf); + + // no data write, should return false + assertFalse(reader.hasMessageAvailable()); + + for (int i = 0; i < 100; i++) { + String message = "my-message-" + i; + producer.sendAsync(message.getBytes()); + } + + // Write one sync message to ensure everything before got persistend + producer.send("my-message-10".getBytes()); + + MessageId lastMessageId = null; + int index = 0; + assertTrue(reader.hasMessageAvailable()); + + if (reader.hasMessageAvailable()) { + Message msg = reader.readNext(); + lastMessageId = msg.getMessageId(); + assertEquals(lastMessageId.getClass(), BatchMessageIdImpl.class); + + while (msg != null) { + index++; + msg = reader.readNext(100, TimeUnit.MILLISECONDS); + } + assertEquals(index, 101); + } + + assertFalse(reader.hasMessageAvailable()); + producer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java index 4b89470..d29b238 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Reader.java @@ -62,4 +62,14 @@ public interface Reader extends Closeable { * Return true if the topic was terminated and this reader has reached the end of the topic */ boolean hasReachedEndOfTopic(); + + /** + * Check if there is any message available to read from the current position. + */ + boolean hasMessageAvailable() throws PulsarClientException; + + /** + * Asynchronously Check if there is message that has been published successfully to the broker in the topic. + */ + CompletableFuture<Boolean> hasMessageAvailableAsync(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 1064654..58cdede 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -44,6 +44,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; import org.apache.pulsar.common.api.proto.PulsarApi.CommandError; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage; import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse; @@ -52,6 +53,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopic; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.http.conn.ssl.DefaultHostnameVerifier; @@ -76,6 +78,8 @@ public class ClientCnx extends PulsarHandler { 16, 1); private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> pendingLookupRequests = new ConcurrentLongHashMap<>( 16, 1); + private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> pendingGetLastMessageIdRequests = new ConcurrentLongHashMap<>( + 16, 1); private final ConcurrentLongHashMap<ProducerImpl> producers = new ConcurrentLongHashMap<>(16, 1); private final ConcurrentLongHashMap<ConsumerImpl> consumers = new ConcurrentLongHashMap<>(16, 1); @@ -158,6 +162,7 @@ public class ClientCnx extends PulsarHandler { // Fail out all the pending ops pendingRequests.forEach((key, future) -> future.completeExceptionally(e)); pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e)); + pendingGetLastMessageIdRequests.forEach((key, future) -> future.completeExceptionally(e)); // Notify all attached producers/consumers so they have a chance to reconnect producers.forEach((id, producer) -> producer.connectionClosed(this)); @@ -264,6 +269,22 @@ public class ClientCnx extends PulsarHandler { } @Override + protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse success) { + checkArgument(state == State.Ready); + + if (log.isDebugEnabled()) { + log.debug("{} Received success GetLastMessageId response from server: {}", ctx.channel(), success.getRequestId()); + } + long requestId = success.getRequestId(); + CompletableFuture<MessageIdData> requestFuture = pendingGetLastMessageIdRequests.remove(requestId); + if (requestFuture != null) { + requestFuture.complete(success.getLastMessageId()); + } else { + log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); + } + } + + @Override protected void handleProducerSuccess(CommandProducerSuccess success) { checkArgument(state == State.Ready); @@ -511,6 +532,22 @@ public class ClientCnx extends PulsarHandler { return future; } + public CompletableFuture<MessageIdData> sendGetLastMessageId(ByteBuf request, long requestId) { + CompletableFuture<MessageIdData> future = new CompletableFuture<>(); + + pendingGetLastMessageIdRequests.put(requestId, future); + + ctx.writeAndFlush(request).addListener(writeFuture -> { + if (!writeFuture.isSuccess()) { + log.warn("{} Failed to send GetLastMessageId request to broker: {}", ctx.channel(), writeFuture.cause().getMessage()); + pendingGetLastMessageIdRequests.remove(requestId); + future.completeExceptionally(writeFuture.cause()); + } + }); + + return future; + } + /** * check serverError and take appropriate action * <ul> diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b74bb13..a436899 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -25,19 +25,33 @@ import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; import static org.apache.pulsar.common.api.Commands.hasChecksum; import static org.apache.pulsar.common.api.Commands.readChecksum; +import com.google.common.collect.Iterables; +import io.netty.buffer.ByteBuf; +import io.netty.util.Timeout; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; - import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; @@ -60,13 +74,6 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Iterables; - -import io.netty.buffer.ByteBuf; -import io.netty.util.Timeout; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; - public class ConsumerImpl extends ConsumerBase { private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000; @@ -78,7 +85,8 @@ public class ConsumerImpl extends ConsumerBase { .newUpdater(ConsumerImpl.class, "availablePermits"); private volatile int availablePermits = 0; - private MessageIdImpl lastDequeuedMessage; + private MessageId lastDequeuedMessage = MessageId.earliest; + private MessageId lastMessageIdInBroker = MessageId.earliest; private long subscribeTimeout; private final int partitionIndex; @@ -278,7 +286,7 @@ public class ConsumerImpl extends ConsumerBase { } do { message = incomingMessages.take(); - lastDequeuedMessage = (MessageIdImpl) message.getMessageId(); + lastDequeuedMessage = message.getMessageId(); ClientCnx msgCnx = ((MessageImpl) message).getCnx(); // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()" synchronized (ConsumerImpl.this) { @@ -639,10 +647,10 @@ public class ConsumerImpl extends ConsumerBase { } return previousMessage; - } else if (lastDequeuedMessage != null) { + } else if (!lastDequeuedMessage.equals(MessageId.earliest)) { // If the queue was empty we need to restart from the message just after the last one that has been dequeued // in the past - return new BatchMessageIdImpl(lastDequeuedMessage); + return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessage); } else { // No message was received or dequeued by this consumer. Next message would still be the startMessageId return startMessageId; @@ -964,7 +972,7 @@ public class ConsumerImpl extends ConsumerBase { protected synchronized void messageProcessed(Message msg) { ClientCnx currentCnx = cnx(); ClientCnx msgCnx = ((MessageImpl) msg).getCnx(); - lastDequeuedMessage = (MessageIdImpl) msg.getMessageId(); + lastDequeuedMessage = msg.getMessageId(); if (msgCnx != currentCnx) { // The processed message did belong to the old queue that was cleared after reconnection. @@ -1248,6 +1256,101 @@ public class ConsumerImpl extends ConsumerBase { return seekFuture; } + public boolean hasMessageAvailable() throws PulsarClientException { + try { + if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && + ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { + return true; + } + + return hasMessageAvailableAsync().get(); + } catch (ExecutionException | InterruptedException e) { + throw new PulsarClientException(e); + } + } + + public CompletableFuture<Boolean> hasMessageAvailableAsync() { + final CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>(); + + if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && + ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { + booleanFuture.complete(true); + } else { + getLastMessageIdAsync().thenAccept(messageId -> { + lastMessageIdInBroker = messageId; + if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && + ((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { + booleanFuture.complete(true); + } else { + booleanFuture.complete(false); + } + }).exceptionally(e -> { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); + booleanFuture.completeExceptionally(e.getCause()); + return null; + }); + } + return booleanFuture; + } + + private CompletableFuture<MessageId> getLastMessageIdAsync() { + if (getState() == State.Closing || getState() == State.Closed) { + return FutureUtil + .failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); + } + + AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); + Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, + opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS, + 0 , TimeUnit.MILLISECONDS); + CompletableFuture<MessageId> getLastMessageIdFuture = new CompletableFuture<>(); + + internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture); + return getLastMessageIdFuture; + } + + private void internalGetLastMessageIdAsync(final Backoff backoff, + final AtomicLong remainingTime, + CompletableFuture<MessageId> future) { + ClientCnx cnx = cnx(); + if (isConnected() && cnx != null) { + if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) { + future.completeExceptionally(new PulsarClientException + .NotSupportedException("GetLastMessageId Not supported for ProtocolVersion: " + + cnx.getRemoteEndpointProtocolVersion())); + } + + long requestId = client.newRequestId(); + ByteBuf getLastIdCmd = Commands.newGetLastMessageId(consumerId, requestId); + log.info("[{}][{}] Get topic last message Id", topic, subscription); + + cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept((result) -> { + log.info("[{}][{}] Successfully getLastMessageId {}:{}", + topic, subscription, result.getLedgerId(), result.getEntryId()); + future.complete(new MessageIdImpl(result.getLedgerId(), + result.getEntryId(), result.getPartition())); + }).exceptionally(e -> { + log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); + future.completeExceptionally(e.getCause()); + return null; + }); + } else { + long nextDelay = Math.min(backoff.next(), remainingTime.get()); + if (nextDelay <= 0) { + future.completeExceptionally(new PulsarClientException + .TimeoutException("Could not getLastMessageId within configured timeout.")); + return; + } + + ((ScheduledExecutorService) listenerExecutor).schedule(() -> { + log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms", + topic, getHandlerName(), nextDelay); + remainingTime.addAndGet(-nextDelay); + internalGetLastMessageIdAsync(backoff, remainingTime, future); + }, nextDelay, TimeUnit.MILLISECONDS); + } + } + private MessageIdImpl getMessageIdImpl(Message msg) { MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId(); if (messageId instanceof BatchMessageIdImpl) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 5b50ef5..bf0dc2f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -23,7 +23,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; - import org.apache.commons.codec.digest.DigestUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; @@ -133,4 +132,14 @@ public class ReaderImpl implements Reader { return consumer.closeAsync(); } + @Override + public boolean hasMessageAvailable() throws PulsarClientException { + return consumer.hasMessageAvailable(); + } + + @Override + public CompletableFuture<Boolean> hasMessageAvailableAsync() { + return consumer.hasMessageAvailableAsync(); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java index cb60720..907c52f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java @@ -41,7 +41,7 @@ public class ExecutorProvider { checkNotNull(threadNamePrefix); executors = Lists.newArrayListWithCapacity(numThreads); for (int i = 0; i < numThreads; i++) { - executors.add(Executors.newSingleThreadExecutor(new DefaultThreadFactory(threadNamePrefix))); + executors.add(Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory(threadNamePrefix))); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 4376b01..2912a73 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -21,11 +21,14 @@ package org.apache.pulsar.common.api; import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum; +import com.google.protobuf.ByteString; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; - import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod; import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; @@ -40,6 +43,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandError; import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType; @@ -67,12 +71,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; -import com.google.protobuf.ByteString; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; - public class Commands { public static final short magicCrc32c = 0x0e01; @@ -123,7 +121,7 @@ public class Commands { if (originalAuthData != null) { connectBuilder.setOriginalAuthData(originalAuthData); } - + if (originalAuthMethod != null) { connectBuilder.setOriginalAuthMethod(originalAuthMethod); } @@ -637,6 +635,29 @@ public class Commands { return cmdPong.retainedDuplicate(); } + public static ByteBuf newGetLastMessageId(long consumerId, long requestId) { + CommandGetLastMessageId.Builder cmdBuilder = CommandGetLastMessageId.newBuilder(); + cmdBuilder.setConsumerId(consumerId).setRequestId(requestId); + + ByteBuf res = serializeWithSize(BaseCommand.newBuilder() + .setType(Type.GET_LAST_MESSAGE_ID) + .setGetLastMessageId(cmdBuilder.build())); + cmdBuilder.recycle(); + return res; + } + + public static ByteBuf newGetLastMessageIdResponse(long requestId, MessageIdData messageIdData) { + PulsarApi.CommandGetLastMessageIdResponse.Builder response = PulsarApi.CommandGetLastMessageIdResponse.newBuilder() + .setLastMessageId(messageIdData) + .setRequestId(requestId); + + ByteBuf res = serializeWithSize(BaseCommand.newBuilder() + .setType(Type.GET_LAST_MESSAGE_ID_RESPONSE) + .setGetLastMessageIdResponse(response.build())); + response.recycle(); + return res; + } + private static ByteBuf serializeWithSize(BaseCommand.Builder cmdBuilder) { // / Wire format // [TOTAL_SIZE] [CMD_SIZE][CMD] @@ -931,4 +952,8 @@ public class Commands { lookupBroker.recycle(); return res; } + + public static boolean peerSupportsGetLastMessageId(int peerVersion) { + return peerVersion >= ProtocolVersion.v12.getNumber(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java index e9e113a..8d2389b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java @@ -20,6 +20,7 @@ package org.apache.pulsar.common.api; import static com.google.common.base.Preconditions.checkArgument; +import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck; import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer; @@ -253,6 +254,18 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { handleReachedEndOfTopic(cmd.getReachedEndOfTopic()); cmd.getReachedEndOfTopic().recycle(); break; + + case GET_LAST_MESSAGE_ID: + checkArgument(cmd.hasGetLastMessageId()); + handleGetLastMessageId(cmd.getGetLastMessageId()); + cmd.getGetLastMessageId().recycle(); + break; + + case GET_LAST_MESSAGE_ID_RESPONSE: + checkArgument(cmd.hasGetLastMessageIdResponse()); + handleGetLastMessageIdSuccess(cmd.getGetLastMessageIdResponse()); + cmd.getGetLastMessageIdResponse().recycle(); + break; } } finally { if (cmdBuilder != null) { @@ -377,5 +390,12 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { throw new UnsupportedOperationException(); } + protected void handleGetLastMessageId(PulsarApi.CommandGetLastMessageId getLastMessageId) { + throw new UnsupportedOperationException(); + } + protected void handleGetLastMessageIdSuccess(PulsarApi.CommandGetLastMessageIdResponse success) { + throw new UnsupportedOperationException(); + } + private static final Logger log = LoggerFactory.getLogger(PulsarDecoder.class); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 2c13125..27cc63a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -199,6 +199,7 @@ public final class PulsarApi { v9(9, 9), v10(10, 10), v11(11, 11), + v12(12, 12), ; public static final int v0_VALUE = 0; @@ -213,6 +214,7 @@ public final class PulsarApi { public static final int v9_VALUE = 9; public static final int v10_VALUE = 10; public static final int v11_VALUE = 11; + public static final int v12_VALUE = 12; public final int getNumber() { return value; } @@ -231,6 +233,7 @@ public final class PulsarApi { case 9: return v9; case 10: return v10; case 11: return v11; + case 12: return v12; default: return null; } } @@ -20067,6 +20070,831 @@ public final class PulsarApi { // @@protoc_insertion_point(class_scope:pulsar.proto.CommandConsumerStatsResponse) } + public interface CommandGetLastMessageIdOrBuilder + extends com.google.protobuf.MessageLiteOrBuilder { + + // required uint64 consumer_id = 1; + boolean hasConsumerId(); + long getConsumerId(); + + // required uint64 request_id = 2; + boolean hasRequestId(); + long getRequestId(); + } + public static final class CommandGetLastMessageId extends + com.google.protobuf.GeneratedMessageLite + implements CommandGetLastMessageIdOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage { + // Use CommandGetLastMessageId.newBuilder() to construct. + private final io.netty.util.Recycler.Handle<CommandGetLastMessageId> handle; + private CommandGetLastMessageId(io.netty.util.Recycler.Handle<CommandGetLastMessageId> handle) { + this.handle = handle; + } + + private static final io.netty.util.Recycler<CommandGetLastMessageId> RECYCLER = new io.netty.util.Recycler<CommandGetLastMessageId>() { + protected CommandGetLastMessageId newObject(Handle<CommandGetLastMessageId> handle) { + return new CommandGetLastMessageId(handle); + } + }; + + public void recycle() { + this.initFields(); + this.memoizedIsInitialized = -1; + this.bitField0_ = 0; + this.memoizedSerializedSize = -1; + handle.recycle(this); + } + + private CommandGetLastMessageId(boolean noInit) { + this.handle = null; + } + + private static final CommandGetLastMessageId defaultInstance; + public static CommandGetLastMessageId getDefaultInstance() { + return defaultInstance; + } + + public CommandGetLastMessageId getDefaultInstanceForType() { + return defaultInstance; + } + + private int bitField0_; + // required uint64 consumer_id = 1; + public static final int CONSUMER_ID_FIELD_NUMBER = 1; + private long consumerId_; + public boolean hasConsumerId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public long getConsumerId() { + return consumerId_; + } + + // required uint64 request_id = 2; + public static final int REQUEST_ID_FIELD_NUMBER = 2; + private long requestId_; + public boolean hasRequestId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public long getRequestId() { + return requestId_; + } + + private void initFields() { + consumerId_ = 0L; + requestId_ = 0L; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasConsumerId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasRequestId()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + throw new RuntimeException("Cannot use CodedOutputStream"); + } + + public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeUInt64(1, consumerId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt64(2, requestId_); + } + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(1, consumerId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(2, requestId_); + } + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessageLite.Builder< + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId, Builder> + implements org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder { + // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.newBuilder() + private final io.netty.util.Recycler.Handle<Builder> handle; + private Builder(io.netty.util.Recycler.Handle<Builder> handle) { + this.handle = handle; + maybeForceBuilderInitialization(); + } + private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() { + protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) { + return new Builder(handle); + } + }; + + public void recycle() { + clear(); + handle.recycle(this); + } + + private void maybeForceBuilderInitialization() { + } + private static Builder create() { + return RECYCLER.get(); + } + + public Builder clear() { + super.clear(); + consumerId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + requestId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId getDefaultInstanceForType() { + return org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance(); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId build() { + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId buildPartial() { + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId result = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.RECYCLER.get(); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.consumerId_ = consumerId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.requestId_ = requestId_; + result.bitField0_ = to_bitField0_; + return result; + } + + public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId other) { + if (other == org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance()) return this; + if (other.hasConsumerId()) { + setConsumerId(other.getConsumerId()); + } + if (other.hasRequestId()) { + setRequestId(other.getRequestId()); + } + return this; + } + + public final boolean isInitialized() { + if (!hasConsumerId()) { + + return false; + } + if (!hasRequestId()) { + + return false; + } + return true; + } + + public Builder mergeFrom(com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + throw new java.io.IOException("Merge from CodedInputStream is disabled"); + } + public Builder mergeFrom( + org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + + return this; + default: { + if (!input.skipField(tag)) { + + return this; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + consumerId_ = input.readUInt64(); + break; + } + case 16: { + bitField0_ |= 0x00000002; + requestId_ = input.readUInt64(); + break; + } + } + } + } + + private int bitField0_; + + // required uint64 consumer_id = 1; + private long consumerId_ ; + public boolean hasConsumerId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public long getConsumerId() { + return consumerId_; + } + public Builder setConsumerId(long value) { + bitField0_ |= 0x00000001; + consumerId_ = value; + + return this; + } + public Builder clearConsumerId() { + bitField0_ = (bitField0_ & ~0x00000001); + consumerId_ = 0L; + + return this; + } + + // required uint64 request_id = 2; + private long requestId_ ; + public boolean hasRequestId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public long getRequestId() { + return requestId_; + } + public Builder setRequestId(long value) { + bitField0_ |= 0x00000002; + requestId_ = value; + + return this; + } + public Builder clearRequestId() { + bitField0_ = (bitField0_ & ~0x00000002); + requestId_ = 0L; + + return this; + } + + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetLastMessageId) + } + + static { + defaultInstance = new CommandGetLastMessageId(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:pulsar.proto.CommandGetLastMessageId) + } + + public interface CommandGetLastMessageIdResponseOrBuilder + extends com.google.protobuf.MessageLiteOrBuilder { + + // required .pulsar.proto.MessageIdData last_message_id = 1; + boolean hasLastMessageId(); + org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getLastMessageId(); + + // required uint64 request_id = 2; + boolean hasRequestId(); + long getRequestId(); + } + public static final class CommandGetLastMessageIdResponse extends + com.google.protobuf.GeneratedMessageLite + implements CommandGetLastMessageIdResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage { + // Use CommandGetLastMessageIdResponse.newBuilder() to construct. + private final io.netty.util.Recycler.Handle<CommandGetLastMessageIdResponse> handle; + private CommandGetLastMessageIdResponse(io.netty.util.Recycler.Handle<CommandGetLastMessageIdResponse> handle) { + this.handle = handle; + } + + private static final io.netty.util.Recycler<CommandGetLastMessageIdResponse> RECYCLER = new io.netty.util.Recycler<CommandGetLastMessageIdResponse>() { + protected CommandGetLastMessageIdResponse newObject(Handle<CommandGetLastMessageIdResponse> handle) { + return new CommandGetLastMessageIdResponse(handle); + } + }; + + public void recycle() { + this.initFields(); + this.memoizedIsInitialized = -1; + this.bitField0_ = 0; + this.memoizedSerializedSize = -1; + handle.recycle(this); + } + + private CommandGetLastMessageIdResponse(boolean noInit) { + this.handle = null; + } + + private static final CommandGetLastMessageIdResponse defaultInstance; + public static CommandGetLastMessageIdResponse getDefaultInstance() { + return defaultInstance; + } + + public CommandGetLastMessageIdResponse getDefaultInstanceForType() { + return defaultInstance; + } + + private int bitField0_; + // required .pulsar.proto.MessageIdData last_message_id = 1; + public static final int LAST_MESSAGE_ID_FIELD_NUMBER = 1; + private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData lastMessageId_; + public boolean hasLastMessageId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getLastMessageId() { + return lastMessageId_; + } + + // required uint64 request_id = 2; + public static final int REQUEST_ID_FIELD_NUMBER = 2; + private long requestId_; + public boolean hasRequestId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public long getRequestId() { + return requestId_; + } + + private void initFields() { + lastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + requestId_ = 0L; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasLastMessageId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasRequestId()) { + memoizedIsInitialized = 0; + return false; + } + if (!getLastMessageId().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + throw new RuntimeException("Cannot use CodedOutputStream"); + } + + public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeMessage(1, lastMessageId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt64(2, requestId_); + } + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, lastMessageId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(2, requestId_); + } + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessageLite.Builder< + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse, Builder> + implements org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder { + // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.newBuilder() + private final io.netty.util.Recycler.Handle<Builder> handle; + private Builder(io.netty.util.Recycler.Handle<Builder> handle) { + this.handle = handle; + maybeForceBuilderInitialization(); + } + private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() { + protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) { + return new Builder(handle); + } + }; + + public void recycle() { + clear(); + handle.recycle(this); + } + + private void maybeForceBuilderInitialization() { + } + private static Builder create() { + return RECYCLER.get(); + } + + public Builder clear() { + super.clear(); + lastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + bitField0_ = (bitField0_ & ~0x00000001); + requestId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse getDefaultInstanceForType() { + return org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance(); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse build() { + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse buildPartial() { + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse result = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.RECYCLER.get(); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.lastMessageId_ = lastMessageId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.requestId_ = requestId_; + result.bitField0_ = to_bitField0_; + return result; + } + + public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse other) { + if (other == org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance()) return this; + if (other.hasLastMessageId()) { + mergeLastMessageId(other.getLastMessageId()); + } + if (other.hasRequestId()) { + setRequestId(other.getRequestId()); + } + return this; + } + + public final boolean isInitialized() { + if (!hasLastMessageId()) { + + return false; + } + if (!hasRequestId()) { + + return false; + } + if (!getLastMessageId().isInitialized()) { + + return false; + } + return true; + } + + public Builder mergeFrom(com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + throw new java.io.IOException("Merge from CodedInputStream is disabled"); + } + public Builder mergeFrom( + org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + + return this; + default: { + if (!input.skipField(tag)) { + + return this; + } + break; + } + case 10: { + org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(); + if (hasLastMessageId()) { + subBuilder.mergeFrom(getLastMessageId()); + } + input.readMessage(subBuilder, extensionRegistry); + setLastMessageId(subBuilder.buildPartial()); + subBuilder.recycle(); + break; + } + case 16: { + bitField0_ |= 0x00000002; + requestId_ = input.readUInt64(); + break; + } + } + } + } + + private int bitField0_; + + // required .pulsar.proto.MessageIdData last_message_id = 1; + private org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData lastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + public boolean hasLastMessageId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getLastMessageId() { + return lastMessageId_; + } + public Builder setLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData value) { + if (value == null) { + throw new NullPointerException(); + } + lastMessageId_ = value; + + bitField0_ |= 0x00000001; + return this; + } + public Builder setLastMessageId( + org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder builderForValue) { + lastMessageId_ = builderForValue.build(); + + bitField0_ |= 0x00000001; + return this; + } + public Builder mergeLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData value) { + if (((bitField0_ & 0x00000001) == 0x00000001) && + lastMessageId_ != org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance()) { + lastMessageId_ = + org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(lastMessageId_).mergeFrom(value).buildPartial(); + } else { + lastMessageId_ = value; + } + + bitField0_ |= 0x00000001; + return this; + } + public Builder clearLastMessageId() { + lastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + // required uint64 request_id = 2; + private long requestId_ ; + public boolean hasRequestId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public long getRequestId() { + return requestId_; + } + public Builder setRequestId(long value) { + bitField0_ |= 0x00000002; + requestId_ = value; + + return this; + } + public Builder clearRequestId() { + bitField0_ = (bitField0_ & ~0x00000002); + requestId_ = 0L; + + return this; + } + + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetLastMessageIdResponse) + } + + static { + defaultInstance = new CommandGetLastMessageIdResponse(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:pulsar.proto.CommandGetLastMessageIdResponse) + } + public interface BaseCommandOrBuilder extends com.google.protobuf.MessageLiteOrBuilder { @@ -20181,6 +21009,14 @@ public final class PulsarApi { // optional .pulsar.proto.CommandSeek seek = 28; boolean hasSeek(); org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek getSeek(); + + // optional .pulsar.proto.CommandGetLastMessageId getLastMessageId = 29; + boolean hasGetLastMessageId(); + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId getGetLastMessageId(); + + // optional .pulsar.proto.CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; + boolean hasGetLastMessageIdResponse(); + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse getGetLastMessageIdResponse(); } public static final class BaseCommand extends com.google.protobuf.GeneratedMessageLite @@ -20247,6 +21083,8 @@ public final class PulsarApi { CONSUMER_STATS_RESPONSE(24, 26), REACHED_END_OF_TOPIC(25, 27), SEEK(26, 28), + GET_LAST_MESSAGE_ID(27, 29), + GET_LAST_MESSAGE_ID_RESPONSE(28, 30), ; public static final int CONNECT_VALUE = 2; @@ -20276,6 +21114,8 @@ public final class PulsarApi { public static final int CONSUMER_STATS_RESPONSE_VALUE = 26; public static final int REACHED_END_OF_TOPIC_VALUE = 27; public static final int SEEK_VALUE = 28; + public static final int GET_LAST_MESSAGE_ID_VALUE = 29; + public static final int GET_LAST_MESSAGE_ID_RESPONSE_VALUE = 30; public final int getNumber() { return value; } @@ -20309,6 +21149,8 @@ public final class PulsarApi { case 26: return CONSUMER_STATS_RESPONSE; case 27: return REACHED_END_OF_TOPIC; case 28: return SEEK; + case 29: return GET_LAST_MESSAGE_ID; + case 30: return GET_LAST_MESSAGE_ID_RESPONSE; default: return null; } } @@ -20615,6 +21457,26 @@ public final class PulsarApi { return seek_; } + // optional .pulsar.proto.CommandGetLastMessageId getLastMessageId = 29; + public static final int GETLASTMESSAGEID_FIELD_NUMBER = 29; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId getLastMessageId_; + public boolean hasGetLastMessageId() { + return ((bitField0_ & 0x10000000) == 0x10000000); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId getGetLastMessageId() { + return getLastMessageId_; + } + + // optional .pulsar.proto.CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; + public static final int GETLASTMESSAGEIDRESPONSE_FIELD_NUMBER = 30; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse getLastMessageIdResponse_; + public boolean hasGetLastMessageIdResponse() { + return ((bitField0_ & 0x20000000) == 0x20000000); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse getGetLastMessageIdResponse() { + return getLastMessageIdResponse_; + } + private void initFields() { type_ = org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT; connect_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance(); @@ -20644,6 +21506,8 @@ public final class PulsarApi { consumerStatsResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse.getDefaultInstance(); reachedEndOfTopic_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopic.getDefaultInstance(); seek_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance(); + getLastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance(); + getLastMessageIdResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -20804,6 +21668,18 @@ public final class PulsarApi { return false; } } + if (hasGetLastMessageId()) { + if (!getGetLastMessageId().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + if (hasGetLastMessageIdResponse()) { + if (!getGetLastMessageIdResponse().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -20900,6 +21776,12 @@ public final class PulsarApi { if (((bitField0_ & 0x08000000) == 0x08000000)) { output.writeMessage(28, seek_); } + if (((bitField0_ & 0x10000000) == 0x10000000)) { + output.writeMessage(29, getLastMessageId_); + } + if (((bitField0_ & 0x20000000) == 0x20000000)) { + output.writeMessage(30, getLastMessageIdResponse_); + } } private int memoizedSerializedSize = -1; @@ -21020,6 +21902,14 @@ public final class PulsarApi { size += com.google.protobuf.CodedOutputStream .computeMessageSize(28, seek_); } + if (((bitField0_ & 0x10000000) == 0x10000000)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(29, getLastMessageId_); + } + if (((bitField0_ & 0x20000000) == 0x20000000)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(30, getLastMessageIdResponse_); + } memoizedSerializedSize = size; return size; } @@ -21189,6 +22079,10 @@ public final class PulsarApi { bitField0_ = (bitField0_ & ~0x04000000); seek_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance(); bitField0_ = (bitField0_ & ~0x08000000); + getLastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance(); + bitField0_ = (bitField0_ & ~0x10000000); + getLastMessageIdResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance(); + bitField0_ = (bitField0_ & ~0x20000000); return this; } @@ -21334,6 +22228,14 @@ public final class PulsarApi { to_bitField0_ |= 0x08000000; } result.seek_ = seek_; + if (((from_bitField0_ & 0x10000000) == 0x10000000)) { + to_bitField0_ |= 0x10000000; + } + result.getLastMessageId_ = getLastMessageId_; + if (((from_bitField0_ & 0x20000000) == 0x20000000)) { + to_bitField0_ |= 0x20000000; + } + result.getLastMessageIdResponse_ = getLastMessageIdResponse_; result.bitField0_ = to_bitField0_; return result; } @@ -21424,6 +22326,12 @@ public final class PulsarApi { if (other.hasSeek()) { mergeSeek(other.getSeek()); } + if (other.hasGetLastMessageId()) { + mergeGetLastMessageId(other.getGetLastMessageId()); + } + if (other.hasGetLastMessageIdResponse()) { + mergeGetLastMessageIdResponse(other.getGetLastMessageIdResponse()); + } return this; } @@ -21582,6 +22490,18 @@ public final class PulsarApi { return false; } } + if (hasGetLastMessageId()) { + if (!getGetLastMessageId().isInitialized()) { + + return false; + } + } + if (hasGetLastMessageIdResponse()) { + if (!getGetLastMessageIdResponse().isInitialized()) { + + return false; + } + } return true; } @@ -21886,6 +22806,26 @@ public final class PulsarApi { subBuilder.recycle(); break; } + case 234: { + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.newBuilder(); + if (hasGetLastMessageId()) { + subBuilder.mergeFrom(getGetLastMessageId()); + } + input.readMessage(subBuilder, extensionRegistry); + setGetLastMessageId(subBuilder.buildPartial()); + subBuilder.recycle(); + break; + } + case 242: { + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.newBuilder(); + if (hasGetLastMessageIdResponse()) { + subBuilder.mergeFrom(getGetLastMessageIdResponse()); + } + input.readMessage(subBuilder, extensionRegistry); + setGetLastMessageIdResponse(subBuilder.buildPartial()); + subBuilder.recycle(); + break; + } } } } @@ -23077,6 +24017,92 @@ public final class PulsarApi { return this; } + // optional .pulsar.proto.CommandGetLastMessageId getLastMessageId = 29; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId getLastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance(); + public boolean hasGetLastMessageId() { + return ((bitField0_ & 0x10000000) == 0x10000000); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId getGetLastMessageId() { + return getLastMessageId_; + } + public Builder setGetLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId value) { + if (value == null) { + throw new NullPointerException(); + } + getLastMessageId_ = value; + + bitField0_ |= 0x10000000; + return this; + } + public Builder setGetLastMessageId( + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.Builder builderForValue) { + getLastMessageId_ = builderForValue.build(); + + bitField0_ |= 0x10000000; + return this; + } + public Builder mergeGetLastMessageId(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId value) { + if (((bitField0_ & 0x10000000) == 0x10000000) && + getLastMessageId_ != org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance()) { + getLastMessageId_ = + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.newBuilder(getLastMessageId_).mergeFrom(value).buildPartial(); + } else { + getLastMessageId_ = value; + } + + bitField0_ |= 0x10000000; + return this; + } + public Builder clearGetLastMessageId() { + getLastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance(); + + bitField0_ = (bitField0_ & ~0x10000000); + return this; + } + + // optional .pulsar.proto.CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; + private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse getLastMessageIdResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance(); + public boolean hasGetLastMessageIdResponse() { + return ((bitField0_ & 0x20000000) == 0x20000000); + } + public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse getGetLastMessageIdResponse() { + return getLastMessageIdResponse_; + } + public Builder setGetLastMessageIdResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse value) { + if (value == null) { + throw new NullPointerException(); + } + getLastMessageIdResponse_ = value; + + bitField0_ |= 0x20000000; + return this; + } + public Builder setGetLastMessageIdResponse( + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.Builder builderForValue) { + getLastMessageIdResponse_ = builderForValue.build(); + + bitField0_ |= 0x20000000; + return this; + } + public Builder mergeGetLastMessageIdResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse value) { + if (((bitField0_ & 0x20000000) == 0x20000000) && + getLastMessageIdResponse_ != org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance()) { + getLastMessageIdResponse_ = + org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.newBuilder(getLastMessageIdResponse_).mergeFrom(value).buildPartial(); + } else { + getLastMessageIdResponse_ = value; + } + + bitField0_ |= 0x20000000; + return this; + } + public Builder clearGetLastMessageIdResponse() { + getLastMessageIdResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance(); + + bitField0_ = (bitField0_ & ~0x20000000); + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 964fd52..ca3d2fb 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -135,6 +135,7 @@ enum ProtocolVersion { v9 = 9; // Added end of topic notification v10 = 10;// Added proxy to broker v11 = 11;// C++ consumers before this version are not correctly handling the checksum field + v12 = 12;//Added get topic's last messageId from broker } message CommandConnect { @@ -152,13 +153,13 @@ message CommandConnect { // a Pulsar proxy. In this case the auth info above // will be the auth of the proxy itself optional string original_principal = 7; - - // Original auth role and auth Method that was passed + + // Original auth role and auth Method that was passed // to the proxy. In this case the auth info above - // will be the auth of the proxy itself + // will be the auth of the proxy itself optional string original_auth_data = 8; optional string original_auth_method = 9; - + } message CommandConnected { @@ -202,9 +203,9 @@ message CommandPartitionedTopicMetadata { // Original principal that was verified by // a Pulsar proxy. optional string original_principal = 3; - - // Original auth role and auth Method that was passed - // to the proxy. + + // Original auth role and auth Method that was passed + // to the proxy. optional string original_auth_data = 4; optional string original_auth_method = 5; } @@ -225,13 +226,13 @@ message CommandLookupTopic { required string topic = 1; required uint64 request_id = 2; optional bool authoritative = 3 [default = false]; - + // Original principal that was verified by // a Pulsar proxy. optional string original_principal = 4; - - // Original auth role and auth Method that was passed - // to the proxy. + + // Original auth role and auth Method that was passed + // to the proxy. optional string original_auth_data = 5; optional string original_auth_method = 6; } @@ -340,7 +341,7 @@ message CommandUnsubscribe { message CommandSeek { required uint64 consumer_id = 1; required uint64 request_id = 2; - + optional MessageIdData message_id = 3; } @@ -374,7 +375,7 @@ message CommandSuccess { message CommandProducerSuccess { required uint64 request_id = 1; required string producer_name = 2; - + // The last sequence id that was stored by this producer in the previous session // This will only be meaningful if deduplication has been enabled. optional int64 last_sequence_id = 3 [default = -1]; @@ -443,6 +444,16 @@ message CommandConsumerStatsResponse { optional uint64 msgBacklog = 15; } +message CommandGetLastMessageId { + required uint64 consumer_id = 1; + required uint64 request_id = 2; +} + +message CommandGetLastMessageIdResponse { + required MessageIdData last_message_id = 1; + required uint64 request_id = 2; +} + message BaseCommand { enum Type { CONNECT = 2; @@ -486,6 +497,9 @@ message BaseCommand { REACHED_END_OF_TOPIC = 27; SEEK = 28; + + GET_LAST_MESSAGE_ID = 29; + GET_LAST_MESSAGE_ID_RESPONSE = 30; } required Type type = 1; @@ -524,6 +538,11 @@ message BaseCommand { optional CommandConsumerStatsResponse consumerStatsResponse = 26; optional CommandReachedEndOfTopic reachedEndOfTopic = 27; - + optional CommandSeek seek = 28; + + optional CommandGetLastMessageId getLastMessageId = 29; + optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30; + + } -- To stop receiving notification emails like this one, please contact mme...@apache.org.