This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3da3e5d  [fix] Fix ordering key not being set and parsed when batching 
is disabled (#1034)
3da3e5d is described below

commit 3da3e5d43134e7ccca933c6a80d6a20fb926e05a
Author: Zike Yang <[email protected]>
AuthorDate: Wed Jun 28 11:06:57 2023 +0800

    [fix] Fix ordering key not being set and parsed when batching is disabled 
(#1034)
---
 pulsar/consumer_partition.go |  1 +
 pulsar/producer_partition.go |  4 ++++
 pulsar/producer_test.go      | 22 ++++++++++++++++++----
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 2d2a194..cd7aa50 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1190,6 +1190,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                                redeliveryCount:     
response.GetRedeliveryCount(),
                                schemaVersion:       msgMeta.GetSchemaVersion(),
                                schemaInfoCache:     pc.schemaInfoCache,
+                               orderingKey:         
string(msgMeta.GetOrderingKey()),
                                index:               messageIndex,
                                brokerPublishTime:   brokerPublishTime,
                        }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 6bd9081..837d1d7 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -733,6 +733,10 @@ func (p *partitionProducer) genMetadata(msg 
*ProducerMessage,
                mm.PartitionKey = proto.String(msg.Key)
        }
 
+       if len(msg.OrderingKey) != 0 {
+               mm.OrderingKey = []byte(msg.OrderingKey)
+       }
+
        if msg.Properties != nil {
                mm.Properties = internal.ConvertFromStringMap(msg.Properties)
        }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 2721fa3..7c3abdc 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1967,7 +1967,15 @@ func TestMemLimitContextCancel(t *testing.T) {
        assert.NoError(t, err)
 }
 
-func TestSendMessagesWithMetadata(t *testing.T) {
+func TestBatchSendMessagesWithMetadata(t *testing.T) {
+       testSendMessagesWithMetadata(t, false)
+}
+
+func TestNoBatchSendMessagesWithMetadata(t *testing.T) {
+       testSendMessagesWithMetadata(t, true)
+}
+
+func testSendMessagesWithMetadata(t *testing.T, disableBatch bool) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
        })
@@ -1978,7 +1986,7 @@ func TestSendMessagesWithMetadata(t *testing.T) {
        topic := newTopicName()
        producer, err := client.CreateProducer(ProducerOptions{
                Topic:           topic,
-               DisableBatching: true,
+               DisableBatching: disableBatch,
        })
        assert.Nil(t, err)
 
@@ -1989,7 +1997,10 @@ func TestSendMessagesWithMetadata(t *testing.T) {
        assert.Nil(t, err)
 
        msg := &ProducerMessage{EventTime: time.Now().Local(),
-               Payload: []byte("msg")}
+               Key:         "my-key",
+               OrderingKey: "my-ordering-key",
+               Properties:  map[string]string{"k1": "v1", "k2": "v2"},
+               Payload:     []byte("msg")}
 
        _, err = producer.Send(context.Background(), msg)
        assert.Nil(t, err)
@@ -1997,5 +2008,8 @@ func TestSendMessagesWithMetadata(t *testing.T) {
        recvMsg, err := consumer.Receive(context.Background())
        assert.Nil(t, err)
 
-       assert.Equal(t, internal.TimestampMillis(recvMsg.EventTime()), 
internal.TimestampMillis(msg.EventTime))
+       assert.Equal(t, internal.TimestampMillis(msg.EventTime), 
internal.TimestampMillis(recvMsg.EventTime()))
+       assert.Equal(t, msg.Key, recvMsg.Key())
+       assert.Equal(t, msg.OrderingKey, recvMsg.OrderingKey())
+       assert.Equal(t, msg.Properties, recvMsg.Properties())
 }

Reply via email to