This is an automated email from the ASF dual-hosted git repository.

curth pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f8b159fe feat(go/adbc/driver/snowflake): Add option to disable 
vectorized scanner (#3555)
1f8b159fe is described below

commit 1f8b159fe159dc84e53076782daaf0ce2174e7fa
Author: Curt Hagenlocher <[email protected]>
AuthorDate: Mon Oct 13 10:42:27 2025 -0700

    feat(go/adbc/driver/snowflake): Add option to disable vectorized scanner 
(#3555)
    
    The vectorized scanner seems to be causing performance issues under some
    circumstances. Add an option to disable it.
---
 go/adbc/driver/snowflake/bulk_ingestion.go | 22 +++++++++++++++++++++-
 go/adbc/driver/snowflake/statement.go      | 11 +++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/go/adbc/driver/snowflake/bulk_ingestion.go 
b/go/adbc/driver/snowflake/bulk_ingestion.go
index 4415aea3e..1dfd4d6b4 100644
--- a/go/adbc/driver/snowflake/bulk_ingestion.go
+++ b/go/adbc/driver/snowflake/bulk_ingestion.go
@@ -48,7 +48,8 @@ import (
 
 const (
        bindStageName            = "ADBC$BIND"
-       createTemporaryStageStmt = "CREATE OR REPLACE TEMPORARY STAGE " + 
bindStageName + " FILE_FORMAT = (TYPE = PARQUET USE_LOGICAL_TYPE = TRUE 
BINARY_AS_TEXT = FALSE USE_VECTORIZED_SCANNER=TRUE REPLACE_INVALID_CHARACTERS = 
TRUE)"
+       createTemporaryStageTmpl = "CREATE OR REPLACE TEMPORARY STAGE " + 
bindStageName + " FILE_FORMAT = (TYPE = PARQUET USE_LOGICAL_TYPE = TRUE 
BINARY_AS_TEXT = FALSE %s REPLACE_INVALID_CHARACTERS = TRUE)"
+       vectorizedScannerOption  = "USE_VECTORIZED_SCANNER=TRUE"
        putQueryTmpl             = "PUT 'file:///tmp/placeholder/%s' @" + 
bindStageName + " OVERWRITE = TRUE"
        copyQuery                = "COPY INTO IDENTIFIER(?) FROM @" + 
bindStageName + " MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE"
        countQuery               = "SELECT COUNT(*) FROM IDENTIFIER(?)"
@@ -64,6 +65,8 @@ var (
        defaultCompressionCodec compress.Compression = compress.Codecs.Snappy
        defaultCompressionLevel int                  = flate.DefaultCompression
 
+       defaultVectorizedScanner bool = true
+
        ErrNoRecordsInStream = errors.New("no records in stream to write")
 )
 
@@ -110,6 +113,10 @@ type ingestOptions struct {
        // notably Snappy.
        // Default is the default level for the specified compressionCodec.
        compressionLevel int
+       // Whether to use the vectorized scanner when ingesting Parquet files 
into Snowvake.
+       //
+       // Default is true.
+       vectorizedScanner bool
 }
 
 func DefaultIngestOptions() *ingestOptions {
@@ -120,6 +127,7 @@ func DefaultIngestOptions() *ingestOptions {
                copyConcurrency:   defaultCopyConcurrency,
                compressionCodec:  defaultCompressionCodec,
                compressionLevel:  defaultCompressionLevel,
+               vectorizedScanner: defaultVectorizedScanner,
        }
 }
 
@@ -171,6 +179,12 @@ func (st *statement) ingestRecord(ctx context.Context) 
(nrows int64, err error)
        })
 
        // Create a temporary stage, we can't start uploading until it has been 
created
+       var createTemporaryStageStmt string
+       if st.ingestOptions.vectorizedScanner {
+               createTemporaryStageStmt = 
fmt.Sprintf(createTemporaryStageTmpl, vectorizedScannerOption)
+       } else {
+               createTemporaryStageStmt = 
fmt.Sprintf(createTemporaryStageTmpl, "")
+       }
        _, err = st.cnxn.cn.ExecContext(ctx, createTemporaryStageStmt, nil)
        if err != nil {
                return
@@ -267,6 +281,12 @@ func (st *statement) ingestStream(ctx context.Context) 
(nrows int64, err error)
        })
 
        // Create a temporary stage, we can't start uploading until it has been 
created
+       var createTemporaryStageStmt string
+       if st.ingestOptions.vectorizedScanner {
+               createTemporaryStageStmt = 
fmt.Sprintf(createTemporaryStageTmpl, vectorizedScannerOption)
+       } else {
+               createTemporaryStageStmt = 
fmt.Sprintf(createTemporaryStageTmpl, "")
+       }
        _, err = st.cnxn.cn.ExecContext(ctx, createTemporaryStageStmt, nil)
        if err != nil {
                return
diff --git a/go/adbc/driver/snowflake/statement.go 
b/go/adbc/driver/snowflake/statement.go
index 6b7e09560..65a60c8b9 100644
--- a/go/adbc/driver/snowflake/statement.go
+++ b/go/adbc/driver/snowflake/statement.go
@@ -46,6 +46,7 @@ const (
        OptionStatementIngestTargetFileSize    = 
"adbc.snowflake.statement.ingest_target_file_size"
        OptionStatementIngestCompressionCodec  = 
"adbc.snowflake.statement.ingest_compression_codec" // TODO(GH-1473): Implement 
option
        OptionStatementIngestCompressionLevel  = 
"adbc.snowflake.statement.ingest_compression_level" // TODO(GH-1473): Implement 
option
+       OptionStatementVectorizedScanner       = 
"adbc.snowflake.statement.ingest_use_vectorized_scanner"
 )
 
 type statement struct {
@@ -228,6 +229,16 @@ func (st *statement) SetOption(key string, val string) 
error {
                                Code: adbc.StatusInvalidArgument,
                        }
                }
+       case OptionStatementVectorizedScanner:
+               vectorized, err := strconv.ParseBool(val)
+               if err != nil {
+                       return adbc.Error{
+                               Msg:  fmt.Sprintf("[Snowflake] could not parse 
'%s' as bool for option '%s'", val, key),
+                               Code: adbc.StatusInvalidArgument,
+                       }
+               }
+               st.ingestOptions.vectorizedScanner = vectorized
+               return nil
        default:
                return st.Base().SetOption(key, val)
        }

Reply via email to