This is an automated email from the ASF dual-hosted git repository.

lyndonb pushed a commit to branch 3.5-dev
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/3.5-dev by this push:
     new 104510a12e Fix deadlock on non response-specific read errors (#1669)
104510a12e is described below

commit 104510a12e4d6e5e6a2507e28db6452836f02265
Author: Simon Zhao <[email protected]>
AuthorDate: Fri May 27 09:59:49 2022 -0700

    Fix deadlock on non response-specific read errors (#1669)
---
 gremlin-go/driver/connection.go |  7 ++++---
 gremlin-go/driver/protocol.go   |  4 +---
 gremlin-go/driver/resultSet.go  | 14 ++++++++++++++
 3 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go
index 78592f660f..b23ddfe5a0 100644
--- a/gremlin-go/driver/connection.go
+++ b/gremlin-go/driver/connection.go
@@ -144,10 +144,11 @@ func (s *synchronizedMap) size() int {
        return len(s.internalMap)
 }
 
-func (s *synchronizedMap) synchronizedRange(f func(key string, value 
ResultSet)) {
+func (s *synchronizedMap) closeAll(err error) {
        s.syncLock.Lock()
        defer s.syncLock.Unlock()
-       for k, v := range s.internalMap {
-               f(k, v)
+       for _, resultSet := range s.internalMap {
+               resultSet.setError(err)
+               resultSet.unlockedClose()
        }
 }
diff --git a/gremlin-go/driver/protocol.go b/gremlin-go/driver/protocol.go
index ddca600580..b6d0aaea30 100644
--- a/gremlin-go/driver/protocol.go
+++ b/gremlin-go/driver/protocol.go
@@ -92,9 +92,7 @@ func (protocol *gremlinServerWSProtocol) readLoop(resultSets 
*synchronizedMap, e
 // If there is an error, we need to close the ResultSets and then pass the 
error back.
 func readErrorHandler(resultSets *synchronizedMap, errorCallback func(), err 
error, log *logHandler) {
        log.logf(Error, readLoopError, err.Error())
-       resultSets.synchronizedRange(func(_ string, resultSet ResultSet) {
-               resultSet.Close()
-       })
+       resultSets.closeAll(err)
        errorCallback()
 }
 
diff --git a/gremlin-go/driver/resultSet.go b/gremlin-go/driver/resultSet.go
index 3ff1c458e9..50506bf647 100644
--- a/gremlin-go/driver/resultSet.go
+++ b/gremlin-go/driver/resultSet.go
@@ -35,6 +35,7 @@ type ResultSet interface {
        GetRequestID() string
        IsEmpty() bool
        Close()
+       unlockedClose()
        Channel() chan *Result
        addResult(result *Result)
        One() (*Result, bool, error)
@@ -120,6 +121,19 @@ func (channelResultSet *channelResultSet) Close() {
        }
 }
 
+// Close and remove from the channelResultSet from the container without 
locking container. Meant for use when calling
+// function already locks the container.
+func (channelResultSet *channelResultSet) unlockedClose() {
+       if !channelResultSet.closed {
+               channelResultSet.channelMutex.Lock()
+               channelResultSet.closed = true
+               delete(channelResultSet.container.internalMap, 
channelResultSet.requestID)
+               close(channelResultSet.channel)
+               channelResultSet.channelMutex.Unlock()
+               channelResultSet.sendSignal()
+       }
+}
+
 func (channelResultSet *channelResultSet) setAggregateTo(val string) {
        channelResultSet.aggregateTo = val
 }

Reply via email to