gunli opened a new issue, #1042:
URL: https://github.com/apache/pulsar-client-go/issues/1042
#### Expected behavior
Consume the sendRequests before closing the producer, send and flush them,
invoke the callbacks of the input messages, so that the application can know
the producing is succeed or failed.
#### Actual behavior
Currently, when we close the producer, we forget to consume the sendRequests
in partitionProducer.dataChan before closing.
In the case that producing is faster than consuming in partitionProducer, we
have send a lot of sendRequests into partitionProducer.dataChan, when closing,
many of them are not consumed, these sendRequests will looks like get lost,
their callback won't get invoked, the application won't know the producing
result of these messages.
#### Steps to reproduce
Review the code of
``` go
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
//Register transaction operation to transaction and the transaction
coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
if msg.Transaction != nil {
transactionImpl := (msg.Transaction).(*transaction)
if transactionImpl.state != TxnOpen {
p.log.WithField("state",
transactionImpl.state).Error("Failed to send message" +
" by a non-open transaction.")
callback(nil, msg, newError(InvalidStatus, "Failed to
send message by a non-open transaction."))
return
}
if err := transactionImpl.registerProducerTopic(p.topic); err
!= nil {
callback(nil, msg, err)
return
}
if err := transactionImpl.registerSendOrAckOp(); err != nil {
callback(nil, msg, err)
}
newCallback = func(id MessageID, producerMessage
*ProducerMessage, err error) {
callback(id, producerMessage, err)
transactionImpl.endSendOrAckOp(err)
}
} else {
newCallback = callback
}
if p.getProducerState() != producerReady {
// Producer is closing
newCallback(nil, msg, errProducerClosed)
return
}
// bc only works when DisableBlockIfQueueFull is false
bc := make(chan struct{})
// callbackOnce make sure the callback is only invoked once in chunking
callbackOnce := &sync.Once{}
var txn *transaction
if msg.Transaction != nil {
txn = (msg.Transaction).(*transaction)
}
sr := &sendRequest{
ctx: ctx,
msg: msg,
callback: newCallback,
callbackOnce: callbackOnce,
flushImmediately: flushImmediately,
publishTime: time.Now(),
blockCh: bc,
closeBlockChOnce: &sync.Once{},
transaction: txn,
}
p.options.Interceptors.BeforeSend(p, msg)
p.dataChan <- sr
if !p.options.DisableBlockIfQueueFull {
// block if queue full
<-bc
}
}
func (p *partitionProducer) internalClose(req *closeProducer) {
defer close(req.doneCh)
if !p.casProducerState(producerReady, producerClosing) {
return
}
p.log.Info("Closing producer")
id := p.client.rpcClient.NewRequestID()
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id,
pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
ProducerId: &p.producerID,
RequestId: &id,
})
if err != nil {
p.log.WithError(err).Warn("Failed to close producer")
} else {
p.log.Info("Closed producer")
}
if p.batchBuilder != nil {
if err = p.batchBuilder.Close(); err != nil {
p.log.WithError(err).Warn("Failed to close batch
builder")
}
}
p.setProducerState(producerClosed)
p._getConn().UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()
}
```
#### System configuration
**Pulsar version**: x.y
@zengguan @merlimat @wolfstudy
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]