This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f8df27  [Refactor] refactor duplicated code lines and fix typo errors 
(#1039)
5f8df27 is described below

commit 5f8df2782251226b63b2dad9f4e87fd4bbd31ef5
Author: gunli <[email protected]>
AuthorDate: Mon Jul 3 09:57:14 2023 +0800

    [Refactor] refactor duplicated code lines and fix typo errors (#1039)
    
    * [Refactor] refactor duplicated code lines and fix typo errors
    
    * [typo] revert
    
    * [typo] revert
    
    * [typo] revert
    
    * [refactor] delete redunpdant code lines
    
    * [typo] revert some words
    
    * [fix] set useTxn when use transaction
    
    ---------
    
    Co-authored-by: gunli <[email protected]>
---
 pulsar/producer_partition.go | 94 +++++++++++++++++++++-----------------------
 1 file changed, 45 insertions(+), 49 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
old mode 100644
new mode 100755
index 98c6c98..9d04427
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -710,17 +710,19 @@ func addRequestToBatch(smm *pb.SingleMessageMetadata, p 
*partitionProducer,
        uncompressedPayload []byte,
        request *sendRequest, msg *ProducerMessage, deliverAt time.Time,
        schemaVersion []byte, multiSchemaEnabled bool) bool {
-       var ok bool
+       var useTxn bool
+       var mostSigBits uint64
+       var leastSigBits uint64
        if request.transaction != nil {
                txnID := request.transaction.GetTxnID()
-               ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
-                       msg.ReplicationClusters, deliverAt, schemaVersion, 
multiSchemaEnabled, true, txnID.MostSigBits,
-                       txnID.LeastSigBits)
-       } else {
-               ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
-                       msg.ReplicationClusters, deliverAt, schemaVersion, 
multiSchemaEnabled, false, 0, 0)
+               useTxn = true
+               mostSigBits = txnID.MostSigBits
+               leastSigBits = txnID.LeastSigBits
        }
-       return ok
+
+       return p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
uncompressedPayload, request,
+               msg.ReplicationClusters, deliverAt, schemaVersion, 
multiSchemaEnabled, useTxn, mostSigBits,
+               leastSigBits)
 }
 
 func (p *partitionProducer) genMetadata(msg *ProducerMessage,
@@ -764,6 +766,14 @@ func (p *partitionProducer) updateMetadataSeqID(mm 
*pb.MessageMetadata, msg *Pro
        }
 }
 
+func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm 
*pb.SingleMessageMetadata, msg *ProducerMessage) {
+       if msg.SequenceID != nil {
+               smm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+       } else {
+               smm.SequenceId = 
proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+       }
+}
+
 func (p *partitionProducer) genSingleMessageMetadataInBatch(msg 
*ProducerMessage,
        uncompressedSize int) (smm *pb.SingleMessageMetadata) {
        smm = &pb.SingleMessageMetadata{
@@ -786,14 +796,7 @@ func (p *partitionProducer) 
genSingleMessageMetadataInBatch(msg *ProducerMessage
                smm.Properties = internal.ConvertFromStringMap(msg.Properties)
        }
 
-       var sequenceID uint64
-       if msg.SequenceID != nil {
-               sequenceID = uint64(*msg.SequenceID)
-       } else {
-               sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
-       }
-
-       smm.SequenceId = proto.Uint64(sequenceID)
+       p.updateSingleMessageMetadataSeqID(smm, msg)
 
        return
 }
@@ -813,35 +816,30 @@ func (p *partitionProducer) internalSingleSend(mm 
*pb.MessageMetadata,
        }
 
        sid := *mm.SequenceId
-       var err error
+       var useTxn bool
+       var mostSigBits uint64
+       var leastSigBits uint64
+
        if request.transaction != nil {
                txnID := request.transaction.GetTxnID()
-               err = internal.SingleSend(
-                       buffer,
-                       p.producerID,
-                       sid,
-                       mm,
-                       payloadBuf,
-                       p.encryptor,
-                       maxMessageSize,
-                       true,
-                       txnID.MostSigBits,
-                       txnID.LeastSigBits,
-               )
-       } else {
-               err = internal.SingleSend(
-                       buffer,
-                       p.producerID,
-                       sid,
-                       mm,
-                       payloadBuf,
-                       p.encryptor,
-                       maxMessageSize,
-                       false,
-                       0,
-                       0,
-               )
-       }
+               useTxn = true
+               mostSigBits = txnID.MostSigBits
+               leastSigBits = txnID.LeastSigBits
+       }
+
+       err := internal.SingleSend(
+               buffer,
+               p.producerID,
+               sid,
+               mm,
+               payloadBuf,
+               p.encryptor,
+               maxMessageSize,
+               useTxn,
+               mostSigBits,
+               leastSigBits,
+       )
+
        if err != nil {
                runCallback(request.callback, nil, request.msg, err)
                p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
@@ -1001,7 +999,7 @@ func (p *partitionProducer) failTimeoutMessages() {
                                }
                        }
 
-                       // flag the send has completed with error, flush make 
no effect
+                       // flag the sending has completed with error, flush 
make no effect
                        pi.Complete()
                        pi.Unlock()
 
@@ -1116,8 +1114,10 @@ func (p *partitionProducer) internalSendAsync(ctx 
context.Context, msg *Producer
        callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
        // Register transaction operation to transaction and the transaction 
coordinator.
        var newCallback func(MessageID, *ProducerMessage, error)
+       var txn *transaction
        if msg.Transaction != nil {
                transactionImpl := (msg.Transaction).(*transaction)
+               txn = transactionImpl
                if transactionImpl.state != TxnOpen {
                        p.log.WithField("state", 
transactionImpl.state).Error("Failed to send message" +
                                " by a non-open transaction.")
@@ -1150,10 +1150,6 @@ func (p *partitionProducer) internalSendAsync(ctx 
context.Context, msg *Producer
 
        // callbackOnce make sure the callback is only invoked once in chunking
        callbackOnce := &sync.Once{}
-       var txn *transaction
-       if msg.Transaction != nil {
-               txn = (msg.Transaction).(*transaction)
-       }
        sr := &sendRequest{
                ctx:              ctx,
                msg:              msg,
@@ -1398,7 +1394,7 @@ func (p *partitionProducer) _setConn(conn 
internal.Connection) {
 // _getConn returns internal connection field of this partition producer 
atomically.
 // Note: should only be called by this partition producer before attempting to 
use the connection
 func (p *partitionProducer) _getConn() internal.Connection {
-       // Invariant: The conn must be non-nill for the lifetime of the 
partitionProducer.
+       // Invariant: The conn must be non-nil for the lifetime of the 
partitionProducer.
        //            For this reason we leave this cast unchecked and panic() 
if the
        //            invariant is broken
        return p.conn.Load().(internal.Connection)

Reply via email to