This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3a4e5cf Add single partition router (#999)
3a4e5cf is described below
commit 3a4e5cfbf2c4abe3bf5df137a970ffd70f8efded
Author: crossoverJie <[email protected]>
AuthorDate: Wed Jun 28 10:58:26 2023 +0800
Add single partition router (#999)
* add single-partition-router
* fix license
* fix ci
* Modify as sync.Once
* Update pulsar/producer_test.go
Co-authored-by: Zixuan Liu <[email protected]>
* Modify with CR
* Verify message was published on single partition
---------
Co-authored-by: Zixuan Liu <[email protected]>
---
pulsar/producer_test.go | 61 +++++++++++++++++++++++++++++++++
pulsar/single_partition_router.go | 42 +++++++++++++++++++++++
pulsar/single_partition_router_test.go | 62 ++++++++++++++++++++++++++++++++++
3 files changed, 165 insertions(+)
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index b587975..2721fa3 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -560,6 +560,67 @@ func TestMessageRouter(t *testing.T) {
assert.NotNil(t, msg)
assert.Equal(t, string(msg.Payload()), "hello")
}
+func TestMessageSingleRouter(t *testing.T) {
+ // Create topic with 5 partitions
+ topicAdminURL :=
"admin/v2/persistent/public/default/my-single-partitioned-topic/partitions"
+ err := httpPut(topicAdminURL, 5)
+ defer httpDelete(topicAdminURL)
+ if err != nil {
+ t.Fatal(err)
+ }
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ numOfMessages := 10
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: "my-single-partitioned-topic",
+ SubscriptionName: "my-sub",
+ })
+
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: "my-single-partitioned-topic",
+ MessageRouter: NewSinglePartitionRouter(),
+ })
+
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+
+ for i := 0; i < numOfMessages; i++ {
+ ID, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, ID)
+ }
+
+ // Verify message was published on single partition
+ msgCount := 0
+ msgPartitionMap := make(map[string]int)
+ for i := 0; i < numOfMessages; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ consumer.Ack(msg)
+ msgCount++
+ msgPartitionMap[msg.Topic()]++
+ }
+ assert.Equal(t, msgCount, numOfMessages)
+ assert.Equal(t, len(msgPartitionMap), 1)
+ for _, i := range msgPartitionMap {
+ assert.Equal(t, i, numOfMessages)
+ }
+
+}
func TestNonPersistentTopic(t *testing.T) {
topicName := "non-persistent://public/default/testNonPersistentTopic"
diff --git a/pulsar/single_partition_router.go
b/pulsar/single_partition_router.go
new file mode 100644
index 0000000..7896aa3
--- /dev/null
+++ b/pulsar/single_partition_router.go
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "sync"
+
+func NewSinglePartitionRouter() func(*ProducerMessage, TopicMetadata) int {
+ var (
+ singlePartition *int
+ once sync.Once
+ )
+ return func(message *ProducerMessage, metadata TopicMetadata) int {
+ numPartitions := metadata.NumPartitions()
+ if len(message.Key) != 0 {
+ // When a key is specified, use the hash of that key
+ return
int(getHashingFunction(JavaStringHash)(message.Key) % numPartitions)
+ }
+ once.Do(func() {
+ partition := r.R.Intn(int(numPartitions))
+ singlePartition = &partition
+ })
+
+ return *singlePartition
+
+ }
+
+}
diff --git a/pulsar/single_partition_router_test.go
b/pulsar/single_partition_router_test.go
new file mode 100644
index 0000000..a5a4215
--- /dev/null
+++ b/pulsar/single_partition_router_test.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+type topicMetaData struct {
+ partition uint32
+}
+
+func (t topicMetaData) NumPartitions() uint32 {
+ return t.partition
+}
+
+func TestNewSinglePartitionRouter(t *testing.T) {
+ numPartitions := topicMetaData{2}
+ router := NewSinglePartitionRouter()
+ p := router(&ProducerMessage{
+ Payload: []byte("message 2"),
+ }, numPartitions)
+ assert.GreaterOrEqual(t, p, 0)
+
+ p2 := router(&ProducerMessage{
+ Payload: []byte("message 2"),
+ }, numPartitions)
+ assert.Equal(t, p, p2)
+}
+
+func TestNewSinglePartitionRouterWithKey(t *testing.T) {
+ router := NewSinglePartitionRouter()
+ numPartitions := topicMetaData{3}
+ p := router(&ProducerMessage{
+ Payload: []byte("message 2"),
+ Key: "my-key",
+ }, numPartitions)
+ assert.Equal(t, 1, p)
+
+ p2 := router(&ProducerMessage{
+ Key: "my-key",
+ Payload: []byte("message 2"),
+ }, numPartitions)
+ assert.Equal(t, p, p2)
+}