This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.11.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 89a99e33a3a8cb6277a0524a898c8e70124887cc
Author: Jiaqi Shen <[email protected]>
AuthorDate: Tue Jul 11 15:35:26 2023 +0800

    [fix] [issue 877] Fix ctx in partitionProducer.Send() is not performing as 
expected (#1053)
    
    Fixes #877
    
    ### Motivation
    
    The original PR is #878. Because the original author @billowqiu has not 
continued to reply to the review comments for a long time, resubmit the fix 
here.
    
    ### Modifications
    
    - Add select for ctx and doneCh in partitionProducer.Send()
    
    ---------
    
    Co-authored-by: shenjiaqi.2769 <[email protected]>
    (cherry picked from commit be3574019383ac0cdc65fec63e422fcfd6c82e4b)
---
 pulsar/producer_partition.go | 10 +++++++---
 pulsar/producer_test.go      | 26 ++++++++++++++++++++++++++
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index b416c9ec..8af75bad 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1102,9 +1102,13 @@ func (p *partitionProducer) Send(ctx context.Context, 
msg *ProducerMessage) (Mes
        }, true)
 
        // wait for send request to finish
-       <-doneCh
-
-       return msgID, err
+       select {
+       case <-ctx.Done():
+               return nil, ctx.Err()
+       case <-doneCh:
+               // send request has been finished
+               return msgID, err
+       }
 }
 
 func (p *partitionProducer) SendAsync(ctx context.Context, msg 
*ProducerMessage,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index a042e0d7..be9885fa 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2198,3 +2198,29 @@ func testSendMessagesWithMetadata(t *testing.T, 
disableBatch bool) {
        assert.Equal(t, msg.OrderingKey, recvMsg.OrderingKey())
        assert.Equal(t, msg.Properties, recvMsg.Properties())
 }
+
+func TestProducerSendWithContext(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       // Make ctx be canceled to invalidate the context immediately
+       cancel()
+       _, err = producer.Send(ctx, &ProducerMessage{
+               Payload: make([]byte, 1024*1024),
+       })
+       //  producer.Send should fail and return err context.Canceled
+       assert.True(t, errors.Is(err, context.Canceled))
+}

Reply via email to