This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new dfc17ab  [Issue 118][client_impl.go] fix race condition when accessing 
client.handlers (#119)
dfc17ab is described below

commit dfc17abd753ac2e488021aede74433e681b5df58
Author: Jim Lambert <[email protected]>
AuthorDate: Wed Dec 11 13:22:40 2019 -0500

    [Issue 118][client_impl.go] fix race condition when accessing 
client.handlers (#119)
    
    * fix race condition when access client.handlers
    
    * added newline to statisfy lic test
    
    * refactor Set(Closable, bool) to Add(Closable) for a more explicit
---
 pulsar/client_impl.go                   | 12 ++++----
 pulsar/internal/client_handlers.go      | 52 +++++++++++++++++++++++++++++++++
 pulsar/internal/client_handlers_test.go | 46 +++++++++++++++++++++++++++++
 3 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 9cb3442..d0c36fe 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -39,7 +39,7 @@ type client struct {
        lookupService internal.LookupService
        auth          auth.Provider
 
-       handlers            map[internal.Closable]bool
+       handlers            internal.ClientHandlers
        producerIDGenerator uint64
        consumerIDGenerator uint64
 }
@@ -86,14 +86,14 @@ func newClient(options ClientOptions) (Client, error) {
        }
        c.rpcClient = internal.NewRPCClient(url, c.cnxPool)
        c.lookupService = internal.NewLookupService(c.rpcClient, url)
-       c.handlers = make(map[internal.Closable]bool)
+       c.handlers = internal.NewClientHandlers()
        return c, nil
 }
 
 func (c *client) CreateProducer(options ProducerOptions) (Producer, error) {
        producer, err := newProducer(c, &options)
        if err == nil {
-               c.handlers[producer] = true
+               c.handlers.Add(producer)
        }
        return producer, err
 }
@@ -103,7 +103,7 @@ func (c *client) Subscribe(options ConsumerOptions) 
(Consumer, error) {
        if err != nil {
                return nil, err
        }
-       c.handlers[consumer] = true
+       c.handlers.Add(consumer)
        return consumer, nil
 }
 
@@ -145,9 +145,7 @@ func (c *client) TopicPartitions(topic string) ([]string, 
error) {
 }
 
 func (c *client) Close() {
-       for handler := range c.handlers {
-               handler.Close()
-       }
+       c.handlers.Close()
 }
 
 func (c *client) namespaceTopics(namespace string) ([]string, error) {
diff --git a/pulsar/internal/client_handlers.go 
b/pulsar/internal/client_handlers.go
new file mode 100644
index 0000000..1ecfdd9
--- /dev/null
+++ b/pulsar/internal/client_handlers.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import "sync"
+
+// ClientHandlerMap is a simple concurrent-safe map for the client type
+type ClientHandlers struct {
+       handlers map[Closable]bool
+       l        *sync.RWMutex
+}
+
+func NewClientHandlers() ClientHandlers {
+       return ClientHandlers{
+               handlers: map[Closable]bool{},
+               l:        &sync.RWMutex{},
+       }
+}
+func (h *ClientHandlers) Add(c Closable) {
+       h.l.Lock()
+       defer h.l.Unlock()
+       h.handlers[c] = true
+}
+func (h *ClientHandlers) Val(c Closable) bool {
+       h.l.RLock()
+       defer h.l.RUnlock()
+       return h.handlers[c]
+}
+
+func (h *ClientHandlers) Close() {
+       h.l.Lock()
+       defer h.l.Unlock()
+
+       for handler := range h.handlers {
+               handler.Close()
+       }
+}
diff --git a/pulsar/internal/client_handlers_test.go 
b/pulsar/internal/client_handlers_test.go
new file mode 100644
index 0000000..baaf899
--- /dev/null
+++ b/pulsar/internal/client_handlers_test.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestClientHandlers(t *testing.T) {
+       h := NewClientHandlers()
+       assert.NotNil(t, h.l)
+       assert.Equal(t, h.handlers, map[Closable]bool{})
+
+       closable := &testClosable{false}
+       h.Add(closable)
+       assert.True(t, h.Val(closable))
+
+       h.Close()
+       t.Log("closable is: ", closable.closed)
+       assert.True(t, closable.closed)
+}
+
+type testClosable struct {
+       closed bool
+}
+
+func (t *testClosable) Close() {
+       t.closed = true
+}

Reply via email to