This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 166a7ab5e10 [improve][java-client]Add init capacity for messages in 
BatchMessageContainerImpl (#17822)
166a7ab5e10 is described below

commit 166a7ab5e1082646aebee4f152d968e3c0d102f7
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Fri Sep 30 10:33:37 2022 +0800

    [improve][java-client]Add init capacity for messages in 
BatchMessageContainerImpl (#17822)
    
    (cherry picked from commit 15a347ca999befe3ea3bd246d34309ad50fbcbe2)
---
 .../client/impl/AbstractBatchMessageContainer.java |  8 +++
 .../client/impl/BatchMessageContainerImpl.java     |  5 +-
 .../client/impl/BatchMessageContainerImplTest.java | 64 ++++++++++++++++++++++
 3 files changed, 75 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 73f1e6d0889..9b4d1b7d683 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
@@ -46,10 +47,12 @@ public abstract class AbstractBatchMessageContainer 
implements BatchMessageConta
     protected long currentTxnidLeastBits = -1L;
 
     protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
+    protected static final int INITIAL_MESSAGES_NUM = 32;
 
     // This will be the largest size for a batch sent from this particular 
producer. This is used as a baseline to
     // allocate a new buffer that can hold the entire batch without needing 
costly reallocations
     protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
+    protected int maxMessagesNum = INITIAL_MESSAGES_NUM;
 
     @Override
     public boolean haveEnoughSpace(MessageImpl<?> msg) {
@@ -71,6 +74,11 @@ public abstract class AbstractBatchMessageContainer 
implements BatchMessageConta
         return numMessagesInBatch;
     }
 
+    @VisibleForTesting
+    public int getMaxMessagesNum() {
+        return maxMessagesNum;
+    }
+
     @Override
     public long getCurrentBatchSize() {
         return currentBatchSizeBytes;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 0a6fedf0ab6..cfc0e1f98c1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -49,7 +49,7 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
     private long lowestSequenceId = -1L;
     private long highestSequenceId = -1L;
     private ByteBuf batchedMessageMetadataAndPayload;
-    private List<MessageImpl<?>> messages = new ArrayList<>();
+    private List<MessageImpl<?>> messages = new ArrayList<>(maxMessagesNum);
     protected SendCallback previousCallback = null;
     // keep track of callbacks for individual messages being published in a 
batch
     protected SendCallback firstCallback;
@@ -139,12 +139,13 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         // Update the current max batch size using the uncompressed size, 
which is what we need in any case to
         // accumulate the batch content
         maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
+        maxMessagesNum = Math.max(maxMessagesNum, numMessagesInBatch);
         return compressedPayload;
     }
 
     @Override
     public void clear() {
-        messages = new ArrayList<>();
+        messages = new ArrayList<>(maxMessagesNum);
         firstCallback = null;
         previousCallback = null;
         messageMetadata.clear();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index 8fc018b3199..69aaf95385b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -18,13 +18,20 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
 import org.apache.pulsar.client.api.CompressionType;
@@ -35,6 +42,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.mockito.MockedConstruction;
 import org.mockito.Mockito;
 import org.powermock.reflect.Whitebox;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class BatchMessageContainerImplTest {
@@ -84,4 +92,60 @@ public class BatchMessageContainerImplTest {
                 createByteBufAllocatorMethod.invoke(null));
     }
 
+
+    @Test
+    public void testMessagesSize() throws Exception {
+        ProducerImpl producer = mock(ProducerImpl.class);
+
+        final ProducerConfigurationData producerConfigurationData = new 
ProducerConfigurationData();
+        producerConfigurationData.setCompressionType(CompressionType.NONE);
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        MemoryLimitController memoryLimitController = 
mock(MemoryLimitController.class);
+        
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
+        try {
+            Field clientFiled = HandlerState.class.getDeclaredField("client");
+            clientFiled.setAccessible(true);
+            clientFiled.set(producer, pulsarClient);
+        } catch (Exception e){
+            Assert.fail(e.getMessage());
+        }
+
+        ByteBuffer payload = 
ByteBuffer.wrap("payload".getBytes(StandardCharsets.UTF_8));
+
+        
when(producer.getConfiguration()).thenReturn(producerConfigurationData);
+        when(producer.encryptMessage(any(), 
any())).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(payload));
+
+        final int initNum = 32;
+        BatchMessageContainerImpl batchMessageContainer = new 
BatchMessageContainerImpl();
+        batchMessageContainer.setProducer(producer);
+        assertEquals(batchMessageContainer.getMaxMessagesNum(), initNum);
+
+        addMessagesAndCreateOpSendMsg(batchMessageContainer, 10);
+        assertEquals(batchMessageContainer.getMaxMessagesNum(), initNum);
+
+        addMessagesAndCreateOpSendMsg(batchMessageContainer, 200);
+        assertEquals(batchMessageContainer.getMaxMessagesNum(), 200);
+
+        addMessagesAndCreateOpSendMsg(batchMessageContainer, 10);
+        assertEquals(batchMessageContainer.getMaxMessagesNum(), 200);
+    }
+
+    private void addMessagesAndCreateOpSendMsg(BatchMessageContainerImpl 
batchMessageContainer, int num)
+            throws Exception{
+        ArrayList<MessageImpl<?>> messages = new ArrayList<>();
+        for (int i = 0; i < num; ++i) {
+            MessageMetadata messageMetadata = new MessageMetadata();
+            messageMetadata.setSequenceId(i);
+            messageMetadata.setProducerName("producer");
+            messageMetadata.setPublishTime(System.currentTimeMillis());
+            ByteBuffer payload = 
ByteBuffer.wrap("payload".getBytes(StandardCharsets.UTF_8));
+            MessageImpl<?> message = MessageImpl.create(messageMetadata, 
payload, Schema.BYTES, null);
+            messages.add(message);
+            batchMessageContainer.add(message, null);
+        }
+
+        batchMessageContainer.createOpSendMsg();
+        batchMessageContainer.clear();
+        messages.forEach(ReferenceCountUtil::safeRelease);
+    }
 }

Reply via email to