This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fa86b277ad8c6b1789caf5f106bdf1a63a8ffb4a Author: Yunze Xu <[email protected]> AuthorDate: Wed Oct 6 16:39:28 2021 +0800 [PIP 96] Add message payload processor for Pulsar client (#12088) --- .../pulsar/client/processor/CustomBatchFormat.java | 74 +++++++ .../processor/CustomBatchPayloadProcessor.java | 61 ++++++ .../client/processor/CustomBatchProducer.java | 70 ++++++ .../processor/DefaultProcessorWithRefCnt.java | 47 ++++ .../processor/MessagePayloadProcessorTest.java | 220 +++++++++++++++++++ .../apache/pulsar/client/api/ConsumerBuilder.java | 8 + .../apache/pulsar/client/api/MessagePayload.java | 42 ++++ .../pulsar/client/api/MessagePayloadContext.java | 80 +++++++ .../pulsar/client/api/MessagePayloadFactory.java | 46 ++++ .../pulsar/client/api/MessagePayloadProcessor.java | 70 ++++++ .../PulsarClientImplementationBinding.java | 3 + .../pulsar/client/impl/ConsumerBuilderImpl.java | 7 + .../apache/pulsar/client/impl/ConsumerImpl.java | 236 +++++++++++++++------ .../client/impl/MessagePayloadContextImpl.java | 148 +++++++++++++ .../client/impl/MessagePayloadFactoryImpl.java | 37 ++++ .../pulsar/client/impl/MessagePayloadImpl.java | 72 +++++++ .../pulsar/client/impl/MessagePayloadUtils.java | 34 +++ .../PulsarClientImplementationBindingImpl.java | 4 + .../impl/conf/ConsumerConfigurationData.java | 4 + .../pulsar/client/api/MessagePayloadTest.java | 104 +++++++++ 20 files changed, 1297 insertions(+), 70 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java new file mode 100644 index 0000000..571d292 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchFormat.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.processor; + +import io.netty.buffer.ByteBuf; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; + +/** + * A batch message whose format is customized. + * + * 1. First 2 bytes represent the number of messages. + * 2. Each message is a string, whose format is + * 1. First 2 bytes represent the length `N`. + * 2. Followed N bytes are the bytes of the string. + */ +public class CustomBatchFormat { + + public static final String KEY = "entry.format"; + public static final String VALUE = "custom"; + + @AllArgsConstructor + @Getter + public static class Metadata { + private final int numMessages; + } + + public static ByteBuf serialize(Iterable<String> strings) { + final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024); + buf.writeShort(0); + short numMessages = 0; + for (String s : strings) { + writeString(buf, s); + numMessages++; + } + buf.setShort(0, numMessages); + return buf; + } + + private static void writeString(final ByteBuf buf, final String s) { + final byte[] bytes = Schema.STRING.encode(s); + buf.writeShort(bytes.length); + buf.writeBytes(bytes); + } + + public static Metadata readMetadata(final ByteBuf buf) { + return new Metadata(buf.readShort()); + } + + public static byte[] readMessage(final ByteBuf buf) { + final short length = buf.readShort(); + final byte[] bytes = new byte[length]; + buf.readBytes(bytes); + return bytes; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchPayloadProcessor.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchPayloadProcessor.java new file mode 100644 index 0000000..c83b13b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchPayloadProcessor.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.processor; + +import io.netty.buffer.ByteBuf; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.MessagePayloadContext; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessagePayload; +import org.apache.pulsar.client.api.MessagePayloadFactory; +import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessagePayloadUtils; + +@Slf4j +public class CustomBatchPayloadProcessor implements MessagePayloadProcessor { + + @Override + public <T> void process(MessagePayload payload, MessagePayloadContext context, Schema<T> schema, + Consumer<Message<T>> messageConsumer) throws Exception { + final String value = context.getProperty(CustomBatchFormat.KEY); + if (value == null || !value.equals(CustomBatchFormat.VALUE)) { + DEFAULT.process(payload, context, schema, messageConsumer); + return; + } + + final ByteBuf buf = MessagePayloadUtils.convertToByteBuf(payload); + try { + final int numMessages = CustomBatchFormat.readMetadata(buf).getNumMessages(); + for (int i = 0; i < numMessages; i++) { + final MessagePayload singlePayload = + MessagePayloadFactory.DEFAULT.wrap(CustomBatchFormat.readMessage(buf)); + try { + messageConsumer.accept( + context.getMessageAt(i, numMessages, singlePayload, false, schema)); + } finally { + singlePayload.release(); + } + } + } finally { + buf.release(); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchProducer.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchProducer.java new file mode 100644 index 0000000..bc07206 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/CustomBatchProducer.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.processor; + +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; + +@RequiredArgsConstructor +@Slf4j +public class CustomBatchProducer { + + private final List<String> messages = new ArrayList<>(); + private final PersistentTopic persistentTopic; + private final int batchingMaxMessages; + + public void sendAsync(final String value) { + messages.add(value); + if (messages.size() >= batchingMaxMessages) { + flush(); + } + } + + public void flush() { + final ByteBuf buf = CustomBatchFormat.serialize(messages); + final ByteBuf headerAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.None, + createCustomMetadata(), buf); + buf.release(); + persistentTopic.publishMessage(headerAndPayload, (e, ledgerId, entryId) -> { + if (e == null) { + log.info("Send successfully to {} ({}, {})", persistentTopic.getName(), ledgerId, entryId); + } else { + log.error("Failed to send: {}", e.getMessage()); + } + }); + messages.clear(); + } + + private static MessageMetadata createCustomMetadata() { + final MessageMetadata messageMetadata = new MessageMetadata(); + // Here are required fields + messageMetadata.setProducerName(""); + messageMetadata.setSequenceId(0L); + messageMetadata.setPublishTime(0L); + // Add the property to identify the message format + messageMetadata.addProperty().setKey(CustomBatchFormat.KEY).setValue(CustomBatchFormat.VALUE); + return messageMetadata; + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/DefaultProcessorWithRefCnt.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/DefaultProcessorWithRefCnt.java new file mode 100644 index 0000000..63e295f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/DefaultProcessorWithRefCnt.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.processor; + +import java.util.function.Consumer; +import lombok.Getter; +import org.apache.pulsar.client.api.MessagePayloadContext; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessagePayload; +import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessagePayloadImpl; + +/** + * The processor for Pulsar format messages and maintains a total reference count. + * + * It's used to verify {@link MessagePayloadContext#getMessageAt} and {@link MessagePayloadContext#asSingleMessage} have release the + * ByteBuf successfully. + */ +public class DefaultProcessorWithRefCnt implements MessagePayloadProcessor { + + @Getter + int totalRefCnt = 0; + + @Override + public <T> void process(MessagePayload payload, MessagePayloadContext context, Schema<T> schema, + Consumer<Message<T>> messageConsumer) throws Exception { + MessagePayloadProcessor.DEFAULT.process(payload, context, schema, messageConsumer); + totalRefCnt += ((MessagePayloadImpl) payload).getByteBuf().refCnt(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java new file mode 100644 index 0000000..6a12101 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/processor/MessagePayloadProcessorTest.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.processor; + +import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import io.netty.buffer.ByteBuf; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessagePayloadProcessor; +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.TopicMetadata; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +/** + * Test for {@link MessagePayloadProcessor}. + */ +@Slf4j +@Test(groups = "broker-impl") +public class MessagePayloadProcessorTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + admin.clusters().createCluster("test", + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("public", + new TenantInfoImpl(Sets.newHashSet("appid"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("public/default", Sets.newHashSet("test")); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider + public static Object[][] config() { + return new Object[][] { + // numPartitions / enableBatching / batchingMaxMessages + { 1, true, 1 }, + { 1, true, 4 }, + { 1, false, 1 }, + { 3, false, 1 } + }; + } + + @DataProvider + public static Object[][] customBatchConfig() { + return new Object[][] { + // numMessages / batchingMaxMessages + { 10, 1 }, + { 10, 4 } + }; + } + + @Test(dataProvider = "config") + public void testDefaultProcessor(int numPartitions, boolean enableBatching, int batchingMaxMessages) + throws Exception { + final String topic = "testDefaultProcessor-" + numPartitions + "-" + enableBatching + "-" + batchingMaxMessages; + final int numMessages = 10; + final String messagePrefix = "msg-"; + + if (numPartitions > 1) { + admin.topics().createPartitionedTopic(topic, numPartitions); + } + + @Cleanup + final Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(enableBatching) + .batchingMaxMessages(batchingMaxMessages) + .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS) + .messageRouter(new MessageRouter() { + int i = 0; + + @Override + public int choosePartition(Message<?> msg, TopicMetadata metadata) { + return i++ % metadata.numPartitions(); + } + }) + .create(); + for (int i = 0; i < numMessages; i++) { + final String value = messagePrefix + i; + producer.sendAsync(value).whenComplete((id, e) -> { + if (e == null) { + log.info("Send {} to {} {}", value, topic, id); + } else { + log.error("Failed to send {}: {}", value, e.getMessage()); + } + }); + } + + final DefaultProcessorWithRefCnt processor = new DefaultProcessorWithRefCnt(); + @Cleanup + final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .messagePayloadProcessor(processor) + .subscribe(); + final List<String> values = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + final Message<String> message = consumer.receive(1, TimeUnit.SECONDS); + Assert.assertNotNull(message); + values.add(message.getValue()); + consumer.acknowledge(message.getMessageId()); + } + + if (numPartitions > 1) { + // messages are out of order across multiple partitions + Collections.sort(values); + } + for (int i = 0; i < numMessages; i++) { + Assert.assertEquals(values.get(i), messagePrefix + i); + } + + // Each buffer's refCnt is 2 because after retrieving the refCnt, there will be two release for the ByteBuf: + // 1. ConsumerImpl#processPayloadByProcessor + // 2. PulsarDecoder#channelRead + if (enableBatching) { + int numBatches = numMessages / batchingMaxMessages; + numBatches += (numMessages % batchingMaxMessages == 0) ? 0 : 1; + Assert.assertEquals(processor.getTotalRefCnt(), 2 * numBatches); + } else { + Assert.assertEquals(processor.getTotalRefCnt(), 2 * numMessages); + } + } + + @Test + public void testCustomBatchFormat() { + final List<List<String>> inputs = new ArrayList<>(); + inputs.add(Collections.emptyList()); + inputs.add(Collections.singletonList("java")); + inputs.add(Arrays.asList("hello", "world", "java")); + + for (List<String> input : inputs) { + final ByteBuf buf = CustomBatchFormat.serialize(input); + + final CustomBatchFormat.Metadata metadata = CustomBatchFormat.readMetadata(buf); + final List<String> parsedTokens = new ArrayList<>(); + for (int i = 0; i < metadata.getNumMessages(); i++) { + parsedTokens.add(Schema.STRING.decode(CustomBatchFormat.readMessage(buf))); + } + + Assert.assertEquals(parsedTokens, input); + Assert.assertEquals(parsedTokens.size(), input.size()); + + Assert.assertEquals(buf.refCnt(), 1); + buf.release(); + } + } + + @Test(dataProvider = "customBatchConfig") + public void testCustomProcessor(final int numMessages, final int batchingMaxMessages) throws Exception { + final String topic = "persistent://public/default/testCustomProcessor-" + + numMessages + "-" + batchingMaxMessages; + + @Cleanup + final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .messagePayloadProcessor(new CustomBatchPayloadProcessor()) + .subscribe(); + + final PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().orElse(null); + Assert.assertNotNull(persistentTopic); + + final String messagePrefix = "msg-"; + final CustomBatchProducer producer = new CustomBatchProducer(persistentTopic, batchingMaxMessages); + for (int i = 0; i < numMessages; i++) { + producer.sendAsync(messagePrefix + i); + } + producer.flush(); + + for (int i = 0; i < numMessages; i++) { + final Message<String> message = consumer.receive(1, TimeUnit.SECONDS); + Assert.assertNotNull(message); + Assert.assertEquals(message.getValue(), messagePrefix + i); + consumer.acknowledge(message.getMessageId()); + } + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 1038ba9..3c3ce17 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -741,4 +741,12 @@ public interface ConsumerBuilder<T> extends Cloneable { * corruption, deserialization error, etc.). */ ConsumerBuilder<T> poolMessages(boolean poolMessages); + + /** + * If it's configured with a non-null value, the consumer will use the processor to process the payload, including + * decoding it to messages and triggering the listener. + * + * Default: null + */ + ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayload.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayload.java new file mode 100644 index 0000000..5d2ff63 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayload.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * The abstraction of a message's payload. + */ +public interface MessagePayload { + + /** + * Copy the bytes of the payload into the byte array. + * + * @return the byte array that is filled with the readable bytes of the payload, it should not be null + */ + byte[] copiedBuffer(); + + /** + * Release the resources if necessary. + * + * NOTE: For a MessagePayload object that is created from {@link MessagePayloadFactory#DEFAULT}, this method must be + * called to avoid memory leak. + */ + default void release() { + // No ops + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java new file mode 100644 index 0000000..a444de7 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadContext.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * The context of the message payload, which usually represents a batched message (batch) or a single message. + */ +public interface MessagePayloadContext { + + /** + * Get a value associated with the given key. + * + * When the message payload is not produced by Pulsar producer, a specific property is usually added to indicate the + * format. So this method is useful to determine whether the payload is produced by Pulsar producer. + * + * @param key + * @return the value associated with the key or null if the key or value doesn't exist + */ + String getProperty(String key); + + /** + * Get the number of messages when the payload is produced by Pulsar producer. + * + * @return the number of messages + */ + int getNumMessages(); + + /** + * Check whether the payload is a batch when the payload is produced by Pulsar producer. + * + * @return true if the payload is a batch + */ + boolean isBatch(); + + /** + * Get the internal single message with a specific index from a payload if the payload is a batch. + * + * @param index the batch index + * @param numMessages the number of messages in the batch + * @param payload the message payload + * @param containMetadata whether the payload contains the single message metadata + * @param schema the schema of the batch + * @param <T> + * @return the created message + * @implNote The `index` and `numMessages` parameters are used to create the message id with batch index. + * If `containMetadata` is true, parse the single message metadata from the payload first. The fields of single + * message metadata will overwrite the same fields of the entry's metadata. + */ + <T> Message<T> getMessageAt(int index, + int numMessages, + MessagePayload payload, + boolean containMetadata, + Schema<T> schema); + + /** + * Convert the given payload to a single message if the entry is not a batch. + * + * @param payload the message payload + * @param schema the schema of the message + * @param <T> + * @return the created message + */ + <T> Message<T> asSingleMessage(MessagePayload payload, Schema<T> schema); +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadFactory.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadFactory.java new file mode 100644 index 0000000..0181d6d --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.nio.ByteBuffer; +import org.apache.pulsar.client.internal.DefaultImplementation; + +/** + * The factory class of {@link MessagePayload}. + */ +public interface MessagePayloadFactory { + + MessagePayloadFactory DEFAULT = DefaultImplementation.getDefaultImplementation().newDefaultMessagePayloadFactory(); + + /** + * Create a payload whose underlying buffer refers to a byte array. + * + * @param bytes the byte array + * @return the created MessagePayload object + */ + MessagePayload wrap(byte[] bytes); + + /** + * Create a payload whose underlying buffer refers to a NIO buffer. + * + * @param buffer the NIO buffer + * @return the created MessagePayload object + */ + MessagePayload wrap(ByteBuffer buffer); +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadProcessor.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadProcessor.java new file mode 100644 index 0000000..1d50a50 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessagePayloadProcessor.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.util.function.Consumer; + +/** + * The processor to process a message payload. + * + * It's responsible to convert the raw buffer to some messages, then trigger some callbacks so that consumer can consume + * these messages and handle the exception if it existed. + * + * The most important part is to decode the raw buffer. After that, we can call + * {@link MessagePayloadContext#getMessageAt} or {@link MessagePayloadContext#asSingleMessage} to construct + * {@link Message} for consumer to consume. Since we need to pass the {@link MessagePayload} object to these methods, we + * can use {@link MessagePayloadFactory#DEFAULT} to create it or just reuse the payload argument. + */ +public interface MessagePayloadProcessor { + + /** + * Process the message payload. + * + * @param payload the payload whose underlying buffer is a Netty ByteBuf + * @param context the message context that contains the message format information and methods to create a message + * @param schema the message's schema + * @param messageConsumer the callback to consume each message + * @param <T> + * @throws Exception + */ + <T> void process(MessagePayload payload, + MessagePayloadContext context, + Schema<T> schema, + Consumer<Message<T>> messageConsumer) throws Exception; + + // The default processor for Pulsar format payload. It should be noted getNumMessages() and isBatch() methods of + // EntryContext only work for Pulsar format. For other formats, the message metadata might be stored in the payload. + MessagePayloadProcessor DEFAULT = new MessagePayloadProcessor() { + + @Override + public <T> void process(MessagePayload payload, + MessagePayloadContext context, + Schema<T> schema, + Consumer<Message<T>> messageConsumer) { + if (context.isBatch()) { + final int numMessages = context.getNumMessages(); + for (int i = 0; i < numMessages; i++) { + messageConsumer.accept(context.getMessageAt(i, numMessages, payload, true, schema)); + } + } else { + messageConsumer.accept(context.asSingleMessage(payload, schema)); + } + } + }; +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java index 98b4d5c..f7bcf05 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java @@ -21,6 +21,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessagePayloadFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -229,6 +230,8 @@ public interface PulsarClientImplementationBinding { BatcherBuilder newKeyBasedBatcherBuilder(); + MessagePayloadFactory newDefaultMessagePayloadFactory(); + /** * Retrieves ByteBuffer data into byte[]. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index dba18d9..cbfc27d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessagePayloadProcessor; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; import org.apache.pulsar.client.api.RegexSubscriptionMode; @@ -479,4 +480,10 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { conf.setPoolMessages(poolMessages); return this; } + + @Override + public ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor) { + conf.setPayloadProcessor(payloadProcessor); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 98767d1..97fb13f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -48,9 +48,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -996,6 +996,135 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle }); } + protected boolean isBatch(MessageMetadata messageMetadata) { + // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message + // and return undecrypted payload + return !isMessageUndecryptable(messageMetadata) && + (messageMetadata.hasNumMessagesInBatch() || messageMetadata.getNumMessagesInBatch() != 1); + } + + protected <U> MessageImpl<U> newSingleMessage(final int index, + final int numMessages, + final BrokerEntryMetadata brokerEntryMetadata, + final MessageMetadata msgMetadata, + final SingleMessageMetadata singleMessageMetadata, + final ByteBuf payload, + final MessageIdImpl messageId, + final Schema<U> schema, + final boolean containMetadata, + final BitSetRecyclable ackBitSet, + final BatchMessageAcker acker, + final int redeliveryCount) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, index); + } + + ByteBuf singleMessagePayload = null; + try { + if (containMetadata) { + singleMessagePayload = + Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, index, numMessages); + } + + if (isSameEntry(messageId) && isPriorBatchIndex(index)) { + // If we are receiving a batch message, we need to discard messages that were prior + // to the startMessageId + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription, + consumerName, startMessageId); + } + return null; + } + + if (singleMessageMetadata != null && singleMessageMetadata.isCompactedOut()) { + // message has been compacted out, so don't send to the user + return null; + } + + if (ackBitSet != null && !ackBitSet.get(index)) { + return null; + } + + BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(), + messageId.getEntryId(), getPartitionIndex(), index, numMessages, acker); + + final ByteBuf payloadBuffer = (singleMessagePayload != null) ? singleMessagePayload : payload; + final MessageImpl<U> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl, + msgMetadata, singleMessageMetadata, payloadBuffer, + createEncryptionContext(msgMetadata), cnx(), schema, redeliveryCount, poolMessages + ); + message.setBrokerEntryMetadata(brokerEntryMetadata); + return message; + } catch (IOException | IllegalStateException e) { + throw new IllegalStateException(e); + } finally { + if (singleMessagePayload != null) { + singleMessagePayload.release(); + } + } + } + + protected <U> MessageImpl<U> newMessage(final MessageIdImpl messageId, + final BrokerEntryMetadata brokerEntryMetadata, + final MessageMetadata messageMetadata, + final ByteBuf payload, + final Schema<U> schema, + final int redeliveryCount) { + final MessageImpl<U> message = MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload, + createEncryptionContext(messageMetadata), cnx(), schema, redeliveryCount, poolMessages + ); + message.setBrokerEntryMetadata(brokerEntryMetadata); + return message; + } + + private void executeNotifyCallback(final MessageImpl<T> message) { + // Enqueue the message so that it can be retrieved when application calls receive() + // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. + // if asyncReceive is waiting then notify callback without adding to incomingMessages queue + internalPinnedExecutor.execute(() -> { + if (hasNextPendingReceive()) { + notifyPendingReceivedCallback(message, null); + } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) { + notifyPendingBatchReceivedCallBack(); + } + }); + } + + private void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMetadata, + final MessageMetadata messageMetadata, + final ByteBuf byteBuf, + final MessageIdImpl messageId, + final Schema<T> schema, + final int redeliveryCount, + final List<Long> ackSet) { + final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf); + final MessagePayloadContextImpl entryContext = MessagePayloadContextImpl.get( + brokerEntryMetadata, messageMetadata, messageId, this, redeliveryCount, ackSet); + final AtomicInteger skippedMessages = new AtomicInteger(0); + try { + conf.getPayloadProcessor().process(payload, entryContext, schema, message -> { + if (message != null) { + executeNotifyCallback((MessageImpl<T>) message); + } else { + skippedMessages.incrementAndGet(); + } + }); + } catch (Throwable throwable) { + log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, throwable); + discardCorruptedMessage(messageId, cnx(), ValidationError.BatchDeSerializeError); + } finally { + entryContext.recycle(); + payload.release(); // byteBuf.release() is called in this method + } + + if (skippedMessages.get() > 0) { + increaseAvailablePermits(cnx(), skippedMessages.get()); + } + + internalPinnedExecutor.execute(() + -> tryTriggerListener()); + } + void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(), @@ -1051,6 +1180,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return; } + if (conf.getPayloadProcessor() != null) { + // uncompressedPayload is released in this method so we don't need to call release() again + processPayloadByProcessor( + brokerEntryMetadata, msgMetadata, uncompressedPayload, msgId, schema, redeliveryCount, ackSet); + return; + } + // if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message // and return undecrypted payload if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) { @@ -1063,7 +1199,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } } - if (isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) { + if (isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) { // We need to discard entries that were prior to startMessageId if (log.isDebugEnabled()) { log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription, @@ -1074,27 +1210,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return; } - final MessageImpl<T> message = MessageImpl.create(topicName.toString(), msgId, msgMetadata, - uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, - poolMessages); + final MessageImpl<T> message = + newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload, schema, redeliveryCount); uncompressedPayload.release(); - message.setBrokerEntryMetadata(brokerEntryMetadata); - // Enqueue the message so that it can be retrieved when application calls receive() - // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. - // if asyncReceive is waiting then notify callback without adding to incomingMessages queue - internalPinnedExecutor.execute(() -> { - if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && - redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { - possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(), - Collections.singletonList(message)); - } - if (hasNextPendingReceive()) { - notifyPendingReceivedCallback(message, null); - } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) { - notifyPendingBatchReceivedCallBack(); - } - }); + if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && + redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { + possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(), + Collections.singletonList(message)); + } + executeNotifyCallback(message); } else { // handle batch message enqueuing; uncompressed payload has all messages in batch receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx); @@ -1268,63 +1393,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle int skippedMessages = 0; try { for (int i = 0; i < batchSize; ++i) { - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, i); - } - - ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload, - singleMessageMetadata, i, batchSize); - - if (isSameEntry(messageId) && isPriorBatchIndex(i)) { - // If we are receiving a batch message, we need to discard messages that were prior - // to the startMessageId - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription, - consumerName, startMessageId); - } - singleMessagePayload.release(); - - ++skippedMessages; + final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata, + singleMessageMetadata, uncompressedPayload, batchMessage, schema, true, ackBitSet, acker, + redeliveryCount); + if (message == null) { + skippedMessages++; continue; } - - if (singleMessageMetadata.isCompactedOut()) { - // message has been compacted out, so don't send to the user - singleMessagePayload.release(); - - ++skippedMessages; - continue; - } - - if (ackBitSet != null && !ackBitSet.get(i)) { - singleMessagePayload.release(); - ++skippedMessages; - continue; - } - - BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(), - messageId.getEntryId(), getPartitionIndex(), i, batchSize, acker); - final MessageImpl<T> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl, - msgMetadata, singleMessageMetadata, singleMessagePayload, - createEncryptionContext(msgMetadata), cnx, schema, redeliveryCount, poolMessages); - message.setBrokerEntryMetadata(brokerEntryMetadata); if (possibleToDeadLetter != null) { possibleToDeadLetter.add(message); } - internalPinnedExecutor.execute(() -> { - if (hasNextPendingReceive()) { - notifyPendingReceivedCallback(message, null); - } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) { - notifyPendingBatchReceivedCallBack(); - } - singleMessagePayload.release(); - }); + executeNotifyCallback(message); } if (ackBitSet != null) { ackBitSet.recycle(); } - } catch (IOException e) { - log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName); + } catch (IllegalStateException e) { + log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, e); discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError); } @@ -1350,7 +1435,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex(); } - private boolean isSameEntry(MessageIdData messageId) { + private boolean isSameEntry(MessageIdImpl messageId) { return startMessageId != null && messageId.getLedgerId() == startMessageId.getLedgerId() && messageId.getEntryId() == startMessageId.getEntryId(); @@ -1544,6 +1629,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return true; } + private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentCnx, + ValidationError validationError) { + log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(), + messageId.getEntryId()); + ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null, AckType.Individual, + validationError, Collections.emptyMap(), -1); + currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise()); + increaseAvailablePermits(currentCnx); + stats.incrementNumReceiveFailed(); + } + private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError) { log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java new file mode 100644 index 0000000..aa6cab8 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.util.Recycler; +import java.util.List; +import lombok.NonNull; +import org.apache.pulsar.client.api.MessagePayloadContext; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessagePayload; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; +import org.apache.pulsar.common.api.proto.KeyValue; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.api.proto.SingleMessageMetadata; +import org.apache.pulsar.common.util.SafeCollectionUtils; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; + +public class MessagePayloadContextImpl implements MessagePayloadContext { + + private static final Recycler<MessagePayloadContextImpl> RECYCLER = new Recycler<MessagePayloadContextImpl>() { + @Override + protected MessagePayloadContextImpl newObject(Handle<MessagePayloadContextImpl> handle) { + return new MessagePayloadContextImpl(handle); + } + }; + + private final Recycler.Handle<MessagePayloadContextImpl> recyclerHandle; + private BrokerEntryMetadata brokerEntryMetadata; + private MessageMetadata messageMetadata; + private SingleMessageMetadata singleMessageMetadata; + private MessageIdImpl messageId; + private ConsumerImpl<?> consumer; + private int redeliveryCount; + private BatchMessageAcker acker; + private BitSetRecyclable ackBitSet; + + private MessagePayloadContextImpl(final Recycler.Handle<MessagePayloadContextImpl> handle) { + this.recyclerHandle = handle; + } + + public static MessagePayloadContextImpl get(final BrokerEntryMetadata brokerEntryMetadata, + @NonNull final MessageMetadata messageMetadata, + @NonNull final MessageIdImpl messageId, + @NonNull final ConsumerImpl<?> consumer, + final int redeliveryCount, + final List<Long> ackSet) { + final MessagePayloadContextImpl context = RECYCLER.get(); + context.brokerEntryMetadata = brokerEntryMetadata; + context.messageMetadata = messageMetadata; + context.singleMessageMetadata = new SingleMessageMetadata(); + context.messageId = messageId; + context.consumer = consumer; + context.redeliveryCount = redeliveryCount; + context.acker = BatchMessageAcker.newAcker(context.getNumMessages()); + context.ackBitSet = (ackSet != null && ackSet.size() > 0) + ? BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet)) + : null; + return context; + } + + public void recycle() { + brokerEntryMetadata = null; + messageMetadata = null; + singleMessageMetadata = null; + messageId = null; + consumer = null; + redeliveryCount = 0; + acker = null; + if (ackBitSet != null) { + ackBitSet.recycle(); + ackBitSet = null; + } + } + + @Override + public String getProperty(String key) { + for (KeyValue keyValue : messageMetadata.getPropertiesList()) { + if (keyValue.hasKey() && keyValue.getKey().equals(key)) { + return keyValue.getValue(); + } + } + return null; + } + + @Override + public int getNumMessages() { + return messageMetadata.getNumMessagesInBatch(); + } + + @Override + public boolean isBatch() { + return consumer.isBatch(messageMetadata); + } + + @Override + public <T> Message<T> getMessageAt(int index, + int numMessages, + MessagePayload payload, + boolean containMetadata, + Schema<T> schema) { + final ByteBuf payloadBuffer = MessagePayloadUtils.convertToByteBuf(payload); + try { + return consumer.newSingleMessage(index, + numMessages, + brokerEntryMetadata, + messageMetadata, + singleMessageMetadata, + payloadBuffer, + messageId, + schema, + containMetadata, + ackBitSet, + acker, + redeliveryCount); + } finally { + payloadBuffer.release(); + } + } + + @Override + public <T> Message<T> asSingleMessage(MessagePayload payload, Schema<T> schema) { + final ByteBuf payloadBuffer = MessagePayloadUtils.convertToByteBuf(payload); + try { + return consumer.newMessage( + messageId, brokerEntryMetadata, messageMetadata, payloadBuffer, schema, redeliveryCount); + } finally { + payloadBuffer.release(); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadFactoryImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadFactoryImpl.java new file mode 100644 index 0000000..70a5b56 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadFactoryImpl.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.buffer.Unpooled; +import java.nio.ByteBuffer; +import org.apache.pulsar.client.api.MessagePayload; +import org.apache.pulsar.client.api.MessagePayloadFactory; + +public class MessagePayloadFactoryImpl implements MessagePayloadFactory { + + @Override + public MessagePayload wrap(byte[] bytes) { + return MessagePayloadImpl.create(Unpooled.wrappedBuffer(bytes)); + } + + @Override + public MessagePayload wrap(ByteBuffer buffer) { + return MessagePayloadImpl.create(Unpooled.wrappedBuffer(buffer)); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java new file mode 100644 index 0000000..d99cd83 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadImpl.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.util.Recycler; +import io.netty.util.ReferenceCountUtil; +import lombok.Getter; +import lombok.NonNull; +import org.apache.pulsar.client.api.MessagePayload; + +/** + * A wrapper of {@link ByteBuf} that implements {@link MessagePayload}. + */ +public class MessagePayloadImpl implements MessagePayload { + + private static final Recycler<MessagePayloadImpl> RECYCLER = new Recycler<MessagePayloadImpl>() { + @Override + protected MessagePayloadImpl newObject(Handle<MessagePayloadImpl> handle) { + return new MessagePayloadImpl(handle); + } + }; + + private final Recycler.Handle<MessagePayloadImpl> recyclerHandle; + @Getter + private ByteBuf byteBuf; + + public static MessagePayloadImpl create(@NonNull final ByteBuf byteBuf) { + final MessagePayloadImpl payload = RECYCLER.get(); + payload.byteBuf = byteBuf; + return payload; + } + + private MessagePayloadImpl(final Recycler.Handle<MessagePayloadImpl> handle) { + this.recyclerHandle = handle; + } + + @Override + public void release() { + ReferenceCountUtil.release(byteBuf); + byteBuf = null; + recyclerHandle.recycle(this); + } + + @Override + public byte[] copiedBuffer() { + final int readable = byteBuf.readableBytes(); + if (readable > 0) { + final byte[] bytes = new byte[readable]; + byteBuf.getBytes(byteBuf.readerIndex(), bytes); + return bytes; + } else { + return new byte[0]; + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadUtils.java new file mode 100644 index 0000000..64faffe --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadUtils.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.pulsar.client.api.MessagePayload; + +public class MessagePayloadUtils { + + public static ByteBuf convertToByteBuf(final MessagePayload payload) { + if (payload instanceof MessagePayloadImpl) { + return ((MessagePayloadImpl) payload).getByteBuf().retain(); + } else { + return Unpooled.wrappedBuffer(payload.copiedBuffer()); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java index 225e246..c146f23 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessagePayloadFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -379,4 +380,7 @@ public final class PulsarClientImplementationBindingImpl implements PulsarClient return new KeyBasedBatcherBuilder(); } + public MessagePayloadFactory newDefaultMessagePayloadFactory() { + return new MessagePayloadFactoryImpl(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index a39ac21..1076946 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessagePayloadProcessor; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; @@ -149,6 +150,9 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable { private boolean poolMessages = false; + @JsonIgnore + private transient MessagePayloadProcessor payloadProcessor = null; + public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) { checkArgument(interval > 0, "interval needs to be > 0"); this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessagePayloadTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessagePayloadTest.java new file mode 100644 index 0000000..f980e51 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/MessagePayloadTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import io.netty.buffer.ByteBuf; +import java.nio.ByteBuffer; +import org.apache.pulsar.client.impl.MessagePayloadImpl; +import org.apache.pulsar.client.impl.MessagePayloadUtils; +import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Unit test of {@link MessagePayload}. + */ +public class MessagePayloadTest { + + @Test + public void testConvertMessagePayloadImpl() { + final ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1); + + final MessagePayloadImpl payload = MessagePayloadImpl.create(buf); + Assert.assertEquals(buf.refCnt(), 1); + + final ByteBuf convertedBuf = MessagePayloadUtils.convertToByteBuf(payload); + Assert.assertSame(convertedBuf, buf); + + Assert.assertEquals(buf.refCnt(), 2); + buf.release(); + buf.release(); + } + + @Test + public void testConvertCustomPayload() { + final ByteBuffer buffer = ByteBuffer.allocate(3); + buffer.put(new byte[]{ 0x11, 0x22, 0x33 }); + buffer.flip(); + buffer.get(); // skip 1st byte + + final ByteBuf buf = MessagePayloadUtils.convertToByteBuf(new ByteBufferPayload(buffer)); + Assert.assertEquals(buf.refCnt(), 1); + + Assert.assertEquals(buf.readableBytes(), 2); + Assert.assertEquals(buf.readByte(), 0x22); + Assert.assertEquals(buf.readByte(), 0x33); + + buf.release(); + } + + @Test + public void testConvertEmptyCustomPayload() { + final ByteBuf buf = MessagePayloadUtils.convertToByteBuf(new ByteBufferPayload(ByteBuffer.allocate(0))); + Assert.assertEquals(buf.refCnt(), 1); + Assert.assertEquals(buf.readableBytes(), 0); + buf.release(); + } + + private static class ByteBufferPayload implements MessagePayload { + + private final ByteBuffer buffer; + + public ByteBufferPayload(final ByteBuffer buffer) { + this.buffer = buffer; + } + + @Override + public byte[] copiedBuffer() { + final byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + } + + @Test + public void testFactoryWrap() { + MessagePayloadImpl payload = (MessagePayloadImpl) MessagePayloadFactory.DEFAULT.wrap(new byte[1]); + ByteBuf byteBuf = payload.getByteBuf(); + Assert.assertEquals(byteBuf.refCnt(), 1); + payload.release(); + Assert.assertEquals(byteBuf.refCnt(), 0); + + payload = (MessagePayloadImpl) MessagePayloadFactory.DEFAULT.wrap(ByteBuffer.allocate(1)); + byteBuf = payload.getByteBuf(); + Assert.assertEquals(byteBuf.refCnt(), 1); + payload.release(); + Assert.assertEquals(byteBuf.refCnt(), 0); + } +}
