This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e269c42  [feat] Expose the chunk config of consumer to the reader 
(#987)
e269c42 is described below

commit e269c42887c09c35d3f2cdcb4bc5a1699ccf2711
Author: CrazyCollin <[email protected]>
AuthorDate: Thu Mar 9 21:21:25 2023 +0800

    [feat] Expose the chunk config of consumer to the reader (#987)
    
    * [feat] Expose the chunk config of consumer to the reader
    
    * add test for reader's chunk config
    
    * refactoring some code
    
    * Update pulsar/reader.go
    
    Co-authored-by: Zike Yang <[email protected]>
    
    ---------
    
    Co-authored-by: Zike Yang <[email protected]>
---
 pulsar/reader.go      | 10 ++++++++++
 pulsar/reader_impl.go | 41 ++++++++++++++++++++++++++---------------
 pulsar/reader_test.go | 39 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 75 insertions(+), 15 deletions(-)

diff --git a/pulsar/reader.go b/pulsar/reader.go
index e4679ab..d58d06f 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -92,6 +92,16 @@ type ReaderOptions struct {
        // BackoffPolicy parameterize the following options in the reconnection 
logic to
        // allow users to customize the reconnection logic (minBackoff, 
maxBackoff and jitterPercentage)
        BackoffPolicy internal.BackoffPolicy
+
+       // MaxPendingChunkedMessage sets the maximum pending chunked messages. 
(default: 100)
+       MaxPendingChunkedMessage int
+
+       // ExpireTimeOfIncompleteChunk sets the expiry time of discarding 
incomplete chunked message. (default: 60 seconds)
+       ExpireTimeOfIncompleteChunk time.Duration
+
+       // AutoAckIncompleteChunk sets whether reader auto acknowledges 
incomplete chunked message when it should
+       // be removed (e.g.the chunked message pending queue is full). 
(default: false)
+       AutoAckIncompleteChunk bool
 }
 
 // Reader can be used to scan through all the messages currently available in 
a topic.
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index c7620ad..36b492a 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -90,22 +90,33 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                options.Decryption.MessageCrypto = messageCrypto
        }
 
+       if options.MaxPendingChunkedMessage == 0 {
+               options.MaxPendingChunkedMessage = 100
+       }
+
+       if options.ExpireTimeOfIncompleteChunk == 0 {
+               options.ExpireTimeOfIncompleteChunk = time.Minute
+       }
+
        consumerOptions := &partitionConsumerOpts{
-               topic:                      options.Topic,
-               consumerName:               options.Name,
-               subscription:               subscriptionName,
-               subscriptionType:           Exclusive,
-               receiverQueueSize:          receiverQueueSize,
-               startMessageID:             startMessageID,
-               startMessageIDInclusive:    options.StartMessageIDInclusive,
-               subscriptionMode:           nonDurable,
-               readCompacted:              options.ReadCompacted,
-               metadata:                   options.Properties,
-               nackRedeliveryDelay:        defaultNackRedeliveryDelay,
-               replicateSubscriptionState: false,
-               decryption:                 options.Decryption,
-               schema:                     options.Schema,
-               backoffPolicy:              options.BackoffPolicy,
+               topic:                       options.Topic,
+               consumerName:                options.Name,
+               subscription:                subscriptionName,
+               subscriptionType:            Exclusive,
+               receiverQueueSize:           receiverQueueSize,
+               startMessageID:              startMessageID,
+               startMessageIDInclusive:     options.StartMessageIDInclusive,
+               subscriptionMode:            nonDurable,
+               readCompacted:               options.ReadCompacted,
+               metadata:                    options.Properties,
+               nackRedeliveryDelay:         defaultNackRedeliveryDelay,
+               replicateSubscriptionState:  false,
+               decryption:                  options.Decryption,
+               schema:                      options.Schema,
+               backoffPolicy:               options.BackoffPolicy,
+               maxPendingChunkedMessage:    options.MaxPendingChunkedMessage,
+               expireTimeOfIncompleteChunk: 
options.ExpireTimeOfIncompleteChunk,
+               autoAckIncompleteChunk:      options.AutoAckIncompleteChunk,
        }
 
        reader := &reader{
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 3543187..0a1b2a1 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -70,6 +70,45 @@ func TestReaderConfigSubscribeName(t *testing.T) {
        assert.NotNil(t, consumer)
 }
 
+func TestReaderConfigChunk(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer client.Close()
+
+       r1, err := client.CreateReader(ReaderOptions{
+               Topic:                       "my-topic1",
+               StartMessageID:              EarliestMessageID(),
+               MaxPendingChunkedMessage:    50,
+               ExpireTimeOfIncompleteChunk: 30 * time.Second,
+               AutoAckIncompleteChunk:      true,
+       })
+       assert.Nil(t, err)
+       defer r1.Close()
+
+       // verify specified chunk options
+       pcOpts := r1.(*reader).pc.options
+       assert.Equal(t, 50, pcOpts.maxPendingChunkedMessage)
+       assert.Equal(t, 30*time.Second, pcOpts.expireTimeOfIncompleteChunk)
+       assert.True(t, pcOpts.autoAckIncompleteChunk)
+
+       r2, err := client.CreateReader(ReaderOptions{
+               Topic:          "my-topic2",
+               StartMessageID: EarliestMessageID(),
+       })
+       assert.Nil(t, err)
+       defer r2.Close()
+
+       // verify default chunk options
+       pcOpts = r2.(*reader).pc.options
+       assert.Equal(t, 100, pcOpts.maxPendingChunkedMessage)
+       assert.Equal(t, time.Minute, pcOpts.expireTimeOfIncompleteChunk)
+       assert.False(t, pcOpts.autoAckIncompleteChunk)
+}
+
 func TestReader(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,

Reply via email to