This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ecf7792  Fix NPE when send a large message and don't release 
batchedMessageMetadataAndPayload when discard in batch message container. 
(#5748)
ecf7792 is described below

commit ecf779210c31d1051f9cd2c823a1a7d078cdfd75
Author: lipenghui <[email protected]>
AuthorDate: Fri Nov 29 06:38:23 2019 +0800

    Fix NPE when send a large message and don't release 
batchedMessageMetadataAndPayload when discard in batch message container. 
(#5748)
    
    Fixes #5746 #5747
    
    ### Motivation
    
    Fix NPE and release an already released ByteBuf when publish an oversize 
message.
    
    Here is error log:
    ```
    io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
        at 
io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
 ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
        at 
io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
 ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
        at 
io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 ~[netty-buffer-4.1.43.Final.jar:4.1.43.Final]
        at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88) 
~[netty-common-4.1.43.Final.jar:4.1.43.Final]
        at 
io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:113) 
[netty-common-4.1.43.Final.jar:4.1.43.Final]
        at 
org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer$KeyedBatch.discard(BatchMessageKeyBasedContainer.java:244)
 [classes/:?]
        at 
org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer.createOpSendMsg(BatchMessageKeyBasedContainer.java:125)
 [classes/:?]
        at 
org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer.createOpSendMsgs(BatchMessageKeyBasedContainer.java:145)
 [classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1426)
 [classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl.triggerFlush(ProducerImpl.java:1411) 
[classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:112) 
[classes/:?]
        at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89)
 [classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:63) 
[classes/:?]
        at 
org.apache.pulsar.broker.service.BatchMessageTest.testSendOverSizeMessage(BatchMessageTest.java:875)
 [test-classes/:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_201]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_201]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_201]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
        at 
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
 [testng-6.14.3.jar:?]
        at org.testng.internal.Invoker.invokeMethod(Invoker.java:583) 
[testng-6.14.3.jar:?]
        at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719) 
[testng-6.14.3.jar:?]
        at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989) 
[testng-6.14.3.jar:?]
        at 
org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
 [testng-6.14.3.jar:?]
        at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109) 
[testng-6.14.3.jar:?]
        at org.testng.TestRunner.privateRun(TestRunner.java:648) 
[testng-6.14.3.jar:?]
        at org.testng.TestRunner.run(TestRunner.java:505) [testng-6.14.3.jar:?]
        at org.testng.SuiteRunner.runTest(SuiteRunner.java:455) 
[testng-6.14.3.jar:?]
        at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450) 
[testng-6.14.3.jar:?]
        at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415) 
[testng-6.14.3.jar:?]
        at org.testng.SuiteRunner.run(SuiteRunner.java:364) 
[testng-6.14.3.jar:?]
        at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) 
[testng-6.14.3.jar:?]
        at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84) 
[testng-6.14.3.jar:?]
        at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208) 
[testng-6.14.3.jar:?]
        at org.testng.TestNG.runSuitesLocally(TestNG.java:1137) 
[testng-6.14.3.jar:?]
        at org.testng.TestNG.runSuites(TestNG.java:1049) [testng-6.14.3.jar:?]
        at org.testng.TestNG.run(TestNG.java:1017) [testng-6.14.3.jar:?]
        at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:73) 
[testng-plugin.jar:?]
        at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:123) 
[testng-plugin.jar:?]
    
    
    
    16:19:13.850 [main:org.apache.pulsar.client.impl.ProducerImpl@1439] WARN  
org.apache.pulsar.client.impl.ProducerImpl - 
[persistent://prop/ns-abc/testSendOverSizeMessage-623833fc-d9f7-4b28-aead-27955928fae9]
 [test-0-0] error while create opSendMsg by batch message container
    java.lang.NullPointerException: null
        at 
org.apache.pulsar.client.impl.ProducerImpl.releaseSemaphoreForSendOp(ProducerImpl.java:858)
 ~[classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl.processOpSendMsg(ProducerImpl.java:1477)
 ~[classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1432)
 [classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl.triggerFlush(ProducerImpl.java:1411) 
[classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:112) 
[classes/:?]
        at 
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89)
 [classes/:?]
        at 
org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:63) 
[classes/:?]
        at 
org.apache.pulsar.broker.service.BatchMessageTest.testSendOverSizeMessage(BatchMessageTest.java:875)
 [test-classes/:?]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_201]
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_201]
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_201]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
        at 
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
 [testng-6.14.3.jar:?]
        at org.testng.internal.Invoker.invokeMethod(Invoker.java:583) 
[testng-6.14.3.jar:?]
        at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719) 
[testng-6.14.3.jar:?]
        at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989) 
[testng-6.14.3.jar:?]
        at 
org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
 [testng-6.14.3.jar:?]
        at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109) 
[testng-6.14.3.jar:?]
        at org.testng.TestRunner.privateRun(TestRunner.java:648) 
[testng-6.14.3.jar:?]
        at org.testng.TestRunner.run(TestRunner.java:505) [testng-6.14.3.jar:?]
        at org.testng.SuiteRunner.runTest(SuiteRunner.java:455) 
[testng-6.14.3.jar:?]
        at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450) 
[testng-6.14.3.jar:?]
        at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415) 
[testng-6.14.3.jar:?]
        at org.testng.SuiteRunner.run(SuiteRunner.java:364) 
[testng-6.14.3.jar:?]
        at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) 
[testng-6.14.3.jar:?]
        at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84) 
[testng-6.14.3.jar:?]
        at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208) 
[testng-6.14.3.jar:?]
        at org.testng.TestNG.runSuitesLocally(TestNG.java:1137) 
[testng-6.14.3.jar:?]
        at org.testng.TestNG.runSuites(TestNG.java:1049) [testng-6.14.3.jar:?]
        at org.testng.TestNG.run(TestNG.java:1017) [testng-6.14.3.jar:?]
        at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:73) 
[testng-plugin.jar:?]
        at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:123) 
[testng-plugin.jar:?]
    ```
    
    ### Modifications
    
    Add check for processOpSendMsg, if the op is null, just return.
    Don't release the batchedMessageMetadataAndPayload since it is already 
released in getCompressedBatchMetadataAndPayload() method.
---
 .../pulsar/client/impl/BatchMessageContainerImpl.java    | 16 +++++-----------
 .../client/impl/BatchMessageKeyBasedContainer.java       | 16 ++++++----------
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java |  3 +++
 3 files changed, 14 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 4d6866b..d4772ce 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -162,7 +162,6 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
             log.warn("[{}] [{}] Got exception while completing the callback 
for msg {}:", topicName, producerName,
                     lowestSequenceId, t);
         }
-        ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
         clear();
     }
 
@@ -174,6 +173,11 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
     @Override
     public OpSendMsg createOpSendMsg() throws IOException {
         ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, 
getCompressedBatchMetadataAndPayload());
+        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+            discard(new PulsarClientException.InvalidMessageException(
+                    "Message size is bigger than " + 
ClientCnx.getMaxMessageSize() + " bytes"));
+            return null;
+        }
         messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
         messageMetadata.setHighestSequenceId(highestSequenceId);
         ByteBufPair cmd = producer.sendMessage(producer.producerId, 
messageMetadata.getSequenceId(),
@@ -182,16 +186,6 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         OpSendMsg op = OpSendMsg.create(messages, cmd, 
messageMetadata.getSequenceId(),
                 messageMetadata.getHighestSequenceId(), firstCallback);
 
-        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
-            cmd.release();
-            discard(new PulsarClientException.InvalidMessageException(
-                    "Message size is bigger than " + 
ClientCnx.getMaxMessageSize() + " bytes"));
-            if (op != null) {
-                op.recycle();
-            }
-            return null;
-        }
-
         op.setNumMessagesInBatch(numMessagesInBatch);
         op.setBatchSizeByte(currentBatchSizeBytes);
         lowestSequenceId = -1L;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index be1d234..c9328c8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -109,6 +109,12 @@ class BatchMessageKeyBasedContainer extends 
AbstractBatchMessageContainer {
 
     private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch) 
throws IOException {
         ByteBuf encryptedPayload = 
producer.encryptMessage(keyedBatch.messageMetadata, 
keyedBatch.getCompressedBatchMetadataAndPayload());
+        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+            keyedBatch.discard(new 
PulsarClientException.InvalidMessageException(
+                    "Message size is bigger than " + 
ClientCnx.getMaxMessageSize() + " bytes"));
+            return null;
+        }
+
         final int numMessagesInBatch = keyedBatch.messages.size();
         long currentBatchSizeBytes = 0;
         for (MessageImpl<?> message : keyedBatch.messages) {
@@ -120,15 +126,6 @@ class BatchMessageKeyBasedContainer extends 
AbstractBatchMessageContainer {
 
         ProducerImpl.OpSendMsg op = 
ProducerImpl.OpSendMsg.create(keyedBatch.messages, cmd, keyedBatch.sequenceId, 
keyedBatch.firstCallback);
 
-        if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
-            cmd.release();
-            keyedBatch.discard(new 
PulsarClientException.InvalidMessageException(
-                    "Message size is bigger than " + 
ClientCnx.getMaxMessageSize() + " bytes"));
-            if (op != null) {
-                op.recycle();
-            }
-            return null;
-        }
         op.setNumMessagesInBatch(numMessagesInBatch);
         op.setBatchSizeByte(currentBatchSizeBytes);
         return op;
@@ -241,7 +238,6 @@ class BatchMessageKeyBasedContainer extends 
AbstractBatchMessageContainer {
                 log.warn("[{}] [{}] Got exception while completing the 
callback for msg {}:", topicName, producerName,
                         sequenceId, t);
             }
-            ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
             clear();
         }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 079b653..4bc95a2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1442,6 +1442,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     }
 
     private void processOpSendMsg(OpSendMsg op) {
+        if (op == null) {
+            return;
+        }
         try {
             if (op.msg != null && isBatchMessagingEnabled()) {
                 batchMessageAndSend();

Reply via email to