This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 367984a  Removed unacked messages tracker (#90)
367984a is described below

commit 367984a8dabec7efc8185fd88a366b1535774725
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Nov 11 21:04:56 2019 -0800

    Removed unacked messages tracker (#90)
    
    * Removed unacked messages tracker
    
    * Fixed tests
---
 pulsar/consumer.go                 |   9 --
 pulsar/consumer_impl.go            |   9 +-
 pulsar/consumer_test.go            |   2 -
 pulsar/unacked_msg_tracker.go      | 199 -------------------------------------
 pulsar/unacked_msg_tracker_test.go |  66 ------------
 5 files changed, 1 insertion(+), 284 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 3b729ec..b0bba1b 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -19,7 +19,6 @@ package pulsar
 
 import (
        "context"
-       "time"
 )
 
 // Pair of a Consumer and Message
@@ -82,11 +81,6 @@ type ConsumerOptions struct {
        // This properties will be visible in the topic stats
        Properties map[string]string
 
-       // Set the timeout for unacked messages
-       // Message not acknowledged within the give time, will be replayed by 
the broker to the same or a different consumer
-       // Default is 0, which means message are not being replayed based on 
ack time
-       AckTimeout time.Duration
-
        // Select the subscription type to be used when subscribing to the 
topic.
        // Default is `Exclusive`
        Type SubscriptionType
@@ -128,9 +122,6 @@ type ConsumerOptions struct {
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
 type Consumer interface {
-       // Topic get the topic for the consumer
-       Topic() string
-
        // Subscription get a subscription for the consumer
        Subscription() string
 
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index af9b8d5..8b73631 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -34,8 +34,6 @@ import (
 var ErrConsumerClosed = errors.New("consumer closed")
 
 type consumer struct {
-       topic string
-
        options ConsumerOptions
 
        consumers []*partitionConsumer
@@ -88,7 +86,6 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
 func topicSubscribe(client *client, options ConsumerOptions, topic string,
        messageCh chan ConsumerMessage) (Consumer, error) {
        consumer := &consumer{
-               topic:     topic,
                messageCh: messageCh,
                errorCh:   make(chan error),
                log:       log.WithField("topic", topic),
@@ -165,10 +162,6 @@ func topicSubscribe(client *client, options 
ConsumerOptions, topic string,
        return consumer, nil
 }
 
-func (c *consumer) Topic() string {
-       return c.topic
-}
-
 func (c *consumer) Subscription() string {
        return c.options.SubscriptionName
 }
@@ -177,7 +170,7 @@ func (c *consumer) Unsubscribe() error {
        var errMsg string
        for _, consumer := range c.consumers {
                if err := consumer.Unsubscribe(); err != nil {
-                       errMsg += fmt.Sprintf("topic %s, subscription %s: %s", 
c.Topic(), c.Subscription(), err)
+                       errMsg += fmt.Sprintf("topic %s, subscription %s: %s", 
consumer.topic, c.Subscription(), err)
                }
        }
        if errMsg != "" {
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index bbc1315..33c2f15 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -149,7 +149,6 @@ func TestBatchMessageReceive(t *testing.T) {
                SubscriptionName: subName,
        })
        assert.Nil(t, err)
-       assert.Equal(t, topicName, consumer.Topic())
        count := 0
 
        for i := 0; i < numOfMessages; i++ {
@@ -397,7 +396,6 @@ func TestConsumerReceiveTimeout(t *testing.T) {
                Topic:            topic,
                SubscriptionName: "my-sub1",
                Type:             Shared,
-               AckTimeout:       5 * 1000,
        })
        assert.Nil(t, err)
        defer consumer.Close()
diff --git a/pulsar/unacked_msg_tracker.go b/pulsar/unacked_msg_tracker.go
deleted file mode 100644
index 5dfe895..0000000
--- a/pulsar/unacked_msg_tracker.go
+++ /dev/null
@@ -1,199 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
-       "sync"
-       "time"
-
-       "github.com/golang/protobuf/proto"
-
-       set "github.com/deckarep/golang-set"
-       log "github.com/sirupsen/logrus"
-
-       "github.com/apache/pulsar-client-go/pkg/pb"
-)
-
-type UnackedMessageTracker struct {
-       cmu        sync.RWMutex // protects following
-       currentSet set.Set
-       oldOpenSet set.Set
-       timeout    *time.Ticker
-
-       pcs []*partitionConsumer
-}
-
-// NewUnackedMessageTracker init UnackedMessageTracker object
-func NewUnackedMessageTracker() *UnackedMessageTracker {
-       unAckTracker := &UnackedMessageTracker{
-               currentSet: set.NewSet(),
-               oldOpenSet: set.NewSet(),
-       }
-
-       return unAckTracker
-}
-
-// Size return the size of current set and old open set cardinality
-func (t *UnackedMessageTracker) Size() int {
-       t.cmu.Lock()
-       defer t.cmu.Unlock()
-
-       return t.currentSet.Cardinality() + t.oldOpenSet.Cardinality()
-}
-
-// IsEmpty check if the currentSet or oldOpenSet are empty.
-func (t *UnackedMessageTracker) IsEmpty() bool {
-       t.cmu.RLock()
-       defer t.cmu.RUnlock()
-
-       return t.currentSet.Cardinality() == 0 && t.oldOpenSet.Cardinality() == 0
-}
-
-// Add will add message id data to currentSet and remove the message id from 
oldOpenSet.
-func (t *UnackedMessageTracker) Add(id *pb.MessageIdData) bool {
-       t.cmu.Lock()
-       defer t.cmu.Unlock()
-
-       t.oldOpenSet.Remove(id)
-       return t.currentSet.Add(id)
-}
-
-// Remove will remove message id data from currentSet and oldOpenSet
-func (t *UnackedMessageTracker) Remove(id *pb.MessageIdData) {
-       t.cmu.Lock()
-       defer t.cmu.Unlock()
-
-       t.currentSet.Remove(id)
-       t.oldOpenSet.Remove(id)
-}
-
-func (t *UnackedMessageTracker) clear() {
-       t.cmu.Lock()
-       defer t.cmu.Unlock()
-
-       t.currentSet.Clear()
-       t.oldOpenSet.Clear()
-}
-
-func (t *UnackedMessageTracker) toggle() {
-       t.cmu.Lock()
-       defer t.cmu.Unlock()
-
-       t.currentSet, t.oldOpenSet = t.oldOpenSet, t.currentSet
-}
-
-func (t *UnackedMessageTracker) isAckTimeout() bool {
-       t.cmu.RLock()
-       defer t.cmu.RUnlock()
-
-       return !(t.oldOpenSet.Cardinality() == 0)
-}
-
-func (t *UnackedMessageTracker) lessThanOrEqual(id1, id2 pb.MessageIdData) 
bool {
-       return id1.GetPartition() == id2.GetPartition() &&
-               (id1.GetLedgerId() < id2.GetLedgerId() || id1.GetEntryId() <= 
id2.GetEntryId())
-}
-
-func (t *UnackedMessageTracker) RemoveMessagesTill(id pb.MessageIdData) int {
-       t.cmu.Lock()
-       defer t.cmu.Unlock()
-
-       counter := 0
-
-       t.currentSet.Each(func(elem interface{}) bool {
-               if t.lessThanOrEqual(elem.(pb.MessageIdData), id) {
-                       t.currentSet.Remove(elem)
-                       counter++
-               }
-               return true
-       })
-
-       t.oldOpenSet.Each(func(elem interface{}) bool {
-               if t.lessThanOrEqual(elem.(pb.MessageIdData), id) {
-                       t.currentSet.Remove(elem)
-                       counter++
-               }
-               return true
-       })
-
-       return counter
-}
-
-func (t *UnackedMessageTracker) Start(ackTimeoutMillis int64) {
-       t.cmu.Lock()
-       defer t.cmu.Unlock()
-       t.timeout = time.NewTicker((time.Duration(ackTimeoutMillis)) * 
time.Millisecond)
-
-       go t.handlerCmd()
-}
-
-func (t *UnackedMessageTracker) handlerCmd() {
-       for {
-               select {
-               case tick := <-t.timeout.C:
-                       if t.isAckTimeout() {
-                               t.cmu.Lock()
-                               log.Debugf(" %d messages have timed-out", 
t.oldOpenSet.Cardinality())
-                               messageIds := make([]*pb.MessageIdData, 0)
-
-                               t.oldOpenSet.Each(func(i interface{}) bool {
-                                       messageIds = append(messageIds, 
i.(*pb.MessageIdData))
-                                       return false
-                               })
-                               log.Debugf("messageID length is:%d", 
len(messageIds))
-
-                               t.oldOpenSet.Clear()
-                               t.cmu.Unlock()
-
-                               if t.pcs != nil {
-                                       messageIdsMap := 
make(map[int32][]*pb.MessageIdData)
-                                       for _, msgID := range messageIds {
-                                               
messageIdsMap[msgID.GetPartition()] = 
append(messageIdsMap[msgID.GetPartition()], msgID)
-                                       }
-
-                                       for index, subConsumer := range t.pcs {
-                                               if messageIdsMap[int32(index)] 
!= nil {
-                                                       requestID := 
subConsumer.client.rpcClient.NewRequestID()
-                                                       cmd := 
&pb.CommandRedeliverUnacknowledgedMessages{
-                                                               ConsumerId: 
proto.Uint64(subConsumer.consumerID),
-                                                               MessageIds: 
messageIdsMap[int32(index)],
-                                                       }
-
-                                                       _, err := 
subConsumer.client.rpcClient.RequestOnCnx(subConsumer.conn, requestID,
-                                                               
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
-                                                       if err != nil {
-                                                               
subConsumer.log.WithError(err).Error("Failed to unsubscribe consumer")
-                                                               return
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-                       log.Debugf("Tick at: %v", tick)
-               }
-
-               t.toggle()
-       }
-}
-
-func (t *UnackedMessageTracker) Stop() {
-       t.timeout.Stop()
-       log.Debug("stop ticker ", t.timeout)
-
-       t.clear()
-}
diff --git a/pulsar/unacked_msg_tracker_test.go 
b/pulsar/unacked_msg_tracker_test.go
deleted file mode 100644
index 3848ce9..0000000
--- a/pulsar/unacked_msg_tracker_test.go
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
-       "testing"
-
-       "github.com/golang/protobuf/proto"
-       "github.com/stretchr/testify/assert"
-
-       "github.com/apache/pulsar-client-go/pkg/pb"
-)
-
-func TestUnackedMessageTracker(t *testing.T) {
-       unAckTracker := NewUnackedMessageTracker()
-
-       var msgIDs []*pb.MessageIdData
-
-       for i := 0; i < 5; i++ {
-               msgID := &pb.MessageIdData{
-                       LedgerId:   proto.Uint64(1),
-                       EntryId:    proto.Uint64(uint64(i)),
-                       Partition:  proto.Int32(-1),
-                       BatchIndex: proto.Int32(-1),
-               }
-
-               msgIDs = append(msgIDs, msgID)
-       }
-
-       for _, msgID := range msgIDs {
-               ok := unAckTracker.Add(msgID)
-               assert.True(t, ok)
-       }
-
-       flag := unAckTracker.IsEmpty()
-       assert.False(t, flag)
-
-       num := unAckTracker.Size()
-       assert.Equal(t, num, 5)
-
-       for index, msgID := range msgIDs {
-               unAckTracker.Remove(msgID)
-               assert.Equal(t, 4-index, unAckTracker.Size())
-       }
-
-       num = unAckTracker.Size()
-       assert.Equal(t, num, 0)
-
-       flag = unAckTracker.IsEmpty()
-       assert.True(t, flag)
-}

Reply via email to