This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a8a2625 consumer add key shared policy support (#363)
a8a2625 is described below
commit a8a2625189616fa0f0046f96588698a8e1cfc75d
Author: syklevin <[email protected]>
AuthorDate: Fri Oct 9 09:54:29 2020 +0800
consumer add key shared policy support (#363)
Fixes #342
### Motivation
Add support for KeySharedPolicy with AutoSplit or Sticky mode for consumer,
which is a useful for some user cases like scalable request-reply pattern.
### Modifications
add key shared policy options for consumer, and a helpful constructor for
validating hash range list
---
pulsar/consumer.go | 3 ++
pulsar/consumer_impl.go | 1 +
pulsar/consumer_partition.go | 3 ++
pulsar/key_shared_policy.go | 107 +++++++++++++++++++++++++++++++++++++++
pulsar/key_shared_policy_test.go | 91 +++++++++++++++++++++++++++++++++
5 files changed, 205 insertions(+)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index a99c030..41ccb91 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -110,6 +110,9 @@ type ConsumerOptions struct {
// By default is nil and there's no DLQ
DLQ *DLQPolicy
+ // Configuration for Key Shared consumer policy.
+ KeySharedPolicy *KeySharedPolicy
+
// Auto retry send messages to default filled DLQPolicy topics
// Default is false
RetryEnable bool
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 7e266a2..97afbaa 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -310,6 +310,7 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
subscriptionMode: durable,
readCompacted:
c.options.ReadCompacted,
interceptors:
c.options.Interceptors,
+ keySharedPolicy:
c.options.KeySharedPolicy,
}
cons, err := newPartitionConsumer(c, c.client, opts,
c.messageCh, c.dlq)
ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 4d10521..882afd7 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -121,6 +121,7 @@ type partitionConsumerOpts struct {
readCompacted bool
disableForceTopicCreation bool
interceptors ConsumerInterceptors
+ keySharedPolicy *KeySharedPolicy
}
type partitionConsumer struct {
@@ -831,6 +832,7 @@ func (pc *partitionConsumer) grabConn() error {
subType := toProtoSubType(pc.options.subscriptionType)
initialPosition :=
toProtoInitialPosition(pc.options.subscriptionInitPos)
+ keySharedMeta := toProtoKeySharedMeta(pc.options.keySharedPolicy)
requestID := pc.client.rpcClient.NewRequestID()
cmdSubscribe := &pb.CommandSubscribe{
Topic: proto.String(pc.topic),
@@ -846,6 +848,7 @@ func (pc *partitionConsumer) grabConn() error {
Schema: nil,
InitialPosition: initialPosition.Enum(),
ReplicateSubscriptionState:
proto.Bool(pc.options.replicateSubscriptionState),
+ KeySharedMeta: keySharedMeta,
}
pc.startMessageID = pc.clearReceiverQueue()
diff --git a/pulsar/key_shared_policy.go b/pulsar/key_shared_policy.go
new file mode 100644
index 0000000..30c6dc4
--- /dev/null
+++ b/pulsar/key_shared_policy.go
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "fmt"
+
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+type KeySharedPolicyMode int
+
+const (
+ // KeySharedPolicyModeAutoSplit Auto split hash range key shared policy.
+ KeySharedPolicyModeAutoSplit KeySharedPolicyMode = iota
+ // KeySharedPolicyModeSticky is Sticky attach topic with fixed hash
range.
+ KeySharedPolicyModeSticky
+)
+
+// KeySharedPolicy for KeyShared subscription
+type KeySharedPolicy struct {
+ //KeySharedPolicyMode
+ Mode KeySharedPolicyMode
+ //HashRanges value pair list
+ HashRanges []int
+ // If enabled, it will relax the ordering requirement, allowing the
broker to send out-of-order messages in case of
+ // failures. This will make it faster for new consumers to join without
being stalled by an existing slow consumer.
+ AllowOutOfOrderDelivery bool
+}
+
+// NewKeySharedPolicySticky construct KeySharedPolicy in Sticky mode with
+// hashRanges formed in value pair list: [x1, x2, y1, y2, z1, z2], and must
not overlap with each others
+func NewKeySharedPolicySticky(hashRanges []int) (*KeySharedPolicy, error) {
+ err := validateHashRanges(hashRanges)
+ if err != nil {
+ return nil, err
+ }
+ return &KeySharedPolicy{
+ Mode: KeySharedPolicyModeSticky,
+ HashRanges: hashRanges,
+ }, nil
+}
+
+func toProtoKeySharedMeta(ksp *KeySharedPolicy) *pb.KeySharedMeta {
+ if ksp == nil {
+ return nil
+ }
+
+ mode := pb.KeySharedMode(ksp.Mode)
+ meta := &pb.KeySharedMeta{
+ KeySharedMode: &mode,
+ AllowOutOfOrderDelivery: &ksp.AllowOutOfOrderDelivery,
+ }
+
+ if ksp.Mode == KeySharedPolicyModeSticky {
+ for i := 0; i < len(ksp.HashRanges); i += 2 {
+ start, end := int32(ksp.HashRanges[i]),
int32(ksp.HashRanges[i+1])
+ meta.HashRanges = append(meta.HashRanges,
&pb.IntRange{Start: &start, End: &end})
+ }
+ }
+
+ return meta
+}
+
+func validateHashRanges(hashRanges []int) error {
+ sz := len(hashRanges)
+ if sz == 0 || sz%2 != 0 {
+ return fmt.Errorf("ranges must not be empty or not in value
pairs")
+ }
+ var x1, x2, y1, y2 int
+ //check that the ranges are well-formed
+ for i := 0; i < sz; i += 2 {
+ x1, x2 = hashRanges[i], hashRanges[i+1]
+ if x1 >= x2 || x1 < 0 || x2 > 65535 {
+ return fmt.Errorf("ranges must be in [0, 65535], but
provided range is, %d - %d", x1, x2)
+ }
+ }
+ //loop again for checking range overlap
+ for i := 0; i < sz; i += 2 {
+ x1, x2 = hashRanges[i], hashRanges[i+1]
+ for j := 0; j < sz; j += 2 {
+ if j == i {
+ continue
+ }
+ y1, y2 = hashRanges[j], hashRanges[j+1]
+ if x1 <= y2 && y1 <= x2 {
+ return fmt.Errorf("ranges with overlap between,
%d - %d, and %d - %d", x1, x2, y1, y2)
+ }
+ }
+ }
+ return nil
+}
diff --git a/pulsar/key_shared_policy_test.go b/pulsar/key_shared_policy_test.go
new file mode 100644
index 0000000..af468ac
--- /dev/null
+++ b/pulsar/key_shared_policy_test.go
@@ -0,0 +1,91 @@
+package pulsar
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewKeySharedPolicySticky(t *testing.T) {
+ ksp, err := NewKeySharedPolicySticky([]int{0, 10000, 10001, 19999,
30000, 59999})
+ assert.Nil(t, err)
+ assert.NotNil(t, ksp)
+ assert.Equal(t, ksp.Mode, KeySharedPolicyModeSticky)
+}
+
+func TestValidateHashRanges(t *testing.T) {
+ type args struct {
+ hashRanges []int
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr bool
+ }{
+ {
+ name: "normal hash ranges",
+ args: args{
+ hashRanges: []int{0, 10, 11, 19, 20, 1000,
2000, 3000, 10000, 20000},
+ },
+ },
+ {
+ name: "abnormal hash ranges over max range",
+ args: args{
+ hashRanges: []int{1, 300, 2000, 66666},
+ },
+ wantErr: true,
+ },
+ {
+ name: "abnormal hash ranges x1 < x2",
+ args: args{
+ hashRanges: []int{10, 1, 2, 10},
+ },
+ wantErr: true,
+ },
+ {
+ name: "abnormal hash ranges not in pairs",
+ args: args{
+ hashRanges: []int{1, 3, 5, 10, 11},
+ },
+ wantErr: true,
+ },
+ {
+ name: "abnormal hash ranges overlap 1",
+ args: args{
+ hashRanges: []int{1, 3, 3, 6, 9, 12},
+ },
+ wantErr: true,
+ },
+ {
+ name: "abnormal hash ranges overlap 2",
+ args: args{
+ hashRanges: []int{1, 8, 6, 12, 7, 10},
+ },
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if err := validateHashRanges(tt.args.hashRanges); (err
!= nil) != tt.wantErr {
+ t.Errorf("validateHashRanges() error = %v,
wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}