This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new e61e9666 Revert "feat: add sendRequest.done() to release resource
together (#1110)"
e61e9666 is described below
commit e61e96662cfc850bdda48a7ff70541c81ddf0cde
Author: tison <[email protected]>
AuthorDate: Tue Oct 24 19:29:32 2023 +0800
Revert "feat: add sendRequest.done() to release resource together (#1110)"
This needs a rework.
This reverts commit f9969ca84e927786ef208abb73de065eb6bfacae.
---
pulsar/producer_partition.go | 93 +++++++-------------------------------------
1 file changed, 14 insertions(+), 79 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 2beb3614..82123f98 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1419,37 +1419,21 @@ func (p *partitionProducer) Close() {
<-cp.doneCh
}
-//nolint:all
type sendRequest struct {
- ctx context.Context
- msg *ProducerMessage
- callback func(MessageID, *ProducerMessage, error)
- callbackOnce *sync.Once
- publishTime time.Time
- flushImmediately bool
- blockCh chan struct{}
- closeBlockChOnce *sync.Once
- totalChunks int
- chunkID int
- uuid string
- chunkRecorder *chunkRecorder
- transaction *transaction
- reservedMem int64
- producer *partitionProducer
- memLimit internal.MemoryLimitController
- semaphore internal.Semaphore
- reservedSemaphore int
- sendAsBatch bool
- schema Schema
- schemaVersion []byte
- uncompressedPayload []byte
- uncompressedSize int64
- compressedPayload []byte
- compressedSize int
- payloadChunkSize int
- mm *pb.MessageMetadata
- deliverAt time.Time
- maxMessageSize int32
+ ctx context.Context
+ msg *ProducerMessage
+ callback func(MessageID, *ProducerMessage, error)
+ callbackOnce *sync.Once
+ publishTime time.Time
+ flushImmediately bool
+ blockCh chan struct{}
+ closeBlockChOnce *sync.Once
+ totalChunks int
+ chunkID int
+ uuid string
+ chunkRecorder *chunkRecorder
+ transaction *transaction
+ reservedMem int64
}
// stopBlock can be invoked multiple times safety
@@ -1459,55 +1443,6 @@ func (sr *sendRequest) stopBlock() {
})
}
-//nolint:all
-func (sr *sendRequest) done(msgID MessageID, err error) {
- if err == nil {
-
sr.producer.metrics.PublishLatency.Observe(float64(time.Now().UnixNano()-sr.publishTime.UnixNano())
/ 1.0e9)
- sr.producer.metrics.MessagesPublished.Inc()
- sr.producer.metrics.BytesPublished.Add(float64(sr.reservedMem))
- }
-
- if err != nil {
- sr.producer.log.WithError(err).
- WithField("size", sr.reservedMem).
- WithField("properties", sr.msg.Properties)
- }
-
- if err == errSendTimeout {
- sr.producer.metrics.PublishErrorsTimeout.Inc()
- }
-
- if err == errMessageTooLarge {
- sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
- }
-
- if sr.semaphore != nil {
- for i := 0; i < sr.reservedSemaphore; i++ {
- sr.semaphore.Release()
- }
-
sr.producer.metrics.MessagesPending.Sub(float64(sr.reservedSemaphore))
- }
-
- if sr.memLimit != nil {
- sr.memLimit.ReleaseMemory(sr.reservedMem)
- sr.producer.metrics.BytesPending.Sub(float64(sr.reservedMem))
- }
-
- if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
- sr.callbackOnce.Do(func() {
- runCallback(sr.callback, msgID, sr.msg, err)
- })
-
- if sr.transaction != nil {
- sr.transaction.endSendOrAckOp(err)
- }
-
- if sr.producer.options.Interceptors != nil && err == nil {
-
sr.producer.options.Interceptors.OnSendAcknowledgement(sr.producer, sr.msg,
msgID)
- }
- }
-}
-
type closeProducer struct {
doneCh chan struct{}
}