This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 172f050  Added message parser to decode raw headers and payload 
without a Consumer instance (#2019)
172f050 is described below

commit 172f05050318889af825b1e7b71ba6f43da8c5df
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 25 09:56:46 2018 -0700

    Added message parser to decode raw headers and payload without a Consumer 
instance (#2019)
    
    * Added message parser to decode raw headers and payload without a Consumer 
instance
    
    * Added header
---
 .../pulsar/client/impl/MessageParserTest.java      | 155 +++++++++++++++++
 .../apache/pulsar/client/impl/MessageParser.java   | 190 +++++++++++++++++++++
 2 files changed, 345 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
new file mode 100644
index 0000000..a16166a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import com.google.common.collect.Sets;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import avro.shaded.com.google.common.collect.Lists;
+
+public class MessageParserTest extends MockedPulsarServiceBaseTest {
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster("test", new 
ClusterData("http://127.0.0.1:"; + BROKER_WEBSERVICE_PORT));
+        admin.tenants().createTenant("my-tenant",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-tenant/my-ns", 
Sets.newHashSet("test"));
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    public static String extractKey(RawMessage m) throws Exception {
+        ByteBuf headersAndPayload = m.getHeadersAndPayload();
+        MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+        return msgMetadata.getPartitionKey();
+    }
+
+    @Test
+    public void testWithoutBatches() throws Exception {
+        String topic = "persistent://my-tenant/my-ns/my-topic";
+        TopicName topicName = TopicName.get(topic);
+
+        int n = 10;
+
+        try (Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic)
+                .create()) {
+            for (int i = 0; i < n; i++) {
+                producer.send("hello-" + i);
+            }
+        }
+
+        // Read through raw data
+        ManagedCursor cursor = ((PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get())
+                .getManagedLedger().newNonDurableCursor(PositionImpl.earliest);
+
+        for (int i = 0; i < n; i++) {
+            Entry entry = cursor.readEntriesOrWait(1).get(0);
+
+            List<Message<?>> messages = Lists.newArrayList();
+
+            try {
+                MessageParser.parseMessage(topicName, entry.getLedgerId(), 
entry.getEntryId(), entry.getDataBuffer(),
+                        (messageId, message, payload) -> {
+                            messages.add(message);
+                        });
+            } finally {
+                entry.release();
+            }
+
+            assertEquals(messages.size(), 1);
+
+            assertEquals(messages.get(0).getData(), ("hello-" + i).getBytes());
+        }
+    }
+
+    @Test
+    public void testWithBatches() throws Exception {
+        String topic = "persistent://my-tenant/my-ns/my-topic-with-batch";
+        TopicName topicName = TopicName.get(topic);
+
+        int n = 10;
+
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).enableBatching(true)
+                .batchingMaxPublishDelay(10, 
TimeUnit.SECONDS).topic(topic).create();
+
+        ManagedCursor cursor = ((PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get())
+                .getManagedLedger().newNonDurableCursor(PositionImpl.earliest);
+
+        for (int i = 0; i < n - 1; i++) {
+            producer.sendAsync("hello-" + i);
+        }
+
+        producer.send("hello-" + (n - 1));
+
+        // Read through raw data
+        assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
+        Entry entry = cursor.readEntriesOrWait(1).get(0);
+
+        List<Message<?>> messages = Lists.newArrayList();
+
+        try {
+            MessageParser.parseMessage(topicName, entry.getLedgerId(), 
entry.getEntryId(), entry.getDataBuffer(),
+                    (messageId, message, payload) -> {
+                        messages.add(message);
+                    });
+        } finally {
+            entry.release();
+        }
+
+        assertEquals(messages.size(), 10);
+
+        for (int i = 0; i < n; i++) {
+            assertEquals(messages.get(i).getData(), ("hello-" + i).getBytes());
+        }
+
+        producer.close();
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
new file mode 100644
index 0000000..5fa1c8c
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
+import static org.apache.pulsar.common.api.Commands.hasChecksum;
+import static org.apache.pulsar.common.api.Commands.readChecksum;
+
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
+import org.apache.pulsar.common.naming.TopicName;
+
+@UtilityClass
+@Slf4j
+public class MessageParser {
+    static interface MessageProcessor {
+        void process(MessageId messageId, Message<?> message, ByteBuf payload);
+    }
+
+    /**
+     * Parse a raw Pulsar entry payload and extract all the individual message 
that may be included in the batch. The
+     * provided {@link MessageProcessor} will be invoked for each individual 
message.
+     */
+    public static void parseMessage(TopicName topicName, long ledgerId, long 
entryId, ByteBuf headersAndPayload,
+            MessageProcessor processor) throws IOException {
+        MessageIdImpl msgId = new MessageIdImpl(ledgerId, entryId, -1);
+
+        MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
+        messageIdBuilder.setLedgerId(ledgerId);
+        messageIdBuilder.setEntryId(entryId);
+        MessageIdData messageId = messageIdBuilder.build();
+
+        MessageMetadata msgMetadata = null;
+        ByteBuf payload = headersAndPayload;
+        ByteBuf uncompressedPayload = null;
+
+        try {
+            if (!verifyChecksum(headersAndPayload, messageId, 
topicName.toString(), "reader")) {
+                // discard message with checksum error
+                return;
+            }
+
+            try {
+                msgMetadata = Commands.parseMessageMetadata(payload);
+            } catch (Throwable t) {
+                log.warn("[{}] Failed to deserialize metadata for message {} - 
Ignoring", topicName, messageId);
+                return;
+            }
+
+            if (msgMetadata.getEncryptionKeysCount() > 0) {
+                throw new IOException("Cannot parse encrypted message " + 
msgMetadata + " on topic " + topicName);
+            }
+
+            uncompressedPayload = uncompressPayloadIfNeeded(messageId, 
msgMetadata, headersAndPayload,
+                    topicName.toString(), "reader");
+
+            if (uncompressedPayload == null) {
+                // Message was discarded on decompression error
+                return;
+            }
+
+            final int numMessages = msgMetadata.getNumMessagesInBatch();
+
+            if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
+                final MessageImpl<?> message = new MessageImpl<>(msgId, 
msgMetadata, uncompressedPayload, null, null);
+                processor.process(msgId, message, uncompressedPayload);
+
+                uncompressedPayload.release();
+
+            } else {
+                // handle batch message enqueuing; uncompressed payload has 
all messages in batch
+                receiveIndividualMessagesFromBatch(msgMetadata, 
uncompressedPayload, messageId, null, -1, processor);
+                uncompressedPayload.release();
+            }
+
+        } finally {
+            if (uncompressedPayload != null) {
+                uncompressedPayload.release();
+            }
+
+            messageIdBuilder.recycle();
+            messageId.recycle();
+            msgMetadata.recycle();
+        }
+    }
+
+    public static boolean verifyChecksum(ByteBuf headersAndPayload, 
MessageIdData messageId, String topic,
+            String subscription) {
+        if (hasChecksum(headersAndPayload)) {
+            int checksum = readChecksum(headersAndPayload);
+            int computedChecksum = computeChecksum(headersAndPayload);
+            if (checksum != computedChecksum) {
+                log.error(
+                        "[{}][{}] Checksum mismatch for message at {}:{}. 
Received checksum: 0x{}, Computed checksum: 0x{}",
+                        topic, subscription, messageId.getLedgerId(), 
messageId.getEntryId(),
+                        Long.toHexString(checksum), 
Integer.toHexString(computedChecksum));
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public static ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, 
MessageMetadata msgMetadata,
+            ByteBuf payload, String topic, String subscription) {
+        CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(msgMetadata.getCompression());
+        int uncompressedSize = msgMetadata.getUncompressedSize();
+        int payloadSize = payload.readableBytes();
+        if (payloadSize > PulsarDecoder.MaxMessageSize) {
+            // payload size is itself corrupted since it cannot be bigger than 
the MaxMessageSize
+            log.error("[{}][{}] Got corrupted payload message size {} at {}", 
topic, subscription, payloadSize,
+                    messageId);
+            return null;
+        }
+
+        try {
+            ByteBuf uncompressedPayload = codec.decode(payload, 
uncompressedSize);
+            return uncompressedPayload;
+        } catch (IOException e) {
+            log.error("[{}][{}] Failed to decompress message with {} at {}: 
{}", topic, subscription,
+                    msgMetadata.getCompression(), messageId, e.getMessage(), 
e);
+            return null;
+        }
+    }
+
+    public static void receiveIndividualMessagesFromBatch(MessageMetadata 
msgMetadata, ByteBuf uncompressedPayload,
+            MessageIdData messageId, ClientCnx cnx, int partitionIndex, 
MessageProcessor processor) {
+        int batchSize = msgMetadata.getNumMessagesInBatch();
+
+        try {
+            for (int i = 0; i < batchSize; ++i) {
+                PulsarApi.SingleMessageMetadata.Builder 
singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
+                        .newBuilder();
+                ByteBuf singleMessagePayload = 
Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
+                        singleMessageMetadataBuilder, i, batchSize);
+
+                if (singleMessageMetadataBuilder.getCompactedOut()) {
+                    // message has been compacted out, so don't send to the 
user
+                    singleMessagePayload.release();
+                    singleMessageMetadataBuilder.recycle();
+
+                    continue;
+                }
+
+                BatchMessageIdImpl batchMessageIdImpl = new 
BatchMessageIdImpl(messageId.getLedgerId(),
+                        messageId.getEntryId(), partitionIndex, i, null);
+                final MessageImpl<?> message = new 
MessageImpl<>(batchMessageIdImpl, msgMetadata,
+                        singleMessageMetadataBuilder.build(), 
singleMessagePayload, cnx, null);
+
+                processor.process(batchMessageIdImpl, message, 
singleMessagePayload);
+
+                singleMessagePayload.release();
+                singleMessageMetadataBuilder.recycle();
+            }
+        } catch (IOException e) {
+            log.warn("Unable to obtain messages in batch", e);
+        }
+    }
+
+}

Reply via email to