This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d8923b8b20f [pulsar-client] Add message chunking configuration for 
reader (#15143)
d8923b8b20f is described below

commit d8923b8b20f4d041ec07eed55aa08099447c6a2b
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Apr 14 21:12:36 2022 -0700

    [pulsar-client] Add message chunking configuration for reader (#15143)
---
 .../pulsar/client/impl/MessageChunkingTest.java    | 36 ++++++++++++++++-
 .../apache/pulsar/client/api/ReaderBuilder.java    | 47 ++++++++++++++++++++++
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  7 ++++
 .../pulsar/client/impl/ReaderBuilderImpl.java      | 18 +++++++++
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  7 ++++
 .../client/impl/conf/ReaderConfigurationData.java  |  8 ++++
 6 files changed, 122 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 4d752029c0f..85d67c3de0d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
@@ -217,6 +218,8 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
                 .isAckReceiptEnabled(ackReceiptEnabled)
                 .ackTimeout(5, TimeUnit.SECONDS).subscribe();
 
+        Reader<byte[]> reader = 
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
+
         ProducerBuilder<byte[]> producerBuilder = 
pulsarClient.newProducer().topic(topicName);
 
         Producer<byte[]> producer = 
producerBuilder.enableChunking(true).enableBatching(false).create();
@@ -232,6 +235,15 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 
         Message<byte[]> msg = null;
         Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < totalMessages; i++) {
+            msg = reader.readNext(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.info("Received message: [{}]", receivedMessage);
+            String expectedMessage = publishedMessages.get(i);
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+        }
+
+        messageSet.clear();
         for (int i = 0; i < totalMessages; i++) {
             msg = consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
@@ -268,6 +280,7 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 
         consumer.close();
         producer.close();
+        reader.close();
         log.info("-- Exiting {} test --", methodName);
 
     }
@@ -384,6 +397,8 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         producer.cnx().registerProducer(producerId, producer); // registered 
spy ProducerImpl
         ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").subscribe();
+        ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>) 
pulsarClient.newReader().topic(topicName)
+                .startMessageId(MessageId.earliest).create();
 
         TypedMessageBuilderImpl<byte[]> msg = 
(TypedMessageBuilderImpl<byte[]>) 
producer.newMessage().value("message-1".getBytes());
         ByteBuf payload = Unpooled.wrappedBuffer(msg.getContent());
@@ -397,17 +412,22 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         producer.processOpSendMsg(op);
 
         retryStrategically((test) -> {
-            return consumer.chunkedMessagesMap.size() > 0;
+            return reader.getConsumer().chunkedMessagesMap.size() > 0 && 
consumer.chunkedMessagesMap.size() > 0;
         }, 5, 500);
         assertEquals(consumer.chunkedMessagesMap.size(), 1);
+        assertEquals(reader.getConsumer().chunkedMessagesMap.size(), 1);
 
         consumer.expireTimeOfIncompleteChunkedMessageMillis = 1;
+        reader.getConsumer().expireTimeOfIncompleteChunkedMessageMillis = 1;
         Thread.sleep(10);
         consumer.removeExpireIncompleteChunkedMessages();
+        reader.getConsumer().removeExpireIncompleteChunkedMessages();
         assertEquals(consumer.chunkedMessagesMap.size(), 0);
+        assertEquals(reader.getConsumer().chunkedMessagesMap.size(), 0);
 
         producer.close();
         consumer.close();
+        reader.close();
         producer = null; // clean reference of mocked producer
     }
 
@@ -507,6 +527,20 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testReaderChunkingConfiguration() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        final String topicName = "persistent://my-property/my-ns/my-topic1";
+        ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>) 
pulsarClient.newReader().topic(topicName)
+                
.startMessageId(MessageId.earliest).maxPendingChunkedMessage(12)
+                .autoAckOldestChunkedMessageOnQueueFull(true)
+                .expireTimeOfIncompleteChunkedMessage(12, 
TimeUnit.MILLISECONDS).create();
+        ConsumerImpl<byte[]> consumer = reader.getConsumer();
+        assertEquals(consumer.conf.getMaxPendingChunkedMessage(), 12);
+        assertTrue(consumer.conf.isAutoAckOldestChunkedMessageOnQueueFull());
+        
assertEquals(consumer.conf.getExpireTimeOfIncompleteChunkedMessageMillis(), 12);
+    }
+
     private String createMessagePayload(int size) {
         StringBuilder str = new StringBuilder();
         Random rand = new Random();
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index c48bdec604a..f9ca85cd45f 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -320,4 +320,51 @@ public interface ReaderBuilder<T> extends Cloneable {
      * @return the reader builder instance
      */
     ReaderBuilder<T> intercept(ReaderInterceptor<T>... interceptors);
+
+    /**
+     * Consumer buffers chunk messages into memory until it receives all the 
chunks of the original message. While
+     * consuming chunk-messages, chunks from same message might not be 
contiguous in the stream and they might be mixed
+     * with other messages' chunks. so, consumer has to maintain multiple 
buffers to manage chunks coming from different
+     * messages. This mainly happens when multiple publishers are publishing 
messages on the topic concurrently or
+     * publisher failed to publish all chunks of the messages.
+     *
+     * <pre>
+     * eg: M1-C1, M2-C1, M1-C2, M2-C2
+     * Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and 
M2-C2 messages belong to M2 message.
+     * </pre>
+     * Buffering large number of outstanding uncompleted chunked messages can 
create memory pressure and it can be
+     * guarded by providing this @maxPendingChunkedMessage threshold. Once, 
consumer reaches this threshold, it drops
+     * the outstanding unchunked-messages by silently acking or asking broker 
to redeliver later by marking it unacked.
+     * This behavior can be controlled by configuration: 
@autoAckOldestChunkedMessageOnQueueFull
+     *
+     * The default value is 10.
+     *
+     * @param maxPendingChunkedMessage
+     * @return
+     */
+    ReaderBuilder<T> maxPendingChunkedMessage(int maxPendingChunkedMessage);
+
+    /**
+     * Buffering large number of outstanding uncompleted chunked messages can 
create memory pressure and it can be
+     * guarded by providing this @maxPendingChunkedMessage threshold. Once, 
consumer reaches this threshold, it drops
+     * the outstanding unchunked-messages by silently acking if 
autoAckOldestChunkedMessageOnQueueFull is true else it
+     * marks them for redelivery.
+     *
+     * @default false
+     *
+     * @param autoAckOldestChunkedMessageOnQueueFull
+     * @return
+     */
+    ReaderBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean 
autoAckOldestChunkedMessageOnQueueFull);
+
+    /**
+     * If producer fails to publish all the chunks of a message then consumer 
can expire incomplete chunks if consumer
+     * won't be able to receive all chunks in expire times (default 1 minute).
+     *
+     * @param duration
+     * @param unit
+     * @return
+     */
+    ReaderBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, 
TimeUnit unit);
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 329f0463475..3ec95386cb8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -68,6 +68,13 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
         
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
         
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
 
+        // chunking configuration
+        
consumerConfiguration.setMaxPendingChunkedMessage(readerConfiguration.getMaxPendingChunkedMessage());
+        consumerConfiguration.setAutoAckOldestChunkedMessageOnQueueFull(
+                
readerConfiguration.isAutoAckOldestChunkedMessageOnQueueFull());
+        consumerConfiguration.setExpireTimeOfIncompleteChunkedMessageMillis(
+                
readerConfiguration.getExpireTimeOfIncompleteChunkedMessageMillis());
+
         if (readerConfiguration.getReaderListener() != null) {
             ReaderListener<T> readerListener = 
readerConfiguration.getReaderListener();
             consumerConfiguration.setMessageListener(new MessageListener<T>() {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index e55a3211870..f3e75467835 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -253,4 +253,22 @@ public class ReaderBuilderImpl<T> implements 
ReaderBuilder<T> {
         return this;
     }
 
+    @Override
+    public ReaderBuilder<T> maxPendingChunkedMessage(int 
maxPendingChunkedMessage) {
+        conf.setMaxPendingChunkedMessage(maxPendingChunkedMessage);
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean 
autoAckOldestChunkedMessageOnQueueFull) {
+        
conf.setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull);
+        return this;
+    }
+
+    @Override
+    public ReaderBuilder<T> expireTimeOfIncompleteChunkedMessage(long 
duration, TimeUnit unit) {
+        
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
+        return this;
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 80d1e3ae755..7701722249f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -73,6 +73,13 @@ public class ReaderImpl<T> implements Reader<T> {
         
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
         
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
 
+        // chunking configuration
+        
consumerConfiguration.setMaxPendingChunkedMessage(readerConfiguration.getMaxPendingChunkedMessage());
+        consumerConfiguration.setAutoAckOldestChunkedMessageOnQueueFull(
+                
readerConfiguration.isAutoAckOldestChunkedMessageOnQueueFull());
+        consumerConfiguration.setExpireTimeOfIncompleteChunkedMessageMillis(
+                
readerConfiguration.getExpireTimeOfIncompleteChunkedMessageMillis());
+
         // Reader doesn't need any batch receiving behaviours
         // disable the batch receive timer for the ConsumerImpl instance 
wrapped by the ReaderImpl
         
consumerConfiguration.setBatchReceivePolicy(DISABLED_BATCH_RECEIVE_POLICY);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index e9f08144863..c33cbf186cb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import lombok.Data;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -67,6 +68,13 @@ public class ReaderConfigurationData<T> implements 
Serializable, Cloneable {
 
     private transient List<ReaderInterceptor<T>> readerInterceptorList;
 
+    // max pending chunked message to avoid sending incomplete message into 
the queue and memory
+    private int maxPendingChunkedMessage = 10;
+
+    private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+
+    private long expireTimeOfIncompleteChunkedMessageMillis = 
TimeUnit.MINUTES.toMillis(1);
+
     @JsonIgnore
     public String getTopicName() {
         if (topicNames.size() > 1) {

Reply via email to