This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e269c42 [feat] Expose the chunk config of consumer to the reader
(#987)
e269c42 is described below
commit e269c42887c09c35d3f2cdcb4bc5a1699ccf2711
Author: CrazyCollin <[email protected]>
AuthorDate: Thu Mar 9 21:21:25 2023 +0800
[feat] Expose the chunk config of consumer to the reader (#987)
* [feat] Expose the chunk config of consumer to the reader
* add test for reader's chunk config
* refactoring some code
* Update pulsar/reader.go
Co-authored-by: Zike Yang <[email protected]>
---------
Co-authored-by: Zike Yang <[email protected]>
---
pulsar/reader.go | 10 ++++++++++
pulsar/reader_impl.go | 41 ++++++++++++++++++++++++++---------------
pulsar/reader_test.go | 39 +++++++++++++++++++++++++++++++++++++++
3 files changed, 75 insertions(+), 15 deletions(-)
diff --git a/pulsar/reader.go b/pulsar/reader.go
index e4679ab..d58d06f 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -92,6 +92,16 @@ type ReaderOptions struct {
// BackoffPolicy parameterize the following options in the reconnection
logic to
// allow users to customize the reconnection logic (minBackoff,
maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
+
+ // MaxPendingChunkedMessage sets the maximum pending chunked messages.
(default: 100)
+ MaxPendingChunkedMessage int
+
+ // ExpireTimeOfIncompleteChunk sets the expiry time of discarding
incomplete chunked message. (default: 60 seconds)
+ ExpireTimeOfIncompleteChunk time.Duration
+
+ // AutoAckIncompleteChunk sets whether reader auto acknowledges
incomplete chunked message when it should
+ // be removed (e.g.the chunked message pending queue is full).
(default: false)
+ AutoAckIncompleteChunk bool
}
// Reader can be used to scan through all the messages currently available in
a topic.
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index c7620ad..36b492a 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -90,22 +90,33 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
options.Decryption.MessageCrypto = messageCrypto
}
+ if options.MaxPendingChunkedMessage == 0 {
+ options.MaxPendingChunkedMessage = 100
+ }
+
+ if options.ExpireTimeOfIncompleteChunk == 0 {
+ options.ExpireTimeOfIncompleteChunk = time.Minute
+ }
+
consumerOptions := &partitionConsumerOpts{
- topic: options.Topic,
- consumerName: options.Name,
- subscription: subscriptionName,
- subscriptionType: Exclusive,
- receiverQueueSize: receiverQueueSize,
- startMessageID: startMessageID,
- startMessageIDInclusive: options.StartMessageIDInclusive,
- subscriptionMode: nonDurable,
- readCompacted: options.ReadCompacted,
- metadata: options.Properties,
- nackRedeliveryDelay: defaultNackRedeliveryDelay,
- replicateSubscriptionState: false,
- decryption: options.Decryption,
- schema: options.Schema,
- backoffPolicy: options.BackoffPolicy,
+ topic: options.Topic,
+ consumerName: options.Name,
+ subscription: subscriptionName,
+ subscriptionType: Exclusive,
+ receiverQueueSize: receiverQueueSize,
+ startMessageID: startMessageID,
+ startMessageIDInclusive: options.StartMessageIDInclusive,
+ subscriptionMode: nonDurable,
+ readCompacted: options.ReadCompacted,
+ metadata: options.Properties,
+ nackRedeliveryDelay: defaultNackRedeliveryDelay,
+ replicateSubscriptionState: false,
+ decryption: options.Decryption,
+ schema: options.Schema,
+ backoffPolicy: options.BackoffPolicy,
+ maxPendingChunkedMessage: options.MaxPendingChunkedMessage,
+ expireTimeOfIncompleteChunk:
options.ExpireTimeOfIncompleteChunk,
+ autoAckIncompleteChunk: options.AutoAckIncompleteChunk,
}
reader := &reader{
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 3543187..0a1b2a1 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -70,6 +70,45 @@ func TestReaderConfigSubscribeName(t *testing.T) {
assert.NotNil(t, consumer)
}
+func TestReaderConfigChunk(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer client.Close()
+
+ r1, err := client.CreateReader(ReaderOptions{
+ Topic: "my-topic1",
+ StartMessageID: EarliestMessageID(),
+ MaxPendingChunkedMessage: 50,
+ ExpireTimeOfIncompleteChunk: 30 * time.Second,
+ AutoAckIncompleteChunk: true,
+ })
+ assert.Nil(t, err)
+ defer r1.Close()
+
+ // verify specified chunk options
+ pcOpts := r1.(*reader).pc.options
+ assert.Equal(t, 50, pcOpts.maxPendingChunkedMessage)
+ assert.Equal(t, 30*time.Second, pcOpts.expireTimeOfIncompleteChunk)
+ assert.True(t, pcOpts.autoAckIncompleteChunk)
+
+ r2, err := client.CreateReader(ReaderOptions{
+ Topic: "my-topic2",
+ StartMessageID: EarliestMessageID(),
+ })
+ assert.Nil(t, err)
+ defer r2.Close()
+
+ // verify default chunk options
+ pcOpts = r2.(*reader).pc.options
+ assert.Equal(t, 100, pcOpts.maxPendingChunkedMessage)
+ assert.Equal(t, time.Minute, pcOpts.expireTimeOfIncompleteChunk)
+ assert.False(t, pcOpts.autoAckIncompleteChunk)
+}
+
func TestReader(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,