This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e61e9666 Revert "feat: add sendRequest.done() to release resource 
together (#1110)"
e61e9666 is described below

commit e61e96662cfc850bdda48a7ff70541c81ddf0cde
Author: tison <[email protected]>
AuthorDate: Tue Oct 24 19:29:32 2023 +0800

    Revert "feat: add sendRequest.done() to release resource together (#1110)"
    
    This needs a rework.
    
    This reverts commit f9969ca84e927786ef208abb73de065eb6bfacae.
---
 pulsar/producer_partition.go | 93 +++++++-------------------------------------
 1 file changed, 14 insertions(+), 79 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 2beb3614..82123f98 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1419,37 +1419,21 @@ func (p *partitionProducer) Close() {
        <-cp.doneCh
 }
 
-//nolint:all
 type sendRequest struct {
-       ctx                 context.Context
-       msg                 *ProducerMessage
-       callback            func(MessageID, *ProducerMessage, error)
-       callbackOnce        *sync.Once
-       publishTime         time.Time
-       flushImmediately    bool
-       blockCh             chan struct{}
-       closeBlockChOnce    *sync.Once
-       totalChunks         int
-       chunkID             int
-       uuid                string
-       chunkRecorder       *chunkRecorder
-       transaction         *transaction
-       reservedMem         int64
-       producer            *partitionProducer
-       memLimit            internal.MemoryLimitController
-       semaphore           internal.Semaphore
-       reservedSemaphore   int
-       sendAsBatch         bool
-       schema              Schema
-       schemaVersion       []byte
-       uncompressedPayload []byte
-       uncompressedSize    int64
-       compressedPayload   []byte
-       compressedSize      int
-       payloadChunkSize    int
-       mm                  *pb.MessageMetadata
-       deliverAt           time.Time
-       maxMessageSize      int32
+       ctx              context.Context
+       msg              *ProducerMessage
+       callback         func(MessageID, *ProducerMessage, error)
+       callbackOnce     *sync.Once
+       publishTime      time.Time
+       flushImmediately bool
+       blockCh          chan struct{}
+       closeBlockChOnce *sync.Once
+       totalChunks      int
+       chunkID          int
+       uuid             string
+       chunkRecorder    *chunkRecorder
+       transaction      *transaction
+       reservedMem      int64
 }
 
 // stopBlock can be invoked multiple times safety
@@ -1459,55 +1443,6 @@ func (sr *sendRequest) stopBlock() {
        })
 }
 
-//nolint:all
-func (sr *sendRequest) done(msgID MessageID, err error) {
-       if err == nil {
-               
sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano())
 / 1.0e9)
-               sr.producer.metrics.MessagesPublished.Inc()
-               sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
-       }
-
-       if err != nil {
-               sr.producer.log.WithError(err).
-                       WithField("size", sr.reservedMem).
-                       WithField("properties", sr.msg.Properties)
-       }
-
-       if err == errSendTimeout {
-               sr.producer.metrics.PublishErrorsTimeout.Inc()
-       }
-
-       if err == errMessageTooLarge {
-               sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
-       }
-
-       if sr.semaphore != nil {
-               for i := 0; i < sr.reservedSemaphore; i++ {
-                       sr.semaphore.Release()
-               }
-               
sr.producer.metrics.MessagesPending.Sub(float64(sr.reservedSemaphore))
-       }
-
-       if sr.memLimit != nil {
-               sr.memLimit.ReleaseMemory(sr.reservedMem)
-               sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
-       }
-
-       if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
-               sr.callbackOnce.Do(func() {
-                       runCallback(sr.callback, msgID, sr.msg, err)
-               })
-
-               if sr.transaction != nil {
-                       sr.transaction.endSendOrAckOp(err)
-               }
-
-               if sr.producer.options.Interceptors != nil && err == nil {
-                       
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg, 
msgID)
-               }
-       }
-}
-
 type closeProducer struct {
        doneCh chan struct{}
 }

Reply via email to