This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 27e3625  [fix] Fix event time not being set when batching is disabled 
(#1015)
27e3625 is described below

commit 27e36250f35094b203483c946bc4406c6119ce62
Author: Zike Yang <[email protected]>
AuthorDate: Wed May 17 18:18:20 2023 +0800

    [fix] Fix event time not being set when batching is disabled (#1015)
    
    Fixes #1013
    
    ### Motivation
    
    The event time is not set when batching is disabled. The event time will be 
lost.
    This is a regression bug in 0.10.0.
    
    ### Modifications
    
    * Set the event time when sending single message
---
 pulsar/producer_partition.go |  4 ++++
 pulsar/producer_test.go      | 33 +++++++++++++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e77c929..fc67f51 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -735,6 +735,10 @@ func (p *partitionProducer) genMetadata(msg 
*ProducerMessage,
                UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
        }
 
+       if !msg.EventTime.IsZero() {
+               mm.EventTime = 
proto.Uint64(internal.TimestampMillis(msg.EventTime))
+       }
+
        if msg.Key != "" {
                mm.PartitionKey = proto.String(msg.Key)
        }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index a6f5e39..b587975 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1905,3 +1905,36 @@ func TestMemLimitContextCancel(t *testing.T) {
        })
        assert.NoError(t, err)
 }
+
+func TestSendMessagesWithMetadata(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub",
+       })
+       assert.Nil(t, err)
+
+       msg := &ProducerMessage{EventTime: time.Now().Local(),
+               Payload: []byte("msg")}
+
+       _, err = producer.Send(context.Background(), msg)
+       assert.Nil(t, err)
+
+       recvMsg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+
+       assert.Equal(t, internal.TimestampMillis(recvMsg.EventTime()), 
internal.TimestampMillis(msg.EventTime))
+}

Reply via email to