This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a4e5cf  Add single partition router (#999)
3a4e5cf is described below

commit 3a4e5cfbf2c4abe3bf5df137a970ffd70f8efded
Author: crossoverJie <[email protected]>
AuthorDate: Wed Jun 28 10:58:26 2023 +0800

    Add single partition router (#999)
    
    * add single-partition-router
    
    * fix license
    
    * fix ci
    
    * Modify as sync.Once
    
    * Update pulsar/producer_test.go
    
    Co-authored-by: Zixuan Liu <[email protected]>
    
    * Modify with CR
    
    * Verify message was published on single partition
    
    ---------
    
    Co-authored-by: Zixuan Liu <[email protected]>
---
 pulsar/producer_test.go                | 61 +++++++++++++++++++++++++++++++++
 pulsar/single_partition_router.go      | 42 +++++++++++++++++++++++
 pulsar/single_partition_router_test.go | 62 ++++++++++++++++++++++++++++++++++
 3 files changed, 165 insertions(+)

diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index b587975..2721fa3 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -560,6 +560,67 @@ func TestMessageRouter(t *testing.T) {
        assert.NotNil(t, msg)
        assert.Equal(t, string(msg.Payload()), "hello")
 }
+func TestMessageSingleRouter(t *testing.T) {
+       // Create topic with 5 partitions
+       topicAdminURL := 
"admin/v2/persistent/public/default/my-single-partitioned-topic/partitions"
+       err := httpPut(topicAdminURL, 5)
+       defer httpDelete(topicAdminURL)
+       if err != nil {
+               t.Fatal(err)
+       }
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       numOfMessages := 10
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            "my-single-partitioned-topic",
+               SubscriptionName: "my-sub",
+       })
+
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:         "my-single-partitioned-topic",
+               MessageRouter: NewSinglePartitionRouter(),
+       })
+
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+
+       for i := 0; i < numOfMessages; i++ {
+               ID, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte("hello"),
+               })
+               assert.Nil(t, err)
+               assert.NotNil(t, ID)
+       }
+
+       // Verify message was published on single partition
+       msgCount := 0
+       msgPartitionMap := make(map[string]int)
+       for i := 0; i < numOfMessages; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.NotNil(t, msg)
+               consumer.Ack(msg)
+               msgCount++
+               msgPartitionMap[msg.Topic()]++
+       }
+       assert.Equal(t, msgCount, numOfMessages)
+       assert.Equal(t, len(msgPartitionMap), 1)
+       for _, i := range msgPartitionMap {
+               assert.Equal(t, i, numOfMessages)
+       }
+
+}
 
 func TestNonPersistentTopic(t *testing.T) {
        topicName := "non-persistent://public/default/testNonPersistentTopic"
diff --git a/pulsar/single_partition_router.go 
b/pulsar/single_partition_router.go
new file mode 100644
index 0000000..7896aa3
--- /dev/null
+++ b/pulsar/single_partition_router.go
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "sync"
+
+func NewSinglePartitionRouter() func(*ProducerMessage, TopicMetadata) int {
+       var (
+               singlePartition *int
+               once            sync.Once
+       )
+       return func(message *ProducerMessage, metadata TopicMetadata) int {
+               numPartitions := metadata.NumPartitions()
+               if len(message.Key) != 0 {
+                       // When a key is specified, use the hash of that key
+                       return 
int(getHashingFunction(JavaStringHash)(message.Key) % numPartitions)
+               }
+               once.Do(func() {
+                       partition := r.R.Intn(int(numPartitions))
+                       singlePartition = &partition
+               })
+
+               return *singlePartition
+
+       }
+
+}
diff --git a/pulsar/single_partition_router_test.go 
b/pulsar/single_partition_router_test.go
new file mode 100644
index 0000000..a5a4215
--- /dev/null
+++ b/pulsar/single_partition_router_test.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+type topicMetaData struct {
+       partition uint32
+}
+
+func (t topicMetaData) NumPartitions() uint32 {
+       return t.partition
+}
+
+func TestNewSinglePartitionRouter(t *testing.T) {
+       numPartitions := topicMetaData{2}
+       router := NewSinglePartitionRouter()
+       p := router(&ProducerMessage{
+               Payload: []byte("message 2"),
+       }, numPartitions)
+       assert.GreaterOrEqual(t, p, 0)
+
+       p2 := router(&ProducerMessage{
+               Payload: []byte("message 2"),
+       }, numPartitions)
+       assert.Equal(t, p, p2)
+}
+
+func TestNewSinglePartitionRouterWithKey(t *testing.T) {
+       router := NewSinglePartitionRouter()
+       numPartitions := topicMetaData{3}
+       p := router(&ProducerMessage{
+               Payload: []byte("message 2"),
+               Key:     "my-key",
+       }, numPartitions)
+       assert.Equal(t, 1, p)
+
+       p2 := router(&ProducerMessage{
+               Key:     "my-key",
+               Payload: []byte("message 2"),
+       }, numPartitions)
+       assert.Equal(t, p, p2)
+}

Reply via email to