This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 166a7ab5e10 [improve][java-client]Add init capacity for messages in
BatchMessageContainerImpl (#17822)
166a7ab5e10 is described below
commit 166a7ab5e1082646aebee4f152d968e3c0d102f7
Author: Xiaoyu Hou <[email protected]>
AuthorDate: Fri Sep 30 10:33:37 2022 +0800
[improve][java-client]Add init capacity for messages in
BatchMessageContainerImpl (#17822)
(cherry picked from commit 15a347ca999befe3ea3bd246d34309ad50fbcbe2)
---
.../client/impl/AbstractBatchMessageContainer.java | 8 +++
.../client/impl/BatchMessageContainerImpl.java | 5 +-
.../client/impl/BatchMessageContainerImplTest.java | 64 ++++++++++++++++++++++
3 files changed, 75 insertions(+), 2 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 73f1e6d0889..9b4d1b7d683 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@@ -46,10 +47,12 @@ public abstract class AbstractBatchMessageContainer
implements BatchMessageConta
protected long currentTxnidLeastBits = -1L;
protected static final int INITIAL_BATCH_BUFFER_SIZE = 1024;
+ protected static final int INITIAL_MESSAGES_NUM = 32;
// This will be the largest size for a batch sent from this particular
producer. This is used as a baseline to
// allocate a new buffer that can hold the entire batch without needing
costly reallocations
protected int maxBatchSize = INITIAL_BATCH_BUFFER_SIZE;
+ protected int maxMessagesNum = INITIAL_MESSAGES_NUM;
@Override
public boolean haveEnoughSpace(MessageImpl<?> msg) {
@@ -71,6 +74,11 @@ public abstract class AbstractBatchMessageContainer
implements BatchMessageConta
return numMessagesInBatch;
}
+ @VisibleForTesting
+ public int getMaxMessagesNum() {
+ return maxMessagesNum;
+ }
+
@Override
public long getCurrentBatchSize() {
return currentBatchSizeBytes;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 0a6fedf0ab6..cfc0e1f98c1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -49,7 +49,7 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
private long lowestSequenceId = -1L;
private long highestSequenceId = -1L;
private ByteBuf batchedMessageMetadataAndPayload;
- private List<MessageImpl<?>> messages = new ArrayList<>();
+ private List<MessageImpl<?>> messages = new ArrayList<>(maxMessagesNum);
protected SendCallback previousCallback = null;
// keep track of callbacks for individual messages being published in a
batch
protected SendCallback firstCallback;
@@ -139,12 +139,13 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
// Update the current max batch size using the uncompressed size,
which is what we need in any case to
// accumulate the batch content
maxBatchSize = Math.max(maxBatchSize, uncompressedSize);
+ maxMessagesNum = Math.max(maxMessagesNum, numMessagesInBatch);
return compressedPayload;
}
@Override
public void clear() {
- messages = new ArrayList<>();
+ messages = new ArrayList<>(maxMessagesNum);
firstCallback = null;
previousCallback = null;
messageMetadata.clear();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index 8fc018b3199..69aaf95385b 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -18,13 +18,20 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
+import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
import org.apache.pulsar.client.api.CompressionType;
@@ -35,6 +42,7 @@ import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class BatchMessageContainerImplTest {
@@ -84,4 +92,60 @@ public class BatchMessageContainerImplTest {
createByteBufAllocatorMethod.invoke(null));
}
+
+ @Test
+ public void testMessagesSize() throws Exception {
+ ProducerImpl producer = mock(ProducerImpl.class);
+
+ final ProducerConfigurationData producerConfigurationData = new
ProducerConfigurationData();
+ producerConfigurationData.setCompressionType(CompressionType.NONE);
+ PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ MemoryLimitController memoryLimitController =
mock(MemoryLimitController.class);
+
when(pulsarClient.getMemoryLimitController()).thenReturn(memoryLimitController);
+ try {
+ Field clientFiled = HandlerState.class.getDeclaredField("client");
+ clientFiled.setAccessible(true);
+ clientFiled.set(producer, pulsarClient);
+ } catch (Exception e){
+ Assert.fail(e.getMessage());
+ }
+
+ ByteBuffer payload =
ByteBuffer.wrap("payload".getBytes(StandardCharsets.UTF_8));
+
+
when(producer.getConfiguration()).thenReturn(producerConfigurationData);
+ when(producer.encryptMessage(any(),
any())).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(payload));
+
+ final int initNum = 32;
+ BatchMessageContainerImpl batchMessageContainer = new
BatchMessageContainerImpl();
+ batchMessageContainer.setProducer(producer);
+ assertEquals(batchMessageContainer.getMaxMessagesNum(), initNum);
+
+ addMessagesAndCreateOpSendMsg(batchMessageContainer, 10);
+ assertEquals(batchMessageContainer.getMaxMessagesNum(), initNum);
+
+ addMessagesAndCreateOpSendMsg(batchMessageContainer, 200);
+ assertEquals(batchMessageContainer.getMaxMessagesNum(), 200);
+
+ addMessagesAndCreateOpSendMsg(batchMessageContainer, 10);
+ assertEquals(batchMessageContainer.getMaxMessagesNum(), 200);
+ }
+
+ private void addMessagesAndCreateOpSendMsg(BatchMessageContainerImpl
batchMessageContainer, int num)
+ throws Exception{
+ ArrayList<MessageImpl<?>> messages = new ArrayList<>();
+ for (int i = 0; i < num; ++i) {
+ MessageMetadata messageMetadata = new MessageMetadata();
+ messageMetadata.setSequenceId(i);
+ messageMetadata.setProducerName("producer");
+ messageMetadata.setPublishTime(System.currentTimeMillis());
+ ByteBuffer payload =
ByteBuffer.wrap("payload".getBytes(StandardCharsets.UTF_8));
+ MessageImpl<?> message = MessageImpl.create(messageMetadata,
payload, Schema.BYTES, null);
+ messages.add(message);
+ batchMessageContainer.add(message, null);
+ }
+
+ batchMessageContainer.createOpSendMsg();
+ batchMessageContainer.clear();
+ messages.forEach(ReferenceCountUtil::safeRelease);
+ }
}