This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 36f7233e perf(go/adbc/driver/snowflake): Implement concurrency limit
(#974)
36f7233e is described below
commit 36f7233e634c538fe9d838f2b0e7da34ba7aeab4
Author: Matt Topol <[email protected]>
AuthorDate: Thu Aug 10 16:52:04 2023 -0400
perf(go/adbc/driver/snowflake): Implement concurrency limit (#974)
In order to improve the performance of the ADBC Snowflake driver,
particularly for larger size result sets, we should limit the concurrent
connections we make the snowflake during the prefetching in order to
avoid excessive blocking / throttling.
---
docs/source/driver/snowflake.rst | 11 ++++++--
go/adbc/driver/snowflake/connection.go | 11 ++++++--
go/adbc/driver/snowflake/record_reader.go | 3 +-
go/adbc/driver/snowflake/statement.go | 33 ++++++++++++++++++----
go/adbc/go.mod | 4 +--
go/adbc/go.sum | 25 +++-------------
.../adbc_driver_snowflake/__init__.py | 7 +++--
.../adbc_driver_snowflake/tests/test_lowlevel.py | 2 +-
8 files changed, 60 insertions(+), 36 deletions(-)
diff --git a/docs/source/driver/snowflake.rst b/docs/source/driver/snowflake.rst
index d5ab24c4..3e17b5b0 100644
--- a/docs/source/driver/snowflake.rst
+++ b/docs/source/driver/snowflake.rst
@@ -207,10 +207,17 @@ In addition, results are potentially fetched in parallel
from multiple endpoints
A limited number of batches are queued per endpoint, though data is always
returned to the client in the order of the endpoints.
-The queue size can be changed by setting an option on the
:cpp:class:`AdbcStatement`:
+To manage the performance of result fetching there are two options to control
+buffering and concurrency behavior. These options are only available to be set
+on the :cpp:class:`AdbcStatement` object:
``adbc.rpc.result_queue_size``
- The number of batches to queue per endpoint. Defaults to 5.
+ The number of batches to queue in the record reader. Defaults to 200.
+ Must be an integer > 0.
+
+``adbc.snowflake.rpc.prefetch_concurrency``
+ The number of concurrent streams being fetched from snowflake at a time.
+ Defaults to 10. Must be an integer > 0.
Transactions
------------
diff --git a/go/adbc/driver/snowflake/connection.go
b/go/adbc/driver/snowflake/connection.go
index 8f965597..b08b94a7 100644
--- a/go/adbc/driver/snowflake/connection.go
+++ b/go/adbc/driver/snowflake/connection.go
@@ -33,6 +33,11 @@ import (
"github.com/snowflakedb/gosnowflake"
)
+const (
+ defaultStatementQueueSize = 200
+ defaultPrefetchConcurrency = 10
+)
+
type snowflakeConn interface {
driver.Conn
driver.ConnBeginTx
@@ -777,8 +782,10 @@ func (c *cnxn) Rollback(_ context.Context) error {
// NewStatement initializes a new statement object tied to this connection
func (c *cnxn) NewStatement() (adbc.Statement, error) {
return &statement{
- alloc: c.db.alloc,
- cnxn: c,
+ alloc: c.db.alloc,
+ cnxn: c,
+ queueSize: defaultStatementQueueSize,
+ prefetchConcurrency: defaultPrefetchConcurrency,
}, nil
}
diff --git a/go/adbc/driver/snowflake/record_reader.go
b/go/adbc/driver/snowflake/record_reader.go
index b4c8b15a..db0bf0f8 100644
--- a/go/adbc/driver/snowflake/record_reader.go
+++ b/go/adbc/driver/snowflake/record_reader.go
@@ -413,7 +413,7 @@ type reader struct {
cancelFn context.CancelFunc
}
-func newRecordReader(ctx context.Context, alloc memory.Allocator, ld
gosnowflake.ArrowStreamLoader, bufferSize int) (array.RecordReader, error) {
+func newRecordReader(ctx context.Context, alloc memory.Allocator, ld
gosnowflake.ArrowStreamLoader, bufferSize, prefetchConcurrency int)
(array.RecordReader, error) {
batches, err := ld.GetBatches()
if err != nil {
return nil, errToAdbcErr(adbc.StatusInternal, err)
@@ -478,6 +478,7 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, ld gosnowflake
}
}()
+ group.SetLimit(prefetchConcurrency)
group.Go(func() error {
defer rr.Release()
defer r.Close()
diff --git a/go/adbc/driver/snowflake/statement.go
b/go/adbc/driver/snowflake/statement.go
index be554780..e0d14582 100644
--- a/go/adbc/driver/snowflake/statement.go
+++ b/go/adbc/driver/snowflake/statement.go
@@ -33,13 +33,15 @@ import (
)
const (
- OptionStatementQueueSize = "adbc.rpc.result_queue_size"
+ OptionStatementQueueSize = "adbc.rpc.result_queue_size"
+ OptionStatementPrefetchConcurrency =
"adbc.snowflake.rpc.prefetch_concurrency"
)
type statement struct {
- cnxn *cnxn
- alloc memory.Allocator
- queueSize int
+ cnxn *cnxn
+ alloc memory.Allocator
+ queueSize int
+ prefetchConcurrency int
query string
targetTable string
@@ -97,7 +99,28 @@ func (st *statement) SetOption(key string, val string) error
{
Code: adbc.StatusInvalidArgument,
}
}
+ if sz <= 0 {
+ return adbc.Error{
+ Msg: fmt.Sprintf("invalid value ('%d') for
option '%s', must be > 0", sz, key),
+ Code: adbc.StatusInvalidArgument,
+ }
+ }
st.queueSize = sz
+ case OptionStatementPrefetchConcurrency:
+ concurrency, err := strconv.Atoi(val)
+ if err != nil {
+ return adbc.Error{
+ Msg: fmt.Sprintf("could not parse '%s' as int
for option '%s'", val, key),
+ Code: adbc.StatusInvalidArgument,
+ }
+ }
+ if concurrency <= 0 {
+ return adbc.Error{
+ Msg: fmt.Sprintf("invalid value ('%d') for
option '%s', must be > 0", concurrency, key),
+ Code: adbc.StatusInvalidArgument,
+ }
+ }
+ st.prefetchConcurrency = concurrency
default:
return adbc.Error{
Msg: fmt.Sprintf("invalid statement option %s=%s",
key, val),
@@ -436,7 +459,7 @@ func (st *statement) ExecuteQuery(ctx context.Context)
(array.RecordReader, int6
return nil, -1, errToAdbcErr(adbc.StatusInternal, err)
}
- rdr, err := newRecordReader(ctx, st.alloc, loader, st.queueSize)
+ rdr, err := newRecordReader(ctx, st.alloc, loader, st.queueSize,
st.prefetchConcurrency)
nrec := loader.TotalRows()
return rdr, nrec, err
}
diff --git a/go/adbc/go.mod b/go/adbc/go.mod
index f3d5ce06..999a8d47 100644
--- a/go/adbc/go.mod
+++ b/go/adbc/go.mod
@@ -24,7 +24,7 @@ require (
github.com/bluele/gcache v0.0.2
github.com/google/uuid v1.3.0
github.com/snowflakedb/gosnowflake v1.6.22
- github.com/stretchr/testify v1.8.2
+ github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
golang.org/x/sync v0.3.0
golang.org/x/tools v0.11.0
@@ -41,7 +41,7 @@ require (
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c //
indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
- github.com/apache/thrift v0.16.0 // indirect
+ github.com/apache/thrift v0.17.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10 //
indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
diff --git a/go/adbc/go.sum b/go/adbc/go.sum
index 1793e472..7e47f67f 100644
--- a/go/adbc/go.sum
+++ b/go/adbc/go.sum
@@ -19,8 +19,8 @@ github.com/apache/arrow/go/v12 v12.0.1
h1:JsR2+hzYYjgSUkBSaahpqCetqZMr76djX80fF/
github.com/apache/arrow/go/v12 v12.0.1/go.mod
h1:weuTY7JvTG/HDPtMQxEUp7pU73vkLWMLpY67QwZ/WWw=
github.com/apache/arrow/go/v13 v13.0.0-20230713180941-b97597765355
h1:QuXqLb2HzL5EjY99fFp+iG9NagAruvQIbU/2++x+2VY=
github.com/apache/arrow/go/v13 v13.0.0-20230713180941-b97597765355/go.mod
h1:W69eByFNO0ZR30q1/7Sr9d83zcVZmF2MiP3fFYAWJOc=
-github.com/apache/thrift v0.16.0
h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
-github.com/apache/thrift v0.16.0/go.mod
h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
+github.com/apache/thrift v0.17.0
h1:cMd2aj52n+8VoAtvSvLn4kDC3aZ6IAkBuqWQ2IDu7wo=
+github.com/apache/thrift v0.17.0/go.mod
h1:OLxhMRJxomX+1I/KUw03qoV3mMz16BwaKI+d4fPBx7Q=
github.com/aws/aws-sdk-go-v2 v1.19.0
h1:klAT+y3pGFBU/qVf1uzwttpBbiuozJYWzNLHioyDJ+k=
github.com/aws/aws-sdk-go-v2 v1.19.0/go.mod
h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.10
h1:dK82zF6kkPeCo8J1e+tGx4JdvDIQzj7ygIoLg8WMuGs=
@@ -80,7 +80,6 @@ github.com/goccy/go-json v0.10.2/go.mod
h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2
h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod
h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/golang-jwt/jwt/v4 v4.5.0
h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
-github.com/golang/mock v1.5.0/go.mod
h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/protobuf v1.5.0/go.mod
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3
h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
@@ -138,35 +137,23 @@ github.com/sirupsen/logrus v1.9.3/go.mod
h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs
github.com/snowflakedb/gosnowflake v1.6.23-0.20230717195239-fec38ba82d2a
h1:F7fKVj3t12jr3Bopzngsp/PZDm1or8zpk+29NN4YFGk=
github.com/snowflakedb/gosnowflake
v1.6.23-0.20230717195239-fec38ba82d2a/go.mod
h1:KfO4F7bk+aXPUIvBqYxvPhxLlu2/w4TtSC8Rw/yr5Mg=
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/objx v0.4.0/go.mod
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
-github.com/stretchr/objx v0.5.0/go.mod
h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.0/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.7.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.8.0/go.mod
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
-github.com/stretchr/testify v1.8.2
h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
-github.com/stretchr/testify v1.8.2/go.mod
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.8.4
h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+github.com/stretchr/testify v1.8.4/go.mod
h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod
h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
-golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod
h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw=
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod
h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
-golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
-golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.12.0 h1:cfawfvKITfUsFCeJIHJrbSxpeu/E81khclypR0GVT50=
golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
-golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -175,14 +162,10 @@ golang.org/x/sys v0.10.0
h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.10.0 h1:3R7pNqamzBraeqj/Tj8qt1aQ2HpmlC+Cx/qL/7hn4/c=
golang.org/x/term v0.10.0/go.mod
h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
-golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod
h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
-golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8=
golang.org/x/tools v0.11.0/go.mod
h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8=
-golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod
h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
diff --git a/python/adbc_driver_snowflake/adbc_driver_snowflake/__init__.py
b/python/adbc_driver_snowflake/adbc_driver_snowflake/__init__.py
index daf88084..65c6acbc 100644
--- a/python/adbc_driver_snowflake/adbc_driver_snowflake/__init__.py
+++ b/python/adbc_driver_snowflake/adbc_driver_snowflake/__init__.py
@@ -90,8 +90,11 @@ class DatabaseOptions(enum.Enum):
class StatementOptions(enum.Enum):
"""Statement options specific to the Snowflake driver."""
- #: The number of rows per batch. Defaults to 1024.
- BATCH_ROWS = "adbc.rpc.result_queue_size"
+ #: The number of batches queued up at a time. Defaults to 200.
+ RESULT_QUEUE_SIZE = "adbc.rpc.result_queue_size"
+ #: Number of concurrent streams being prefetched for a result set.
+ #: Defaults to 10.
+ PREFETCH_CONCURRENCY = "adbc.snowflake.rpc.prefetch_concurrency"
def connect(
diff --git a/python/adbc_driver_snowflake/tests/test_lowlevel.py
b/python/adbc_driver_snowflake/tests/test_lowlevel.py
index 18d1e778..0d8c9bee 100644
--- a/python/adbc_driver_snowflake/tests/test_lowlevel.py
+++ b/python/adbc_driver_snowflake/tests/test_lowlevel.py
@@ -49,7 +49,7 @@ def test_options(snowflake):
with adbc_driver_manager.AdbcStatement(snowflake) as stmt:
stmt.set_options(
**{
- adbc_driver_snowflake.StatementOptions.BATCH_ROWS.value: "1",
+
adbc_driver_snowflake.StatementOptions.RESULT_QUEUE_SIZE.value: "1",
}
)
stmt.set_sql_query("SELECT 1")