Copilot commented on code in PR #1501:
URL: https://github.com/apache/pulsar-client-go/pull/1501#discussion_r3309813776
##########
pulsar/producer_partition.go:
##########
@@ -1700,6 +1721,8 @@ func (sr *sendRequest) done(msgID MessageID, err error) {
if pool != nil {
// reset all the fields
*sr = sendRequest{}
+ // Keep the guard raised until the object is reinitialized from
the pool.
+ sr.doneFlag.Store(true)
Review Comment:
`done()` resets the entire struct via `*sr = sendRequest{}` after setting
`doneFlag` at the top. That assignment temporarily clears `doneFlag` back to
false, reopening a race window where a concurrent second `done()` call can pass
the CAS and run on a partially-reset object. To keep idempotency under
concurrent calls, avoid overwriting `doneFlag` during reset (e.g., clear fields
individually or use a reset helper that preserves `doneFlag` as true throughout
reset+Put).
##########
pulsar/producer_partition.go:
##########
@@ -1648,7 +1609,67 @@ type sendRequest struct {
maxMessageSize int32
}
+func newSendRequest(
+ ctx context.Context,
+ p *partitionProducer,
+ msg *ProducerMessage,
+ callback func(MessageID, *ProducerMessage, error),
+ flushImmediately bool,
+) *sendRequest {
+ sr := sendRequestPool.Get().(*sendRequest)
+ *sr = sendRequest{
+ pool: sendRequestPool,
+ ctx: ctx,
+ msg: msg,
+ producer: p,
+ callback: callback,
+ callbackOnce: &sync.Once{},
+ flushImmediately: flushImmediately,
+ publishTime: time.Now(),
+ chunkID: -1,
+ }
+ return sr
+}
+
+func newChunkSendRequest(p *sendRequest, chunkID int, uuid string, cr
*chunkRecorder, reservedMem int64) *sendRequest {
+ sr := sendRequestPool.Get().(*sendRequest)
+ *sr = sendRequest{
+ pool: sendRequestPool,
+ ctx: p.ctx,
+ msg: p.msg,
+ producer: p.producer,
+ callback: p.callback,
+ callbackOnce: p.callbackOnce,
+ publishTime: p.publishTime,
+ flushImmediately: p.flushImmediately,
+ totalChunks: p.totalChunks,
+ chunkID: chunkID,
+ uuid: uuid,
+ chunkRecorder: cr,
+ transaction: p.transaction,
+ memLimit: p.memLimit,
+ semaphore: p.semaphore,
+ reservedMem: reservedMem,
+ sendAsBatch: p.sendAsBatch,
+ schema: p.schema,
+ schemaVersion: p.schemaVersion,
+ uncompressedPayload: p.uncompressedPayload,
+ uncompressedSize: p.uncompressedSize,
+ compressedPayload: p.compressedPayload,
+ compressedSize: p.compressedSize,
+ payloadChunkSize: p.payloadChunkSize,
+ mm: p.mm,
+ deliverAt: p.deliverAt,
+ maxMessageSize: p.maxMessageSize,
+ }
Review Comment:
`newChunkSendRequest` also uses `*sr = sendRequest{...}` to initialize a
pooled object containing `atomic.Bool`. This both violates the atomic-copying
constraint and can clear `doneFlag` to false during initialization,
reintroducing the stale-pointer race the guard is meant to prevent. Initialize
fields without overwriting the atomic field and only clear the guard
(`doneFlag.Store(false)`) as the final step once the object is fully
initialized.
##########
pulsar/producer_partition.go:
##########
@@ -1648,7 +1609,67 @@ type sendRequest struct {
maxMessageSize int32
}
+func newSendRequest(
+ ctx context.Context,
+ p *partitionProducer,
+ msg *ProducerMessage,
+ callback func(MessageID, *ProducerMessage, error),
+ flushImmediately bool,
+) *sendRequest {
+ sr := sendRequestPool.Get().(*sendRequest)
+ *sr = sendRequest{
+ pool: sendRequestPool,
+ ctx: ctx,
+ msg: msg,
+ producer: p,
+ callback: callback,
+ callbackOnce: &sync.Once{},
+ flushImmediately: flushImmediately,
+ publishTime: time.Now(),
+ chunkID: -1,
+ }
+ return sr
+}
+
+func newChunkSendRequest(p *sendRequest, chunkID int, uuid string, cr
*chunkRecorder, reservedMem int64) *sendRequest {
+ sr := sendRequestPool.Get().(*sendRequest)
+ *sr = sendRequest{
+ pool: sendRequestPool,
+ ctx: p.ctx,
+ msg: p.msg,
+ producer: p.producer,
+ callback: p.callback,
+ callbackOnce: p.callbackOnce,
+ publishTime: p.publishTime,
+ flushImmediately: p.flushImmediately,
+ totalChunks: p.totalChunks,
+ chunkID: chunkID,
+ uuid: uuid,
+ chunkRecorder: cr,
+ transaction: p.transaction,
+ memLimit: p.memLimit,
+ semaphore: p.semaphore,
+ reservedMem: reservedMem,
+ sendAsBatch: p.sendAsBatch,
+ schema: p.schema,
+ schemaVersion: p.schemaVersion,
+ uncompressedPayload: p.uncompressedPayload,
+ uncompressedSize: p.uncompressedSize,
+ compressedPayload: p.compressedPayload,
+ compressedSize: p.compressedSize,
+ payloadChunkSize: p.payloadChunkSize,
+ mm: p.mm,
+ deliverAt: p.deliverAt,
+ maxMessageSize: p.maxMessageSize,
+ }
Review Comment:
`newSendRequest` reinitializes pooled objects with `*sr = sendRequest{...}`.
Since `sendRequest` now contains an `atomic.Bool`, copying/overwriting it via
struct assignment is unsafe (atomic types must not be copied after first use)
and also drops the guard to false early during initialization, allowing
stale-pointer `done()` calls to proceed if they race with pool reuse. Prefer
preserving `doneFlag` as true while filling fields, then explicitly
`Store(false)` as the last init step, without struct-assigning over the atomic
field.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]