This is an automated email from the ASF dual-hosted git repository.
lyndonb pushed a commit to branch 3.5-dev
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/3.5-dev by this push:
new 104510a12e Fix deadlock on non response-specific read errors (#1669)
104510a12e is described below
commit 104510a12e4d6e5e6a2507e28db6452836f02265
Author: Simon Zhao <[email protected]>
AuthorDate: Fri May 27 09:59:49 2022 -0700
Fix deadlock on non response-specific read errors (#1669)
---
gremlin-go/driver/connection.go | 7 ++++---
gremlin-go/driver/protocol.go | 4 +---
gremlin-go/driver/resultSet.go | 14 ++++++++++++++
3 files changed, 19 insertions(+), 6 deletions(-)
diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go
index 78592f660f..b23ddfe5a0 100644
--- a/gremlin-go/driver/connection.go
+++ b/gremlin-go/driver/connection.go
@@ -144,10 +144,11 @@ func (s *synchronizedMap) size() int {
return len(s.internalMap)
}
-func (s *synchronizedMap) synchronizedRange(f func(key string, value
ResultSet)) {
+func (s *synchronizedMap) closeAll(err error) {
s.syncLock.Lock()
defer s.syncLock.Unlock()
- for k, v := range s.internalMap {
- f(k, v)
+ for _, resultSet := range s.internalMap {
+ resultSet.setError(err)
+ resultSet.unlockedClose()
}
}
diff --git a/gremlin-go/driver/protocol.go b/gremlin-go/driver/protocol.go
index ddca600580..b6d0aaea30 100644
--- a/gremlin-go/driver/protocol.go
+++ b/gremlin-go/driver/protocol.go
@@ -92,9 +92,7 @@ func (protocol *gremlinServerWSProtocol) readLoop(resultSets
*synchronizedMap, e
// If there is an error, we need to close the ResultSets and then pass the
error back.
func readErrorHandler(resultSets *synchronizedMap, errorCallback func(), err
error, log *logHandler) {
log.logf(Error, readLoopError, err.Error())
- resultSets.synchronizedRange(func(_ string, resultSet ResultSet) {
- resultSet.Close()
- })
+ resultSets.closeAll(err)
errorCallback()
}
diff --git a/gremlin-go/driver/resultSet.go b/gremlin-go/driver/resultSet.go
index 3ff1c458e9..50506bf647 100644
--- a/gremlin-go/driver/resultSet.go
+++ b/gremlin-go/driver/resultSet.go
@@ -35,6 +35,7 @@ type ResultSet interface {
GetRequestID() string
IsEmpty() bool
Close()
+ unlockedClose()
Channel() chan *Result
addResult(result *Result)
One() (*Result, bool, error)
@@ -120,6 +121,19 @@ func (channelResultSet *channelResultSet) Close() {
}
}
+// Close and remove from the channelResultSet from the container without
locking container. Meant for use when calling
+// function already locks the container.
+func (channelResultSet *channelResultSet) unlockedClose() {
+ if !channelResultSet.closed {
+ channelResultSet.channelMutex.Lock()
+ channelResultSet.closed = true
+ delete(channelResultSet.container.internalMap,
channelResultSet.requestID)
+ close(channelResultSet.channel)
+ channelResultSet.channelMutex.Unlock()
+ channelResultSet.sendSignal()
+ }
+}
+
func (channelResultSet *channelResultSet) setAggregateTo(val string) {
channelResultSet.aggregateTo = val
}