tisonkun commented on code in PR #1118:
URL: https://github.com/apache/pulsar-client-go/pull/1118#discussion_r1370162360
##########
pulsar/producer_partition.go:
##########
@@ -1145,57 +1165,35 @@ func (p *partitionProducer) internalSendAsync(
return
}
- // Register transaction operation to transaction and the transaction
coordinator.
- var newCallback func(MessageID, *ProducerMessage, error)
- var txn *transaction
- if msg.Transaction != nil {
- transactionImpl := (msg.Transaction).(*transaction)
- txn = transactionImpl
- if transactionImpl.state != TxnOpen {
- p.log.WithField("state",
transactionImpl.state).Error("Failed to send message" +
- " by a non-open transaction.")
- runCallback(callback, nil, msg, newError(InvalidStatus,
"Failed to send message by a non-open transaction."))
- return
- }
-
- if err := transactionImpl.registerProducerTopic(p.topic); err
!= nil {
- runCallback(callback, nil, msg, err)
- return
- }
- if err := transactionImpl.registerSendOrAckOp(); err != nil {
- runCallback(callback, nil, msg, err)
- return
- }
- newCallback = func(id MessageID, producerMessage
*ProducerMessage, err error) {
- runCallback(callback, id, producerMessage, err)
- transactionImpl.endSendOrAckOp(err)
- }
- } else {
- newCallback = callback
- }
if p.getProducerState() != producerReady {
// Producer is closing
- runCallback(newCallback, nil, msg, errProducerClosed)
+ runCallback(callback, nil, msg, errProducerClosed)
Review Comment:
The real engineering issue is we don't fulfill `sr` at this point. We may
move sr construction here and use `sr.done` consistently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]