This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 10301ee633a5da78013571cbf02b5599371dcbf6
Author: ZhangJian He <[email protected]>
AuthorDate: Wed Oct 20 15:58:20 2021 +0800

    [pulsar-java-client] Auto-recovery after exception like out of direct 
memory (#12170)
    
    
    (cherry picked from commit ee62763ae4fe749b24857876ea0d8a34021042de)
---
 .../client/impl/BatchMessageContainerImpl.java     | 34 ++++++----
 .../client/impl/BatchMessageContainerImplTest.java | 75 ++++++++++++++++++++++
 2 files changed, 97 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 9f81e03..cea567e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -66,18 +66,28 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         }
 
         if (++numMessagesInBatch == 1) {
-            // some properties are common amongst the different messages in 
the batch, hence we just pick it up from
-            // the first message
-            messageMetadata.setSequenceId(msg.getSequenceId());
-            lowestSequenceId = 
Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
-            this.firstCallback = callback;
-            batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
-                    .buffer(Math.min(maxBatchSize, 
ClientCnx.getMaxMessageSize()));
-            if (msg.getMessageBuilder().hasTxnidMostBits() && 
currentTxnidMostBits == -1) {
-                currentTxnidMostBits = 
msg.getMessageBuilder().getTxnidMostBits();
-            }
-            if (msg.getMessageBuilder().hasTxnidLeastBits() && 
currentTxnidLeastBits == -1) {
-                currentTxnidLeastBits = 
msg.getMessageBuilder().getTxnidLeastBits();
+            try {
+                // some properties are common amongst the different messages 
in the batch, hence we just pick it up from
+                // the first message
+                messageMetadata.setSequenceId(msg.getSequenceId());
+                lowestSequenceId = 
Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
+                this.firstCallback = callback;
+                batchedMessageMetadataAndPayload = 
PulsarByteBufAllocator.DEFAULT
+                        .buffer(Math.min(maxBatchSize, 
ClientCnx.getMaxMessageSize()));
+                if (msg.getMessageBuilder().hasTxnidMostBits() && 
currentTxnidMostBits == -1) {
+                    currentTxnidMostBits = 
msg.getMessageBuilder().getTxnidMostBits();
+                }
+                if (msg.getMessageBuilder().hasTxnidLeastBits() && 
currentTxnidLeastBits == -1) {
+                    currentTxnidLeastBits = 
msg.getMessageBuilder().getTxnidLeastBits();
+                }
+            } catch (Throwable e) {
+                log.error("construct first message failed, exception is ", e);
+                if (batchedMessageMetadataAndPayload != null) {
+                    // if payload has been allocated release it
+                    batchedMessageMetadataAndPayload.release();
+                }
+                discard(new PulsarClientException(e));
+                return false;
             }
         }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
new file mode 100644
index 0000000..3d55487
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorBuilderImpl;
+import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+@PrepareForTest({ByteBufAllocatorImpl.class, 
ByteBufAllocatorBuilderImpl.class})
+@PowerMockIgnore({"javax.management.*", "javax.ws.*", 
"org.apache.logging.log4j.*"})
+public class BatchMessageContainerImplTest {
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void recoveryAfterOom() throws Exception {
+        final ByteBufAllocatorImpl mockAllocator = 
PowerMockito.mock(ByteBufAllocatorImpl.class);
+        
PowerMockito.whenNew(ByteBufAllocatorImpl.class).withAnyArguments().thenReturn(mockAllocator);
+        PowerMockito.when(mockAllocator.buffer(Mockito.anyInt(), 
Mockito.anyInt())).thenThrow(new OutOfMemoryError("test")).thenReturn(null);
+        final ProducerImpl producer = Mockito.mock(ProducerImpl.class);
+        final ProducerConfigurationData producerConfigurationData = new 
ProducerConfigurationData();
+        producerConfigurationData.setCompressionType(CompressionType.NONE);
+        
Mockito.when(producer.getConfiguration()).thenReturn(producerConfigurationData);
+        final BatchMessageContainerImpl batchMessageContainer = new 
BatchMessageContainerImpl();
+        batchMessageContainer.setProducer(producer);
+        MessageMetadata messageMetadata1 = new MessageMetadata();
+        messageMetadata1.setSequenceId(1L);
+        messageMetadata1.setProducerName("producer1");
+        messageMetadata1.setPublishTime(System.currentTimeMillis());
+        ByteBuffer payload1 = 
ByteBuffer.wrap("payload1".getBytes(StandardCharsets.UTF_8));
+        final MessageImpl<byte[]> message1 = 
MessageImpl.create(messageMetadata1, payload1, Schema.BYTES, null);
+        batchMessageContainer.add(message1, null);
+        MessageMetadata messageMetadata2 = new MessageMetadata();
+        messageMetadata2.setSequenceId(1L);
+        messageMetadata2.setProducerName("producer1");
+        messageMetadata2.setPublishTime(System.currentTimeMillis());
+        ByteBuffer payload2 = 
ByteBuffer.wrap("payload2".getBytes(StandardCharsets.UTF_8));
+        final MessageImpl<byte[]> message2 = 
MessageImpl.create(messageMetadata2, payload2, Schema.BYTES, null);
+        // after oom, our add can self-healing, won't throw exception
+        batchMessageContainer.add(message2, null);
+    }
+
+}

Reply via email to