This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f9969ca8 feat: add sendRequest.done() to release resource together
(#1110)
f9969ca8 is described below
commit f9969ca84e927786ef208abb73de065eb6bfacae
Author: gunli <[email protected]>
AuthorDate: Tue Oct 24 16:42:31 2023 +0800
feat: add sendRequest.done() to release resource together (#1110)
Co-authored-by: gunli <[email protected]>
---
pulsar/producer_partition.go | 93 +++++++++++++++++++++++++++++++++++++-------
1 file changed, 79 insertions(+), 14 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 82123f98..2beb3614 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1419,21 +1419,37 @@ func (p *partitionProducer) Close() {
<-cp.doneCh
}
+//nolint:all
type sendRequest struct {
- ctx context.Context
- msg *ProducerMessage
- callback func(MessageID, *ProducerMessage, error)
- callbackOnce *sync.Once
- publishTime time.Time
- flushImmediately bool
- blockCh chan struct{}
- closeBlockChOnce *sync.Once
- totalChunks int
- chunkID int
- uuid string
- chunkRecorder *chunkRecorder
- transaction *transaction
- reservedMem int64
+ ctx context.Context
+ msg *ProducerMessage
+ callback func(MessageID, *ProducerMessage, error)
+ callbackOnce *sync.Once
+ publishTime time.Time
+ flushImmediately bool
+ blockCh chan struct{}
+ closeBlockChOnce *sync.Once
+ totalChunks int
+ chunkID int
+ uuid string
+ chunkRecorder *chunkRecorder
+ transaction *transaction
+ reservedMem int64
+ producer *partitionProducer
+ memLimit internal.MemoryLimitController
+ semaphore internal.Semaphore
+ reservedSemaphore int
+ sendAsBatch bool
+ schema Schema
+ schemaVersion []byte
+ uncompressedPayload []byte
+ uncompressedSize int64
+ compressedPayload []byte
+ compressedSize int
+ payloadChunkSize int
+ mm *pb.MessageMetadata
+ deliverAt time.Time
+ maxMessageSize int32
}
// stopBlock can be invoked multiple times safety
@@ -1443,6 +1459,55 @@ func (sr *sendRequest) stopBlock() {
})
}
+//nolint:all
+func (sr *sendRequest) done(msgID MessageID, err error) {
+ if err == nil {
+
sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano())
/ 1.0e9)
+ sr.producer.metrics.MessagesPublished.Inc()
+ sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
+ }
+
+ if err != nil {
+ sr.producer.log.WithError(err).
+ WithField("size", sr.reservedMem).
+ WithField("properties", sr.msg.Properties)
+ }
+
+ if err == errSendTimeout {
+ sr.producer.metrics.PublishErrorsTimeout.Inc()
+ }
+
+ if err == errMessageTooLarge {
+ sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
+ }
+
+ if sr.semaphore != nil {
+ for i := 0; i < sr.reservedSemaphore; i++ {
+ sr.semaphore.Release()
+ }
+
sr.producer.metrics.MessagesPending.Sub(float64(sr.reservedSemaphore))
+ }
+
+ if sr.memLimit != nil {
+ sr.memLimit.ReleaseMemory(sr.reservedMem)
+ sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
+ }
+
+ if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
+ sr.callbackOnce.Do(func() {
+ runCallback(sr.callback, msgID, sr.msg, err)
+ })
+
+ if sr.transaction != nil {
+ sr.transaction.endSendOrAckOp(err)
+ }
+
+ if sr.producer.options.Interceptors != nil && err == nil {
+
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg,
msgID)
+ }
+ }
+}
+
type closeProducer struct {
doneCh chan struct{}
}