This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.12.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 6ce5421b07e23ff252ba5aaa754e1a0a5bab4cc5
Author: Zike Yang <[email protected]>
AuthorDate: Fri Feb 23 15:49:08 2024 +0800

    [feat] Support partitioned topic reader (#1178)
    
    Master Issue: #1177
    
    ### Motivation
    
    Currently, there is an issue with the reader implementation. If the reader 
is creating, it won't get the topic metadata from the topic. The reader can 
only read messages from a single topic. If the topic is a partitioned topic, 
the reader won't know that and will try to create a non-partition topic with 
the same name. And it will lead to this issue: 
https://github.com/apache/pulsar/issues/22032
    
    ### Modifications
    
    - Support partitioned topic reader
    
    (cherry picked from commit 3b9b1f8895d8924ec98db4612806b9871f1d135b)
---
 pulsar/consumer.go           |   7 +++
 pulsar/consumer_impl.go      |  47 +++++++++++++++++-
 pulsar/consumer_partition.go |  36 ++++++++++++++
 pulsar/reader.go             |   1 +
 pulsar/reader_impl.go        | 112 ++++++++++++++++++-------------------------
 pulsar/reader_test.go        | 100 ++++++++++++++++++++++++++++++++++----
 6 files changed, 227 insertions(+), 76 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 667bff66..fea94cf6 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -246,6 +246,13 @@ type ConsumerOptions struct {
        // SubscriptionMode specifies the subscription mode to be used when 
subscribing to a topic.
        // Default is `Durable`
        SubscriptionMode SubscriptionMode
+
+       // StartMessageIDInclusive, if true, the consumer will start at the 
`StartMessageID`, included.
+       // Default is `false` and the consumer will start from the "next" 
message
+       StartMessageIDInclusive bool
+
+       // startMessageID specifies the message id to start from. Currently, 
it's only used for the reader internally.
+       startMessageID *trackingMessageID
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 75d839b4..0c31a1aa 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -384,7 +384,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                metadata:                    metadata,
                                subProperties:               subProperties,
                                replicateSubscriptionState:  
c.options.ReplicateSubscriptionState,
-                               startMessageID:              nil,
+                               startMessageID:              
c.options.startMessageID,
+                               startMessageIDInclusive:     
c.options.StartMessageIDInclusive,
                                subscriptionMode:            
c.options.SubscriptionMode,
                                readCompacted:               
c.options.ReadCompacted,
                                interceptors:                
c.options.Interceptors,
@@ -707,6 +708,50 @@ func (c *consumer) checkMsgIDPartition(msgID MessageID) 
error {
        return nil
 }
 
+func (c *consumer) hasNext() bool {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel() // Make sure all paths cancel the context to avoid 
context leak
+
+       var wg sync.WaitGroup
+       wg.Add(len(c.consumers))
+
+       hasNext := make(chan bool)
+       for _, pc := range c.consumers {
+               pc := pc
+               go func() {
+                       defer wg.Done()
+                       if pc.hasNext() {
+                               select {
+                               case hasNext <- true:
+                               case <-ctx.Done():
+                               }
+                       }
+               }()
+       }
+
+       go func() {
+               wg.Wait()
+               close(hasNext) // Close the channel after all goroutines have 
finished
+       }()
+
+       // Wait for either a 'true' result or for all goroutines to finish
+       for hn := range hasNext {
+               if hn {
+                       return true
+               }
+       }
+
+       return false
+}
+
+func (c *consumer) setLastDequeuedMsg(msgID MessageID) error {
+       if err := c.checkMsgIDPartition(msgID); err != nil {
+               return err
+       }
+       c.consumers[msgID.PartitionIdx()].lastDequeuedMsg = 
toTrackingMessageID(msgID)
+       return nil
+}
+
 var r = &random{
        R: rand.New(rand.NewSource(time.Now().UnixNano())),
 }
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index fd6441c1..95b5bc09 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -174,6 +174,8 @@ type partitionConsumer struct {
        chunkedMsgCtxMap   *chunkedMsgCtxMap
        unAckChunksTracker *unAckChunksTracker
        ackGroupingTracker ackGroupingTracker
+
+       lastMessageInBroker *trackingMessageID
 }
 
 func (pc *partitionConsumer) ActiveConsumerChanged(isActive bool) {
@@ -1970,6 +1972,40 @@ func (pc *partitionConsumer) 
discardCorruptedMessage(msgID *pb.MessageIdData,
        pc.availablePermits.inc()
 }
 
+func (pc *partitionConsumer) hasNext() bool {
+       if pc.lastMessageInBroker != nil && pc.hasMoreMessages() {
+               return true
+       }
+
+       for {
+               lastMsgID, err := pc.getLastMessageID()
+               if err != nil {
+                       pc.log.WithError(err).Error("Failed to get last message 
id from broker")
+                       continue
+               } else {
+                       pc.lastMessageInBroker = lastMsgID
+                       break
+               }
+       }
+
+       return pc.hasMoreMessages()
+}
+
+func (pc *partitionConsumer) hasMoreMessages() bool {
+       if pc.lastDequeuedMsg != nil {
+               return pc.lastMessageInBroker.isEntryIDValid() && 
pc.lastMessageInBroker.greater(pc.lastDequeuedMsg.messageID)
+       }
+
+       if pc.options.startMessageIDInclusive {
+               return pc.lastMessageInBroker.isEntryIDValid() &&
+                       
pc.lastMessageInBroker.greaterEqual(pc.startMessageID.get().messageID)
+       }
+
+       // Non-inclusive
+       return pc.lastMessageInBroker.isEntryIDValid() &&
+               
pc.lastMessageInBroker.greater(pc.startMessageID.get().messageID)
+}
+
 // _setConn sets the internal connection field of this partition consumer 
atomically.
 // Note: should only be called by this partition consumer when a new 
connection is available.
 func (pc *partitionConsumer) _setConn(conn internal.Connection) {
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 5e1a73b9..1c5235d4 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -136,5 +136,6 @@ type Reader interface {
        SeekByTime(time time.Time) error
 
        // GetLastMessageID get the last message id available for consume.
+       // It only works for single topic reader. It will return an error when 
the reader is the multi-topic reader.
        GetLastMessageID() (MessageID, error)
 }
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 7b260b88..bf91c67f 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -34,12 +34,11 @@ const (
 
 type reader struct {
        sync.Mutex
-       client              *client
-       pc                  *partitionConsumer
-       messageCh           chan ConsumerMessage
-       lastMessageInBroker *trackingMessageID
-       log                 log.Logger
-       metrics             *internal.LeveledMetrics
+       client    *client
+       messageCh chan ConsumerMessage
+       log       log.Logger
+       metrics   *internal.LeveledMetrics
+       c         *consumer
 }
 
 func newReader(client *client, options ReaderOptions) (Reader, error) {
@@ -98,25 +97,25 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                options.ExpireTimeOfIncompleteChunk = time.Minute
        }
 
-       consumerOptions := &partitionConsumerOpts{
-               topic:                       options.Topic,
-               consumerName:                options.Name,
-               subscription:                subscriptionName,
-               subscriptionType:            Exclusive,
-               receiverQueueSize:           receiverQueueSize,
+       consumerOptions := &ConsumerOptions{
+               Topic:                       options.Topic,
+               Name:                        options.Name,
+               SubscriptionName:            subscriptionName,
+               Type:                        Exclusive,
+               ReceiverQueueSize:           receiverQueueSize,
+               SubscriptionMode:            NonDurable,
+               ReadCompacted:               options.ReadCompacted,
+               Properties:                  options.Properties,
+               NackRedeliveryDelay:         defaultNackRedeliveryDelay,
+               ReplicateSubscriptionState:  false,
+               Decryption:                  options.Decryption,
+               Schema:                      options.Schema,
+               BackoffPolicy:               options.BackoffPolicy,
+               MaxPendingChunkedMessage:    options.MaxPendingChunkedMessage,
+               ExpireTimeOfIncompleteChunk: 
options.ExpireTimeOfIncompleteChunk,
+               AutoAckIncompleteChunk:      options.AutoAckIncompleteChunk,
                startMessageID:              startMessageID,
-               startMessageIDInclusive:     options.StartMessageIDInclusive,
-               subscriptionMode:            NonDurable,
-               readCompacted:               options.ReadCompacted,
-               metadata:                    options.Properties,
-               nackRedeliveryDelay:         defaultNackRedeliveryDelay,
-               replicateSubscriptionState:  false,
-               decryption:                  options.Decryption,
-               schema:                      options.Schema,
-               backoffPolicy:               options.BackoffPolicy,
-               maxPendingChunkedMessage:    options.MaxPendingChunkedMessage,
-               expireTimeOfIncompleteChunk: 
options.ExpireTimeOfIncompleteChunk,
-               autoAckIncompleteChunk:      options.AutoAckIncompleteChunk,
+               StartMessageIDInclusive:     options.StartMessageIDInclusive,
        }
 
        reader := &reader{
@@ -131,20 +130,25 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
        if err != nil {
                return nil, err
        }
+       // Provide dummy rlq router with not dlq policy
+       rlq, err := newRetryRouter(client, nil, false, client.log)
+       if err != nil {
+               return nil, err
+       }
 
-       pc, err := newPartitionConsumer(nil, client, consumerOptions, 
reader.messageCh, dlq, reader.metrics)
+       c, err := newInternalConsumer(client, *consumerOptions, options.Topic, 
reader.messageCh, dlq, rlq, false)
        if err != nil {
                close(reader.messageCh)
                return nil, err
        }
+       reader.c = c
 
-       reader.pc = pc
        reader.metrics.ReadersOpened.Inc()
        return reader, nil
 }
 
 func (r *reader) Topic() string {
-       return r.pc.topic
+       return r.c.topic
 }
 
 func (r *reader) Next(ctx context.Context) (Message, error) {
@@ -158,9 +162,14 @@ func (r *reader) Next(ctx context.Context) (Message, 
error) {
                        // Acknowledge message immediately because the reader 
is based on non-durable subscription. When it reconnects,
                        // it will specify the subscription position anyway
                        msgID := cm.Message.ID()
-                       mid := toTrackingMessageID(msgID)
-                       r.pc.lastDequeuedMsg = mid
-                       r.pc.AckID(mid)
+                       err := r.c.setLastDequeuedMsg(msgID)
+                       if err != nil {
+                               return nil, err
+                       }
+                       err = r.c.AckID(msgID)
+                       if err != nil {
+                               return nil, err
+                       }
                        return cm.Message, nil
                case <-ctx.Done():
                        return nil, ctx.Err()
@@ -169,41 +178,11 @@ func (r *reader) Next(ctx context.Context) (Message, 
error) {
 }
 
 func (r *reader) HasNext() bool {
-       if r.lastMessageInBroker != nil && r.hasMoreMessages() {
-               return true
-       }
-
-       for {
-               lastMsgID, err := r.pc.getLastMessageID()
-               if err != nil {
-                       r.log.WithError(err).Error("Failed to get last message 
id from broker")
-                       continue
-               } else {
-                       r.lastMessageInBroker = lastMsgID
-                       break
-               }
-       }
-
-       return r.hasMoreMessages()
-}
-
-func (r *reader) hasMoreMessages() bool {
-       if r.pc.lastDequeuedMsg != nil {
-               return r.lastMessageInBroker.isEntryIDValid() && 
r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID)
-       }
-
-       if r.pc.options.startMessageIDInclusive {
-               return r.lastMessageInBroker.isEntryIDValid() &&
-                       
r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.get().messageID)
-       }
-
-       // Non-inclusive
-       return r.lastMessageInBroker.isEntryIDValid() &&
-               
r.lastMessageInBroker.greater(r.pc.startMessageID.get().messageID)
+       return r.c.hasNext()
 }
 
 func (r *reader) Close() {
-       r.pc.Close()
+       r.c.Close()
        r.client.handlers.Del(r)
        r.metrics.ReadersClosed.Inc()
 }
@@ -235,16 +214,19 @@ func (r *reader) Seek(msgID MessageID) error {
                return nil
        }
 
-       return r.pc.Seek(mid)
+       return r.c.Seek(mid)
 }
 
 func (r *reader) SeekByTime(time time.Time) error {
        r.Lock()
        defer r.Unlock()
 
-       return r.pc.SeekByTime(time)
+       return r.c.SeekByTime(time)
 }
 
 func (r *reader) GetLastMessageID() (MessageID, error) {
-       return r.pc.getLastMessageID()
+       if len(r.c.consumers) > 1 {
+               return nil, fmt.Errorf("GetLastMessageID is not supported for 
multi-topics reader")
+       }
+       return r.c.consumers[0].getLastMessageID()
 }
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index c8228a7c..ccf52875 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -24,6 +24,9 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/crypto"
+       "github.com/apache/pulsar-client-go/pulsaradmin"
+       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
+       "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
        "github.com/google/uuid"
        "github.com/stretchr/testify/assert"
 )
@@ -90,10 +93,10 @@ func TestReaderConfigChunk(t *testing.T) {
        defer r1.Close()
 
        // verify specified chunk options
-       pcOpts := r1.(*reader).pc.options
-       assert.Equal(t, 50, pcOpts.maxPendingChunkedMessage)
-       assert.Equal(t, 30*time.Second, pcOpts.expireTimeOfIncompleteChunk)
-       assert.True(t, pcOpts.autoAckIncompleteChunk)
+       pcOpts := r1.(*reader).c.options
+       assert.Equal(t, 50, pcOpts.MaxPendingChunkedMessage)
+       assert.Equal(t, 30*time.Second, pcOpts.ExpireTimeOfIncompleteChunk)
+       assert.True(t, pcOpts.AutoAckIncompleteChunk)
 
        r2, err := client.CreateReader(ReaderOptions{
                Topic:          "my-topic2",
@@ -103,10 +106,10 @@ func TestReaderConfigChunk(t *testing.T) {
        defer r2.Close()
 
        // verify default chunk options
-       pcOpts = r2.(*reader).pc.options
-       assert.Equal(t, 100, pcOpts.maxPendingChunkedMessage)
-       assert.Equal(t, time.Minute, pcOpts.expireTimeOfIncompleteChunk)
-       assert.False(t, pcOpts.autoAckIncompleteChunk)
+       pcOpts = r2.(*reader).c.options
+       assert.Equal(t, 100, pcOpts.MaxPendingChunkedMessage)
+       assert.Equal(t, time.Minute, pcOpts.ExpireTimeOfIncompleteChunk)
+       assert.False(t, pcOpts.AutoAckIncompleteChunk)
 }
 
 func TestReader(t *testing.T) {
@@ -153,6 +156,50 @@ func TestReader(t *testing.T) {
        }
 }
 
+func TestReaderOnPartitionedTopic(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       assert.Nil(t, createPartitionedTopic(topic, 3))
+       ctx := context.Background()
+       // create reader
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessageID(),
+       })
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.NoError(t, err)
+       }
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := reader.Next(context.Background())
+               assert.NoError(t, err)
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+       }
+}
+
 func TestReaderConnectError(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: "pulsar://invalid-hostname:6650",
@@ -422,7 +469,6 @@ func TestReaderHasNext(t *testing.T) {
                assert.NotNil(t, msgID)
        }
 
-       // create reader on 5th message (not included)
        reader, err := client.CreateReader(ReaderOptions{
                Topic:          topic,
                StartMessageID: EarliestMessageID(),
@@ -880,7 +926,7 @@ func TestReaderWithBackoffPolicy(t *testing.T) {
        assert.NotNil(t, _reader)
        assert.Nil(t, err)
 
-       partitionConsumerImp := _reader.(*reader).pc
+       partitionConsumerImp := _reader.(*reader).c.consumers[0]
        // 1 s
        startTime := time.Now()
        partitionConsumerImp.reconnectToBroker()
@@ -943,3 +989,37 @@ func TestReaderGetLastMessageID(t *testing.T) {
        assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID())
        assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID())
 }
+
+func TestReaderGetLastMessageIDOnMultiTopics(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       topic := newTopicName()
+       assert.Nil(t, createPartitionedTopic(topic, 3))
+
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessageID(),
+       })
+       assert.Nil(t, err)
+       _, err = reader.GetLastMessageID()
+       assert.NotNil(t, err)
+}
+
+func createPartitionedTopic(topic string, n int) error {
+       admin, err := pulsaradmin.NewClient(&config.Config{})
+       if err != nil {
+               return err
+       }
+
+       topicName, err := utils.GetTopicName(topic)
+       if err != nil {
+               return err
+       }
+       err = admin.Topics().Create(*topicName, n)
+       if err != nil {
+               return err
+       }
+       return nil
+}

Reply via email to