This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 8dd4ed19 [fix] Avoid a data race when flushing with load (#1261)
8dd4ed19 is described below
commit 8dd4ed19b98ecf9287cbcfc20e2de9a12b197e3f
Author: Gaylor Bosson <[email protected]>
AuthorDate: Tue Jul 30 14:25:42 2024 +0200
[fix] Avoid a data race when flushing with load (#1261)
Fixes #1258
### Motivation
While flushing, the data channel is switched if a new allocated one which
can cause the loss of messages because the length can be zero which would stop
the procedure and at the same time a new message can be sent to the channel.
### Modifications
Instead of allocating a new channel, it empties the existing one up to the
length of the buffer of the channel before proceeding with the flush.
---
pulsar/producer_partition.go | 33 ++++++++++++++++++++-------------
1 file changed, 20 insertions(+), 13 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 5c038aa5..f5fd493b 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -338,10 +338,6 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL
string) error {
p.schemaCache.Put(p.schemaInfo, schemaVersion)
}
- if err != nil {
- return err
- }
-
if !p.options.DisableBatching && p.batchBuilder == nil {
provider, err :=
GetBatcherBuilderProvider(p.options.BatcherBuilderType)
if err != nil {
@@ -1022,15 +1018,7 @@ func (p *partitionProducer)
internalFlushCurrentBatches() {
}
func (p *partitionProducer) internalFlush(fr *flushRequest) {
- // clear all the messages which have sent to dataChan before flush
- if len(p.dataChan) != 0 {
- oldDataChan := p.dataChan
- p.dataChan = make(chan *sendRequest,
p.options.MaxPendingMessages)
- for len(oldDataChan) != 0 {
- pendingData := <-oldDataChan
- p.internalSend(pendingData)
- }
- }
+ p.clearPendingSendRequests()
if !p.options.DisableBatching {
p.internalFlushCurrentBatch()
@@ -1061,6 +1049,25 @@ func (p *partitionProducer) internalFlush(fr
*flushRequest) {
}
}
+// clearPendingSendRequests makes sure to push forward previous sending
requests
+// by emptying the data channel.
+func (p *partitionProducer) clearPendingSendRequests() {
+ sizeBeforeFlushing := len(p.dataChan)
+
+ // Bound the for loop to the current length of the channel to ensure
that it
+ // will eventually stop as we only want to ensure that existing
messages are
+ // flushed.
+ for i := 0; i < sizeBeforeFlushing; i++ {
+ select {
+ case pendingData := <-p.dataChan:
+ p.internalSend(pendingData)
+
+ default:
+ return
+ }
+ }
+}
+
func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage)
(MessageID, error) {
var err error
var msgID MessageID