This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new cdb2c6257 fix(go/adbc/driver/snowflake): workaround snowflake
metadata-only limitations (#1790)
cdb2c6257 is described below
commit cdb2c6257e60648912ac8cf179d9e853dc2e0a19
Author: Matt Topol <[email protected]>
AuthorDate: Tue Apr 30 00:35:59 2024 -0400
fix(go/adbc/driver/snowflake): workaround snowflake metadata-only
limitations (#1790)
Workaround to fix #1454 until snowflake addresses
https://github.com/snowflakedb/gosnowflake/issues/1110 with a better
solution (hopefully by having the server actually return Arrow...)
---
go/adbc/driver/snowflake/driver_test.go | 25 +++++++++
go/adbc/driver/snowflake/record_reader.go | 92 ++++++++++++++++++++++++++++---
2 files changed, 108 insertions(+), 9 deletions(-)
diff --git a/go/adbc/driver/snowflake/driver_test.go
b/go/adbc/driver/snowflake/driver_test.go
index 968ca942d..eb9f2a299 100644
--- a/go/adbc/driver/snowflake/driver_test.go
+++ b/go/adbc/driver/snowflake/driver_test.go
@@ -2006,3 +2006,28 @@ func (suite *SnowflakeTests) TestJwtPrivateKey() {
defer os.Remove(binKey)
verifyKey(binKey)
}
+
+func (suite *SnowflakeTests) TestMetadataOnlyQuery() {
+ // force more than one chunk for `SHOW FUNCTIONS` which will return
+ // JSON data instead of arrow, even though we ask for Arrow
+ suite.Require().NoError(suite.stmt.SetSqlQuery(`ALTER SESSION SET
CLIENT_RESULT_CHUNK_SIZE = 50`))
+ _, err := suite.stmt.ExecuteUpdate(suite.ctx)
+ suite.Require().NoError(err)
+
+ // since we lowered the CLIENT_RESULT_CHUNK_SIZE this will return at
least
+ // 1 chunk in addition to the first one. Metadata queries will return
JSON
+ // no matter what currently.
+ suite.Require().NoError(suite.stmt.SetSqlQuery(`SHOW FUNCTIONS`))
+ rdr, n, err := suite.stmt.ExecuteQuery(suite.ctx)
+ suite.Require().NoError(err)
+ defer rdr.Release()
+
+ recv := int64(0)
+ for rdr.Next() {
+ recv += rdr.Record().NumRows()
+ }
+
+ // verify that we got the exepected number of rows if we sum up
+ // all the rows from each record in the stream.
+ suite.Equal(n, recv)
+}
diff --git a/go/adbc/driver/snowflake/record_reader.go
b/go/adbc/driver/snowflake/record_reader.go
index acf86bd0d..1a24b91d9 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -18,9 +18,12 @@
package snowflake
import (
+ "bytes"
"context"
"encoding/hex"
+ "encoding/json"
"fmt"
+ "io"
"math"
"strconv"
"strings"
@@ -300,7 +303,7 @@ func integerToDecimal128(ctx context.Context, a
arrow.Array, dt *arrow.Decimal12
return result, err
}
-func rowTypesToArrowSchema(ctx context.Context, ld
gosnowflake.ArrowStreamLoader, useHighPrecision bool) (*arrow.Schema, error) {
+func rowTypesToArrowSchema(_ context.Context, ld
gosnowflake.ArrowStreamLoader, useHighPrecision bool) (*arrow.Schema, error) {
var loc *time.Location
metadata := ld.RowTypes()
@@ -360,8 +363,7 @@ func extractTimestamp(src *string) (sec, nsec int64, err
error) {
return
}
-func jsonDataToArrow(ctx context.Context, bldr *array.RecordBuilder, ld
gosnowflake.ArrowStreamLoader) (arrow.Record, error) {
- rawData := ld.JSONData()
+func jsonDataToArrow(_ context.Context, bldr *array.RecordBuilder, rawData
[][]*string) (arrow.Record, error) {
fieldBuilders := bldr.Fields()
for _, rec := range rawData {
for i, col := range rec {
@@ -471,7 +473,12 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
return nil, errToAdbcErr(adbc.StatusInternal, err)
}
- if len(batches) == 0 {
+ // if the first chunk was JSON, that means this was a metadata query
which
+ // is only returning JSON data rather than Arrow
+ rawData := ld.JSONData()
+ if len(rawData) > 0 {
+ // construct an Arrow schema based on reading the JSON metadata
description of the
+ // result type schema
schema, err := rowTypesToArrowSchema(ctx, ld, useHighPrecision)
if err != nil {
return nil, adbc.Error{
@@ -480,20 +487,87 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
}
}
+ if ld.TotalRows() == 0 {
+ return array.NewRecordReader(schema, []arrow.Record{})
+ }
+
bldr := array.NewRecordBuilder(alloc, schema)
defer bldr.Release()
- rec, err := jsonDataToArrow(ctx, bldr, ld)
+ rec, err := jsonDataToArrow(ctx, bldr, rawData)
if err != nil {
return nil, err
}
defer rec.Release()
- if ld.TotalRows() != 0 {
- return array.NewRecordReader(schema,
[]arrow.Record{rec})
- } else {
- return array.NewRecordReader(schema, []arrow.Record{})
+ results := []arrow.Record{rec}
+ for _, b := range batches {
+ rdr, err := b.GetStream(ctx)
+ if err != nil {
+ return nil, adbc.Error{
+ Msg: err.Error(),
+ Code: adbc.StatusInternal,
+ }
+ }
+ defer rdr.Close()
+
+ // the "JSON" data returned isn't valid JSON. Instead
it is a list of
+ // comma-delimited JSON lists containing every value as
a string, except
+ // for a JSON null to represent nulls. Thus we can't
just use the existing
+ // JSON parsing code in Arrow.
+ data, err := io.ReadAll(rdr)
+ if err != nil {
+ return nil, adbc.Error{
+ Msg: err.Error(),
+ Code: adbc.StatusInternal,
+ }
+ }
+
+ if cap(rawData) >= int(b.NumRows()) {
+ rawData = rawData[:b.NumRows()]
+ } else {
+ rawData = make([][]*string, b.NumRows())
+ }
+ bldr.Reserve(int(b.NumRows()))
+
+ // we grab the entire JSON message and create a bytes
reader
+ offset, buf := int64(0), bytes.NewReader(data)
+ for i := 0; i < int(b.NumRows()); i++ {
+ // we construct a decoder from the bytes.Reader
to read the next JSON list
+ // of columns (one row) from the input
+ dec := json.NewDecoder(buf)
+ if err = dec.Decode(&rawData[i]); err != nil {
+ return nil, adbc.Error{
+ Msg: err.Error(),
+ Code: adbc.StatusInternal,
+ }
+ }
+
+ // dec.InputOffset() now represents the index
of the ',' so we skip the comma
+ offset += dec.InputOffset() + 1
+ // then seek the buffer to that spot. we have
to seek based on the start
+ // because json.Decoder can read from the
buffer more than is necessary to
+ // process the JSON data.
+ if _, err = buf.Seek(offset, 0); err != nil {
+ return nil, adbc.Error{
+ Msg: err.Error(),
+ Code: adbc.StatusInternal,
+ }
+ }
+ }
+
+ // now that we have our [][]*string of JSON data, we
can pass it to get converted
+ // to an Arrow record batch and appended to our slice
of batches
+ rec, err := jsonDataToArrow(ctx, bldr, rawData)
+ if err != nil {
+ return nil, err
+ }
+ defer rec.Release()
+
+ results = append(results, rec)
}
+
+ return array.NewRecordReader(schema, results)
}
ch := make(chan arrow.Record, bufferSize)