This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a8a2625  consumer add key shared policy support (#363)
a8a2625 is described below

commit a8a2625189616fa0f0046f96588698a8e1cfc75d
Author: syklevin <[email protected]>
AuthorDate: Fri Oct 9 09:54:29 2020 +0800

    consumer add key shared policy support (#363)
    
    Fixes #342
    
    ### Motivation
    
    Add support for KeySharedPolicy with AutoSplit or Sticky mode for consumer, 
which is a useful for some user cases like scalable request-reply pattern.
    
    ### Modifications
    
    add key shared policy options for consumer, and a helpful constructor for 
validating hash range list
---
 pulsar/consumer.go               |   3 ++
 pulsar/consumer_impl.go          |   1 +
 pulsar/consumer_partition.go     |   3 ++
 pulsar/key_shared_policy.go      | 107 +++++++++++++++++++++++++++++++++++++++
 pulsar/key_shared_policy_test.go |  91 +++++++++++++++++++++++++++++++++
 5 files changed, 205 insertions(+)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index a99c030..41ccb91 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -110,6 +110,9 @@ type ConsumerOptions struct {
        // By default is nil and there's no DLQ
        DLQ *DLQPolicy
 
+       // Configuration for Key Shared consumer policy.
+       KeySharedPolicy *KeySharedPolicy
+
        // Auto retry send messages to default filled DLQPolicy topics
        // Default is false
        RetryEnable bool
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 7e266a2..97afbaa 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -310,6 +310,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                subscriptionMode:           durable,
                                readCompacted:              
c.options.ReadCompacted,
                                interceptors:               
c.options.Interceptors,
+                               keySharedPolicy:            
c.options.KeySharedPolicy,
                        }
                        cons, err := newPartitionConsumer(c, c.client, opts, 
c.messageCh, c.dlq)
                        ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 4d10521..882afd7 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -121,6 +121,7 @@ type partitionConsumerOpts struct {
        readCompacted              bool
        disableForceTopicCreation  bool
        interceptors               ConsumerInterceptors
+       keySharedPolicy            *KeySharedPolicy
 }
 
 type partitionConsumer struct {
@@ -831,6 +832,7 @@ func (pc *partitionConsumer) grabConn() error {
 
        subType := toProtoSubType(pc.options.subscriptionType)
        initialPosition := 
toProtoInitialPosition(pc.options.subscriptionInitPos)
+       keySharedMeta := toProtoKeySharedMeta(pc.options.keySharedPolicy)
        requestID := pc.client.rpcClient.NewRequestID()
        cmdSubscribe := &pb.CommandSubscribe{
                Topic:                      proto.String(pc.topic),
@@ -846,6 +848,7 @@ func (pc *partitionConsumer) grabConn() error {
                Schema:                     nil,
                InitialPosition:            initialPosition.Enum(),
                ReplicateSubscriptionState: 
proto.Bool(pc.options.replicateSubscriptionState),
+               KeySharedMeta:              keySharedMeta,
        }
 
        pc.startMessageID = pc.clearReceiverQueue()
diff --git a/pulsar/key_shared_policy.go b/pulsar/key_shared_policy.go
new file mode 100644
index 0000000..30c6dc4
--- /dev/null
+++ b/pulsar/key_shared_policy.go
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "fmt"
+
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+type KeySharedPolicyMode int
+
+const (
+       // KeySharedPolicyModeAutoSplit Auto split hash range key shared policy.
+       KeySharedPolicyModeAutoSplit KeySharedPolicyMode = iota
+       // KeySharedPolicyModeSticky is Sticky attach topic with fixed hash 
range.
+       KeySharedPolicyModeSticky
+)
+
+// KeySharedPolicy for KeyShared subscription
+type KeySharedPolicy struct {
+       //KeySharedPolicyMode
+       Mode KeySharedPolicyMode
+       //HashRanges value pair list
+       HashRanges []int
+       // If enabled, it will relax the ordering requirement, allowing the 
broker to send out-of-order messages in case of
+       // failures. This will make it faster for new consumers to join without 
being stalled by an existing slow consumer.
+       AllowOutOfOrderDelivery bool
+}
+
+// NewKeySharedPolicySticky construct KeySharedPolicy in Sticky mode with
+// hashRanges formed in value pair list: [x1, x2, y1, y2, z1, z2], and must 
not overlap with each others
+func NewKeySharedPolicySticky(hashRanges []int) (*KeySharedPolicy, error) {
+       err := validateHashRanges(hashRanges)
+       if err != nil {
+               return nil, err
+       }
+       return &KeySharedPolicy{
+               Mode:       KeySharedPolicyModeSticky,
+               HashRanges: hashRanges,
+       }, nil
+}
+
+func toProtoKeySharedMeta(ksp *KeySharedPolicy) *pb.KeySharedMeta {
+       if ksp == nil {
+               return nil
+       }
+
+       mode := pb.KeySharedMode(ksp.Mode)
+       meta := &pb.KeySharedMeta{
+               KeySharedMode:           &mode,
+               AllowOutOfOrderDelivery: &ksp.AllowOutOfOrderDelivery,
+       }
+
+       if ksp.Mode == KeySharedPolicyModeSticky {
+               for i := 0; i < len(ksp.HashRanges); i += 2 {
+                       start, end := int32(ksp.HashRanges[i]), 
int32(ksp.HashRanges[i+1])
+                       meta.HashRanges = append(meta.HashRanges, 
&pb.IntRange{Start: &start, End: &end})
+               }
+       }
+
+       return meta
+}
+
+func validateHashRanges(hashRanges []int) error {
+       sz := len(hashRanges)
+       if sz == 0 || sz%2 != 0 {
+               return fmt.Errorf("ranges must not be empty or not in value 
pairs")
+       }
+       var x1, x2, y1, y2 int
+       //check that the ranges are well-formed
+       for i := 0; i < sz; i += 2 {
+               x1, x2 = hashRanges[i], hashRanges[i+1]
+               if x1 >= x2 || x1 < 0 || x2 > 65535 {
+                       return fmt.Errorf("ranges must be in [0, 65535], but 
provided range is, %d - %d", x1, x2)
+               }
+       }
+       //loop again for checking range overlap
+       for i := 0; i < sz; i += 2 {
+               x1, x2 = hashRanges[i], hashRanges[i+1]
+               for j := 0; j < sz; j += 2 {
+                       if j == i {
+                               continue
+                       }
+                       y1, y2 = hashRanges[j], hashRanges[j+1]
+                       if x1 <= y2 && y1 <= x2 {
+                               return fmt.Errorf("ranges with overlap between, 
%d - %d, and %d - %d", x1, x2, y1, y2)
+                       }
+               }
+       }
+       return nil
+}
diff --git a/pulsar/key_shared_policy_test.go b/pulsar/key_shared_policy_test.go
new file mode 100644
index 0000000..af468ac
--- /dev/null
+++ b/pulsar/key_shared_policy_test.go
@@ -0,0 +1,91 @@
+package pulsar
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestNewKeySharedPolicySticky(t *testing.T) {
+       ksp, err := NewKeySharedPolicySticky([]int{0, 10000, 10001, 19999, 
30000, 59999})
+       assert.Nil(t, err)
+       assert.NotNil(t, ksp)
+       assert.Equal(t, ksp.Mode, KeySharedPolicyModeSticky)
+}
+
+func TestValidateHashRanges(t *testing.T) {
+       type args struct {
+               hashRanges []int
+       }
+       tests := []struct {
+               name    string
+               args    args
+               wantErr bool
+       }{
+               {
+                       name: "normal hash ranges",
+                       args: args{
+                               hashRanges: []int{0, 10, 11, 19, 20, 1000, 
2000, 3000, 10000, 20000},
+                       },
+               },
+               {
+                       name: "abnormal hash ranges over max range",
+                       args: args{
+                               hashRanges: []int{1, 300, 2000, 66666},
+                       },
+                       wantErr: true,
+               },
+               {
+                       name: "abnormal hash ranges x1 < x2",
+                       args: args{
+                               hashRanges: []int{10, 1, 2, 10},
+                       },
+                       wantErr: true,
+               },
+               {
+                       name: "abnormal hash ranges not in pairs",
+                       args: args{
+                               hashRanges: []int{1, 3, 5, 10, 11},
+                       },
+                       wantErr: true,
+               },
+               {
+                       name: "abnormal hash ranges overlap 1",
+                       args: args{
+                               hashRanges: []int{1, 3, 3, 6, 9, 12},
+                       },
+                       wantErr: true,
+               },
+               {
+                       name: "abnormal hash ranges overlap 2",
+                       args: args{
+                               hashRanges: []int{1, 8, 6, 12, 7, 10},
+                       },
+                       wantErr: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if err := validateHashRanges(tt.args.hashRanges); (err 
!= nil) != tt.wantErr {
+                               t.Errorf("validateHashRanges() error = %v, 
wantErr %v", err, tt.wantErr)
+                       }
+               })
+       }
+}

Reply via email to