This is an automated email from the ASF dual-hosted git repository.
curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 1f8b159fe feat(go/adbc/driver/snowflake): Add option to disable
vectorized scanner (#3555)
1f8b159fe is described below
commit 1f8b159fe159dc84e53076782daaf0ce2174e7fa
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Mon Oct 13 10:42:27 2025 -0700
feat(go/adbc/driver/snowflake): Add option to disable vectorized scanner
(#3555)
The vectorized scanner seems to be causing performance issues under some
circumstances. Add an option to disable it.
---
go/adbc/driver/snowflake/bulk_ingestion.go | 22 +++++++++++++++++++++-
go/adbc/driver/snowflake/statement.go | 11 +++++++++++
2 files changed, 32 insertions(+), 1 deletion(-)
diff --git a/go/adbc/driver/snowflake/bulk_ingestion.go
b/go/adbc/driver/snowflake/bulk_ingestion.go
index 4415aea3e..1dfd4d6b4 100644
--- a/go/adbc/driver/snowflake/bulk_ingestion.go
+++ b/go/adbc/driver/snowflake/bulk_ingestion.go
@@ -48,7 +48,8 @@ import (
const (
bindStageName = "ADBC$BIND"
- createTemporaryStageStmt = "CREATE OR REPLACE TEMPORARY STAGE " +
bindStageName + " FILE_FORMAT = (TYPE = PARQUET USE_LOGICAL_TYPE = TRUE
BINARY_AS_TEXT = FALSE USE_VECTORIZED_SCANNER=TRUE REPLACE_INVALID_CHARACTERS =
TRUE)"
+ createTemporaryStageTmpl = "CREATE OR REPLACE TEMPORARY STAGE " +
bindStageName + " FILE_FORMAT = (TYPE = PARQUET USE_LOGICAL_TYPE = TRUE
BINARY_AS_TEXT = FALSE %s REPLACE_INVALID_CHARACTERS = TRUE)"
+ vectorizedScannerOption = "USE_VECTORIZED_SCANNER=TRUE"
putQueryTmpl = "PUT 'file:///tmp/placeholder/%s' @" +
bindStageName + " OVERWRITE = TRUE"
copyQuery = "COPY INTO IDENTIFIER(?) FROM @" +
bindStageName + " MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE"
countQuery = "SELECT COUNT(*) FROM IDENTIFIER(?)"
@@ -64,6 +65,8 @@ var (
defaultCompressionCodec compress.Compression = compress.Codecs.Snappy
defaultCompressionLevel int = flate.DefaultCompression
+ defaultVectorizedScanner bool = true
+
ErrNoRecordsInStream = errors.New("no records in stream to write")
)
@@ -110,6 +113,10 @@ type ingestOptions struct {
// notably Snappy.
// Default is the default level for the specified compressionCodec.
compressionLevel int
+ // Whether to use the vectorized scanner when ingesting Parquet files
into Snowvake.
+ //
+ // Default is true.
+ vectorizedScanner bool
}
func DefaultIngestOptions() *ingestOptions {
@@ -120,6 +127,7 @@ func DefaultIngestOptions() *ingestOptions {
copyConcurrency: defaultCopyConcurrency,
compressionCodec: defaultCompressionCodec,
compressionLevel: defaultCompressionLevel,
+ vectorizedScanner: defaultVectorizedScanner,
}
}
@@ -171,6 +179,12 @@ func (st *statement) ingestRecord(ctx context.Context)
(nrows int64, err error)
})
// Create a temporary stage, we can't start uploading until it has been
created
+ var createTemporaryStageStmt string
+ if st.ingestOptions.vectorizedScanner {
+ createTemporaryStageStmt =
fmt.Sprintf(createTemporaryStageTmpl, vectorizedScannerOption)
+ } else {
+ createTemporaryStageStmt =
fmt.Sprintf(createTemporaryStageTmpl, "")
+ }
_, err = st.cnxn.cn.ExecContext(ctx, createTemporaryStageStmt, nil)
if err != nil {
return
@@ -267,6 +281,12 @@ func (st *statement) ingestStream(ctx context.Context)
(nrows int64, err error)
})
// Create a temporary stage, we can't start uploading until it has been
created
+ var createTemporaryStageStmt string
+ if st.ingestOptions.vectorizedScanner {
+ createTemporaryStageStmt =
fmt.Sprintf(createTemporaryStageTmpl, vectorizedScannerOption)
+ } else {
+ createTemporaryStageStmt =
fmt.Sprintf(createTemporaryStageTmpl, "")
+ }
_, err = st.cnxn.cn.ExecContext(ctx, createTemporaryStageStmt, nil)
if err != nil {
return
diff --git a/go/adbc/driver/snowflake/statement.go
b/go/adbc/driver/snowflake/statement.go
index 6b7e09560..65a60c8b9 100644
--- a/go/adbc/driver/snowflake/statement.go
+++ b/go/adbc/driver/snowflake/statement.go
@@ -46,6 +46,7 @@ const (
OptionStatementIngestTargetFileSize =
"adbc.snowflake.statement.ingest_target_file_size"
OptionStatementIngestCompressionCodec =
"adbc.snowflake.statement.ingest_compression_codec" // TODO(GH-1473): Implement
option
OptionStatementIngestCompressionLevel =
"adbc.snowflake.statement.ingest_compression_level" // TODO(GH-1473): Implement
option
+ OptionStatementVectorizedScanner =
"adbc.snowflake.statement.ingest_use_vectorized_scanner"
)
type statement struct {
@@ -228,6 +229,16 @@ func (st *statement) SetOption(key string, val string)
error {
Code: adbc.StatusInvalidArgument,
}
}
+ case OptionStatementVectorizedScanner:
+ vectorized, err := strconv.ParseBool(val)
+ if err != nil {
+ return adbc.Error{
+ Msg: fmt.Sprintf("[Snowflake] could not parse
'%s' as bool for option '%s'", val, key),
+ Code: adbc.StatusInvalidArgument,
+ }
+ }
+ st.ingestOptions.vectorizedScanner = vectorized
+ return nil
default:
return st.Base().SetOption(key, val)
}