This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d45137  [Fix][Producer] check if message is nil (#1047)
8d45137 is described below

commit 8d4513787a25423c988708dff985e9c994545df5
Author: gunli <[email protected]>
AuthorDate: Thu Jul 6 16:29:15 2023 +0800

    [Fix][Producer] check if message is nil (#1047)
    
    * [Fix][Producer] check if message is nil
    
    * add a debug log
    
    ---------
    
    Co-authored-by: gunli <[email protected]>
---
 pulsar/producer_partition.go |  6 ++++++
 pulsar/producer_test.go      | 11 +++++++++++
 2 files changed, 17 insertions(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 595cab4..03729ab 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1107,6 +1107,12 @@ func (p *partitionProducer) SendAsync(ctx 
context.Context, msg *ProducerMessage,
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
        callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
+       if msg == nil {
+               p.log.Error("Message is nil")
+               runCallback(callback, nil, msg, newError(InvalidMessage, 
"Message is nil"))
+               return
+       }
+
        // Register transaction operation to transaction and the transaction 
coordinator.
        var newCallback func(MessageID, *ProducerMessage, error)
        var txn *transaction
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 7c3abdc..fecae8e 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -108,6 +108,9 @@ func TestSimpleProducer(t *testing.T) {
                assert.NoError(t, err)
                assert.NotNil(t, ID)
        }
+
+       _, err = producer.Send(context.Background(), nil)
+       assert.NotNil(t, err)
 }
 
 func TestProducerAsyncSend(t *testing.T) {
@@ -152,6 +155,14 @@ func TestProducerAsyncSend(t *testing.T) {
        wg.Wait()
 
        assert.Equal(t, 0, errors.Size())
+
+       wg.Add(1)
+       producer.SendAsync(context.Background(), nil, func(id MessageID, m 
*ProducerMessage, e error) {
+               assert.NotNil(t, e)
+               assert.Nil(t, id)
+               wg.Done()
+       })
+       wg.Wait()
 }
 
 func TestProducerCompression(t *testing.T) {

Reply via email to