This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 8d45137 [Fix][Producer] check if message is nil (#1047)
8d45137 is described below
commit 8d4513787a25423c988708dff985e9c994545df5
Author: gunli <[email protected]>
AuthorDate: Thu Jul 6 16:29:15 2023 +0800
[Fix][Producer] check if message is nil (#1047)
* [Fix][Producer] check if message is nil
* add a debug log
---------
Co-authored-by: gunli <[email protected]>
---
pulsar/producer_partition.go | 6 ++++++
pulsar/producer_test.go | 11 +++++++++++
2 files changed, 17 insertions(+)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 595cab4..03729ab 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1107,6 +1107,12 @@ func (p *partitionProducer) SendAsync(ctx
context.Context, msg *ProducerMessage,
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
+ if msg == nil {
+ p.log.Error("Message is nil")
+ runCallback(callback, nil, msg, newError(InvalidMessage,
"Message is nil"))
+ return
+ }
+
// Register transaction operation to transaction and the transaction
coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
var txn *transaction
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 7c3abdc..fecae8e 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -108,6 +108,9 @@ func TestSimpleProducer(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, ID)
}
+
+ _, err = producer.Send(context.Background(), nil)
+ assert.NotNil(t, err)
}
func TestProducerAsyncSend(t *testing.T) {
@@ -152,6 +155,14 @@ func TestProducerAsyncSend(t *testing.T) {
wg.Wait()
assert.Equal(t, 0, errors.Size())
+
+ wg.Add(1)
+ producer.SendAsync(context.Background(), nil, func(id MessageID, m
*ProducerMessage, e error) {
+ assert.NotNil(t, e)
+ assert.Nil(t, id)
+ wg.Done()
+ })
+ wg.Wait()
}
func TestProducerCompression(t *testing.T) {