This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 90307f2  Increase writeRequestsCh channel buffer size (#277)
90307f2 is described below

commit 90307f2fe4f801cbcc32d914a8272359498da109
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 10 22:43:14 2020 -0700

    Increase writeRequestsCh channel buffer size (#277)
    
    * Increase writeRequestsCh channel buffer size
    
    * Fixed indentation
---
 pulsar/internal/connection.go | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 797b27a..573e607 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -181,9 +181,15 @@ func newConnection(logicalAddr *url.URL, physicalAddr 
*url.URL, tlsOptions *TLSO
                closeCh:            make(chan interface{}),
                incomingRequestsCh: make(chan *request, 10),
                incomingCmdCh:      make(chan *incomingCmd, 10),
-               writeRequestsCh:    make(chan []byte, 10),
-               listeners:          make(map[uint64]ConnectionListener),
-               consumerHandlers:   make(map[uint64]ConsumerHandler),
+
+               // This channel is used to pass data from producers to the 
connection
+               // go routine. It can become contended or blocking if we have 
multiple
+               // partition produces writing on a single connection. In 
general it's
+               // good to keep this above the number of partition producers 
assigned
+               // to a single connection.
+               writeRequestsCh:  make(chan []byte, 256),
+               listeners:        make(map[uint64]ConnectionListener),
+               consumerHandlers: make(map[uint64]ConsumerHandler),
        }
        cnx.reader = newConnectionReader(cnx)
        cnx.cond = sync.NewCond(cnx)

Reply via email to