This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.11.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 89a99e33a3a8cb6277a0524a898c8e70124887cc Author: Jiaqi Shen <[email protected]> AuthorDate: Tue Jul 11 15:35:26 2023 +0800 [fix] [issue 877] Fix ctx in partitionProducer.Send() is not performing as expected (#1053) Fixes #877 ### Motivation The original PR is #878. Because the original author @billowqiu has not continued to reply to the review comments for a long time, resubmit the fix here. ### Modifications - Add select for ctx and doneCh in partitionProducer.Send() --------- Co-authored-by: shenjiaqi.2769 <[email protected]> (cherry picked from commit be3574019383ac0cdc65fec63e422fcfd6c82e4b) --- pulsar/producer_partition.go | 10 +++++++--- pulsar/producer_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index b416c9ec..8af75bad 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1102,9 +1102,13 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes }, true) // wait for send request to finish - <-doneCh - - return msgID, err + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-doneCh: + // send request has been finished + return msgID, err + } } func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index a042e0d7..be9885fa 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2198,3 +2198,29 @@ func testSendMessagesWithMetadata(t *testing.T, disableBatch bool) { assert.Equal(t, msg.OrderingKey, recvMsg.OrderingKey()) assert.Equal(t, msg.Properties, recvMsg.Properties()) } + +func TestProducerSendWithContext(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + + topicName := newTopicName() + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + // Make ctx be canceled to invalidate the context immediately + cancel() + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: make([]byte, 1024*1024), + }) + // producer.Send should fail and return err context.Canceled + assert.True(t, errors.Is(err, context.Canceled)) +}
