This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 5108332 Add error response for Ack func (#775)
5108332 is described below
commit 5108332c9dd4cb454c26804304bffb82eeffc713
Author: xiaolong ran <[email protected]>
AuthorDate: Mon May 23 11:15:27 2022 +0800
Add error response for Ack func (#775)
* Add error response for Ack func
Signed-off-by: xiaolongran <[email protected]>
* when connection closed we need to reconnect by using new cnx
Signed-off-by: xiaolongran <[email protected]>
* fix comments
Signed-off-by: xiaolongran <[email protected]>
---
pulsar/consumer.go | 4 ++--
pulsar/consumer_impl.go | 20 ++++++++++----------
pulsar/consumer_multitopic.go | 17 +++++++++--------
pulsar/consumer_partition.go | 17 +++++++++++------
pulsar/consumer_regex.go | 17 +++++++++--------
pulsar/impl_message.go | 9 ++++++---
pulsar/internal/connection_pool.go | 4 +++-
.../pulsartracing/consumer_interceptor_test.go | 8 ++++++--
8 files changed, 56 insertions(+), 40 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c67509b..dfe27c5 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -200,10 +200,10 @@ type Consumer interface {
Chan() <-chan ConsumerMessage
// Ack the consumption of a single message
- Ack(Message)
+ Ack(Message) error
// AckID the consumption of a single message, identified by its
MessageID
- AckID(MessageID)
+ AckID(MessageID) error
// ReconsumeLater mark a message for redelivery after custom delay
ReconsumeLater(msg Message, delay time.Duration)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 2bd3ed5..e36f040 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -19,6 +19,7 @@ package pulsar
import (
"context"
+ "errors"
"fmt"
"math/rand"
"strconv"
@@ -34,7 +35,7 @@ import (
const defaultNackRedeliveryDelay = 1 * time.Minute
type acker interface {
- AckID(id trackingMessageID)
+ AckID(id trackingMessageID) error
NackID(id trackingMessageID)
NackMsg(msg Message)
}
@@ -438,29 +439,28 @@ func (c *consumer) Receive(ctx context.Context) (message
Message, err error) {
}
}
-// Messages
+// Chan return the message chan to users
func (c *consumer) Chan() <-chan ConsumerMessage {
return c.messageCh
}
// Ack the consumption of a single message
-func (c *consumer) Ack(msg Message) {
- c.AckID(msg.ID())
+func (c *consumer) Ack(msg Message) error {
+ return c.AckID(msg.ID())
}
-// Ack the consumption of a single message, identified by its MessageID
-func (c *consumer) AckID(msgID MessageID) {
+// AckID the consumption of a single message, identified by its MessageID
+func (c *consumer) AckID(msgID MessageID) error {
mid, ok := c.messageID(msgID)
if !ok {
- return
+ return errors.New("failed to convert trackingMessageID")
}
if mid.consumer != nil {
- mid.Ack()
- return
+ return mid.Ack()
}
- c.consumers[mid.partitionIdx].AckID(mid)
+ return c.consumers[mid.partitionIdx].AckID(mid)
}
// ReconsumeLater mark a message for redelivery after custom delay
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index c1cb3d8..1d75a24 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -19,6 +19,7 @@ package pulsar
import (
"context"
+ "errors"
"fmt"
"sync"
"time"
@@ -112,30 +113,30 @@ func (c *multiTopicConsumer) Receive(ctx context.Context)
(message Message, err
}
}
-// Messages
+// Chan return the message chan to users
func (c *multiTopicConsumer) Chan() <-chan ConsumerMessage {
return c.messageCh
}
// Ack the consumption of a single message
-func (c *multiTopicConsumer) Ack(msg Message) {
- c.AckID(msg.ID())
+func (c *multiTopicConsumer) Ack(msg Message) error {
+ return c.AckID(msg.ID())
}
-// Ack the consumption of a single message, identified by its MessageID
-func (c *multiTopicConsumer) AckID(msgID MessageID) {
+// AckID the consumption of a single message, identified by its MessageID
+func (c *multiTopicConsumer) AckID(msgID MessageID) error {
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
- return
+ return errors.New("invalid message id type in multi_consumer")
}
if mid.consumer == nil {
c.log.Warnf("unable to ack messageID=%+v can not determine
topic", msgID)
- return
+ return errors.New("unable to ack message because consumer is
nil")
}
- mid.Ack()
+ return mid.Ack()
}
func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index db2994f..30ffcb2 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -317,21 +317,24 @@ func (pc *partitionConsumer) requestGetLastMessageID()
(trackingMessageID, error
return convertToMessageID(id), nil
}
-func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
+func (pc *partitionConsumer) AckID(msgID trackingMessageID) error {
if state := pc.getConsumerState(); state == consumerClosed || state ==
consumerClosing {
pc.log.WithField("state", state).Error("Failed to ack by
closing or closed consumer")
- return
+ return errors.New("consumer state is closed")
}
+
+ ackReq := new(ackRequest)
if !msgID.Undefined() && msgID.ack() {
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano())
/ 1.0e9)
- req := &ackRequest{
- msgID: msgID,
- }
- pc.eventsCh <- req
+ ackReq.msgID = msgID
+ // send ack request to eventsCh
+ pc.eventsCh <- ackReq
pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID)
}
+
+ return ackReq.err
}
func (pc *partitionConsumer) NackID(msgID trackingMessageID) {
@@ -531,6 +534,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) {
err := pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(),
pb.BaseCommand_ACK, cmdAck)
if err != nil {
pc.log.Error("Connection was closed when request ack cmd")
+ req.err = err
}
}
@@ -919,6 +923,7 @@ func (pc *partitionConsumer) dispatcher() {
type ackRequest struct {
msgID trackingMessageID
+ err error
}
type unsubscribeRequest struct {
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index ed2ae1a..e4d2077 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -19,6 +19,7 @@ package pulsar
import (
"context"
+ "errors"
"fmt"
"regexp"
"strings"
@@ -152,34 +153,34 @@ func (c *regexConsumer) Receive(ctx context.Context)
(message Message, err error
}
}
-// Chan
+// Chan return the messages chan to user
func (c *regexConsumer) Chan() <-chan ConsumerMessage {
return c.messageCh
}
// Ack the consumption of a single message
-func (c *regexConsumer) Ack(msg Message) {
- c.AckID(msg.ID())
+func (c *regexConsumer) Ack(msg Message) error {
+ return c.AckID(msg.ID())
}
func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
}
-// Ack the consumption of a single message, identified by its MessageID
-func (c *regexConsumer) AckID(msgID MessageID) {
+// AckID the consumption of a single message, identified by its MessageID
+func (c *regexConsumer) AckID(msgID MessageID) error {
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
- return
+ return errors.New("invalid message id type")
}
if mid.consumer == nil {
c.log.Warnf("unable to ack messageID=%+v can not determine
topic", msgID)
- return
+ return errors.New("consumer is nil in consumer_regex")
}
- mid.Ack()
+ return mid.Ack()
}
func (c *regexConsumer) Nack(msg Message) {
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index 3216676..8248b1a 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -18,6 +18,7 @@
package pulsar
import (
+ "errors"
"fmt"
"math"
"math/big"
@@ -63,13 +64,15 @@ func (id trackingMessageID) Undefined() bool {
return id == trackingMessageID{}
}
-func (id trackingMessageID) Ack() {
+func (id trackingMessageID) Ack() error {
if id.consumer == nil {
- return
+ return errors.New("consumer is nil in trackingMessageID")
}
if id.ack() {
- id.consumer.AckID(id)
+ return id.consumer.AckID(id)
}
+
+ return nil
}
func (id trackingMessageID) Nack() {
diff --git a/pulsar/internal/connection_pool.go
b/pulsar/internal/connection_pool.go
index db67c25..6491abd 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -78,7 +78,9 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL,
physicalAddr *url.U
p.log.Debugf("Found connection in pool key=%s logical_addr=%+v
physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
- // remove stale/failed connection
+ // When the current connection is in a closed state or the
broker actively notifies that the
+ // current connection is closed, we need to remove the
connection object from the current
+ // connection pool and create a new connection.
if conn.closed() {
p.log.Infof("Removed connection from pool key=%s
logical_addr=%+v physical_addr=%+v",
key, conn.logicalAddr, conn.physicalAddr)
diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go
b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
index b15a926..9e70d8b 100644
--- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go
@@ -64,9 +64,13 @@ func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage {
return nil
}
-func (c *mockConsumer) Ack(msg pulsar.Message) {}
+func (c *mockConsumer) Ack(msg pulsar.Message) error {
+ return nil
+}
-func (c *mockConsumer) AckID(msgID pulsar.MessageID) {}
+func (c *mockConsumer) AckID(msgID pulsar.MessageID) error {
+ return nil
+}
func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration)
{}