This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3da3e5d [fix] Fix ordering key not being set and parsed when batching
is disabled (#1034)
3da3e5d is described below
commit 3da3e5d43134e7ccca933c6a80d6a20fb926e05a
Author: Zike Yang <[email protected]>
AuthorDate: Wed Jun 28 11:06:57 2023 +0800
[fix] Fix ordering key not being set and parsed when batching is disabled
(#1034)
---
pulsar/consumer_partition.go | 1 +
pulsar/producer_partition.go | 4 ++++
pulsar/producer_test.go | 22 ++++++++++++++++++----
3 files changed, 23 insertions(+), 4 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 2d2a194..cd7aa50 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1190,6 +1190,7 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
redeliveryCount:
response.GetRedeliveryCount(),
schemaVersion: msgMeta.GetSchemaVersion(),
schemaInfoCache: pc.schemaInfoCache,
+ orderingKey:
string(msgMeta.GetOrderingKey()),
index: messageIndex,
brokerPublishTime: brokerPublishTime,
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 6bd9081..837d1d7 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -733,6 +733,10 @@ func (p *partitionProducer) genMetadata(msg
*ProducerMessage,
mm.PartitionKey = proto.String(msg.Key)
}
+ if len(msg.OrderingKey) != 0 {
+ mm.OrderingKey = []byte(msg.OrderingKey)
+ }
+
if msg.Properties != nil {
mm.Properties = internal.ConvertFromStringMap(msg.Properties)
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 2721fa3..7c3abdc 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1967,7 +1967,15 @@ func TestMemLimitContextCancel(t *testing.T) {
assert.NoError(t, err)
}
-func TestSendMessagesWithMetadata(t *testing.T) {
+func TestBatchSendMessagesWithMetadata(t *testing.T) {
+ testSendMessagesWithMetadata(t, false)
+}
+
+func TestNoBatchSendMessagesWithMetadata(t *testing.T) {
+ testSendMessagesWithMetadata(t, true)
+}
+
+func testSendMessagesWithMetadata(t *testing.T, disableBatch bool) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
@@ -1978,7 +1986,7 @@ func TestSendMessagesWithMetadata(t *testing.T) {
topic := newTopicName()
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
- DisableBatching: true,
+ DisableBatching: disableBatch,
})
assert.Nil(t, err)
@@ -1989,7 +1997,10 @@ func TestSendMessagesWithMetadata(t *testing.T) {
assert.Nil(t, err)
msg := &ProducerMessage{EventTime: time.Now().Local(),
- Payload: []byte("msg")}
+ Key: "my-key",
+ OrderingKey: "my-ordering-key",
+ Properties: map[string]string{"k1": "v1", "k2": "v2"},
+ Payload: []byte("msg")}
_, err = producer.Send(context.Background(), msg)
assert.Nil(t, err)
@@ -1997,5 +2008,8 @@ func TestSendMessagesWithMetadata(t *testing.T) {
recvMsg, err := consumer.Receive(context.Background())
assert.Nil(t, err)
- assert.Equal(t, internal.TimestampMillis(recvMsg.EventTime()),
internal.TimestampMillis(msg.EventTime))
+ assert.Equal(t, internal.TimestampMillis(msg.EventTime),
internal.TimestampMillis(recvMsg.EventTime()))
+ assert.Equal(t, msg.Key, recvMsg.Key())
+ assert.Equal(t, msg.OrderingKey, recvMsg.OrderingKey())
+ assert.Equal(t, msg.Properties, recvMsg.Properties())
}