This is an automated email from the ASF dual-hosted git repository. zixuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 69a06d5a [fix] Only decompress the payload if it's not empty (#1280) 69a06d5a is described below commit 69a06d5aeba516431039b3af5d7fd530762761ae Author: Stepan Bujnak <stepan.buj...@gmail.com> AuthorDate: Fri Oct 4 12:33:45 2024 +0200 [fix] Only decompress the payload if it's not empty (#1280) The message payload is optional and in some cases only message properties are sent. In this case, the message decompression would fail so we only want to do the decompression if the payload is not empty. --- pulsar/consumer_partition.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 16aef5d4..823e0e87 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1105,15 +1105,18 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } } - // decryption is success, decompress the payload - uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, processedPayloadBuffer) - if err != nil { - pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError) - return err - } + var uncompressedHeadersAndPayload internal.Buffer + // decryption is success, decompress the payload, but only if payload is not empty + if n := msgMeta.UncompressedSize; n != nil && *n > 0 { + uncompressedHeadersAndPayload, err = pc.Decompress(msgMeta, processedPayloadBuffer) + if err != nil { + pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError) + return err + } - // Reset the reader on the uncompressed buffer - reader.ResetBuffer(uncompressedHeadersAndPayload) + // Reset the reader on the uncompressed buffer + reader.ResetBuffer(uncompressedHeadersAndPayload) + } numMsgs := 1 if msgMeta.NumMessagesInBatch != nil {