This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3378790 Fixed locking between connection_reader and connection (#84)
3378790 is described below
commit 3378790a219271acb8ee84b3428ebf8e4d58946c
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Nov 9 10:11:23 2019 -0800
Fixed locking between connection_reader and connection (#84)
---
pulsar/impl_partition_consumer.go | 6 ++----
pulsar/internal/connection.go | 29 +++++++++++++++++------------
pulsar/internal/connection_reader.go | 6 +++---
3 files changed, 22 insertions(+), 19 deletions(-)
diff --git a/pulsar/impl_partition_consumer.go
b/pulsar/impl_partition_consumer.go
index 58b3ffe..83e09ad 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -600,11 +600,9 @@ func (pc *partitionConsumer) internalFlow(permits uint32)
error {
return nil
}
-func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage,
headersAndPayload []byte) error {
+func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage,
headersAndPayload internal.Buffer) error {
pbMsgID := response.GetMessageId()
-
- reader := internal.NewMessageReader(headersAndPayload)
-
+ reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
msgMeta, err := reader.ReadMessageMetadata()
if err != nil {
// TODO send discardCorruptedMessage
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 2e933b8..a68e34a 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -66,7 +66,7 @@ type Connection interface {
}
type ConsumerHandler interface {
- MessageReceived(response *pb.CommandMessage, headersAndPayload []byte)
error
+ MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer)
error
// ConnectionClosed close the TCP connection.
ConnectionClosed()
@@ -107,6 +107,11 @@ type request struct {
callback func(command *pb.BaseCommand, err error)
}
+type incomingCmd struct {
+ cmd *pb.BaseCommand
+ headersAndPayload Buffer
+}
+
type connection struct {
sync.Mutex
cond *sync.Cond
@@ -129,9 +134,9 @@ type connection struct {
requestIDGenerator uint64
incomingRequestsCh chan *request
+ incomingCmdCh chan *incomingCmd
writeRequestsCh chan []byte
- mapMutex sync.RWMutex
pendingReqs map[uint64]*request
listeners map[uint64]ConnectionListener
@@ -156,6 +161,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr
*url.URL, tlsOptions *TLSO
auth: auth,
incomingRequestsCh: make(chan *request),
+ incomingCmdCh: make(chan *incomingCmd),
writeRequestsCh: make(chan []byte),
listeners: make(map[uint64]ConnectionListener),
consumerHandlers: make(map[uint64]ConsumerHandler),
@@ -280,11 +286,12 @@ func (c *connection) run() {
if req == nil {
return
}
- c.mapMutex.Lock()
c.pendingReqs[req.id] = req
- c.mapMutex.Unlock()
c.writeCommand(req.cmd)
+ case cmd := <- c.incomingCmdCh:
+ c.internalReceivedCommand(cmd.cmd,
cmd.headersAndPayload)
+
case data := <-c.writeRequestsCh:
if data == nil {
return
@@ -331,7 +338,11 @@ func (c *connection) writeCommand(cmd proto.Message) {
c.internalWriteData(data)
}
-func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload
[]byte) {
+func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload
Buffer) {
+ c.incomingCmdCh <- &incomingCmd{cmd, headersAndPayload}
+}
+
+func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand,
headersAndPayload Buffer) {
c.log.Debugf("Received command: %s -- payload: %v", cmd,
headersAndPayload)
c.setLastDataReceived(time.Now())
var err error
@@ -406,14 +417,11 @@ func (c *connection) SendRequest(requestID uint64, req
*pb.BaseCommand, callback
}
func (c *connection) internalSendRequest(req *request) {
- c.mapMutex.Lock()
c.pendingReqs[req.id] = req
- c.mapMutex.Unlock()
c.writeCommand(req.cmd)
}
func (c *connection) handleResponse(requestID uint64, response
*pb.BaseCommand) {
- c.mapMutex.RLock()
request, ok := c.pendingReqs[requestID]
if !ok {
c.log.Warnf("Received unexpected response for request %d of
type %s", requestID, response.Type)
@@ -421,13 +429,11 @@ func (c *connection) handleResponse(requestID uint64,
response *pb.BaseCommand)
}
delete(c.pendingReqs, requestID)
- c.mapMutex.RUnlock()
request.callback(response, nil)
}
func (c *connection) handleResponseError(serverError *pb.CommandError) {
requestID := serverError.GetRequestId()
- c.mapMutex.RLock()
request, ok := c.pendingReqs[requestID]
if !ok {
c.log.Warnf("Received unexpected error response for request %d
of type %s",
@@ -436,7 +442,6 @@ func (c *connection) handleResponseError(serverError
*pb.CommandError) {
}
delete(c.pendingReqs, requestID)
- c.mapMutex.RUnlock()
request.callback(nil,
errors.New(fmt.Sprintf("server error: %s: %s",
serverError.GetError(), serverError.GetMessage())))
@@ -451,7 +456,7 @@ func (c *connection) handleSendReceipt(response
*pb.CommandSendReceipt) {
}
}
-func (c *connection) handleMessage(response *pb.CommandMessage, payload
[]byte) error {
+func (c *connection) handleMessage(response *pb.CommandMessage, payload
Buffer) error {
c.log.Debug("Got Message: ", response)
consumerID := response.GetConsumerId()
if consumer, ok := c.consumerHandler(consumerID); ok {
diff --git a/pulsar/internal/connection_reader.go
b/pulsar/internal/connection_reader.go
index 746db5c..c74a940 100644
--- a/pulsar/internal/connection_reader.go
+++ b/pulsar/internal/connection_reader.go
@@ -55,7 +55,7 @@ func (r *connectionReader) readFromConnection() {
}
}
-func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand,
headersAndPayload []byte, err error) {
+func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand,
headersAndPayload Buffer, err error) {
// First, we need to read the frame size
if r.buffer.ReadableBytes() < 4 {
if r.buffer.ReadableBytes() == 0 {
@@ -92,8 +92,8 @@ func (r *connectionReader) readSingleCommand() (cmd
*pb.BaseCommand, headersAndP
// Also read the eventual payload
headersAndPayloadSize := frameSize - (cmdSize + 4)
if cmdSize+4 < frameSize {
- headersAndPayload = make([]byte, headersAndPayloadSize)
- copy(headersAndPayload, r.buffer.Read(headersAndPayloadSize))
+ headersAndPayload = NewBuffer(int(headersAndPayloadSize))
+ headersAndPayload.Write(r.buffer.Read(headersAndPayloadSize))
}
return cmd, headersAndPayload, nil
}