gunli opened a new issue, #1040:
URL: https://github.com/apache/pulsar-client-go/issues/1040
#### Expected behavior
Should we return when `transactionImpl.registerSendOrAckOp()` returns an
error?
#### Actual behavior
Keep going when `transactionImpl.registerSendOrAckOp()` returns an error,
is this a bug?
``` go
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg
*ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
//Register transaction operation to transaction and the transaction
coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
if msg.Transaction != nil {
transactionImpl := (msg.Transaction).(*transaction)
if transactionImpl.state != TxnOpen {
p.log.WithField("state",
transactionImpl.state).Error("Failed to send message" +
" by a non-open transaction.")
callback(nil, msg, newError(InvalidStatus, "Failed to
send message by a non-open transaction."))
return
}
if err := transactionImpl.registerProducerTopic(p.topic); err
!= nil {
callback(nil, msg, err)
return
}
if err := transactionImpl.registerSendOrAckOp(); err != nil {
callback(nil, msg, err)
//Should WE RETURN HERE ???
}
newCallback = func(id MessageID, producerMessage
*ProducerMessage, err error) {
callback(id, producerMessage, err)
transactionImpl.endSendOrAckOp(err)
}
} else {
newCallback = callback
}
if p.getProducerState() != producerReady {
// Producer is closing
newCallback(nil, msg, errProducerClosed)
return
}
// bc only works when DisableBlockIfQueueFull is false
bc := make(chan struct{})
// callbackOnce make sure the callback is only invoked once in chunking
callbackOnce := &sync.Once{}
var txn *transaction
if msg.Transaction != nil {
txn = (msg.Transaction).(*transaction)
}
sr := &sendRequest{
ctx: ctx,
msg: msg,
callback: newCallback,
callbackOnce: callbackOnce,
flushImmediately: flushImmediately,
publishTime: time.Now(),
blockCh: bc,
closeBlockChOnce: &sync.Once{},
transaction: txn,
}
p.options.Interceptors.BeforeSend(p, msg)
p.dataChan <- sr
if !p.options.DisableBlockIfQueueFull {
// block if queue full
<-bc
}
}
```
#### Steps to reproduce
How can we reproduce the issue
#### System configuration
**Pulsar version**: x.y
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]