gunli commented on PR #1036:
URL: 
https://github.com/apache/pulsar-client-go/pull/1036#issuecomment-1612939452

   > 
   
   No, the callback here is just the signature, users can input a nil one.
   
   ``` go
   func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
        callback func(MessageID, *ProducerMessage, error)) {
        p.getPartition(msg).SendAsync(ctx, msg, callback)
   }
   
   func (p *partitionProducer) SendAsync(ctx context.Context, msg 
*ProducerMessage,
        callback func(MessageID, *ProducerMessage, error)) {
        p.internalSendAsync(ctx, msg, callback, false)
   }
   
   func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
        callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
        // Register transaction operation to transaction and the transaction 
coordinator.
        var newCallback func(MessageID, *ProducerMessage, error)
        var txn *transaction
        if msg.Transaction != nil {
                transactionImpl := (msg.Transaction).(*transaction)
                txn = transactionImpl
                if transactionImpl.state != TxnOpen {
                        p.log.WithField("state", 
transactionImpl.state).Error("Failed to send message" +
                                " by a non-open transaction.")
                        callback(nil, msg, newError(InvalidStatus, "Failed to 
send message by a non-open transaction."))
                        return
                }
   
                if err := transactionImpl.registerProducerTopic(p.topic); err 
!= nil {
                        callback(nil, msg, err)
                        return
                }
                if err := transactionImpl.registerSendOrAckOp(); err != nil {
                        callback(nil, msg, err)
                }
                newCallback = func(id MessageID, producerMessage 
*ProducerMessage, err error) {
                        callback(id, producerMessage, err)
                        transactionImpl.endSendOrAckOp(err)
                }
        } else {
                newCallback = callback
        }
        if p.getProducerState() != producerReady {
                // Producer is closing
                newCallback(nil, msg, errProducerClosed)
                return
        }
   
        // bc only works when DisableBlockIfQueueFull is false
        bc := make(chan struct{})
   
        // callbackOnce make sure the callback is only invoked once in chunking
        callbackOnce := &sync.Once{}
        sr := &sendRequest{
                ctx:              ctx,
                msg:              msg,
                callback:         newCallback,
                callbackOnce:     callbackOnce,
                flushImmediately: flushImmediately,
                publishTime:      time.Now(),
                blockCh:          bc,
                closeBlockChOnce: &sync.Once{},
                transaction:      txn,
        }
        p.options.Interceptors.BeforeSend(p, msg)
   
        p.dataChan <- sr
   
        if !p.options.DisableBlockIfQueueFull {
                // block if queue full
                <-bc
        }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to