This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.11.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit e429eca53562e02def06820afd56ab7b2efb4b85 Author: gunli <[email protected]> AuthorDate: Thu Jul 13 17:15:37 2023 +0800 [Fix][Producer] Stop block request even if Value and Payload are both set (#1052) ### Motivation Currently, if `!p.options.DisableBlockIfQueueFull` and `msg.Value != nil && msg.Payload != nil`, request will be blocked forever 'cause `defer request.stopBlock()` is set up after the verify logic. ```go if msg.Value != nil && msg.Payload != nil { p.log.Error("Can not set Value and Payload both") runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both")) return } // The block chan must be closed when returned with exception defer request.stopBlock() ``` Here is the PR to stop block request even if Value and Payload are both set ### Modifications - pulsar/producer_partition.go --------- Co-authored-by: gunli <[email protected]> (cherry picked from commit e45122c2defc5efd4efc493d0acef278a7ccfc01) --- pulsar/producer_partition.go | 11 ++++++----- pulsar/producer_test.go | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 2f771fde..341ccf5d 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -486,11 +486,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) { var schemaPayload []byte var err error - if msg.Value != nil && msg.Payload != nil { - p.log.Error("Can not set Value and Payload both") - runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both")) - return - } // The block chan must be closed when returned with exception defer request.stopBlock() @@ -1118,6 +1113,12 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer return } + if msg.Value != nil && msg.Payload != nil { + p.log.Error("Can not set Value and Payload both") + runCallback(callback, nil, msg, newError(InvalidMessage, "Can not set Value and Payload both")) + return + } + // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) var txn *transaction diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index fecae8ef..b197278a 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -111,6 +111,12 @@ func TestSimpleProducer(t *testing.T) { _, err = producer.Send(context.Background(), nil) assert.NotNil(t, err) + + _, err = producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + Value: []byte("hello"), + }) + assert.NotNil(t, err) } func TestProducerAsyncSend(t *testing.T) { @@ -163,6 +169,15 @@ func TestProducerAsyncSend(t *testing.T) { wg.Done() }) wg.Wait() + + wg.Add(1) + producer.SendAsync(context.Background(), &ProducerMessage{Payload: []byte("hello"), Value: []byte("hello")}, + func(id MessageID, m *ProducerMessage, e error) { + assert.NotNil(t, e) + assert.Nil(t, id) + wg.Done() + }) + wg.Wait() } func TestProducerCompression(t *testing.T) {
