This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d8923b8b20f [pulsar-client] Add message chunking configuration for
reader (#15143)
d8923b8b20f is described below
commit d8923b8b20f4d041ec07eed55aa08099447c6a2b
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Apr 14 21:12:36 2022 -0700
[pulsar-client] Add message chunking configuration for reader (#15143)
---
.../pulsar/client/impl/MessageChunkingTest.java | 36 ++++++++++++++++-
.../apache/pulsar/client/api/ReaderBuilder.java | 47 ++++++++++++++++++++++
.../pulsar/client/impl/MultiTopicsReaderImpl.java | 7 ++++
.../pulsar/client/impl/ReaderBuilderImpl.java | 18 +++++++++
.../org/apache/pulsar/client/impl/ReaderImpl.java | 7 ++++
.../client/impl/conf/ReaderConfigurationData.java | 8 ++++
6 files changed, 122 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 4d752029c0f..85d67c3de0d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
@@ -217,6 +218,8 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
.isAckReceiptEnabled(ackReceiptEnabled)
.ackTimeout(5, TimeUnit.SECONDS).subscribe();
+ Reader<byte[]> reader =
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
+
ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(topicName);
Producer<byte[]> producer =
producerBuilder.enableChunking(true).enableBatching(false).create();
@@ -232,6 +235,15 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
Message<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < totalMessages; i++) {
+ msg = reader.readNext(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.info("Received message: [{}]", receivedMessage);
+ String expectedMessage = publishedMessages.get(i);
+ testMessageOrderAndDuplicates(messageSet, receivedMessage,
expectedMessage);
+ }
+
+ messageSet.clear();
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
@@ -268,6 +280,7 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
consumer.close();
producer.close();
+ reader.close();
log.info("-- Exiting {} test --", methodName);
}
@@ -384,6 +397,8 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
producer.cnx().registerProducer(producerId, producer); // registered
spy ProducerImpl
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").subscribe();
+ ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)
pulsarClient.newReader().topic(topicName)
+ .startMessageId(MessageId.earliest).create();
TypedMessageBuilderImpl<byte[]> msg =
(TypedMessageBuilderImpl<byte[]>)
producer.newMessage().value("message-1".getBytes());
ByteBuf payload = Unpooled.wrappedBuffer(msg.getContent());
@@ -397,17 +412,22 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
producer.processOpSendMsg(op);
retryStrategically((test) -> {
- return consumer.chunkedMessagesMap.size() > 0;
+ return reader.getConsumer().chunkedMessagesMap.size() > 0 &&
consumer.chunkedMessagesMap.size() > 0;
}, 5, 500);
assertEquals(consumer.chunkedMessagesMap.size(), 1);
+ assertEquals(reader.getConsumer().chunkedMessagesMap.size(), 1);
consumer.expireTimeOfIncompleteChunkedMessageMillis = 1;
+ reader.getConsumer().expireTimeOfIncompleteChunkedMessageMillis = 1;
Thread.sleep(10);
consumer.removeExpireIncompleteChunkedMessages();
+ reader.getConsumer().removeExpireIncompleteChunkedMessages();
assertEquals(consumer.chunkedMessagesMap.size(), 0);
+ assertEquals(reader.getConsumer().chunkedMessagesMap.size(), 0);
producer.close();
consumer.close();
+ reader.close();
producer = null; // clean reference of mocked producer
}
@@ -507,6 +527,20 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+ @Test
+ public void testReaderChunkingConfiguration() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ final String topicName = "persistent://my-property/my-ns/my-topic1";
+ ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)
pulsarClient.newReader().topic(topicName)
+
.startMessageId(MessageId.earliest).maxPendingChunkedMessage(12)
+ .autoAckOldestChunkedMessageOnQueueFull(true)
+ .expireTimeOfIncompleteChunkedMessage(12,
TimeUnit.MILLISECONDS).create();
+ ConsumerImpl<byte[]> consumer = reader.getConsumer();
+ assertEquals(consumer.conf.getMaxPendingChunkedMessage(), 12);
+ assertTrue(consumer.conf.isAutoAckOldestChunkedMessageOnQueueFull());
+
assertEquals(consumer.conf.getExpireTimeOfIncompleteChunkedMessageMillis(), 12);
+ }
+
private String createMessagePayload(int size) {
StringBuilder str = new StringBuilder();
Random rand = new Random();
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index c48bdec604a..f9ca85cd45f 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -320,4 +320,51 @@ public interface ReaderBuilder<T> extends Cloneable {
* @return the reader builder instance
*/
ReaderBuilder<T> intercept(ReaderInterceptor<T>... interceptors);
+
+ /**
+ * Consumer buffers chunk messages into memory until it receives all the
chunks of the original message. While
+ * consuming chunk-messages, chunks from same message might not be
contiguous in the stream and they might be mixed
+ * with other messages' chunks. so, consumer has to maintain multiple
buffers to manage chunks coming from different
+ * messages. This mainly happens when multiple publishers are publishing
messages on the topic concurrently or
+ * publisher failed to publish all chunks of the messages.
+ *
+ * <pre>
+ * eg: M1-C1, M2-C1, M1-C2, M2-C2
+ * Here, Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and
M2-C2 messages belong to M2 message.
+ * </pre>
+ * Buffering large number of outstanding uncompleted chunked messages can
create memory pressure and it can be
+ * guarded by providing this @maxPendingChunkedMessage threshold. Once,
consumer reaches this threshold, it drops
+ * the outstanding unchunked-messages by silently acking or asking broker
to redeliver later by marking it unacked.
+ * This behavior can be controlled by configuration:
@autoAckOldestChunkedMessageOnQueueFull
+ *
+ * The default value is 10.
+ *
+ * @param maxPendingChunkedMessage
+ * @return
+ */
+ ReaderBuilder<T> maxPendingChunkedMessage(int maxPendingChunkedMessage);
+
+ /**
+ * Buffering large number of outstanding uncompleted chunked messages can
create memory pressure and it can be
+ * guarded by providing this @maxPendingChunkedMessage threshold. Once,
consumer reaches this threshold, it drops
+ * the outstanding unchunked-messages by silently acking if
autoAckOldestChunkedMessageOnQueueFull is true else it
+ * marks them for redelivery.
+ *
+ * @default false
+ *
+ * @param autoAckOldestChunkedMessageOnQueueFull
+ * @return
+ */
+ ReaderBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean
autoAckOldestChunkedMessageOnQueueFull);
+
+ /**
+ * If producer fails to publish all the chunks of a message then consumer
can expire incomplete chunks if consumer
+ * won't be able to receive all chunks in expire times (default 1 minute).
+ *
+ * @param duration
+ * @param unit
+ * @return
+ */
+ ReaderBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration,
TimeUnit unit);
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 329f0463475..3ec95386cb8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -68,6 +68,13 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
+ // chunking configuration
+
consumerConfiguration.setMaxPendingChunkedMessage(readerConfiguration.getMaxPendingChunkedMessage());
+ consumerConfiguration.setAutoAckOldestChunkedMessageOnQueueFull(
+
readerConfiguration.isAutoAckOldestChunkedMessageOnQueueFull());
+ consumerConfiguration.setExpireTimeOfIncompleteChunkedMessageMillis(
+
readerConfiguration.getExpireTimeOfIncompleteChunkedMessageMillis());
+
if (readerConfiguration.getReaderListener() != null) {
ReaderListener<T> readerListener =
readerConfiguration.getReaderListener();
consumerConfiguration.setMessageListener(new MessageListener<T>() {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index e55a3211870..f3e75467835 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -253,4 +253,22 @@ public class ReaderBuilderImpl<T> implements
ReaderBuilder<T> {
return this;
}
+ @Override
+ public ReaderBuilder<T> maxPendingChunkedMessage(int
maxPendingChunkedMessage) {
+ conf.setMaxPendingChunkedMessage(maxPendingChunkedMessage);
+ return this;
+ }
+
+ @Override
+ public ReaderBuilder<T> autoAckOldestChunkedMessageOnQueueFull(boolean
autoAckOldestChunkedMessageOnQueueFull) {
+
conf.setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull);
+ return this;
+ }
+
+ @Override
+ public ReaderBuilder<T> expireTimeOfIncompleteChunkedMessage(long
duration, TimeUnit unit) {
+
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
+ return this;
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 80d1e3ae755..7701722249f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -73,6 +73,13 @@ public class ReaderImpl<T> implements Reader<T> {
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
+ // chunking configuration
+
consumerConfiguration.setMaxPendingChunkedMessage(readerConfiguration.getMaxPendingChunkedMessage());
+ consumerConfiguration.setAutoAckOldestChunkedMessageOnQueueFull(
+
readerConfiguration.isAutoAckOldestChunkedMessageOnQueueFull());
+ consumerConfiguration.setExpireTimeOfIncompleteChunkedMessageMillis(
+
readerConfiguration.getExpireTimeOfIncompleteChunkedMessageMillis());
+
// Reader doesn't need any batch receiving behaviours
// disable the batch receive timer for the ConsumerImpl instance
wrapped by the ReaderImpl
consumerConfiguration.setBatchReceivePolicy(DISABLED_BATCH_RECEIVE_POLICY);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index e9f08144863..c33cbf186cb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import lombok.Data;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -67,6 +68,13 @@ public class ReaderConfigurationData<T> implements
Serializable, Cloneable {
private transient List<ReaderInterceptor<T>> readerInterceptorList;
+ // max pending chunked message to avoid sending incomplete message into
the queue and memory
+ private int maxPendingChunkedMessage = 10;
+
+ private boolean autoAckOldestChunkedMessageOnQueueFull = false;
+
+ private long expireTimeOfIncompleteChunkedMessageMillis =
TimeUnit.MINUTES.toMillis(1);
+
@JsonIgnore
public String getTopicName() {
if (topicNames.size() > 1) {