This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push: new dd8e60fc golang: some optimizations regarding transaction messages and pushconsumer (#1021) dd8e60fc is described below commit dd8e60fc2a83104a1c2ce158f2034d5e827963ae Author: guyinyou <36399867+guyin...@users.noreply.github.com> AuthorDate: Mon Jun 23 14:33:48 2025 +0800 golang: some optimizations regarding transaction messages and pushconsumer (#1021) Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com> --- golang/consumer_service.go | 6 ++++++ golang/producer.go | 6 ------ golang/push_consumer.go | 1 + 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/golang/consumer_service.go b/golang/consumer_service.go index 846bd04c..4bb42959 100644 --- a/golang/consumer_service.go +++ b/golang/consumer_service.go @@ -25,6 +25,7 @@ import ( type ConsumeService interface { consume(ProcessQueue, []*MessageView) consumeWithDuration(*MessageView, time.Duration, func(ConsumerResult, error)) + Shutdown() error } type baseConsumeService struct { @@ -55,6 +56,11 @@ func (bcs *baseConsumeService) consumeWithDuration(messageView *MessageView, dur time.AfterFunc(duration, func() { bcs.consumptionExecutor.Submit(task) }) } +func (bcs *baseConsumeService) Shutdown() error { + bcs.consumptionExecutor.Shutdown() + return nil +} + func (bcs *baseConsumeService) newConsumeTask(clientId string, messageListener MessageListener, messageView *MessageView, messageInterceptor MessageInterceptor, callback func(ConsumerResult, error)) func() { return func() { consumeResult := FAILURE diff --git a/golang/producer.go b/golang/producer.go index 58d4a226..b30dc5b7 100644 --- a/golang/producer.go +++ b/golang/producer.go @@ -235,12 +235,6 @@ func (p *defaultProducer) send1(ctx context.Context, topic string, messageType v topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx)) return nil, err } - // No need more attempts for transactional message. - if messageType == v2.MessageType_TRANSACTION { - p.cli.log.Errorf("failed to send transactional message finally, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%s, requestId=%s", - topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx)) - return nil, err - } // Try to do more attempts. nextAttempt := attempt + 1 // Retry immediately if the request is not throttled. diff --git a/golang/push_consumer.go b/golang/push_consumer.go index 75a9c26e..f6a446a7 100644 --- a/golang/push_consumer.go +++ b/golang/push_consumer.go @@ -499,6 +499,7 @@ func (pc *defaultPushConsumer) dropProcessQueue(mqstr utils.MessageQueueStr) { } func (pc *defaultPushConsumer) GracefulStop() error { + pc.consumerService.Shutdown() return pc.cli.GracefulStop() }