This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 27e3625 [fix] Fix event time not being set when batching is disabled
(#1015)
27e3625 is described below
commit 27e36250f35094b203483c946bc4406c6119ce62
Author: Zike Yang <[email protected]>
AuthorDate: Wed May 17 18:18:20 2023 +0800
[fix] Fix event time not being set when batching is disabled (#1015)
Fixes #1013
### Motivation
The event time is not set when batching is disabled. The event time will be
lost.
This is a regression bug in 0.10.0.
### Modifications
* Set the event time when sending single message
---
pulsar/producer_partition.go | 4 ++++
pulsar/producer_test.go | 33 +++++++++++++++++++++++++++++++++
2 files changed, 37 insertions(+)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index e77c929..fc67f51 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -735,6 +735,10 @@ func (p *partitionProducer) genMetadata(msg
*ProducerMessage,
UncompressedSize: proto.Uint32(uint32(uncompressedSize)),
}
+ if !msg.EventTime.IsZero() {
+ mm.EventTime =
proto.Uint64(internal.TimestampMillis(msg.EventTime))
+ }
+
if msg.Key != "" {
mm.PartitionKey = proto.String(msg.Key)
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index a6f5e39..b587975 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1905,3 +1905,36 @@ func TestMemLimitContextCancel(t *testing.T) {
})
assert.NoError(t, err)
}
+
+func TestSendMessagesWithMetadata(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ })
+ assert.Nil(t, err)
+
+ msg := &ProducerMessage{EventTime: time.Now().Local(),
+ Payload: []byte("msg")}
+
+ _, err = producer.Send(context.Background(), msg)
+ assert.Nil(t, err)
+
+ recvMsg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+
+ assert.Equal(t, internal.TimestampMillis(recvMsg.EventTime()),
internal.TimestampMillis(msg.EventTime))
+}