This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b20cc0  [Issue #401] Add orderingKey apis (#427)
9b20cc0 is described below

commit 9b20cc0841602f3767d32b19edb1db55fd0bb209
Author: Rui Fu <[email protected]>
AuthorDate: Wed Dec 23 11:11:59 2020 +0800

    [Issue #401] Add orderingKey apis (#427)
    
    Fixes #401
    
    ### Motivation
    According to apache/pulsar#4079, orderingKey was introduced to let user set 
message order manually, currently pulsar-client-go do not have related apis 
exposed to user. We should add orderingKey related apis to pulsar-client-go.
    
    ### Modifications
    
    - add OrderingKey to ProducerMessage
    - add OrderingKey() to Message interface
    - sync OrderingKey to SingleMessageMetadata
    - tests
---
 go.mod                       |   3 +-
 go.sum                       |   8 +--
 pulsar/consumer_impl.go      |   1 +
 pulsar/consumer_test.go      | 119 ++++++++++++++++++++++++++++++++++++++++++-
 pulsar/default_router.go     |   5 ++
 pulsar/dlq_router.go         |   1 +
 pulsar/impl_message.go       |   5 ++
 pulsar/message.go            |   6 +++
 pulsar/producer_partition.go |   4 ++
 9 files changed, 145 insertions(+), 7 deletions(-)

diff --git a/go.mod b/go.mod
index 817e223..a3727a0 100644
--- a/go.mod
+++ b/go.mod
@@ -10,12 +10,11 @@ require (
        github.com/davecgh/go-spew v1.1.1
        github.com/gogo/protobuf v1.3.1
        github.com/golang/protobuf v1.4.2
+       github.com/google/uuid v1.1.2
        github.com/inconshreveable/mousetrap v1.0.0 // indirect
        github.com/klauspost/compress v1.10.8
        github.com/kr/pretty v0.2.0 // indirect
        github.com/linkedin/goavro/v2 v2.9.8
-       github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // 
indirect
-       github.com/modern-go/reflect2 v1.0.1 // indirect
        github.com/pierrec/lz4 v2.0.5+incompatible
        github.com/pkg/errors v0.9.1
        github.com/prometheus/client_golang v1.7.1
diff --git a/go.sum b/go.sum
index 1bcef52..c89a434 100644
--- a/go.sum
+++ b/go.sum
@@ -1,6 +1,5 @@
 cloud.google.com/go v0.34.0/go.mod 
h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
-github.com/99designs/keyring v1.1.5 
h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
-github.com/99designs/keyring v1.1.5/go.mod 
h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
+github.com/99designs/keyring v1.1.6 
h1:kVDC2uCgVwecxCk+9zoCt2uEL6dt+dfVzMvGgnVcIuM=
 github.com/99designs/keyring v1.1.6/go.mod 
h1:16e0ds7LGQQcT59QqkTg72Hh5ShM51Byv5PEmW6uoRU=
 github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -32,8 +31,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible 
h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod 
h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dimfeld/httptreemux v5.0.1+incompatible 
h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod 
h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
-github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a 
h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
-github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod 
h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
+github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b 
h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ=
 github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b/go.mod 
h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
 github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/fsnotify/fsnotify v1.4.9 
h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
@@ -65,6 +63,8 @@ github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
 github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
 github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
+github.com/google/uuid v1.1.2/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/gorilla/context v1.1.1/go.mod 
h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
 github.com/gorilla/mux v1.7.3/go.mod 
h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c 
h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 8456fd0..370ea12 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -459,6 +459,7 @@ func (c *consumer) ReconsumeLater(msg Message, delay 
time.Duration) {
                        producerMsg: ProducerMessage{
                                Payload:      msg.Payload(),
                                Key:          msg.Key(),
+                               OrderingKey:  msg.OrderingKey(),
                                Properties:   props,
                                DeliverAfter: delay,
                        },
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 6d58cd4..5b9b6e0 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -28,6 +28,7 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
+       "github.com/google/uuid"
        "github.com/stretchr/testify/assert"
 )
 
@@ -1877,5 +1878,121 @@ func 
TestOrderingOfKeyBasedBatchProducerConsumerKeyShared(t *testing.T) {
                receivedMessageIndex++
        }
 
-       // TODO: add OrderingKey support, see GH issue #401
+       // Test OrderingKey
+       for i := 0; i < MsgBatchCount; i++ {
+               for _, k := range keys {
+                       u := uuid.New()
+                       producer.SendAsync(ctx, &ProducerMessage{
+                               Key:         u.String(),
+                               OrderingKey: k,
+                               Payload:     []byte(fmt.Sprintf("value-%d", i)),
+                       }, func(id MessageID, producerMessage *ProducerMessage, 
err error) {
+                               assert.Nil(t, err)
+                       },
+                       )
+               }
+       }
+
+       receivedKey = ""
+       receivedMessageIndex = 0
+       for i := 0; i < len(keys)*MsgBatchCount; i++ {
+               cm, ok := <-consumer1.Chan()
+               if !ok {
+                       break
+               }
+               if receivedKey != cm.OrderingKey() {
+                       receivedKey = cm.OrderingKey()
+                       receivedMessageIndex = 0
+               }
+               assert.Equal(
+                       t, fmt.Sprintf("value-%d", receivedMessageIndex%10),
+                       string(cm.Payload()),
+               )
+               consumer1.Ack(cm.Message)
+               receivedMessageIndex++
+       }
+
+}
+
+func TestConsumerKeySharedWithOrderingKey(t *testing.T) {
+       client, err := NewClient(
+               ClientOptions{
+                       URL: lookupURL,
+               },
+       )
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := "persistent://public/default/test-key-shared-with-ordering-key"
+
+       consumer1, err := client.Subscribe(
+               ConsumerOptions{
+                       Topic:            topic,
+                       SubscriptionName: "sub-1",
+                       Type:             KeyShared,
+               },
+       )
+       assert.Nil(t, err)
+       defer consumer1.Close()
+
+       consumer2, err := client.Subscribe(
+               ConsumerOptions{
+                       Topic:            topic,
+                       SubscriptionName: "sub-1",
+                       Type:             KeyShared,
+               },
+       )
+       assert.Nil(t, err)
+       defer consumer2.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(
+               ProducerOptions{
+                       Topic:           topic,
+                       DisableBatching: true,
+               },
+       )
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+       for i := 0; i < 100; i++ {
+               u := uuid.New()
+               _, err := producer.Send(
+                       ctx, &ProducerMessage{
+                               Key:         u.String(),
+                               OrderingKey: fmt.Sprintf("key-shared-%d", i%3),
+                               Payload:     []byte(fmt.Sprintf("value-%d", i)),
+                       },
+               )
+               assert.Nil(t, err)
+       }
+
+       receivedConsumer1 := 0
+       receivedConsumer2 := 0
+       for (receivedConsumer1 + receivedConsumer2) < 100 {
+               select {
+               case cm, ok := <-consumer1.Chan():
+                       if !ok {
+                               break
+                       }
+                       receivedConsumer1++
+                       consumer1.Ack(cm.Message)
+               case cm, ok := <-consumer2.Chan():
+                       if !ok {
+                               break
+                       }
+                       receivedConsumer2++
+                       consumer2.Ack(cm.Message)
+               }
+       }
+
+       assert.NotEqual(t, 0, receivedConsumer1)
+       assert.NotEqual(t, 0, receivedConsumer2)
+
+       fmt.Printf(
+               "TestConsumerKeySharedWithOrderingKey received messages 
consumer1: %d consumser2: %d\n",
+               receivedConsumer1, receivedConsumer2,
+       )
+       assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
 }
diff --git a/pulsar/default_router.go b/pulsar/default_router.go
index 0e1a354..b5e24a6 100644
--- a/pulsar/default_router.go
+++ b/pulsar/default_router.go
@@ -55,6 +55,11 @@ func NewDefaultRouter(
                        return 0
                }
 
+               if len(message.OrderingKey) != 0 {
+                       // When an OrderingKey is specified, use the hash of 
that key
+                       return int(hashFunc(message.OrderingKey) % 
numPartitions)
+               }
+
                if len(message.Key) != 0 {
                        // When a key is specified, use the hash of that key
                        return int(hashFunc(message.Key) % numPartitions)
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index b4d98b0..40761e2 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -97,6 +97,7 @@ func (r *dlqRouter) run() {
                        producer.SendAsync(context.Background(), 
&ProducerMessage{
                                Payload:             msg.Payload(),
                                Key:                 msg.Key(),
+                               OrderingKey:         msg.OrderingKey(),
                                Properties:          msg.Properties(),
                                EventTime:           msg.EventTime(),
                                ReplicationClusters: msg.replicationClusters,
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 27cb0ed..6fc9cad 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -170,6 +170,7 @@ type message struct {
        publishTime         time.Time
        eventTime           time.Time
        key                 string
+       orderingKey         string
        producerName        string
        payLoad             []byte
        msgID               MessageID
@@ -209,6 +210,10 @@ func (msg *message) Key() string {
        return msg.key
 }
 
+func (msg *message) OrderingKey() string {
+       return msg.orderingKey
+}
+
 func (msg *message) RedeliveryCount() uint32 {
        return msg.redeliveryCount
 }
diff --git a/pulsar/message.go b/pulsar/message.go
index a3b2257..397c51e 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -33,6 +33,9 @@ type ProducerMessage struct {
        // Key sets the key of the message for routing policy
        Key string
 
+       // OrderingKey sets the ordering key of the message
+       OrderingKey string
+
        // Properties attach application defined properties on the message
        Properties map[string]string
 
@@ -93,6 +96,9 @@ type Message interface {
        // Key get the key of the message, if any
        Key() string
 
+       // OrderingKey get the ordering key of the message, if any
+       OrderingKey() string
+
        // Get message redelivery count, redelivery count maintain in pulsar 
broker. When client nack acknowledge messages,
        // broker will dispatch message again with message redelivery count in 
CommandMessage defined.
        //
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 622c53e..f247e26 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -370,6 +370,10 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                smm.PartitionKey = proto.String(msg.Key)
        }
 
+       if len(msg.OrderingKey) != 0 {
+               smm.OrderingKey = []byte(msg.OrderingKey)
+       }
+
        if msg.Properties != nil {
                smm.Properties = internal.ConvertFromStringMap(msg.Properties)
        }

Reply via email to