This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new be35740 [fix] [issue 877] Fix ctx in partitionProducer.Send() is not
performing as expected (#1053)
be35740 is described below
commit be3574019383ac0cdc65fec63e422fcfd6c82e4b
Author: Jiaqi Shen <[email protected]>
AuthorDate: Tue Jul 11 15:35:26 2023 +0800
[fix] [issue 877] Fix ctx in partitionProducer.Send() is not performing as
expected (#1053)
Fixes #877
### Motivation
The original PR is #878. Because the original author @billowqiu has not
continued to reply to the review comments for a long time, resubmit the fix
here.
### Modifications
- Add select for ctx and doneCh in partitionProducer.Send()
---------
Co-authored-by: shenjiaqi.2769 <[email protected]>
---
pulsar/producer_partition.go | 10 +++++++---
pulsar/producer_test.go | 26 ++++++++++++++++++++++++++
2 files changed, 33 insertions(+), 3 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 03729ab..dd45ff2 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1095,9 +1095,13 @@ func (p *partitionProducer) Send(ctx context.Context,
msg *ProducerMessage) (Mes
}, true)
// wait for send request to finish
- <-doneCh
-
- return msgID, err
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case <-doneCh:
+ // send request has been finished
+ return msgID, err
+ }
}
func (p *partitionProducer) SendAsync(ctx context.Context, msg
*ProducerMessage,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index fecae8e..11ff089 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2024,3 +2024,29 @@ func testSendMessagesWithMetadata(t *testing.T,
disableBatch bool) {
assert.Equal(t, msg.OrderingKey, recvMsg.OrderingKey())
assert.Equal(t, msg.Properties, recvMsg.Properties())
}
+
+func TestProducerSendWithContext(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ // Make ctx be canceled to invalidate the context immediately
+ cancel()
+ _, err = producer.Send(ctx, &ProducerMessage{
+ Payload: make([]byte, 1024*1024),
+ })
+ // producer.Send should fail and return err context.Canceled
+ assert.True(t, errors.Is(err, context.Canceled))
+}