This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 68b0ca4abd001fcfef267b6dc18da26a2bbd19ed Author: Aloys <[email protected]> AuthorDate: Mon Jul 12 22:23:54 2021 +0800 Expose broker entry metadata and deliverAtTime to peekMessages/getMessageById/examineMessage (#11279) (cherry picked from commit ea628541950a2cd02746ef0cfce02528b7824fed) --- .../broker/admin/impl/PersistentTopicsBase.java | 14 ++++ .../broker/service/BrokerEntryMetadataE2ETest.java | 89 +++++++++++++++++++++- .../pulsar/client/admin/internal/TopicsImpl.java | 57 ++++++++++++-- .../org/apache/pulsar/admin/cli/CmdTopics.java | 27 ++++++- .../org/apache/pulsar/client/impl/MessageImpl.java | 7 ++ 5 files changed, 181 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 234f153..fa3e939 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -89,6 +89,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeyValue; @@ -2478,6 +2479,7 @@ public class PersistentTopicsBase extends AdminResource { PositionImpl pos = (PositionImpl) entry.getPosition(); ByteBuf metadataAndPayload = entry.getDataBuffer(); + BrokerEntryMetadata brokerEntryMetadata = Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); ResponseBuilder responseBuilder = Response.ok(); @@ -2485,12 +2487,24 @@ public class PersistentTopicsBase extends AdminResource { for (KeyValue keyValue : metadata.getPropertiesList()) { responseBuilder.header("X-Pulsar-PROPERTY-" + keyValue.getKey(), keyValue.getValue()); } + if (brokerEntryMetadata != null) { + if (brokerEntryMetadata.hasBrokerTimestamp()) { + responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-timestamp", + DateFormatter.format(brokerEntryMetadata.getBrokerTimestamp())); + } + if (brokerEntryMetadata.hasIndex()) { + responseBuilder.header("X-Pulsar-Broker-Entry-METADATA-index", brokerEntryMetadata.getIndex()); + } + } if (metadata.hasPublishTime()) { responseBuilder.header("X-Pulsar-publish-time", DateFormatter.format(metadata.getPublishTime())); } if (metadata.hasEventTime()) { responseBuilder.header("X-Pulsar-event-time", DateFormatter.format(metadata.getEventTime())); } + if (metadata.hasDeliverAtTime()) { + responseBuilder.header("X-Pulsar-deliver-at-time", DateFormatter.format(metadata.getDeliverAtTime())); + } if (metadata.hasNumMessagesInBatch()) { responseBuilder.header("X-Pulsar-num-batch-message", metadata.getNumMessagesInBatch()); responseBuilder.header("X-Pulsar-batch-size", metadataAndPayload.readableBytes() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java index e7d98a8..52c8375 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java @@ -25,6 +25,9 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.assertj.core.util.Sets; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -38,6 +41,7 @@ import org.testng.annotations.Test; @Test(groups = "broker") public class BrokerEntryMetadataE2ETest extends BrokerTestBase { + @DataProvider(name = "subscriptionTypes") public static Object[] subscriptionTypes() { return new Object[] { @@ -97,17 +101,98 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase { public void testPeekMessage() throws Exception { final String topic = newTopicName(); final String subscription = "my-sub"; + final long eventTime= 200; + final long deliverAtTime = 300; @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) .create(); - producer.newMessage().value("hello".getBytes()).send(); + + long sendTime = System.currentTimeMillis(); + producer.newMessage() + .eventTime(eventTime) + .deliverAt(deliverAtTime) + .value("hello".getBytes()) + .send(); admin.topics().createSubscription(topic, subscription, MessageId.earliest); final List<Message<byte[]>> messages = admin.topics().peekMessages(topic, subscription, 1); Assert.assertEquals(messages.size(), 1); - Assert.assertEquals(messages.get(0).getData(), "hello".getBytes()); + MessageImpl message = (MessageImpl) messages.get(0); + Assert.assertEquals(message.getData(), "hello".getBytes()); + Assert.assertEquals(message.getEventTime(), eventTime); + Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime); + Assert.assertTrue(message.getPublishTime() >= sendTime); + + BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata(); + Assert.assertEquals(entryMetadata.getIndex(), 0); + Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime); + } + + @Test(timeOut = 20000) + public void testGetMessageById() throws Exception { + final String topic = newTopicName(); + final String subscription = "my-sub"; + final long eventTime= 200; + final long deliverAtTime = 300; + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + long sendTime = System.currentTimeMillis(); + MessageIdImpl messageId = (MessageIdImpl) producer.newMessage() + .eventTime(eventTime) + .deliverAt(deliverAtTime) + .value("hello".getBytes()) + .send(); + + admin.topics().createSubscription(topic, subscription, MessageId.earliest); + MessageImpl message = (MessageImpl) admin.topics() + .getMessageById(topic, messageId.getLedgerId(), messageId.getEntryId()); + Assert.assertEquals(message.getData(), "hello".getBytes()); + Assert.assertEquals(message.getEventTime(), eventTime); + Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime); + Assert.assertTrue(message.getPublishTime() >= sendTime); + + BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata(); + Assert.assertEquals(entryMetadata.getIndex(), 0); + Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime); + } + + + @Test(timeOut = 20000) + public void testExamineMessage() throws Exception { + final String topic = newTopicName(); + final String subscription = "my-sub"; + final long eventTime= 200; + final long deliverAtTime = 300; + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .create(); + + long sendTime = System.currentTimeMillis(); + producer.newMessage() + .eventTime(eventTime) + .deliverAt(deliverAtTime) + .value("hello".getBytes()) + .send(); + + admin.topics().createSubscription(topic, subscription, MessageId.earliest); + MessageImpl message = + (MessageImpl) admin.topics().examineMessage(topic, "earliest", 1); + Assert.assertEquals(message.getData(), "hello".getBytes()); + Assert.assertEquals(message.getEventTime(), eventTime); + Assert.assertEquals(message.getDeliverAtTime(), deliverAtTime); + Assert.assertTrue(message.getPublishTime() >= sendTime); + + BrokerEntryMetadata entryMetadata = message.getBrokerEntryMetadata(); + Assert.assertEquals(entryMetadata.getIndex(), 0); + Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime); } @Test(timeOut = 20000) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 647804f..d46f214 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -58,6 +58,7 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ResetCursorData; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; @@ -84,6 +85,7 @@ import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.DateFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +97,10 @@ public class TopicsImpl extends BaseResource implements Topics { private static final String BATCH_SIZE_HEADER = "X-Pulsar-batch-size"; private static final String MESSAGE_ID = "X-Pulsar-Message-ID"; private static final String PUBLISH_TIME = "X-Pulsar-publish-time"; + private static final String EVENT_TIME = "X-Pulsar-event-time"; + private static final String DELIVER_AT_TIME = "X-Pulsar-deliver-at-time"; + private static final String BROKER_ENTRY_TIMESTAMP = "X-Pulsar-Broker-Entry-METADATA-timestamp"; + private static final String BROKER_ENTRY_INDEX = "X-Pulsar-Broker-Entry-METADATA-index"; // CHECKSTYLE.ON: MemberName public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { @@ -1466,6 +1472,24 @@ public class TopicsImpl extends BaseResource implements Topics { } String msgId = response.getHeaderString(MESSAGE_ID); + + // build broker entry metadata if exist + String brokerEntryTimestamp = response.getHeaderString(BROKER_ENTRY_TIMESTAMP); + String brokerEntryIndex = response.getHeaderString(BROKER_ENTRY_INDEX); + BrokerEntryMetadata brokerEntryMetadata; + if (brokerEntryTimestamp == null && brokerEntryIndex == null) { + brokerEntryMetadata = null; + } else { + brokerEntryMetadata = new BrokerEntryMetadata(); + if (brokerEntryTimestamp != null) { + brokerEntryMetadata.setBrokerTimestamp(DateFormatter.parse(brokerEntryTimestamp.toString())); + } + + if (brokerEntryIndex != null) { + brokerEntryMetadata.setIndex(Long.parseLong(brokerEntryIndex)); + } + } + MessageMetadata messageMetadata = new MessageMetadata(); try (InputStream stream = (InputStream) response.getEntity()) { byte[] data = new byte[stream.available()]; @@ -1475,7 +1499,17 @@ public class TopicsImpl extends BaseResource implements Topics { MultivaluedMap<String, Object> headers = response.getHeaders(); Object tmp = headers.getFirst(PUBLISH_TIME); if (tmp != null) { - properties.put("publish-time", (String) tmp); + messageMetadata.setPublishTime(DateFormatter.parse(tmp.toString())); + } + + tmp = headers.getFirst(EVENT_TIME); + if (tmp != null) { + messageMetadata.setEventTime(DateFormatter.parse(tmp.toString())); + } + + tmp = headers.getFirst(DELIVER_AT_TIME); + if (tmp != null) { + messageMetadata.setDeliverAtTime(DateFormatter.parse(tmp.toString())); } tmp = headers.getFirst("X-Pulsar-null-value"); @@ -1508,16 +1542,21 @@ public class TopicsImpl extends BaseResource implements Topics { } if (!isEncrypted && response.getHeaderString(BATCH_HEADER) != null) { - return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); + return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata, brokerEntryMetadata); } - return Collections.singletonList(new MessageImpl<byte[]>(topic, msgId, properties, - Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata)); + MessageImpl message = new MessageImpl(topic, msgId, properties, + Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata); + if (brokerEntryMetadata != null) { + message.setBrokerEntryMetadata(brokerEntryMetadata); + } + return Collections.singletonList(message); } } private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data, - Map<String, String> properties, MessageMetadata msgMetadataBuilder) { + Map<String, String> properties, MessageMetadata msgMetadataBuilder, + BrokerEntryMetadata brokerEntryMetadata) { List<Message<byte[]>> ret = new ArrayList<>(); int batchSize = Integer.parseInt(properties.get(BATCH_HEADER)); ByteBuf buf = Unpooled.wrappedBuffer(data); @@ -1532,8 +1571,12 @@ public class TopicsImpl extends BaseResource implements Topics { properties.put(entry.getKey(), entry.getValue()); } } - ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, - Schema.BYTES, msgMetadataBuilder)); + MessageImpl message = new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload, + Schema.BYTES, msgMetadataBuilder); + if (brokerEntryMetadata != null) { + message.setBrokerEntryMetadata(brokerEntryMetadata); + } + ret.add(message); } catch (Exception ex) { log.error("Exception occurred while trying to get BatchMsgId: {}", batchMsgId, ex); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index da42686..b751196 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; @@ -835,18 +836,36 @@ public class CmdTopics extends CmdBase { List<Message<byte[]>> messages = getTopics().peekMessages(persistentTopic, subName, numMessages); int position = 0; for (Message<byte[]> msg : messages) { + MessageImpl message = (MessageImpl) msg; if (++position != 1) { System.out.println("-------------------------------------------------------------------------\n"); } - if (msg.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId(); + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + msgId.getBatchIndex()); } else { MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); } - if (msg.getProperties().size() > 0) { - System.out.println("Tenants:"); + + System.out.println("Publish time: " + message.getPublishTime()); + System.out.println("Event time: " + message.getEventTime()); + + if (message.getDeliverAtTime() != 0) { + System.out.println("Deliver at time: " + message.getDeliverAtTime()); + } + + if (message.getBrokerEntryMetadata() != null) { + if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { + System.out.println("Broker entry metadata timestamp: " + message.getBrokerEntryMetadata().getBrokerTimestamp()); + } + if (message.getBrokerEntryMetadata().hasIndex()) { + System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex()); + } + } + + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); print(msg.getProperties()); } ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index c9370f3..7f0c113 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -341,6 +341,13 @@ public class MessageImpl<T> implements Message<T> { return 0; } + public long getDeliverAtTime() { + if (msgMetadata.hasDeliverAtTime()) { + return msgMetadata.getDeliverAtTime(); + } + return 0; + } + public boolean isExpired(int messageTTLInSeconds) { return messageTTLInSeconds != 0 && (brokerEntryMetadata == null || !brokerEntryMetadata.hasBrokerTimestamp() ? (System.currentTimeMillis() >
