This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b6b512bd9c90b4c12ab0edb268c7ecd0344e8ab8 Author: Jason918 <[email protected]> AuthorDate: Thu Jul 29 11:41:07 2021 +0800 [Issue 11440]. Add complete metadata for admin.topics().examineMessages (#11443) Fixes #11440 see issue 11440 Add all the other non-empty meta data fields in http response headers in `org.apache.pulsar.broker.admin.impl.PersistentTopicsBase#generateResponseWithEntry`. For fields with byte[] type, base64 is used for serialization and deserialization. (cherry picked from commit afe47926b904033c304256d96ade5e6214c51bbd) --- .../broker/admin/impl/PersistentTopicsBase.java | 73 ++++++++++++ .../pulsar/broker/admin/PersistentTopicsTest.java | 46 +++++++- .../pulsar/client/admin/internal/TopicsImpl.java | 125 ++++++++++++++++++++- site2/docs/admin-api-topics.md | 23 ++++ 4 files changed, 264 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index fa3e939..c1f6800 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -29,6 +29,7 @@ import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.ArrayList; +import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -92,6 +93,7 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.api.proto.EncryptionKeys; import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.compression.CompressionCodec; @@ -2519,6 +2521,77 @@ public class PersistentTopicsBase extends AdminResource { } responseBuilder.header("X-Pulsar-Is-Encrypted", metadata.getEncryptionKeysCount() > 0); + if (metadata.hasProducerName()) { + responseBuilder.header("X-Pulsar-producer-name", metadata.getProducerName()); + } + if (metadata.hasSequenceId()) { + responseBuilder.header("X-Pulsar-sequence-id", metadata.getSequenceId()); + } + if (metadata.hasReplicatedFrom()) { + responseBuilder.header("X-Pulsar-replicated-from", metadata.getReplicatedFrom()); + } + for (String replicatedTo : metadata.getReplicateTosList()) { + responseBuilder.header("X-Pulsar-replicated-to", replicatedTo); + } + if (metadata.hasPartitionKey()) { + responseBuilder.header("X-Pulsar-partition-key", metadata.getPartitionKey()); + } + if (metadata.hasCompression()) { + responseBuilder.header("X-Pulsar-compression", metadata.getCompression()); + } + if (metadata.hasUncompressedSize()) { + responseBuilder.header("X-Pulsar-uncompressed-size", metadata.getUncompressedSize()); + } + if (metadata.hasEncryptionAlgo()) { + responseBuilder.header("X-Pulsar-encryption-algo", metadata.getEncryptionAlgo()); + } + for (EncryptionKeys encryptionKeys : metadata.getEncryptionKeysList()) { + responseBuilder.header("X-Pulsar-Base64-encryption-keys", + Base64.getEncoder().encodeToString(encryptionKeys.toByteArray())); + } + if (metadata.hasEncryptionParam()) { + responseBuilder.header("X-Pulsar-Base64-encryption-param", + Base64.getEncoder().encodeToString(metadata.getEncryptionParam())); + } + if (metadata.hasSchemaVersion()) { + responseBuilder.header("X-Pulsar-Base64-schema-version", + Base64.getEncoder().encodeToString(metadata.getSchemaVersion())); + } + if (metadata.hasPartitionKeyB64Encoded()) { + responseBuilder.header("X-Pulsar-partition-key-b64-encoded", metadata.isPartitionKeyB64Encoded()); + } + if (metadata.hasOrderingKey()) { + responseBuilder.header("X-Pulsar-Base64-ordering-key", + Base64.getEncoder().encodeToString(metadata.getOrderingKey())); + } + if (metadata.hasMarkerType()) { + responseBuilder.header("X-Pulsar-marker-type", metadata.getMarkerType()); + } + if (metadata.hasTxnidLeastBits()) { + responseBuilder.header("X-Pulsar-txnid-least-bits", metadata.getTxnidLeastBits()); + } + if (metadata.hasTxnidMostBits()) { + responseBuilder.header("X-Pulsar-txnid-most-bits", metadata.getTxnidMostBits()); + } + if (metadata.hasHighestSequenceId()) { + responseBuilder.header("X-Pulsar-highest-sequence-id", metadata.getHighestSequenceId()); + } + if (metadata.hasUuid()) { + responseBuilder.header("X-Pulsar-uuid", metadata.getUuid()); + } + if (metadata.hasNumChunksFromMsg()) { + responseBuilder.header("X-Pulsar-num-chunks-from-msg", metadata.getNumChunksFromMsg()); + } + if (metadata.hasTotalChunkMsgSize()) { + responseBuilder.header("X-Pulsar-total-chunk-msg-size", metadata.getTotalChunkMsgSize()); + } + if (metadata.hasChunkId()) { + responseBuilder.header("X-Pulsar-chunk-id", metadata.getChunkId()); + } + if (metadata.hasNullPartitionKey()) { + responseBuilder.header("X-Pulsar-null-partition-key", metadata.isNullPartitionKey()); + } + // Decode if needed CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression()); ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, metadata.getUncompressedSize()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 12b9529..d298147 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -43,7 +43,6 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; - import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.admin.v2.NonPersistentTopics; import org.apache.pulsar.broker.admin.v2.PersistentTopics; @@ -54,12 +53,14 @@ import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.TopicsImpl; +import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -67,7 +68,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; @@ -722,6 +722,48 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { } @Test + public void testExamineMessageMetadata() throws Exception { + TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant("tenant-xyz", tenantInfo); + admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test")); + final String topicName = "persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata"; + + admin.topics().createPartitionedTopic(topicName, 2); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .producerName("testExamineMessageMetadataProducer") + .compressionType(CompressionType.LZ4) + .topic(topicName + "-partition-0") + .create(); + + producer.newMessage() + .keyBytes("partition123".getBytes()) + .orderingKey(new byte[]{0}) + .replicationClusters(Lists.newArrayList("a", "b")) + .sequenceId(112233) + .value("data") + .send(); + + MessageImpl<byte[]> message = (MessageImpl<byte[]>) admin.topics().examineMessage( + topicName + "-partition-0", "earliest", 1); + + //test long + Assert.assertEquals(112233, message.getSequenceId()); + //test byte[] + Assert.assertEquals(new byte[]{0}, message.getOrderingKey()); + //test bool and byte[] + Assert.assertEquals("partition123".getBytes(), message.getKeyBytes()); + Assert.assertTrue(message.hasBase64EncodedKey()); + //test arrays + Assert.assertEquals(Lists.newArrayList("a", "b"), message.getReplicateTo()); + //test string + Assert.assertEquals(producer.getProducerName(), message.getProducerName()); + //test enum + Assert.assertEquals(CompressionType.LZ4.ordinal(), message.getMessageBuilder().getCompression().ordinal()); + + Assert.assertEquals("data", new String(message.getData())); + } + + @Test public void testOffloadWithNullMessageId() { final String topicName = "topic-123"; persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index d46f214..8ddc75d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -25,6 +25,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.io.InputStream; import java.util.ArrayList; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Map; @@ -59,6 +60,8 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ResetCursorData; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.pulsar.common.api.proto.CompressionType; +import org.apache.pulsar.common.api.proto.EncryptionKeys; import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.SingleMessageMetadata; @@ -100,7 +103,29 @@ public class TopicsImpl extends BaseResource implements Topics { private static final String EVENT_TIME = "X-Pulsar-event-time"; private static final String DELIVER_AT_TIME = "X-Pulsar-deliver-at-time"; private static final String BROKER_ENTRY_TIMESTAMP = "X-Pulsar-Broker-Entry-METADATA-timestamp"; - private static final String BROKER_ENTRY_INDEX = "X-Pulsar-Broker-Entry-METADATA-index"; + private static final String BROKER_ENTRY_INDEX = "X-Pulsar-Broker-Entry-METADATA-index"; + private static final String PRODUCER_NAME = "X-Pulsar-producer-name"; + private static final String SEQUENCE_ID = "X-Pulsar-sequence-id"; + private static final String REPLICATED_FROM = "X-Pulsar-replicated-from"; + private static final String PARTITION_KEY = "X-Pulsar-partition-key"; + private static final String COMPRESSION = "X-Pulsar-compression"; + private static final String UNCOMPRESSED_SIZE = "X-Pulsar-uncompressed-size"; + private static final String ENCRYPTION_ALGO = "X-Pulsar-encryption-algo"; + private static final String MARKER_TYPE = "X-Pulsar-marker-type"; + private static final String TXNID_LEAST_BITS = "X-Pulsar-txnid-least-bits"; + private static final String TXNID_MOST_BITS = "X-Pulsar-txnid-most-bits"; + private static final String HIGHEST_SEQUENCE_ID = "X-Pulsar-highest-sequence-id"; + private static final String UUID = "X-Pulsar-uuid"; + private static final String NUM_CHUNKS_FROM_MSG = "X-Pulsar-num-chunks-from-msg"; + private static final String TOTAL_CHUNK_MSG_SIZE = "X-Pulsar-total-chunk-msg-size"; + private static final String CHUNK_ID = "X-Pulsar-chunk-id"; + private static final String PARTITION_KEY_B64_ENCODED = "X-Pulsar-partition-key-b64-encoded"; + private static final String NULL_PARTITION_KEY = "X-Pulsar-null-partition-key"; + private static final String REPLICATED_TO = "X-Pulsar-replicated-to"; + private static final String ORDERING_KEY = "X-Pulsar-Base64-ordering-key"; + private static final String SCHEMA_VERSION = "X-Pulsar-Base64-schema-version-b64encoded"; + private static final String ENCRYPTION_PARAM = "X-Pulsar-Base64-encryption-param"; + private static final String ENCRYPTION_KEYS = "X-Pulsar-Base64-encryption-keys"; // CHECKSTYLE.ON: MemberName public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) { @@ -1517,6 +1542,104 @@ public class TopicsImpl extends BaseResource implements Topics { messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString())); } + tmp = headers.getFirst(PRODUCER_NAME); + if (tmp != null) { + messageMetadata.setProducerName(tmp.toString()); + } + tmp = headers.getFirst(SEQUENCE_ID); + if (tmp != null) { + messageMetadata.setSequenceId(Long.parseLong(tmp.toString())); + } + tmp = headers.getFirst(REPLICATED_FROM); + if (tmp != null) { + messageMetadata.setReplicatedFrom(tmp.toString()); + } + tmp = headers.getFirst(PARTITION_KEY); + if (tmp != null) { + messageMetadata.setPartitionKey(tmp.toString()); + } + tmp = headers.getFirst(COMPRESSION); + if (tmp != null) { + messageMetadata.setCompression(CompressionType.valueOf(tmp.toString())); + } + tmp = headers.getFirst(UNCOMPRESSED_SIZE); + if (tmp != null) { + messageMetadata.setUncompressedSize(Integer.parseInt(tmp.toString())); + } + tmp = headers.getFirst(ENCRYPTION_ALGO); + if (tmp != null) { + messageMetadata.setEncryptionAlgo(tmp.toString()); + } + tmp = headers.getFirst(PARTITION_KEY_B64_ENCODED); + if (tmp != null) { + messageMetadata.setPartitionKeyB64Encoded(Boolean.parseBoolean(tmp.toString())); + } + tmp = headers.getFirst(MARKER_TYPE); + if (tmp != null) { + messageMetadata.setMarkerType(Integer.parseInt(tmp.toString())); + } + tmp = headers.getFirst(TXNID_LEAST_BITS); + if (tmp != null) { + messageMetadata.setTxnidLeastBits(Long.parseLong(tmp.toString())); + } + tmp = headers.getFirst(TXNID_MOST_BITS); + if (tmp != null) { + messageMetadata.setTxnidMostBits(Long.parseLong(tmp.toString())); + } + tmp = headers.getFirst(HIGHEST_SEQUENCE_ID); + if (tmp != null) { + messageMetadata.setHighestSequenceId(Long.parseLong(tmp.toString())); + } + tmp = headers.getFirst(UUID); + if (tmp != null) { + messageMetadata.setUuid(tmp.toString()); + } + tmp = headers.getFirst(NUM_CHUNKS_FROM_MSG); + if (tmp != null) { + messageMetadata.setNumChunksFromMsg(Integer.parseInt(tmp.toString())); + } + tmp = headers.getFirst(TOTAL_CHUNK_MSG_SIZE); + if (tmp != null) { + messageMetadata.setTotalChunkMsgSize(Integer.parseInt(tmp.toString())); + } + tmp = headers.getFirst(CHUNK_ID); + if (tmp != null) { + messageMetadata.setChunkId(Integer.parseInt(tmp.toString())); + } + tmp = headers.getFirst(NULL_PARTITION_KEY); + if (tmp != null) { + messageMetadata.setNullPartitionKey(Boolean.parseBoolean(tmp.toString())); + } + tmp = headers.getFirst(ENCRYPTION_PARAM); + if (tmp != null) { + messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString())); + } + tmp = headers.getFirst(ORDERING_KEY); + if (tmp != null) { + messageMetadata.setOrderingKey(Base64.getDecoder().decode(tmp.toString())); + } + tmp = headers.getFirst(SCHEMA_VERSION); + if (tmp != null) { + messageMetadata.setSchemaVersion(Base64.getDecoder().decode(tmp.toString())); + } + tmp = headers.getFirst(ENCRYPTION_PARAM); + if (tmp != null) { + messageMetadata.setEncryptionParam(Base64.getDecoder().decode(tmp.toString())); + } + List<Object> tmpList = headers.get(REPLICATED_TO); + if (tmpList != null) { + for (Object o : tmpList) { + messageMetadata.addReplicateTo(o.toString()); + } + } + tmpList = headers.get(ENCRYPTION_KEYS); + if (tmpList != null) { + for (Object o : tmpList) { + EncryptionKeys encryptionKey = messageMetadata.addEncryptionKey(); + encryptionKey.parseFrom(Base64.getDecoder().decode(o.toString())); + } + } + tmp = headers.getFirst(BATCH_SIZE_HEADER); if (tmp != null) { properties.put(BATCH_SIZE_HEADER, (String) tmp); diff --git a/site2/docs/admin-api-topics.md b/site2/docs/admin-api-topics.md index ae344f9..1f4b6fb3 100644 --- a/site2/docs/admin-api-topics.md +++ b/site2/docs/admin-api-topics.md @@ -538,6 +538,29 @@ admin.topics().getMessageById(topic, ledgerId, entryId); <!--END_DOCUSAURUS_CODE_TABS--> +### Examine messages + +You can examine a specific message on a topic by position relative to the earliest or the latest message. + +<!--DOCUSAURUS_CODE_TABS--> +<!--pulsar-admin--> +```shell +./bin/pulsar-admin topics examine-messages \ + persistent://public/default/my-topic \ + -i latest -m 1 +``` + +<!--REST API--> +{@inject: endpoint|GET|/admin/v2/:schema/:tenant/:namespace/:topic/examinemessage?initialPosition=:initialPosition&messagePosition=:messagePosition|operation/examineMessage?version=[[pulsar:version_number]]} + +<!--Java--> +```java +String topic = "persistent://my-tenant/my-namespace/my-topic"; +admin.topics().examineMessage(topic, "latest", 1); +``` + +<!--END_DOCUSAURUS_CODE_TABS--> + ### Skip messages You can skip a number of messages for a specific subscription of a given topic in the following ways.
