This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 5f8df27 [Refactor] refactor duplicated code lines and fix typo errors
(#1039)
5f8df27 is described below
commit 5f8df2782251226b63b2dad9f4e87fd4bbd31ef5
Author: gunli <[email protected]>
AuthorDate: Mon Jul 3 09:57:14 2023 +0800
[Refactor] refactor duplicated code lines and fix typo errors (#1039)
* [Refactor] refactor duplicated code lines and fix typo errors
* [typo] revert
* [typo] revert
* [typo] revert
* [refactor] delete redunpdant code lines
* [typo] revert some words
* [fix] set useTxn when use transaction
---------
Co-authored-by: gunli <[email protected]>
---
pulsar/producer_partition.go | 94 +++++++++++++++++++++-----------------------
1 file changed, 45 insertions(+), 49 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
old mode 100644
new mode 100755
index 98c6c98..9d04427
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -710,17 +710,19 @@ func addRequestToBatch(smm *pb.SingleMessageMetadata, p
*partitionProducer,
uncompressedPayload []byte,
request *sendRequest, msg *ProducerMessage, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool) bool {
- var ok bool
+ var useTxn bool
+ var mostSigBits uint64
+ var leastSigBits uint64
if request.transaction != nil {
txnID := request.transaction.GetTxnID()
- ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator,
uncompressedPayload, request,
- msg.ReplicationClusters, deliverAt, schemaVersion,
multiSchemaEnabled, true, txnID.MostSigBits,
- txnID.LeastSigBits)
- } else {
- ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator,
uncompressedPayload, request,
- msg.ReplicationClusters, deliverAt, schemaVersion,
multiSchemaEnabled, false, 0, 0)
+ useTxn = true
+ mostSigBits = txnID.MostSigBits
+ leastSigBits = txnID.LeastSigBits
}
- return ok
+
+ return p.batchBuilder.Add(smm, p.sequenceIDGenerator,
uncompressedPayload, request,
+ msg.ReplicationClusters, deliverAt, schemaVersion,
multiSchemaEnabled, useTxn, mostSigBits,
+ leastSigBits)
}
func (p *partitionProducer) genMetadata(msg *ProducerMessage,
@@ -764,6 +766,14 @@ func (p *partitionProducer) updateMetadataSeqID(mm
*pb.MessageMetadata, msg *Pro
}
}
+func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm
*pb.SingleMessageMetadata, msg *ProducerMessage) {
+ if msg.SequenceID != nil {
+ smm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
+ } else {
+ smm.SequenceId =
proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
+ }
+}
+
func (p *partitionProducer) genSingleMessageMetadataInBatch(msg
*ProducerMessage,
uncompressedSize int) (smm *pb.SingleMessageMetadata) {
smm = &pb.SingleMessageMetadata{
@@ -786,14 +796,7 @@ func (p *partitionProducer)
genSingleMessageMetadataInBatch(msg *ProducerMessage
smm.Properties = internal.ConvertFromStringMap(msg.Properties)
}
- var sequenceID uint64
- if msg.SequenceID != nil {
- sequenceID = uint64(*msg.SequenceID)
- } else {
- sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
- }
-
- smm.SequenceId = proto.Uint64(sequenceID)
+ p.updateSingleMessageMetadataSeqID(smm, msg)
return
}
@@ -813,35 +816,30 @@ func (p *partitionProducer) internalSingleSend(mm
*pb.MessageMetadata,
}
sid := *mm.SequenceId
- var err error
+ var useTxn bool
+ var mostSigBits uint64
+ var leastSigBits uint64
+
if request.transaction != nil {
txnID := request.transaction.GetTxnID()
- err = internal.SingleSend(
- buffer,
- p.producerID,
- sid,
- mm,
- payloadBuf,
- p.encryptor,
- maxMessageSize,
- true,
- txnID.MostSigBits,
- txnID.LeastSigBits,
- )
- } else {
- err = internal.SingleSend(
- buffer,
- p.producerID,
- sid,
- mm,
- payloadBuf,
- p.encryptor,
- maxMessageSize,
- false,
- 0,
- 0,
- )
- }
+ useTxn = true
+ mostSigBits = txnID.MostSigBits
+ leastSigBits = txnID.LeastSigBits
+ }
+
+ err := internal.SingleSend(
+ buffer,
+ p.producerID,
+ sid,
+ mm,
+ payloadBuf,
+ p.encryptor,
+ maxMessageSize,
+ useTxn,
+ mostSigBits,
+ leastSigBits,
+ )
+
if err != nil {
runCallback(request.callback, nil, request.msg, err)
p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
@@ -1001,7 +999,7 @@ func (p *partitionProducer) failTimeoutMessages() {
}
}
- // flag the send has completed with error, flush make
no effect
+ // flag the sending has completed with error, flush
make no effect
pi.Complete()
pi.Unlock()
@@ -1116,8 +1114,10 @@ func (p *partitionProducer) internalSendAsync(ctx
context.Context, msg *Producer
callback func(MessageID, *ProducerMessage, error), flushImmediately
bool) {
// Register transaction operation to transaction and the transaction
coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
+ var txn *transaction
if msg.Transaction != nil {
transactionImpl := (msg.Transaction).(*transaction)
+ txn = transactionImpl
if transactionImpl.state != TxnOpen {
p.log.WithField("state",
transactionImpl.state).Error("Failed to send message" +
" by a non-open transaction.")
@@ -1150,10 +1150,6 @@ func (p *partitionProducer) internalSendAsync(ctx
context.Context, msg *Producer
// callbackOnce make sure the callback is only invoked once in chunking
callbackOnce := &sync.Once{}
- var txn *transaction
- if msg.Transaction != nil {
- txn = (msg.Transaction).(*transaction)
- }
sr := &sendRequest{
ctx: ctx,
msg: msg,
@@ -1398,7 +1394,7 @@ func (p *partitionProducer) _setConn(conn
internal.Connection) {
// _getConn returns internal connection field of this partition producer
atomically.
// Note: should only be called by this partition producer before attempting to
use the connection
func (p *partitionProducer) _getConn() internal.Connection {
- // Invariant: The conn must be non-nill for the lifetime of the
partitionProducer.
+ // Invariant: The conn must be non-nil for the lifetime of the
partitionProducer.
// For this reason we leave this cast unchecked and panic()
if the
// invariant is broken
return p.conn.Load().(internal.Connection)