zeroshade commented on code in PR #3325:
URL: https://github.com/apache/arrow-adbc/pull/3325#discussion_r2326147581


##########
go/adbc/driver/databricks/cloudfetch_e2e_test.go:
##########
@@ -0,0 +1,496 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build integration
+// +build integration
+
+package databricks_test
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/apache/arrow-adbc/go/adbc/driver/databricks"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// TestCloudFetchE2E_BasicConnection tests basic connectivity
+// CloudFetch is handled automatically by the databricks-sql-go driver
+func TestCloudFetchE2E_BasicConnection(t *testing.T) {
+       host, token, httpPath, catalog, schema := getDatabricksConfig(t)
+
+       driver := databricks.NewDriver(memory.DefaultAllocator)
+
+       opts := map[string]string{
+               databricks.OptionServerHostname: host,
+               databricks.OptionHTTPPath:       httpPath,
+               databricks.OptionAccessToken:    token,
+               databricks.OptionCatalog:        catalog,
+               databricks.OptionSchema:         schema,
+       }
+
+       db, err := driver.NewDatabase(opts)
+       require.NoError(t, err)
+       defer func() { _ = db.Close() }()

Review Comment:
   you should use `validation.CheckedClose` for this instead of just 
dropping/ignoring the result



##########
go/adbc/driver/databricks/connection.go:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       _ "github.com/databricks/databricks-sql-go"
+)
+
+type connectionImpl struct {
+       driverbase.ConnectionImplBase
+
+       // Connection settings
+       catalog  string
+       dbSchema string
+
+       // Database connection
+       conn *sql.Conn
+}
+
+func (c *connectionImpl) Close() error {
+       if c.conn == nil {
+               return adbc.Error{Code: adbc.StatusInvalidState}
+       }
+       defer func() {
+               c.conn = nil
+       }()
+       return c.conn.Close()
+}
+
+func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
+       return &statementImpl{
+               conn: c,
+       }, nil
+}
+
+// Autocommit interface implementation
+func (c *connectionImpl) GetAutocommit() bool {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       return true
+}
+
+func (c *connectionImpl) SetAutocommit(autocommit bool) error {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       if !autocommit {
+               return adbc.Error{
+                       Code: adbc.StatusNotImplemented,
+                       Msg:  fmt.Sprintf("disabling autocommit is not 
supported"),
+               }
+       } else {
+               return nil
+       }
+}
+
+// CurrentNamespacer interface implementation
+func (c *connectionImpl) GetCurrentCatalog() (string, error) {
+       return c.catalog, nil
+}
+
+func (c *connectionImpl) GetCurrentDbSchema() (string, error) {
+       return c.dbSchema, nil
+}
+
+func (c *connectionImpl) SetCurrentCatalog(catalog string) error {
+       // Use the database to execute USE CATALOG
+       if c.conn != nil && catalog != "" {
+               _, err := c.conn.ExecContext(context.TODO(), "USE CATALOG 
`%s`", catalog)
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to set catalog: %v", 
err),
+                       }
+               }
+       }
+       c.catalog = catalog
+       return nil
+}
+
+func (c *connectionImpl) SetCurrentDbSchema(schema string) error {
+       // Use the database to execute USE SCHEMA
+       if c.conn != nil && schema != "" {
+               _, err := c.conn.ExecContext(context.TODO(), "USE SCHEMA `%s`", 
schema)
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to set schema: %v", 
err),
+                       }
+               }
+       }
+       c.dbSchema = schema
+       return nil
+}
+
+// TableTypeLister interface implementation
+func (c *connectionImpl) ListTableTypes(ctx context.Context) ([]string, error) 
{
+       // Databricks supports these table types
+       return []string{"TABLE", "VIEW", "EXTERNAL_TABLE", "MANAGED_TABLE"}, nil
+}
+
+func (c *connectionImpl) GetTableTypes(ctx context.Context) 
(array.RecordReader, error) {
+       // Databricks supports these table types
+       tableTypes := []string{"TABLE", "VIEW", "EXTERNAL_TABLE", 
"MANAGED_TABLE"}
+
+       // Create Arrow schema for table types
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "table_type", Type: arrow.BinaryTypes.String},
+       }, nil)
+
+       // Create record batch
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)

Review Comment:
   use `c.Alloc` instead of `memory.DefaultAllocator` to maintain consistency



##########
go/adbc/driver/databricks/connection.go:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       _ "github.com/databricks/databricks-sql-go"
+)
+
+type connectionImpl struct {
+       driverbase.ConnectionImplBase
+
+       // Connection settings
+       catalog  string
+       dbSchema string
+
+       // Database connection
+       conn *sql.Conn
+}
+
+func (c *connectionImpl) Close() error {
+       if c.conn == nil {
+               return adbc.Error{Code: adbc.StatusInvalidState}
+       }
+       defer func() {
+               c.conn = nil
+       }()
+       return c.conn.Close()
+}
+
+func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
+       return &statementImpl{
+               conn: c,
+       }, nil
+}
+
+// Autocommit interface implementation
+func (c *connectionImpl) GetAutocommit() bool {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       return true
+}
+
+func (c *connectionImpl) SetAutocommit(autocommit bool) error {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       if !autocommit {
+               return adbc.Error{
+                       Code: adbc.StatusNotImplemented,
+                       Msg:  fmt.Sprintf("disabling autocommit is not 
supported"),
+               }
+       } else {
+               return nil
+       }
+}
+
+// CurrentNamespacer interface implementation
+func (c *connectionImpl) GetCurrentCatalog() (string, error) {
+       return c.catalog, nil
+}
+
+func (c *connectionImpl) GetCurrentDbSchema() (string, error) {
+       return c.dbSchema, nil
+}
+
+func (c *connectionImpl) SetCurrentCatalog(catalog string) error {
+       // Use the database to execute USE CATALOG
+       if c.conn != nil && catalog != "" {
+               _, err := c.conn.ExecContext(context.TODO(), "USE CATALOG 
`%s`", catalog)

Review Comment:
   use `context.Background()` here for now



##########
go/adbc/driver/databricks/ipc_reader_adapter.go:
##########
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "bytes"
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/ipc"
+       dbsqlrows "github.com/databricks/databricks-sql-go/rows"
+)
+
+// Check if the rows interface supports IPC streams
+type rowsWithIPCStream interface {
+       GetArrowIPCStreams(context.Context) (dbsqlrows.ArrowIPCStreamIterator, 
error)
+}
+
+// ipcReaderAdapter uses the new IPC stream interface for Arrow access
+type ipcReaderAdapter struct {
+       ipcIterator   dbsqlrows.ArrowIPCStreamIterator
+       currentReader *ipc.Reader
+       currentRecord arrow.Record
+       schema        *arrow.Schema
+       closed        bool
+       refCount      int64
+}
+
+// newIPCReaderAdapter creates a RecordReader using direct IPC stream access
+func newIPCReaderAdapter(ctx context.Context, rows dbsqlrows.Rows) 
(array.RecordReader, error) {
+       // Check if rows supports IPC streams
+       ipcRows, ok := rows.(rowsWithIPCStream)
+       if !ok {
+               return nil, fmt.Errorf("databricks rows do not support IPC 
stream access")
+       }
+
+       // Get IPC stream iterator
+       ipcIterator, err := ipcRows.GetArrowIPCStreams(ctx)
+       if err != nil {
+               return nil, fmt.Errorf("failed to get IPC streams: %w", err)
+       }
+
+       schema_bytes, err := ipcIterator.SchemaBytes()
+       if err != nil {
+               return nil, fmt.Errorf("failed to get schema bytes: %w", err)
+       }
+
+       // Read schema from bytes
+       reader, err := ipc.NewReader(bytes.NewReader(schema_bytes))
+       defer reader.Release()
+
+       if err != nil {
+               return nil, fmt.Errorf("failed to create schema reader: %w", 
err)
+       }
+
+       schema := reader.Schema()
+       if schema == nil {
+               return nil, fmt.Errorf("schema is nil")
+       }
+
+       adapter := &ipcReaderAdapter{
+               refCount:    1,
+               ipcIterator: ipcIterator,
+               schema:      schema,
+       }
+
+       // Initialize the first reader
+       err = adapter.loadNextReader()
+       if err != nil && err != io.EOF {
+               return nil, fmt.Errorf("failed to initialize IPC reader: %w", 
err)
+       }
+       return adapter, nil
+}
+
+func (r *ipcReaderAdapter) loadNextReader() error {
+       if r.currentReader != nil {
+               r.currentReader.Release()
+               r.currentReader = nil
+       }
+
+       // Get next IPC stream
+       if !r.ipcIterator.HasNext() {
+               return io.EOF
+       }
+
+       ipcStream, err := r.ipcIterator.Next()
+       if err != nil {
+               return err
+       }
+
+       // Create IPC reader from stream
+       reader, err := ipc.NewReader(ipcStream)
+       if err != nil {
+               return fmt.Errorf("failed to create IPC reader: %w", err)
+       }
+
+       r.currentReader = reader
+
+       return nil
+}
+
+// Implement array.RecordReader interface
+func (r *ipcReaderAdapter) Schema() *arrow.Schema {
+       return r.schema
+}
+
+func (r *ipcReaderAdapter) Next() bool {
+       if r.closed {
+               return false
+       }
+
+       // Release previous record
+       if r.currentRecord != nil {
+               r.currentRecord.Release()
+               r.currentRecord = nil
+       }
+
+       // Try to get next record from current reader
+       if r.currentReader != nil && r.currentReader.Next() {
+               r.currentRecord = r.currentReader.Record()
+               r.currentRecord.Retain()
+               return true
+       }
+
+       // Need to load next IPC stream
+       err := r.loadNextReader()
+       if err != nil {
+               // Err() will return `r.currentReader.Err()` which contains 
this error
+               return false
+       }
+
+       // Try again with new reader
+       if r.currentReader != nil && r.currentReader.Next() {
+               r.currentRecord = r.currentReader.Record()
+               r.currentRecord.Retain()
+               return true
+       }
+
+       return false
+}
+
+func (r *ipcReaderAdapter) Record() arrow.Record {
+       return r.currentRecord
+}
+
+func (r *ipcReaderAdapter) Release() {
+       r.refCount -= 1

Review Comment:
   this should use `atomic.AddInt64` to ensure we avoid race conditions, i.e.
   
   ```go
   if atomic.AddInt64(&r.refCount, -1) <= 0 {
   ```



##########
go/adbc/driver/databricks/connection.go:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       _ "github.com/databricks/databricks-sql-go"
+)
+
+type connectionImpl struct {
+       driverbase.ConnectionImplBase
+
+       // Connection settings
+       catalog  string
+       dbSchema string
+
+       // Database connection
+       conn *sql.Conn
+}
+
+func (c *connectionImpl) Close() error {
+       if c.conn == nil {
+               return adbc.Error{Code: adbc.StatusInvalidState}
+       }
+       defer func() {
+               c.conn = nil
+       }()
+       return c.conn.Close()
+}
+
+func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
+       return &statementImpl{
+               conn: c,
+       }, nil
+}
+
+// Autocommit interface implementation
+func (c *connectionImpl) GetAutocommit() bool {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       return true
+}
+
+func (c *connectionImpl) SetAutocommit(autocommit bool) error {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       if !autocommit {
+               return adbc.Error{
+                       Code: adbc.StatusNotImplemented,
+                       Msg:  fmt.Sprintf("disabling autocommit is not 
supported"),
+               }
+       } else {
+               return nil
+       }
+}
+
+// CurrentNamespacer interface implementation
+func (c *connectionImpl) GetCurrentCatalog() (string, error) {
+       return c.catalog, nil
+}
+
+func (c *connectionImpl) GetCurrentDbSchema() (string, error) {
+       return c.dbSchema, nil
+}
+
+func (c *connectionImpl) SetCurrentCatalog(catalog string) error {
+       // Use the database to execute USE CATALOG
+       if c.conn != nil && catalog != "" {
+               _, err := c.conn.ExecContext(context.TODO(), "USE CATALOG 
`%s`", catalog)
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to set catalog: %v", 
err),
+                       }
+               }
+       }
+       c.catalog = catalog
+       return nil
+}
+
+func (c *connectionImpl) SetCurrentDbSchema(schema string) error {
+       // Use the database to execute USE SCHEMA
+       if c.conn != nil && schema != "" {
+               _, err := c.conn.ExecContext(context.TODO(), "USE SCHEMA `%s`", 
schema)

Review Comment:
   same as above, use `context.Background()`



##########
go/adbc/driver/databricks/statement.go:
##########
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "database/sql/driver"
+       "fmt"
+       "reflect"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+
+       dbsqlrows "github.com/databricks/databricks-sql-go/rows"
+)
+
+type statementImpl struct {
+       conn       *connectionImpl
+       query      string
+       parameters []interface{}
+       prepared   *sql.Stmt
+}
+
+func (s *statementImpl) Close() error {
+       if s.conn == nil {
+               return adbc.Error{
+                       Msg:  "statement already closed",
+                       Code: adbc.StatusInvalidState,
+               }
+       }
+       if s.prepared != nil {
+               return s.prepared.Close()
+       }
+       return nil
+}
+
+func (s *statementImpl) SetOption(key, val string) error {
+       // No statement-specific options are supported yet
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  fmt.Sprintf("unsupported statement option: %s", key),
+       }
+}
+
+func (s *statementImpl) SetSqlQuery(query string) error {
+       s.query = query
+       // Reset prepared statement if query changes
+       if s.prepared != nil {
+               _ = s.prepared.Close() // Ignore error on cleanup

Review Comment:
   why ignore the error on cleanup?



##########
go/adbc/driver/databricks/database.go:
##########
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+       "strconv"
+       "strings"
+       "time"
+
+       "crypto/tls"
+       "crypto/x509"
+       "net/http"
+       "os"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       dbsql "github.com/databricks/databricks-sql-go"
+)
+
+const DEFAULT_PORT = 443
+const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second
+const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second
+
+type databaseImpl struct {
+       driverbase.DatabaseImplBase
+
+       // Connection Pool
+       db           *sql.DB
+       needsRefresh bool // Whether we need to re-initialize
+
+       // Connection parameters
+       serverHostname string
+       httpPath       string
+       accessToken    string
+       port           string
+       catalog        string
+       schema         string
+
+       // Query options
+       queryTimeout        time.Duration
+       maxRows             int
+       queryRetryCount     int
+       downloadThreadCount int
+
+       // TLS/SSL options
+       sslMode     string
+       sslRootCert string
+
+       // OAuth options (for future expansion)
+       oauthClientID     string
+       oauthClientSecret string
+       oauthRefreshToken string
+}
+
+func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) {
+       if d.serverHostname == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "server hostname is required",
+               }
+       }
+
+       if d.httpPath == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "HTTP path is required",
+               }
+       }
+
+       // FIXME: Support other auth methods
+       if d.accessToken == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "access token is required",
+               }
+       }
+
+       opts := []dbsql.ConnOption{
+               dbsql.WithAccessToken(d.accessToken),
+               dbsql.WithServerHostname(d.serverHostname),
+               dbsql.WithHTTPPath(d.httpPath),
+       }
+
+       // Validate and set custom port
+       // Defaults to 443
+       if d.port != "" {
+               port, err := strconv.Atoi(d.port)
+               if err != nil || port < 1 || port > 65535 {
+                       return nil, adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  "invalid port number",
+                       }
+               }
+               opts = append(opts, dbsql.WithPort(port))
+       } else {
+               opts = append(opts, dbsql.WithPort(DEFAULT_PORT))
+       }
+
+       // Default namespace for queries (catalog/schema)
+       if d.catalog != "" || d.schema != "" {
+               opts = append(opts, dbsql.WithInitialNamespace(d.catalog, 
d.schema))
+       }
+
+       if d.queryTimeout > 0 {
+               opts = append(opts, dbsql.WithTimeout(d.queryTimeout))
+       }
+
+       if d.maxRows > 0 {
+               opts = append(opts, dbsql.WithMaxRows(int(d.maxRows)))
+       }
+       if d.queryRetryCount >= 0 {
+               opts = append(opts, dbsql.WithRetries(d.queryRetryCount, 
DEFAULT_RETRY_WAIT_MIN, DEFAULT_RETRY_WAIT_MAX))
+       }
+       if d.downloadThreadCount > 0 {
+               opts = append(opts, 
dbsql.WithMaxDownloadThreads(d.downloadThreadCount))
+       }
+
+       // TLS/SSL handling
+       if d.sslMode != "" || d.sslRootCert != "" {
+               var tlsConfig *tls.Config
+
+               // Handle custom root certificate
+               if d.sslRootCert != "" {
+                       caCert, err := os.ReadFile(d.sslRootCert)
+                       if err != nil {
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  fmt.Sprintf("failed to read SSL 
root certificate: %v", err),
+                               }
+                       }
+
+                       caCertPool := x509.NewCertPool()
+                       if !caCertPool.AppendCertsFromPEM(caCert) {
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  "failed to parse SSL root 
certificate",
+                               }
+                       }
+
+                       tlsConfig = &tls.Config{
+                               RootCAs:    caCertPool,
+                               MinVersion: tls.VersionTLS12,
+                       }
+               }
+
+               // Handle SSL mode
+               if d.sslMode != "" {
+                       switch strings.ToLower(d.sslMode) {
+                       case "insecure":
+                               if tlsConfig == nil {
+                                       tlsConfig = &tls.Config{MinVersion: 
tls.VersionTLS12}
+                               }
+                               tlsConfig.InsecureSkipVerify = true
+                       case "require":
+                               // Default behavior - full TLS verification
+                               if tlsConfig == nil {
+                                       tlsConfig = &tls.Config{MinVersion: 
tls.VersionTLS12}
+                               }
+                       default:
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  fmt.Sprintf("invalid SSL mode: %s 
(supported: 'require', 'insecure')", d.sslMode),
+                               }
+                       }
+               }
+
+               // Apply custom TLS config if we have one
+               if tlsConfig != nil {
+                       transport := &http.Transport{
+                               TLSClientConfig: tlsConfig,
+                       }
+                       opts = append(opts, dbsql.WithTransport(transport))
+               }
+       }
+
+       return opts, nil
+}
+
+func (d *databaseImpl) initializeConnectionPool(ctx context.Context) (*sql.DB, 
error) {
+       opts, err := d.resolveConnectionOptions()
+
+       if err != nil {
+               return nil, err
+       }
+
+       connector, err := dbsql.NewConnector(opts...)
+
+       if err != nil {
+               return nil, err
+       }
+
+       db := sql.OpenDB(connector)
+
+       // Test the connection
+       if err := db.PingContext(ctx); err != nil {
+               _ = db.Close() // Ignore error on cleanup

Review Comment:
   instead of ignoring this error, shouldn't we propagate it?



##########
go/adbc/driver/databricks/connection.go:
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "fmt"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       _ "github.com/databricks/databricks-sql-go"
+)
+
+type connectionImpl struct {
+       driverbase.ConnectionImplBase
+
+       // Connection settings
+       catalog  string
+       dbSchema string
+
+       // Database connection
+       conn *sql.Conn
+}
+
+func (c *connectionImpl) Close() error {
+       if c.conn == nil {
+               return adbc.Error{Code: adbc.StatusInvalidState}
+       }
+       defer func() {
+               c.conn = nil
+       }()
+       return c.conn.Close()
+}
+
+func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
+       return &statementImpl{
+               conn: c,
+       }, nil
+}
+
+// Autocommit interface implementation
+func (c *connectionImpl) GetAutocommit() bool {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       return true
+}
+
+func (c *connectionImpl) SetAutocommit(autocommit bool) error {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       if !autocommit {
+               return adbc.Error{
+                       Code: adbc.StatusNotImplemented,
+                       Msg:  fmt.Sprintf("disabling autocommit is not 
supported"),
+               }
+       } else {
+               return nil
+       }
+}
+
+// CurrentNamespacer interface implementation
+func (c *connectionImpl) GetCurrentCatalog() (string, error) {
+       return c.catalog, nil
+}
+
+func (c *connectionImpl) GetCurrentDbSchema() (string, error) {
+       return c.dbSchema, nil
+}
+
+func (c *connectionImpl) SetCurrentCatalog(catalog string) error {
+       // Use the database to execute USE CATALOG
+       if c.conn != nil && catalog != "" {
+               _, err := c.conn.ExecContext(context.TODO(), "USE CATALOG 
`%s`", catalog)
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to set catalog: %v", 
err),
+                       }
+               }
+       }
+       c.catalog = catalog
+       return nil
+}
+
+func (c *connectionImpl) SetCurrentDbSchema(schema string) error {
+       // Use the database to execute USE SCHEMA
+       if c.conn != nil && schema != "" {
+               _, err := c.conn.ExecContext(context.TODO(), "USE SCHEMA `%s`", 
schema)
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to set schema: %v", 
err),
+                       }
+               }
+       }
+       c.dbSchema = schema
+       return nil
+}
+
+// TableTypeLister interface implementation
+func (c *connectionImpl) ListTableTypes(ctx context.Context) ([]string, error) 
{
+       // Databricks supports these table types
+       return []string{"TABLE", "VIEW", "EXTERNAL_TABLE", "MANAGED_TABLE"}, nil
+}
+
+func (c *connectionImpl) GetTableTypes(ctx context.Context) 
(array.RecordReader, error) {
+       // Databricks supports these table types
+       tableTypes := []string{"TABLE", "VIEW", "EXTERNAL_TABLE", 
"MANAGED_TABLE"}
+
+       // Create Arrow schema for table types
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "table_type", Type: arrow.BinaryTypes.String},
+       }, nil)
+
+       // Create record batch
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)
+       defer bldr.Release()
+
+       tableTypeBuilder := bldr.Field(0).(*array.StringBuilder)
+       for _, tableType := range tableTypes {
+               tableTypeBuilder.Append(tableType)
+       }
+
+       rec := bldr.NewRecord()
+       defer rec.Release()
+
+       reader, err := array.NewRecordReader(schema, []arrow.Record{rec})
+       if err != nil {
+               return nil, err
+       }
+       return reader, nil
+}
+
+// Transaction methods (Databricks has limited transaction support)
+func (c *connectionImpl) Commit(ctx context.Context) error {
+       // Most operations are auto-committed.
+       return nil
+}
+
+func (c *connectionImpl) Rollback(ctx context.Context) error {
+       // Databricks SQL doesn't support explicit transactions in the 
traditional sense.
+       // Most operations are auto-committed. We'll track state but not 
perform any operation.
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  fmt.Sprintf("rollback is not supported"),
+       }
+}
+
+// DbObjectsEnumerator interface implementation
+func (c *connectionImpl) GetCatalogs(ctx context.Context, catalogFilter 
*string) ([]string, error) {
+       query := "SHOW CATALOGS"
+       if catalogFilter != nil {
+               query += fmt.Sprintf(" LIKE '%s'", *catalogFilter)
+       }
+
+       rows, err := c.conn.QueryContext(ctx, query)
+       if err != nil {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to query catalogs: %v", err),
+               }
+       }
+       defer func() { _ = rows.Close() }()
+
+       var catalogs []string
+       for rows.Next() {
+               var catalog string
+               if err := rows.Scan(&catalog); err != nil {
+                       return nil, adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to scan catalog: %v", 
err),
+                       }
+               }
+               catalogs = append(catalogs, catalog)
+       }
+
+       return catalogs, rows.Err()
+}
+
+func (c *connectionImpl) GetDBSchemasForCatalog(ctx context.Context, catalog 
string, schemaFilter *string) ([]string, error) {
+       query := fmt.Sprintf("SHOW SCHEMAS IN `%s`", catalog)
+       if schemaFilter != nil {
+               query += fmt.Sprintf(" LIKE '%s'", *schemaFilter)
+       }
+
+       rows, err := c.conn.QueryContext(ctx, query)
+       if err != nil {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to query schemas: %v", err),
+               }
+       }
+       defer func() { _ = rows.Close() }()
+
+       var schemas []string
+       for rows.Next() {
+               var schema string
+               if err := rows.Scan(&schema); err != nil {
+                       return nil, adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to scan schema: %v", 
err),
+                       }
+               }
+               schemas = append(schemas, schema)
+       }
+
+       return schemas, rows.Err()
+}
+
+func (c *connectionImpl) GetTablesForDBSchema(ctx context.Context, catalog 
string, schema string, tableFilter *string, columnFilter *string, 
includeColumns bool) ([]driverbase.TableInfo, error) {
+       query := fmt.Sprintf("SHOW TABLES IN `%s`.`%s`", catalog, schema)
+       if tableFilter != nil {
+               query += fmt.Sprintf(" LIKE '%s'", *tableFilter)
+       }
+
+       rows, err := c.conn.QueryContext(ctx, query)
+       if err != nil {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to query tables: %v", err),
+               }
+       }
+       defer func() { _ = rows.Close() }()
+
+       var tables []driverbase.TableInfo
+       for rows.Next() {
+               var database, tableName, isTemporary string
+               if err := rows.Scan(&database, &tableName, &isTemporary); err 
!= nil {
+                       return nil, adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to scan table: %v", 
err),
+                       }
+               }
+
+               tableInfo := driverbase.TableInfo{
+                       TableName:        tableName,
+                       TableType:        "TABLE", // Default to TABLE, could 
be improved with more detailed queries
+                       TableColumns:     nil,     // Schema would need 
separate query
+                       TableConstraints: nil,     // Constraints would need 
separate query
+               }
+
+               tables = append(tables, tableInfo)
+       }
+
+       return tables, rows.Err()
+}
+
+func (c *connectionImpl) GetObjects(ctx context.Context, depth 
adbc.ObjectDepth, catalog *string, dbSchema *string, tableName *string, 
columnName *string, tableType []string) (array.RecordReader, error) {
+       // This is a simplified implementation. A full implementation would 
need to:
+       // 1. Query INFORMATION_SCHEMA or system tables
+       // 2. Build proper Arrow record structure
+       // 3. Handle all the filtering parameters
+
+       // For now, return empty result
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "catalog_name", Type: arrow.BinaryTypes.String},
+               {Name: "catalog_db_schemas", Type: 
arrow.ListOf(arrow.BinaryTypes.String)},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(memory.DefaultAllocator, schema)

Review Comment:
   same as above, this should be `c.Alloc`



##########
go/adbc/driver/databricks/statement.go:
##########
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "database/sql/driver"
+       "fmt"
+       "reflect"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+
+       dbsqlrows "github.com/databricks/databricks-sql-go/rows"
+)
+
+type statementImpl struct {
+       conn       *connectionImpl
+       query      string
+       parameters []interface{}
+       prepared   *sql.Stmt
+}
+
+func (s *statementImpl) Close() error {
+       if s.conn == nil {
+               return adbc.Error{
+                       Msg:  "statement already closed",
+                       Code: adbc.StatusInvalidState,
+               }
+       }
+       if s.prepared != nil {
+               return s.prepared.Close()
+       }
+       return nil
+}
+
+func (s *statementImpl) SetOption(key, val string) error {
+       // No statement-specific options are supported yet
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  fmt.Sprintf("unsupported statement option: %s", key),
+       }
+}
+
+func (s *statementImpl) SetSqlQuery(query string) error {
+       s.query = query
+       // Reset prepared statement if query changes
+       if s.prepared != nil {
+               _ = s.prepared.Close() // Ignore error on cleanup
+               s.prepared = nil
+       }
+       return nil
+}
+
+func (s *statementImpl) Prepare(ctx context.Context) error {
+       if s.query == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       stmt, err := s.conn.conn.PrepareContext(ctx, s.query)
+       if err != nil {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  fmt.Sprintf("failed to prepare statement: %v", 
err),
+               }
+       }
+
+       s.prepared = stmt
+       return nil
+}
+
+func (s *statementImpl) ExecuteQuery(ctx context.Context) (array.RecordReader, 
int64, error) {
+       if s.query == "" {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       // Execute query using raw driver interface to get Arrow batches
+       var driverRows driver.Rows
+       var err error
+       err = s.conn.conn.Raw(func(driverConn interface{}) error {
+               // Use raw driver interface for direct Arrow access
+               if queryerCtx, ok := driverConn.(driver.QueryerContext); ok {
+                       // Convert parameters to driver.NamedValue slice
+                       var driverArgs []driver.NamedValue
+                       for i, param := range s.parameters {
+                               driverArgs = append(driverArgs, 
driver.NamedValue{
+                                       Ordinal: i + 1,
+                                       Value:   param,
+                               })
+                       }
+                       driverRows, err = queryerCtx.QueryContext(ctx, s.query, 
driverArgs)
+                       return err
+               }
+               return fmt.Errorf("driver does not support QueryerContext")
+       })
+
+       if err != nil {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to execute query: %v", err),
+               }
+       }
+       defer func() { _ = driverRows.Close() }()
+
+       // Convert to databricks rows interface to get Arrow batches
+       databricksRows, ok := driverRows.(dbsqlrows.Rows)
+       if !ok {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  "driver rows do not support Arrow batches",
+               }
+       }
+
+       // Use the IPC stream interface (zero-copy)
+       reader, err := newIPCReaderAdapter(ctx, databricksRows)
+       if err != nil {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to create IPC reader adapter: 
%v", err),
+               }
+       }
+
+       // Return -1 for rowsAffected (unknown) since we can't count without 
consuming
+       // The ADBC spec allows -1 to indicate "unknown number of rows affected"
+       return reader, -1, nil
+}
+
+func (s *statementImpl) ExecuteUpdate(ctx context.Context) (int64, error) {
+       var result sql.Result
+       var err error
+
+       if s.prepared != nil {
+               result, err = s.prepared.ExecContext(ctx, s.parameters...)
+       } else if s.query != "" {
+               result, err = s.conn.conn.ExecContext(ctx, s.query, 
s.parameters...)
+       } else {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       if err != nil {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to execute update: %v", err),
+               }
+       }
+
+       rowsAffected, err := result.RowsAffected()
+       if err != nil {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to get rows affected: %v", 
err),
+               }
+       }
+
+       return rowsAffected, nil
+}
+
+func (s *statementImpl) Bind(ctx context.Context, values arrow.Record) error {
+       return adbc.Error{
+               Msg:  "Bind not yet implemented for Databricks driver",
+               Code: adbc.StatusNotImplemented,
+       }
+}
+
+func (s *statementImpl) BindStream(ctx context.Context, stream 
array.RecordReader) error {
+       return adbc.Error{
+               Msg:  "Bind not yet implemented for Databricks driver",
+               Code: adbc.StatusNotImplemented,
+       }
+}
+
+func (s *statementImpl) GetParameterSchema() (*arrow.Schema, error) {
+       // This would require parsing the SQL query to determine parameter types
+       // For now, return nil to indicate unknown schema
+       return nil, adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "parameter schema detection not implemented",
+       }
+}
+
+func (s *statementImpl) SetSubstraitPlan(plan []byte) error {
+       // Databricks SQL doesn't support Substrait plans
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "Substrait plans not supported",
+       }
+}
+
+func (s *statementImpl) ExecutePartitions(ctx context.Context) (*arrow.Schema, 
adbc.Partitions, int64, error) {
+       // Databricks SQL doesn't support partitioned result sets
+       return nil, adbc.Partitions{}, -1, adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "partitioned result sets not supported",
+       }
+}
+
+func (s *statementImpl) arrowToGoValue(arr arrow.Array, idx int) (interface{}, 
error) {
+       if arr.IsNull(idx) {
+               return nil, nil
+       }
+
+       switch a := arr.(type) {
+       case *array.Boolean:
+               return a.Value(idx), nil
+       case *array.Int8:
+               return a.Value(idx), nil
+       case *array.Int16:
+               return a.Value(idx), nil
+       case *array.Int32:
+               return a.Value(idx), nil
+       case *array.Int64:
+               return a.Value(idx), nil
+       case *array.Float32:
+               return a.Value(idx), nil
+       case *array.Float64:
+               return a.Value(idx), nil
+       case *array.String:
+               return a.Value(idx), nil
+       case *array.Binary:
+               return a.Value(idx), nil

Review Comment:
   You can use `arrow.TypedArray[[]byte]` to cover all the types which return 
`[]byte`, such as Binary/LargeBinary/BinaryView etc.



##########
go/adbc/driver/databricks/statement.go:
##########
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "database/sql/driver"
+       "fmt"
+       "reflect"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+
+       dbsqlrows "github.com/databricks/databricks-sql-go/rows"
+)
+
+type statementImpl struct {
+       conn       *connectionImpl
+       query      string
+       parameters []interface{}
+       prepared   *sql.Stmt
+}
+
+func (s *statementImpl) Close() error {
+       if s.conn == nil {
+               return adbc.Error{
+                       Msg:  "statement already closed",
+                       Code: adbc.StatusInvalidState,
+               }
+       }
+       if s.prepared != nil {
+               return s.prepared.Close()
+       }
+       return nil
+}
+
+func (s *statementImpl) SetOption(key, val string) error {
+       // No statement-specific options are supported yet
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  fmt.Sprintf("unsupported statement option: %s", key),
+       }
+}
+
+func (s *statementImpl) SetSqlQuery(query string) error {
+       s.query = query
+       // Reset prepared statement if query changes
+       if s.prepared != nil {
+               _ = s.prepared.Close() // Ignore error on cleanup
+               s.prepared = nil
+       }
+       return nil
+}
+
+func (s *statementImpl) Prepare(ctx context.Context) error {
+       if s.query == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       stmt, err := s.conn.conn.PrepareContext(ctx, s.query)
+       if err != nil {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  fmt.Sprintf("failed to prepare statement: %v", 
err),
+               }
+       }
+
+       s.prepared = stmt
+       return nil
+}
+
+func (s *statementImpl) ExecuteQuery(ctx context.Context) (array.RecordReader, 
int64, error) {
+       if s.query == "" {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       // Execute query using raw driver interface to get Arrow batches
+       var driverRows driver.Rows
+       var err error
+       err = s.conn.conn.Raw(func(driverConn interface{}) error {
+               // Use raw driver interface for direct Arrow access
+               if queryerCtx, ok := driverConn.(driver.QueryerContext); ok {
+                       // Convert parameters to driver.NamedValue slice
+                       var driverArgs []driver.NamedValue
+                       for i, param := range s.parameters {
+                               driverArgs = append(driverArgs, 
driver.NamedValue{
+                                       Ordinal: i + 1,
+                                       Value:   param,
+                               })
+                       }
+                       driverRows, err = queryerCtx.QueryContext(ctx, s.query, 
driverArgs)
+                       return err
+               }
+               return fmt.Errorf("driver does not support QueryerContext")
+       })
+
+       if err != nil {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to execute query: %v", err),
+               }
+       }
+       defer func() { _ = driverRows.Close() }()
+
+       // Convert to databricks rows interface to get Arrow batches
+       databricksRows, ok := driverRows.(dbsqlrows.Rows)
+       if !ok {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  "driver rows do not support Arrow batches",
+               }
+       }
+
+       // Use the IPC stream interface (zero-copy)
+       reader, err := newIPCReaderAdapter(ctx, databricksRows)
+       if err != nil {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to create IPC reader adapter: 
%v", err),
+               }
+       }
+
+       // Return -1 for rowsAffected (unknown) since we can't count without 
consuming
+       // The ADBC spec allows -1 to indicate "unknown number of rows affected"
+       return reader, -1, nil
+}
+
+func (s *statementImpl) ExecuteUpdate(ctx context.Context) (int64, error) {
+       var result sql.Result
+       var err error
+
+       if s.prepared != nil {
+               result, err = s.prepared.ExecContext(ctx, s.parameters...)
+       } else if s.query != "" {
+               result, err = s.conn.conn.ExecContext(ctx, s.query, 
s.parameters...)
+       } else {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       if err != nil {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to execute update: %v", err),
+               }
+       }
+
+       rowsAffected, err := result.RowsAffected()
+       if err != nil {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to get rows affected: %v", 
err),
+               }
+       }
+
+       return rowsAffected, nil
+}
+
+func (s *statementImpl) Bind(ctx context.Context, values arrow.Record) error {
+       return adbc.Error{
+               Msg:  "Bind not yet implemented for Databricks driver",
+               Code: adbc.StatusNotImplemented,
+       }
+}
+
+func (s *statementImpl) BindStream(ctx context.Context, stream 
array.RecordReader) error {
+       return adbc.Error{
+               Msg:  "Bind not yet implemented for Databricks driver",
+               Code: adbc.StatusNotImplemented,
+       }
+}
+
+func (s *statementImpl) GetParameterSchema() (*arrow.Schema, error) {
+       // This would require parsing the SQL query to determine parameter types
+       // For now, return nil to indicate unknown schema
+       return nil, adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "parameter schema detection not implemented",
+       }
+}
+
+func (s *statementImpl) SetSubstraitPlan(plan []byte) error {
+       // Databricks SQL doesn't support Substrait plans
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "Substrait plans not supported",
+       }
+}
+
+func (s *statementImpl) ExecutePartitions(ctx context.Context) (*arrow.Schema, 
adbc.Partitions, int64, error) {
+       // Databricks SQL doesn't support partitioned result sets
+       return nil, adbc.Partitions{}, -1, adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "partitioned result sets not supported",
+       }
+}
+
+func (s *statementImpl) arrowToGoValue(arr arrow.Array, idx int) (interface{}, 
error) {

Review Comment:
   where is this used and what for?



##########
go/adbc/driver/databricks/statement.go:
##########
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "database/sql/driver"
+       "fmt"
+       "reflect"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+
+       dbsqlrows "github.com/databricks/databricks-sql-go/rows"
+)
+
+type statementImpl struct {
+       conn       *connectionImpl
+       query      string
+       parameters []interface{}
+       prepared   *sql.Stmt
+}
+
+func (s *statementImpl) Close() error {
+       if s.conn == nil {
+               return adbc.Error{
+                       Msg:  "statement already closed",
+                       Code: adbc.StatusInvalidState,
+               }
+       }
+       if s.prepared != nil {
+               return s.prepared.Close()
+       }
+       return nil
+}
+
+func (s *statementImpl) SetOption(key, val string) error {
+       // No statement-specific options are supported yet
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  fmt.Sprintf("unsupported statement option: %s", key),
+       }
+}
+
+func (s *statementImpl) SetSqlQuery(query string) error {
+       s.query = query
+       // Reset prepared statement if query changes
+       if s.prepared != nil {
+               _ = s.prepared.Close() // Ignore error on cleanup
+               s.prepared = nil
+       }
+       return nil
+}
+
+func (s *statementImpl) Prepare(ctx context.Context) error {
+       if s.query == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       stmt, err := s.conn.conn.PrepareContext(ctx, s.query)
+       if err != nil {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  fmt.Sprintf("failed to prepare statement: %v", 
err),
+               }
+       }
+
+       s.prepared = stmt
+       return nil
+}
+
+func (s *statementImpl) ExecuteQuery(ctx context.Context) (array.RecordReader, 
int64, error) {
+       if s.query == "" {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       // Execute query using raw driver interface to get Arrow batches
+       var driverRows driver.Rows
+       var err error
+       err = s.conn.conn.Raw(func(driverConn interface{}) error {
+               // Use raw driver interface for direct Arrow access
+               if queryerCtx, ok := driverConn.(driver.QueryerContext); ok {
+                       // Convert parameters to driver.NamedValue slice
+                       var driverArgs []driver.NamedValue
+                       for i, param := range s.parameters {
+                               driverArgs = append(driverArgs, 
driver.NamedValue{
+                                       Ordinal: i + 1,
+                                       Value:   param,
+                               })
+                       }
+                       driverRows, err = queryerCtx.QueryContext(ctx, s.query, 
driverArgs)
+                       return err
+               }
+               return fmt.Errorf("driver does not support QueryerContext")
+       })
+
+       if err != nil {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to execute query: %v", err),
+               }
+       }
+       defer func() { _ = driverRows.Close() }()
+
+       // Convert to databricks rows interface to get Arrow batches
+       databricksRows, ok := driverRows.(dbsqlrows.Rows)
+       if !ok {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  "driver rows do not support Arrow batches",
+               }
+       }
+
+       // Use the IPC stream interface (zero-copy)
+       reader, err := newIPCReaderAdapter(ctx, databricksRows)
+       if err != nil {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to create IPC reader adapter: 
%v", err),
+               }
+       }
+
+       // Return -1 for rowsAffected (unknown) since we can't count without 
consuming
+       // The ADBC spec allows -1 to indicate "unknown number of rows affected"
+       return reader, -1, nil
+}
+
+func (s *statementImpl) ExecuteUpdate(ctx context.Context) (int64, error) {
+       var result sql.Result
+       var err error
+
+       if s.prepared != nil {
+               result, err = s.prepared.ExecContext(ctx, s.parameters...)
+       } else if s.query != "" {
+               result, err = s.conn.conn.ExecContext(ctx, s.query, 
s.parameters...)
+       } else {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       if err != nil {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to execute update: %v", err),
+               }
+       }
+
+       rowsAffected, err := result.RowsAffected()
+       if err != nil {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to get rows affected: %v", 
err),
+               }
+       }
+
+       return rowsAffected, nil
+}
+
+func (s *statementImpl) Bind(ctx context.Context, values arrow.Record) error {
+       return adbc.Error{
+               Msg:  "Bind not yet implemented for Databricks driver",
+               Code: adbc.StatusNotImplemented,
+       }
+}
+
+func (s *statementImpl) BindStream(ctx context.Context, stream 
array.RecordReader) error {
+       return adbc.Error{
+               Msg:  "Bind not yet implemented for Databricks driver",
+               Code: adbc.StatusNotImplemented,
+       }
+}
+
+func (s *statementImpl) GetParameterSchema() (*arrow.Schema, error) {
+       // This would require parsing the SQL query to determine parameter types
+       // For now, return nil to indicate unknown schema
+       return nil, adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "parameter schema detection not implemented",
+       }
+}
+
+func (s *statementImpl) SetSubstraitPlan(plan []byte) error {
+       // Databricks SQL doesn't support Substrait plans
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "Substrait plans not supported",
+       }
+}
+
+func (s *statementImpl) ExecutePartitions(ctx context.Context) (*arrow.Schema, 
adbc.Partitions, int64, error) {
+       // Databricks SQL doesn't support partitioned result sets
+       return nil, adbc.Partitions{}, -1, adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "partitioned result sets not supported",
+       }
+}
+
+func (s *statementImpl) arrowToGoValue(arr arrow.Array, idx int) (interface{}, 
error) {
+       if arr.IsNull(idx) {
+               return nil, nil
+       }
+
+       switch a := arr.(type) {
+       case *array.Boolean:
+               return a.Value(idx), nil
+       case *array.Int8:
+               return a.Value(idx), nil
+       case *array.Int16:
+               return a.Value(idx), nil
+       case *array.Int32:
+               return a.Value(idx), nil
+       case *array.Int64:
+               return a.Value(idx), nil

Review Comment:
   what about unsigned types?



##########
go/adbc/driver/databricks/ipc_reader_adapter.go:
##########
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "bytes"
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/ipc"
+       dbsqlrows "github.com/databricks/databricks-sql-go/rows"
+)
+
+// Check if the rows interface supports IPC streams
+type rowsWithIPCStream interface {
+       GetArrowIPCStreams(context.Context) (dbsqlrows.ArrowIPCStreamIterator, 
error)
+}
+
+// ipcReaderAdapter uses the new IPC stream interface for Arrow access
+type ipcReaderAdapter struct {
+       ipcIterator   dbsqlrows.ArrowIPCStreamIterator
+       currentReader *ipc.Reader
+       currentRecord arrow.Record
+       schema        *arrow.Schema
+       closed        bool
+       refCount      int64
+}
+
+// newIPCReaderAdapter creates a RecordReader using direct IPC stream access
+func newIPCReaderAdapter(ctx context.Context, rows dbsqlrows.Rows) 
(array.RecordReader, error) {
+       // Check if rows supports IPC streams
+       ipcRows, ok := rows.(rowsWithIPCStream)
+       if !ok {
+               return nil, fmt.Errorf("databricks rows do not support IPC 
stream access")
+       }
+
+       // Get IPC stream iterator
+       ipcIterator, err := ipcRows.GetArrowIPCStreams(ctx)
+       if err != nil {
+               return nil, fmt.Errorf("failed to get IPC streams: %w", err)
+       }
+
+       schema_bytes, err := ipcIterator.SchemaBytes()
+       if err != nil {
+               return nil, fmt.Errorf("failed to get schema bytes: %w", err)
+       }
+
+       // Read schema from bytes
+       reader, err := ipc.NewReader(bytes.NewReader(schema_bytes))
+       defer reader.Release()
+
+       if err != nil {
+               return nil, fmt.Errorf("failed to create schema reader: %w", 
err)
+       }
+
+       schema := reader.Schema()
+       if schema == nil {
+               return nil, fmt.Errorf("schema is nil")
+       }
+
+       adapter := &ipcReaderAdapter{
+               refCount:    1,
+               ipcIterator: ipcIterator,
+               schema:      schema,
+       }
+
+       // Initialize the first reader
+       err = adapter.loadNextReader()
+       if err != nil && err != io.EOF {
+               return nil, fmt.Errorf("failed to initialize IPC reader: %w", 
err)
+       }
+       return adapter, nil
+}
+
+func (r *ipcReaderAdapter) loadNextReader() error {
+       if r.currentReader != nil {
+               r.currentReader.Release()
+               r.currentReader = nil
+       }
+
+       // Get next IPC stream
+       if !r.ipcIterator.HasNext() {
+               return io.EOF
+       }
+
+       ipcStream, err := r.ipcIterator.Next()
+       if err != nil {
+               return err
+       }
+
+       // Create IPC reader from stream
+       reader, err := ipc.NewReader(ipcStream)
+       if err != nil {
+               return fmt.Errorf("failed to create IPC reader: %w", err)
+       }
+
+       r.currentReader = reader
+
+       return nil
+}
+
+// Implement array.RecordReader interface
+func (r *ipcReaderAdapter) Schema() *arrow.Schema {
+       return r.schema
+}
+
+func (r *ipcReaderAdapter) Next() bool {
+       if r.closed {
+               return false
+       }
+
+       // Release previous record
+       if r.currentRecord != nil {
+               r.currentRecord.Release()
+               r.currentRecord = nil
+       }
+
+       // Try to get next record from current reader
+       if r.currentReader != nil && r.currentReader.Next() {
+               r.currentRecord = r.currentReader.Record()
+               r.currentRecord.Retain()
+               return true
+       }
+
+       // Need to load next IPC stream
+       err := r.loadNextReader()
+       if err != nil {
+               // Err() will return `r.currentReader.Err()` which contains 
this error
+               return false
+       }
+
+       // Try again with new reader
+       if r.currentReader != nil && r.currentReader.Next() {
+               r.currentRecord = r.currentReader.Record()
+               r.currentRecord.Retain()
+               return true
+       }
+
+       return false
+}
+
+func (r *ipcReaderAdapter) Record() arrow.Record {
+       return r.currentRecord
+}
+
+func (r *ipcReaderAdapter) Release() {
+       r.refCount -= 1
+       if r.refCount <= 0 {
+               if r.closed {
+                       panic("Double cleanup on ipc_reader_adapter - was 
Release() called with a closed reader?")
+               }
+               r.closed = true
+
+               if r.currentRecord != nil {
+                       r.currentRecord.Release()
+                       r.currentRecord = nil
+               }
+
+               if r.currentReader != nil {
+                       r.currentReader.Release()
+                       r.currentReader = nil
+               }
+
+               if r.schema != nil {
+                       r.schema = nil
+               }
+
+               r.ipcIterator.Close()
+       }
+}
+
+func (r *ipcReaderAdapter) Retain() {
+       r.refCount += 1

Review Comment:
   same thing here, this should be `atomic.AddInt64(&r.refCount, 1)`



##########
go/adbc/driver/databricks/statement.go:
##########
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "database/sql/driver"
+       "fmt"
+       "reflect"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+
+       dbsqlrows "github.com/databricks/databricks-sql-go/rows"
+)
+
+type statementImpl struct {
+       conn       *connectionImpl
+       query      string
+       parameters []interface{}
+       prepared   *sql.Stmt
+}
+
+func (s *statementImpl) Close() error {
+       if s.conn == nil {
+               return adbc.Error{
+                       Msg:  "statement already closed",
+                       Code: adbc.StatusInvalidState,
+               }
+       }
+       if s.prepared != nil {
+               return s.prepared.Close()
+       }
+       return nil
+}
+
+func (s *statementImpl) SetOption(key, val string) error {
+       // No statement-specific options are supported yet
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  fmt.Sprintf("unsupported statement option: %s", key),
+       }
+}
+
+func (s *statementImpl) SetSqlQuery(query string) error {
+       s.query = query
+       // Reset prepared statement if query changes
+       if s.prepared != nil {
+               _ = s.prepared.Close() // Ignore error on cleanup
+               s.prepared = nil
+       }
+       return nil
+}
+
+func (s *statementImpl) Prepare(ctx context.Context) error {
+       if s.query == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       stmt, err := s.conn.conn.PrepareContext(ctx, s.query)
+       if err != nil {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  fmt.Sprintf("failed to prepare statement: %v", 
err),
+               }
+       }
+
+       s.prepared = stmt
+       return nil
+}
+
+func (s *statementImpl) ExecuteQuery(ctx context.Context) (array.RecordReader, 
int64, error) {
+       if s.query == "" {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       // Execute query using raw driver interface to get Arrow batches
+       var driverRows driver.Rows
+       var err error
+       err = s.conn.conn.Raw(func(driverConn interface{}) error {
+               // Use raw driver interface for direct Arrow access
+               if queryerCtx, ok := driverConn.(driver.QueryerContext); ok {
+                       // Convert parameters to driver.NamedValue slice
+                       var driverArgs []driver.NamedValue
+                       for i, param := range s.parameters {
+                               driverArgs = append(driverArgs, 
driver.NamedValue{
+                                       Ordinal: i + 1,
+                                       Value:   param,
+                               })
+                       }
+                       driverRows, err = queryerCtx.QueryContext(ctx, s.query, 
driverArgs)
+                       return err
+               }
+               return fmt.Errorf("driver does not support QueryerContext")
+       })
+
+       if err != nil {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to execute query: %v", err),
+               }
+       }
+       defer func() { _ = driverRows.Close() }()
+
+       // Convert to databricks rows interface to get Arrow batches
+       databricksRows, ok := driverRows.(dbsqlrows.Rows)
+       if !ok {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  "driver rows do not support Arrow batches",
+               }
+       }
+
+       // Use the IPC stream interface (zero-copy)
+       reader, err := newIPCReaderAdapter(ctx, databricksRows)
+       if err != nil {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to create IPC reader adapter: 
%v", err),
+               }
+       }
+
+       // Return -1 for rowsAffected (unknown) since we can't count without 
consuming
+       // The ADBC spec allows -1 to indicate "unknown number of rows affected"
+       return reader, -1, nil
+}
+
+func (s *statementImpl) ExecuteUpdate(ctx context.Context) (int64, error) {
+       var result sql.Result
+       var err error
+
+       if s.prepared != nil {
+               result, err = s.prepared.ExecContext(ctx, s.parameters...)
+       } else if s.query != "" {
+               result, err = s.conn.conn.ExecContext(ctx, s.query, 
s.parameters...)
+       } else {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       if err != nil {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to execute update: %v", err),
+               }
+       }
+
+       rowsAffected, err := result.RowsAffected()
+       if err != nil {
+               return -1, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to get rows affected: %v", 
err),
+               }
+       }
+
+       return rowsAffected, nil
+}
+
+func (s *statementImpl) Bind(ctx context.Context, values arrow.Record) error {
+       return adbc.Error{
+               Msg:  "Bind not yet implemented for Databricks driver",
+               Code: adbc.StatusNotImplemented,
+       }
+}
+
+func (s *statementImpl) BindStream(ctx context.Context, stream 
array.RecordReader) error {
+       return adbc.Error{
+               Msg:  "Bind not yet implemented for Databricks driver",
+               Code: adbc.StatusNotImplemented,
+       }
+}
+
+func (s *statementImpl) GetParameterSchema() (*arrow.Schema, error) {
+       // This would require parsing the SQL query to determine parameter types
+       // For now, return nil to indicate unknown schema
+       return nil, adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "parameter schema detection not implemented",
+       }
+}
+
+func (s *statementImpl) SetSubstraitPlan(plan []byte) error {
+       // Databricks SQL doesn't support Substrait plans
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "Substrait plans not supported",
+       }
+}
+
+func (s *statementImpl) ExecutePartitions(ctx context.Context) (*arrow.Schema, 
adbc.Partitions, int64, error) {
+       // Databricks SQL doesn't support partitioned result sets
+       return nil, adbc.Partitions{}, -1, adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  "partitioned result sets not supported",
+       }
+}
+
+func (s *statementImpl) arrowToGoValue(arr arrow.Array, idx int) (interface{}, 
error) {
+       if arr.IsNull(idx) {
+               return nil, nil
+       }
+
+       switch a := arr.(type) {
+       case *array.Boolean:
+               return a.Value(idx), nil
+       case *array.Int8:
+               return a.Value(idx), nil
+       case *array.Int16:
+               return a.Value(idx), nil
+       case *array.Int32:
+               return a.Value(idx), nil
+       case *array.Int64:
+               return a.Value(idx), nil
+       case *array.Float32:
+               return a.Value(idx), nil
+       case *array.Float64:
+               return a.Value(idx), nil
+       case *array.String:
+               return a.Value(idx), nil

Review Comment:
   use `array.StringLike` so that it'll cover String/LargeString/StringView 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to