This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 02b244e Fix producer deadlock after write failure (#378)
02b244e is described below
commit 02b244e1501cf36f5f4bf232deeef65eb9a651ff
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Oct 9 09:14:38 2020 -0700
Fix producer deadlock after write failure (#378)
### Motivation
There is a deadlock that can happen in Go client when the client has a
write failure and tries to process that.
The issue is that Go mutexes are not re-entrant and we trigger a
connection.Close() while already holding the connection mutex.
```
goroutine 1077 [semacquire, 83 minutes]:
sync.runtime_SemacquireMutex(0xc00c31fb04, 0xc110a12000, 0x1)
/usr/local/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc00c31fb00)
/usr/local/go/src/sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
/usr/local/go/src/sync/mutex.go:81
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).Close(0xc00c31fb00)
/go/pkg/mod/cd.splunkdev.com/streamlio/[email protected]/pulsar/internal/connection.go:718
+0x547
github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt(0xc0033926e0,
0xc09ba0fe00)
/go/pkg/mod/cd.splunkdev.com/streamlio/[email protected]/pulsar/producer_partition.go:475
+0x6f0
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt(0xc00c31fb00,
0xc09ba0fe00)
/go/pkg/mod/cd.splunkdev.com/streamlio/[email protected]/pulsar/internal/connection.go:588
+0xee
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand(0xc00c31fb00,
0xc00e40e8c0, 0x0, 0x0)
/go/pkg/mod/cd.splunkdev.com/streamlio/[email protected]/pulsar/internal/connection.go:507
+0x1ce
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc00c31fb00)
/go/pkg/mod/cd.splunkdev.com/streamlio/[email protected]/pulsar/internal/connection.go:368
+0x2db
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1(0xc00c31fb00)
/go/pkg/mod/cd.splunkdev.com/streamlio/[email protected]/pulsar/internal/connection.go:230
+0x71
created by
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start
/go/pkg/mod/cd.splunkdev.com/streamlio/[email protected]/pulsar/internal/connection.go:226
+0x3f
```
### Modifications
We don't need to hold the connection lock while the producer is processing
the write failure. Releasing the lock earlier is fixing the problem.
---
pulsar/internal/connection.go | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 7270c30..d9c124c 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -641,9 +641,10 @@ func (c *connection) handleSendReceipt(response
*pb.CommandSendReceipt) {
producerID := response.GetProducerId()
c.Lock()
- defer c.Unlock()
+ producer, ok := c.listeners[producerID]
+ c.Unlock()
- if producer, ok := c.listeners[producerID]; ok {
+ if ok {
producer.ReceivedSendReceipt(response)
} else {
c.log.WithField("producerID", producerID).Warn("Got unexpected
send receipt for message: ", response.MessageId)