This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new f35485a5 fix(go/adbc/driver/snowflake): fix potential deadlock and 
error handling (#828)
f35485a5 is described below

commit f35485a5f3c9597668c0b4a8936621c97c4adc15
Author: Matt Topol <[email protected]>
AuthorDate: Wed Jun 21 12:41:17 2023 -0400

    fix(go/adbc/driver/snowflake): fix potential deadlock and error handling 
(#828)
    
    Found these when trying to do some performance testing.
---
 go/adbc/driver/flightsql/record_reader.go |  4 +++
 go/adbc/driver/snowflake/driver.go        |  5 ++-
 go/adbc/driver/snowflake/record_reader.go | 59 +++++++++++++++++--------------
 3 files changed, 38 insertions(+), 30 deletions(-)

diff --git a/go/adbc/driver/flightsql/record_reader.go 
b/go/adbc/driver/flightsql/record_reader.go
index 6bcb7791..409ce58e 100644
--- a/go/adbc/driver/flightsql/record_reader.go
+++ b/go/adbc/driver/flightsql/record_reader.go
@@ -95,6 +95,10 @@ func newRecordReader(ctx context.Context, alloc 
memory.Allocator, cl *flightsql.
                schema = rdr.Schema()
                group.Go(func() error {
                        defer rdr.Release()
+                       if numEndpoints > 1 {
+                               defer close(ch)
+                       }
+
                        for rdr.Next() && ctx.Err() == nil {
                                rec := rdr.Record()
                                rec.Retain()
diff --git a/go/adbc/driver/snowflake/driver.go 
b/go/adbc/driver/snowflake/driver.go
index bd8b2c01..c02b58dd 100644
--- a/go/adbc/driver/snowflake/driver.go
+++ b/go/adbc/driver/snowflake/driver.go
@@ -161,9 +161,8 @@ func errToAdbcErr(code adbc.Status, err error) error {
        var sferr *gosnowflake.SnowflakeError
        if errors.As(err, &sferr) {
                var sqlstate [5]byte
-               if len(sferr.SQLState) > 0 {
-                       copy(sqlstate[:], sferr.SQLState[:5])
-               }
+               copy(sqlstate[:], []byte(sferr.SQLState))
+
                return adbc.Error{
                        Code:       code,
                        Msg:        sferr.Error(),
diff --git a/go/adbc/driver/snowflake/record_reader.go 
b/go/adbc/driver/snowflake/record_reader.go
index b3041ddd..32169bb7 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -274,6 +274,9 @@ func newRecordReader(ctx context.Context, alloc 
memory.Allocator, ld gosnowflake
        group.Go(func() error {
                defer rr.Release()
                defer r.Close()
+               if len(batches) > 1 {
+                       defer close(ch)
+               }
 
                for rr.Next() && ctx.Err() == nil {
                        rec := rr.Record()
@@ -297,39 +300,41 @@ func newRecordReader(ctx context.Context, alloc 
memory.Allocator, ld gosnowflake
        }
 
        lastChannelIndex := len(chs) - 1
-       for i, b := range batches[1:] {
-               batch, batchIdx := b, i+1
-               chs[batchIdx] = make(chan arrow.Record, bufferSize)
-               group.Go(func() error {
-                       // close channels (except the last) so that Next can 
move on to the next channel properly
-                       if batchIdx != lastChannelIndex {
-                               defer close(chs[batchIdx])
-                       }
-
-                       rdr, err := batch.GetStream(ctx)
-                       if err != nil {
-                               return err
-                       }
-                       defer rdr.Close()
+       go func() {
+               for i, b := range batches[1:] {
+                       batch, batchIdx := b, i+1
+                       chs[batchIdx] = make(chan arrow.Record, bufferSize)
+                       group.Go(func() error {
+                               // close channels (except the last) so that 
Next can move on to the next channel properly
+                               if batchIdx != lastChannelIndex {
+                                       defer close(chs[batchIdx])
+                               }
 
-                       rr, err := ipc.NewReader(rdr, ipc.WithAllocator(alloc))
-                       if err != nil {
-                               return err
-                       }
-                       defer rr.Release()
+                               rdr, err := batch.GetStream(ctx)
+                               if err != nil {
+                                       return err
+                               }
+                               defer rdr.Close()
 
-                       for rr.Next() && ctx.Err() == nil {
-                               rec := rr.Record()
-                               rec, err = recTransform(ctx, rec)
+                               rr, err := ipc.NewReader(rdr, 
ipc.WithAllocator(alloc))
                                if err != nil {
                                        return err
                                }
-                               chs[batchIdx] <- rec
-                       }
+                               defer rr.Release()
 
-                       return rr.Err()
-               })
-       }
+                               for rr.Next() && ctx.Err() == nil {
+                                       rec := rr.Record()
+                                       rec, err = recTransform(ctx, rec)
+                                       if err != nil {
+                                               return err
+                                       }
+                                       chs[batchIdx] <- rec
+                               }
+
+                               return rr.Err()
+                       })
+               }
+       }()
 
        go func() {
                rdr.err = group.Wait()

Reply via email to