This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ecf7792 Fix NPE when send a large message and don't release
batchedMessageMetadataAndPayload when discard in batch message container.
(#5748)
ecf7792 is described below
commit ecf779210c31d1051f9cd2c823a1a7d078cdfd75
Author: lipenghui <[email protected]>
AuthorDate: Fri Nov 29 06:38:23 2019 +0800
Fix NPE when send a large message and don't release
batchedMessageMetadataAndPayload when discard in batch message container.
(#5748)
Fixes #5746 #5747
### Motivation
Fix NPE and release an already released ByteBuf when publish an oversize
message.
Here is error log:
```
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at
io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
~[netty-common-4.1.43.Final.jar:4.1.43.Final]
at
io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
~[netty-common-4.1.43.Final.jar:4.1.43.Final]
at
io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
~[netty-buffer-4.1.43.Final.jar:4.1.43.Final]
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
~[netty-common-4.1.43.Final.jar:4.1.43.Final]
at
io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:113)
[netty-common-4.1.43.Final.jar:4.1.43.Final]
at
org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer$KeyedBatch.discard(BatchMessageKeyBasedContainer.java:244)
[classes/:?]
at
org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer.createOpSendMsg(BatchMessageKeyBasedContainer.java:125)
[classes/:?]
at
org.apache.pulsar.client.impl.BatchMessageKeyBasedContainer.createOpSendMsgs(BatchMessageKeyBasedContainer.java:145)
[classes/:?]
at
org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1426)
[classes/:?]
at
org.apache.pulsar.client.impl.ProducerImpl.triggerFlush(ProducerImpl.java:1411)
[classes/:?]
at
org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:112)
[classes/:?]
at
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89)
[classes/:?]
at
org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:63)
[classes/:?]
at
org.apache.pulsar.broker.service.BatchMessageTest.testSendOverSizeMessage(BatchMessageTest.java:875)
[test-classes/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_201]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_201]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_201]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
at
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
[testng-6.14.3.jar:?]
at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
[testng-6.14.3.jar:?]
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
[testng-6.14.3.jar:?]
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
[testng-6.14.3.jar:?]
at
org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
[testng-6.14.3.jar:?]
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
[testng-6.14.3.jar:?]
at org.testng.TestRunner.privateRun(TestRunner.java:648)
[testng-6.14.3.jar:?]
at org.testng.TestRunner.run(TestRunner.java:505) [testng-6.14.3.jar:?]
at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
[testng-6.14.3.jar:?]
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
[testng-6.14.3.jar:?]
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
[testng-6.14.3.jar:?]
at org.testng.SuiteRunner.run(SuiteRunner.java:364)
[testng-6.14.3.jar:?]
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
[testng-6.14.3.jar:?]
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
[testng-6.14.3.jar:?]
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
[testng-6.14.3.jar:?]
at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
[testng-6.14.3.jar:?]
at org.testng.TestNG.runSuites(TestNG.java:1049) [testng-6.14.3.jar:?]
at org.testng.TestNG.run(TestNG.java:1017) [testng-6.14.3.jar:?]
at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:73)
[testng-plugin.jar:?]
at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:123)
[testng-plugin.jar:?]
16:19:13.850 [main:org.apache.pulsar.client.impl.ProducerImpl@1439] WARN
org.apache.pulsar.client.impl.ProducerImpl -
[persistent://prop/ns-abc/testSendOverSizeMessage-623833fc-d9f7-4b28-aead-27955928fae9]
[test-0-0] error while create opSendMsg by batch message container
java.lang.NullPointerException: null
at
org.apache.pulsar.client.impl.ProducerImpl.releaseSemaphoreForSendOp(ProducerImpl.java:858)
~[classes/:?]
at
org.apache.pulsar.client.impl.ProducerImpl.processOpSendMsg(ProducerImpl.java:1477)
~[classes/:?]
at
org.apache.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1432)
[classes/:?]
at
org.apache.pulsar.client.impl.ProducerImpl.triggerFlush(ProducerImpl.java:1411)
[classes/:?]
at
org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:112)
[classes/:?]
at
org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89)
[classes/:?]
at
org.apache.pulsar.client.impl.ProducerBase.send(ProducerBase.java:63)
[classes/:?]
at
org.apache.pulsar.broker.service.BatchMessageTest.testSendOverSizeMessage(BatchMessageTest.java:875)
[test-classes/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_201]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_201]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_201]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
at
org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
[testng-6.14.3.jar:?]
at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
[testng-6.14.3.jar:?]
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
[testng-6.14.3.jar:?]
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
[testng-6.14.3.jar:?]
at
org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
[testng-6.14.3.jar:?]
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
[testng-6.14.3.jar:?]
at org.testng.TestRunner.privateRun(TestRunner.java:648)
[testng-6.14.3.jar:?]
at org.testng.TestRunner.run(TestRunner.java:505) [testng-6.14.3.jar:?]
at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
[testng-6.14.3.jar:?]
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
[testng-6.14.3.jar:?]
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
[testng-6.14.3.jar:?]
at org.testng.SuiteRunner.run(SuiteRunner.java:364)
[testng-6.14.3.jar:?]
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
[testng-6.14.3.jar:?]
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
[testng-6.14.3.jar:?]
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
[testng-6.14.3.jar:?]
at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
[testng-6.14.3.jar:?]
at org.testng.TestNG.runSuites(TestNG.java:1049) [testng-6.14.3.jar:?]
at org.testng.TestNG.run(TestNG.java:1017) [testng-6.14.3.jar:?]
at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:73)
[testng-plugin.jar:?]
at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:123)
[testng-plugin.jar:?]
```
### Modifications
Add check for processOpSendMsg, if the op is null, just return.
Don't release the batchedMessageMetadataAndPayload since it is already
released in getCompressedBatchMetadataAndPayload() method.
---
.../pulsar/client/impl/BatchMessageContainerImpl.java | 16 +++++-----------
.../client/impl/BatchMessageKeyBasedContainer.java | 16 ++++++----------
.../java/org/apache/pulsar/client/impl/ProducerImpl.java | 3 +++
3 files changed, 14 insertions(+), 21 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 4d6866b..d4772ce 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -162,7 +162,6 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
log.warn("[{}] [{}] Got exception while completing the callback
for msg {}:", topicName, producerName,
lowestSequenceId, t);
}
- ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
clear();
}
@@ -174,6 +173,11 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
@Override
public OpSendMsg createOpSendMsg() throws IOException {
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload());
+ if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+ discard(new PulsarClientException.InvalidMessageException(
+ "Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
+ return null;
+ }
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
messageMetadata.setHighestSequenceId(highestSequenceId);
ByteBufPair cmd = producer.sendMessage(producer.producerId,
messageMetadata.getSequenceId(),
@@ -182,16 +186,6 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
OpSendMsg op = OpSendMsg.create(messages, cmd,
messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback);
- if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
- cmd.release();
- discard(new PulsarClientException.InvalidMessageException(
- "Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
- if (op != null) {
- op.recycle();
- }
- return null;
- }
-
op.setNumMessagesInBatch(numMessagesInBatch);
op.setBatchSizeByte(currentBatchSizeBytes);
lowestSequenceId = -1L;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index be1d234..c9328c8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -109,6 +109,12 @@ class BatchMessageKeyBasedContainer extends
AbstractBatchMessageContainer {
private ProducerImpl.OpSendMsg createOpSendMsg(KeyedBatch keyedBatch)
throws IOException {
ByteBuf encryptedPayload =
producer.encryptMessage(keyedBatch.messageMetadata,
keyedBatch.getCompressedBatchMetadataAndPayload());
+ if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
+ keyedBatch.discard(new
PulsarClientException.InvalidMessageException(
+ "Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
+ return null;
+ }
+
final int numMessagesInBatch = keyedBatch.messages.size();
long currentBatchSizeBytes = 0;
for (MessageImpl<?> message : keyedBatch.messages) {
@@ -120,15 +126,6 @@ class BatchMessageKeyBasedContainer extends
AbstractBatchMessageContainer {
ProducerImpl.OpSendMsg op =
ProducerImpl.OpSendMsg.create(keyedBatch.messages, cmd, keyedBatch.sequenceId,
keyedBatch.firstCallback);
- if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
- cmd.release();
- keyedBatch.discard(new
PulsarClientException.InvalidMessageException(
- "Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
- if (op != null) {
- op.recycle();
- }
- return null;
- }
op.setNumMessagesInBatch(numMessagesInBatch);
op.setBatchSizeByte(currentBatchSizeBytes);
return op;
@@ -241,7 +238,6 @@ class BatchMessageKeyBasedContainer extends
AbstractBatchMessageContainer {
log.warn("[{}] [{}] Got exception while completing the
callback for msg {}:", topicName, producerName,
sequenceId, t);
}
- ReferenceCountUtil.safeRelease(batchedMessageMetadataAndPayload);
clear();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 079b653..4bc95a2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1442,6 +1442,9 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
private void processOpSendMsg(OpSendMsg op) {
+ if (op == null) {
+ return;
+ }
try {
if (op.msg != null && isBatchMessagingEnabled()) {
batchMessageAndSend();