zeroshade commented on code in PR #3325:
URL: https://github.com/apache/arrow-adbc/pull/3325#discussion_r2407805253


##########
go/adbc/driver/databricks/connection.go:
##########
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "strings"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       _ "github.com/databricks/databricks-sql-go"
+)
+
+type connectionImpl struct {
+       driverbase.ConnectionImplBase
+
+       // Connection settings
+       catalog  string
+       dbSchema string
+
+       // Database connection
+       conn *sql.Conn
+}
+
+func (c *connectionImpl) Close() error {
+       if c.conn == nil {
+               return adbc.Error{Code: adbc.StatusInvalidState}
+       }
+       defer func() {
+               c.conn = nil
+       }()
+       return c.conn.Close()
+}
+
+func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
+       return &statementImpl{
+               conn: c,
+       }, nil
+}
+
+func (c *connectionImpl) SetAutocommit(autocommit bool) error {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       if !autocommit {
+               return adbc.Error{
+                       Code: adbc.StatusNotImplemented,
+                       Msg:  fmt.Sprintf("disabling autocommit is not 
supported"),
+               }
+       } else {
+               return nil
+       }

Review Comment:
   the else is unnecessary, just do `return nil` since we return in the 
previous case



##########
go/adbc/driver/databricks/database.go:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "strconv"
+       "strings"
+       "time"
+
+       "crypto/tls"
+       "crypto/x509"
+       "net/http"
+       "os"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       dbsql "github.com/databricks/databricks-sql-go"
+)
+
+const DEFAULT_PORT = 443
+const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second
+const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second

Review Comment:
   ```suggestion
   const (
       DEFAULT_PORT = 443
       DEFAULT_RETRY_WAIT_MIN = 1 * time.Second
       DEFAULT_RETRY_WAIT_MAX = 30 * time.Second
   )
   ```



##########
go/adbc/driver/databricks/connection.go:
##########
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "strings"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       _ "github.com/databricks/databricks-sql-go"
+)
+
+type connectionImpl struct {
+       driverbase.ConnectionImplBase
+
+       // Connection settings
+       catalog  string
+       dbSchema string
+
+       // Database connection
+       conn *sql.Conn
+}
+
+func (c *connectionImpl) Close() error {
+       if c.conn == nil {
+               return adbc.Error{Code: adbc.StatusInvalidState}
+       }
+       defer func() {
+               c.conn = nil
+       }()
+       return c.conn.Close()
+}
+
+func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
+       return &statementImpl{
+               conn: c,
+       }, nil
+}
+
+func (c *connectionImpl) SetAutocommit(autocommit bool) error {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       if !autocommit {
+               return adbc.Error{
+                       Code: adbc.StatusNotImplemented,
+                       Msg:  fmt.Sprintf("disabling autocommit is not 
supported"),
+               }
+       } else {
+               return nil
+       }
+}
+
+// CurrentNamespacer interface implementation
+func (c *connectionImpl) GetCurrentCatalog() (string, error) {
+       return c.catalog, nil
+}
+
+func (c *connectionImpl) GetCurrentDbSchema() (string, error) {
+       return c.dbSchema, nil
+}
+
+func (c *connectionImpl) SetCurrentCatalog(catalog string) error {
+       if catalog == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "catalog cannot be empty",
+               }
+       }
+       if c.conn != nil {
+               escapedCatalog := strings.ReplaceAll(catalog, "`", "``")
+               _, err := c.conn.ExecContext(context.Background(), 
fmt.Sprintf("USE CATALOG `%s`", escapedCatalog))
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to set catalog: %v", 
err),
+                       }
+               }
+               c.catalog = catalog
+       }
+       return nil
+}
+
+func (c *connectionImpl) SetCurrentDbSchema(schema string) error {
+       if schema == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "schema cannot be empty",
+               }
+       }
+       if c.conn != nil {
+               escapedSchema := strings.ReplaceAll(schema, "`", "``")
+               _, err := c.conn.ExecContext(context.Background(), 
fmt.Sprintf("USE SCHEMA `%s`", escapedSchema))
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to set schema: %v", 
err),
+                       }
+               }
+               c.dbSchema = schema
+       }
+       return nil

Review Comment:
   same as above



##########
go/adbc/driver/databricks/statement.go:
##########
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "database/sql/driver"
+       "errors"
+       "fmt"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+
+       dbsqlrows "github.com/databricks/databricks-sql-go/rows"
+)
+
+type statementImpl struct {
+       conn       *connectionImpl
+       query      string
+       parameters []interface{}
+       prepared   *sql.Stmt
+}
+
+func (s *statementImpl) Close() error {
+       if s.conn == nil {
+               return adbc.Error{
+                       Msg:  "statement already closed",
+                       Code: adbc.StatusInvalidState,
+               }
+       }
+       if s.prepared != nil {
+               return s.prepared.Close()
+       }
+       s.conn = nil
+       return nil
+}
+
+func (s *statementImpl) SetOption(key, val string) error {
+       // No statement-specific options are supported yet
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  fmt.Sprintf("unsupported statement option: %s", key),
+       }
+}
+
+func (s *statementImpl) SetSqlQuery(query string) error {
+       s.query = query
+       // Reset prepared statement if query changes
+       if s.prepared != nil {
+               if err := s.prepared.Close(); err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInvalidState,
+                               Msg:  fmt.Sprintf("failed to close previous 
prepared statement: %v", err),
+                       }
+               }
+               s.prepared = nil
+       }
+       return nil
+}
+
+func (s *statementImpl) Prepare(ctx context.Context) error {
+       if s.query == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       stmt, err := s.conn.conn.PrepareContext(ctx, s.query)
+       if err != nil {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  fmt.Sprintf("failed to prepare statement: %v", 
err),
+               }
+       }
+
+       s.prepared = stmt
+       return nil
+}
+
+func (s *statementImpl) ExecuteQuery(ctx context.Context) (array.RecordReader, 
int64, error) {
+       // TODO: Prepared statement support with raw connections
+       if s.prepared != nil {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusNotImplemented,
+                       Msg:  "Prepared statements are not yet supported via 
`execute query`",
+               }
+       }

Review Comment:
   We shouldn't *require* a prepared statement here. If we have one, then use 
it, otherwise use `s.query`. 



##########
go/adbc/driver/databricks/connection.go:
##########
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "strings"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       _ "github.com/databricks/databricks-sql-go"
+)
+
+type connectionImpl struct {
+       driverbase.ConnectionImplBase
+
+       // Connection settings
+       catalog  string
+       dbSchema string
+
+       // Database connection
+       conn *sql.Conn
+}
+
+func (c *connectionImpl) Close() error {
+       if c.conn == nil {
+               return adbc.Error{Code: adbc.StatusInvalidState}
+       }
+       defer func() {
+               c.conn = nil
+       }()
+       return c.conn.Close()
+}
+
+func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
+       return &statementImpl{
+               conn: c,
+       }, nil
+}
+
+func (c *connectionImpl) SetAutocommit(autocommit bool) error {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       if !autocommit {
+               return adbc.Error{
+                       Code: adbc.StatusNotImplemented,
+                       Msg:  fmt.Sprintf("disabling autocommit is not 
supported"),
+               }
+       } else {
+               return nil
+       }
+}
+
+// CurrentNamespacer interface implementation
+func (c *connectionImpl) GetCurrentCatalog() (string, error) {
+       return c.catalog, nil
+}
+
+func (c *connectionImpl) GetCurrentDbSchema() (string, error) {
+       return c.dbSchema, nil
+}
+
+func (c *connectionImpl) SetCurrentCatalog(catalog string) error {
+       if catalog == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "catalog cannot be empty",
+               }
+       }
+       if c.conn != nil {
+               escapedCatalog := strings.ReplaceAll(catalog, "`", "``")
+               _, err := c.conn.ExecContext(context.Background(), 
fmt.Sprintf("USE CATALOG `%s`", escapedCatalog))
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to set catalog: %v", 
err),
+                       }
+               }
+               c.catalog = catalog
+       }
+       return nil

Review Comment:
   simplify this by inverting it:
   
   do `if c.conn == nil { return adbc.Error{.....} }` and then handle the other 
case.



##########
go/adbc/driver/databricks/database.go:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "strconv"
+       "strings"
+       "time"
+
+       "crypto/tls"
+       "crypto/x509"
+       "net/http"
+       "os"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       dbsql "github.com/databricks/databricks-sql-go"
+)
+
+const DEFAULT_PORT = 443
+const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second
+const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second
+
+type databaseImpl struct {
+       driverbase.DatabaseImplBase
+
+       // Connection Pool
+       db           *sql.DB
+       needsRefresh bool // Whether we need to re-initialize
+
+       // Connection parameters
+       serverHostname string
+       httpPath       string
+       accessToken    string
+       port           string
+       catalog        string
+       schema         string
+
+       // Query options
+       queryTimeout        time.Duration
+       maxRows             int
+       queryRetryCount     int
+       downloadThreadCount int
+
+       // TLS/SSL options
+       sslMode     string
+       sslRootCert string
+
+       // OAuth options (for future expansion)
+       oauthClientID     string
+       oauthClientSecret string
+       oauthRefreshToken string
+}
+
+func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) {
+       if d.serverHostname == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "server hostname is required",
+               }
+       }
+
+       if d.httpPath == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "HTTP path is required",
+               }
+       }
+
+       // FIXME: Support other auth methods
+       if d.accessToken == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "access token is required",
+               }
+       }
+
+       opts := []dbsql.ConnOption{
+               dbsql.WithAccessToken(d.accessToken),
+               dbsql.WithServerHostname(d.serverHostname),
+               dbsql.WithHTTPPath(d.httpPath),
+       }
+
+       // Validate and set custom port
+       // Defaults to 443
+       if d.port != "" {
+               port, err := strconv.Atoi(d.port)
+               if err != nil || port < 1 || port > 65535 {
+                       return nil, adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  "invalid port number",
+                       }
+               }
+               opts = append(opts, dbsql.WithPort(port))
+       } else {
+               opts = append(opts, dbsql.WithPort(DEFAULT_PORT))
+       }

Review Comment:
   why not just have the Port be an integer instead of a string in the first 
place?



##########
go/adbc/driver/databricks/database.go:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "strconv"
+       "strings"
+       "time"
+
+       "crypto/tls"
+       "crypto/x509"
+       "net/http"
+       "os"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       dbsql "github.com/databricks/databricks-sql-go"
+)
+
+const DEFAULT_PORT = 443
+const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second
+const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second
+
+type databaseImpl struct {
+       driverbase.DatabaseImplBase
+
+       // Connection Pool
+       db           *sql.DB
+       needsRefresh bool // Whether we need to re-initialize
+
+       // Connection parameters
+       serverHostname string
+       httpPath       string
+       accessToken    string
+       port           string
+       catalog        string
+       schema         string
+
+       // Query options
+       queryTimeout        time.Duration
+       maxRows             int
+       queryRetryCount     int
+       downloadThreadCount int
+
+       // TLS/SSL options
+       sslMode     string
+       sslRootCert string
+
+       // OAuth options (for future expansion)
+       oauthClientID     string
+       oauthClientSecret string
+       oauthRefreshToken string
+}
+
+func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) {
+       if d.serverHostname == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "server hostname is required",
+               }
+       }
+
+       if d.httpPath == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "HTTP path is required",
+               }
+       }
+
+       // FIXME: Support other auth methods
+       if d.accessToken == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "access token is required",
+               }
+       }
+
+       opts := []dbsql.ConnOption{
+               dbsql.WithAccessToken(d.accessToken),
+               dbsql.WithServerHostname(d.serverHostname),
+               dbsql.WithHTTPPath(d.httpPath),
+       }
+
+       // Validate and set custom port
+       // Defaults to 443
+       if d.port != "" {
+               port, err := strconv.Atoi(d.port)
+               if err != nil || port < 1 || port > 65535 {
+                       return nil, adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  "invalid port number",
+                       }
+               }
+               opts = append(opts, dbsql.WithPort(port))
+       } else {
+               opts = append(opts, dbsql.WithPort(DEFAULT_PORT))
+       }
+
+       // Default namespace for queries (catalog/schema)
+       if d.catalog != "" || d.schema != "" {
+               opts = append(opts, dbsql.WithInitialNamespace(d.catalog, 
d.schema))
+       }
+
+       if d.queryTimeout > 0 {
+               opts = append(opts, dbsql.WithTimeout(d.queryTimeout))
+       }
+
+       if d.maxRows > 0 {
+               opts = append(opts, dbsql.WithMaxRows(int(d.maxRows)))
+       }
+       if d.queryRetryCount >= 0 {
+               opts = append(opts, dbsql.WithRetries(d.queryRetryCount, 
DEFAULT_RETRY_WAIT_MIN, DEFAULT_RETRY_WAIT_MAX))
+       }
+       if d.downloadThreadCount > 0 {
+               opts = append(opts, 
dbsql.WithMaxDownloadThreads(d.downloadThreadCount))
+       }
+
+       // TLS/SSL handling
+       if d.sslMode != "" || d.sslRootCert != "" {
+               var tlsConfig *tls.Config
+
+               // Handle custom root certificate
+               if d.sslRootCert != "" {
+                       caCert, err := os.ReadFile(d.sslRootCert)
+                       if err != nil {
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  fmt.Sprintf("failed to read SSL 
root certificate: %v", err),
+                               }
+                       }
+
+                       caCertPool := x509.NewCertPool()
+                       if !caCertPool.AppendCertsFromPEM(caCert) {
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  "failed to parse SSL root 
certificate",
+                               }
+                       }
+
+                       tlsConfig = &tls.Config{
+                               RootCAs:    caCertPool,
+                               MinVersion: tls.VersionTLS12,
+                       }
+               }

Review Comment:
   can we move this into the `SetOptions` directly instead of doing it here?



##########
go/adbc/driver/databricks/statement.go:
##########
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "database/sql/driver"
+       "errors"
+       "fmt"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+
+       dbsqlrows "github.com/databricks/databricks-sql-go/rows"
+)
+
+type statementImpl struct {
+       conn       *connectionImpl
+       query      string
+       parameters []interface{}

Review Comment:
   since bind is not yet implemented, should we remove this member until we 
implement it?



##########
go/adbc/driver/databricks/cloudfetch_e2e_test.go:
##########
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks_test
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/apache/arrow-adbc/go/adbc/driver/databricks"
+       "github.com/apache/arrow-adbc/go/adbc/validation"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// TestCloudFetchE2E_BasicConnection tests basic connectivity
+// CloudFetch is handled automatically by the databricks-sql-go driver
+func TestCloudFetchE2E_BasicConnection(t *testing.T) {
+       host, token, httpPath, catalog, schema := getDatabricksConfig(t)
+
+       driver := databricks.NewDriver(memory.DefaultAllocator)
+
+       opts := map[string]string{
+               databricks.OptionServerHostname: host,
+               databricks.OptionHTTPPath:       httpPath,
+               databricks.OptionAccessToken:    token,
+               databricks.OptionCatalog:        catalog,
+               databricks.OptionSchema:         schema,
+       }
+
+       db, err := driver.NewDatabase(opts)
+       require.NoError(t, err)
+       defer validation.CheckedClose(t, db)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+       defer cancel()
+
+       conn, err := db.Open(ctx)
+       require.NoError(t, err)
+       defer validation.CheckedClose(t, conn)
+
+       // Execute a simple query to verify connection works
+       stmt, err := conn.NewStatement()
+       require.NoError(t, err)
+       defer validation.CheckedClose(t, stmt)
+
+       err = stmt.SetSqlQuery("SELECT 1 as test")
+       require.NoError(t, err)
+
+       reader, _, err := stmt.ExecuteQuery(ctx)
+       require.NoError(t, err)
+       defer reader.Release()
+
+       assert.True(t, reader.Next(), "Expected at least one record")
+       t.Logf("✅ Connection successful (CloudFetch handled automatically by 
driver)")
+}
+
+// TestCloudFetchE2E_SmallQueries tests CloudFetch with small result sets
+func TestCloudFetchE2E_SmallQueries(t *testing.T) {
+       host, token, httpPath, _, _ := getDatabricksConfig(t)
+
+       queries := []struct {
+               name     string
+               query    string
+               rowCount int
+       }{
+               {
+                       name:     "range_100",
+                       query:    "SELECT * FROM range(100)",
+                       rowCount: 100,
+               },
+               {
+                       name:     "range_1000",
+                       query:    "SELECT * FROM range(1000)",
+                       rowCount: 1000,
+               },
+               {
+                       name:     "range_10000",
+                       query:    "SELECT * FROM range(10000)",
+                       rowCount: 10000,
+               },
+       }
+
+       for _, q := range queries {
+               t.Run(q.name, func(t *testing.T) {
+                       driver := databricks.NewDriver(memory.DefaultAllocator)
+
+                       opts := map[string]string{
+                               databricks.OptionServerHostname: host,
+                               databricks.OptionHTTPPath:       httpPath,
+                               databricks.OptionAccessToken:    token,
+                       }
+
+                       db, err := driver.NewDatabase(opts)
+                       require.NoError(t, err)
+                       defer validation.CheckedClose(t, db)
+
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 60*time.Second)
+                       defer cancel()
+
+                       conn, err := db.Open(ctx)
+                       require.NoError(t, err)
+                       defer validation.CheckedClose(t, conn)
+
+                       stmt, err := conn.NewStatement()
+                       require.NoError(t, err)
+                       defer validation.CheckedClose(t, stmt)
+
+                       err = stmt.SetSqlQuery(q.query)
+                       require.NoError(t, err)
+
+                       startTime := time.Now()
+                       reader, _, err := stmt.ExecuteQuery(ctx)
+                       require.NoError(t, err)
+                       defer reader.Release()
+
+                       // Count rows
+                       totalRows := int64(0)
+                       batchCount := 0
+                       for reader.Next() {
+                               record := reader.Record()
+                               totalRows += record.NumRows()
+                               batchCount++
+                       }
+                       duration := time.Since(startTime)
+
+                       // Note: Databricks may apply row limits, so we check 
for at least some rows
+                       // rather than exact counts
+                       assert.Greater(t, totalRows, int64(0), "Expected to 
receive some rows")
+                       t.Logf("Query '%s': %d rows (requested %d) in %d 
batches, duration: %v",

Review Comment:
   unnecessary test log



##########
go/adbc/driver/databricks/connection.go:
##########
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "strings"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       _ "github.com/databricks/databricks-sql-go"
+)
+
+type connectionImpl struct {
+       driverbase.ConnectionImplBase
+
+       // Connection settings
+       catalog  string
+       dbSchema string
+
+       // Database connection
+       conn *sql.Conn
+}
+
+func (c *connectionImpl) Close() error {
+       if c.conn == nil {
+               return adbc.Error{Code: adbc.StatusInvalidState}
+       }
+       defer func() {
+               c.conn = nil
+       }()
+       return c.conn.Close()
+}
+
+func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
+       return &statementImpl{
+               conn: c,
+       }, nil
+}
+
+func (c *connectionImpl) SetAutocommit(autocommit bool) error {
+       // Databricks SQL doesn't support explicit transaction control in the 
same way
+       // as traditional databases. Most operations are implicitly committed.
+       if !autocommit {
+               return adbc.Error{
+                       Code: adbc.StatusNotImplemented,
+                       Msg:  fmt.Sprintf("disabling autocommit is not 
supported"),
+               }
+       } else {
+               return nil
+       }
+}
+
+// CurrentNamespacer interface implementation
+func (c *connectionImpl) GetCurrentCatalog() (string, error) {
+       return c.catalog, nil
+}
+
+func (c *connectionImpl) GetCurrentDbSchema() (string, error) {
+       return c.dbSchema, nil
+}
+
+func (c *connectionImpl) SetCurrentCatalog(catalog string) error {
+       if catalog == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "catalog cannot be empty",
+               }
+       }
+       if c.conn != nil {
+               escapedCatalog := strings.ReplaceAll(catalog, "`", "``")
+               _, err := c.conn.ExecContext(context.Background(), 
fmt.Sprintf("USE CATALOG `%s`", escapedCatalog))
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to set catalog: %v", 
err),
+                       }
+               }
+               c.catalog = catalog
+       }
+       return nil
+}
+
+func (c *connectionImpl) SetCurrentDbSchema(schema string) error {
+       if schema == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "schema cannot be empty",
+               }
+       }
+       if c.conn != nil {
+               escapedSchema := strings.ReplaceAll(schema, "`", "``")
+               _, err := c.conn.ExecContext(context.Background(), 
fmt.Sprintf("USE SCHEMA `%s`", escapedSchema))
+               if err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInternal,
+                               Msg:  fmt.Sprintf("failed to set schema: %v", 
err),
+                       }
+               }
+               c.dbSchema = schema
+       }
+       return nil
+}
+
+// TableTypeLister interface implementation
+func (c *connectionImpl) ListTableTypes(ctx context.Context) ([]string, error) 
{
+       // Databricks supports these table types
+       return []string{"TABLE", "VIEW", "EXTERNAL_TABLE", "MANAGED_TABLE", 
"STREAMING_TABLE", "MATERIALIZED_VIEW"}, nil
+}
+
+// Transaction methods (Databricks has limited transaction support)
+func (c *connectionImpl) Commit(ctx context.Context) error {
+       // Most operations are auto-committed.
+       return nil

Review Comment:
   shouldn't this error since we can't turn off auto-commit?



##########
go/adbc/driver/databricks/statement.go:
##########
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "database/sql/driver"
+       "errors"
+       "fmt"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+
+       dbsqlrows "github.com/databricks/databricks-sql-go/rows"
+)
+
+type statementImpl struct {
+       conn       *connectionImpl
+       query      string
+       parameters []interface{}
+       prepared   *sql.Stmt
+}
+
+func (s *statementImpl) Close() error {
+       if s.conn == nil {
+               return adbc.Error{
+                       Msg:  "statement already closed",
+                       Code: adbc.StatusInvalidState,
+               }
+       }
+       if s.prepared != nil {
+               return s.prepared.Close()
+       }
+       s.conn = nil
+       return nil
+}
+
+func (s *statementImpl) SetOption(key, val string) error {
+       // No statement-specific options are supported yet
+       return adbc.Error{
+               Code: adbc.StatusNotImplemented,
+               Msg:  fmt.Sprintf("unsupported statement option: %s", key),
+       }
+}
+
+func (s *statementImpl) SetSqlQuery(query string) error {
+       s.query = query
+       // Reset prepared statement if query changes
+       if s.prepared != nil {
+               if err := s.prepared.Close(); err != nil {
+                       return adbc.Error{
+                               Code: adbc.StatusInvalidState,
+                               Msg:  fmt.Sprintf("failed to close previous 
prepared statement: %v", err),
+                       }
+               }
+               s.prepared = nil
+       }
+       return nil
+}
+
+func (s *statementImpl) Prepare(ctx context.Context) error {
+       if s.query == "" {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  "no query set",
+               }
+       }
+
+       stmt, err := s.conn.conn.PrepareContext(ctx, s.query)
+       if err != nil {
+               return adbc.Error{
+                       Code: adbc.StatusInvalidState,
+                       Msg:  fmt.Sprintf("failed to prepare statement: %v", 
err),
+               }
+       }
+
+       s.prepared = stmt
+       return nil
+}
+
+func (s *statementImpl) ExecuteQuery(ctx context.Context) (array.RecordReader, 
int64, error) {
+       // TODO: Prepared statement support with raw connections
+       if s.prepared != nil {
+               return nil, -1, adbc.Error{
+                       Code: adbc.StatusNotImplemented,
+                       Msg:  "Prepared statements are not yet supported via 
`execute query`",
+               }
+       }

Review Comment:
   use the same logic as at line 172



##########
go/adbc/driver/databricks/database.go:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "strconv"
+       "strings"
+       "time"
+
+       "crypto/tls"
+       "crypto/x509"
+       "net/http"
+       "os"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       dbsql "github.com/databricks/databricks-sql-go"
+)
+
+const DEFAULT_PORT = 443
+const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second
+const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second
+
+type databaseImpl struct {
+       driverbase.DatabaseImplBase
+
+       // Connection Pool
+       db           *sql.DB
+       needsRefresh bool // Whether we need to re-initialize
+
+       // Connection parameters
+       serverHostname string
+       httpPath       string
+       accessToken    string
+       port           string
+       catalog        string
+       schema         string
+
+       // Query options
+       queryTimeout        time.Duration
+       maxRows             int
+       queryRetryCount     int
+       downloadThreadCount int
+
+       // TLS/SSL options
+       sslMode     string
+       sslRootCert string
+
+       // OAuth options (for future expansion)
+       oauthClientID     string
+       oauthClientSecret string
+       oauthRefreshToken string
+}
+
+func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) {
+       if d.serverHostname == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "server hostname is required",
+               }
+       }
+
+       if d.httpPath == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "HTTP path is required",
+               }
+       }
+
+       // FIXME: Support other auth methods
+       if d.accessToken == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "access token is required",
+               }
+       }
+
+       opts := []dbsql.ConnOption{
+               dbsql.WithAccessToken(d.accessToken),
+               dbsql.WithServerHostname(d.serverHostname),
+               dbsql.WithHTTPPath(d.httpPath),
+       }
+
+       // Validate and set custom port
+       // Defaults to 443
+       if d.port != "" {
+               port, err := strconv.Atoi(d.port)
+               if err != nil || port < 1 || port > 65535 {
+                       return nil, adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  "invalid port number",
+                       }
+               }
+               opts = append(opts, dbsql.WithPort(port))
+       } else {
+               opts = append(opts, dbsql.WithPort(DEFAULT_PORT))
+       }
+
+       // Default namespace for queries (catalog/schema)
+       if d.catalog != "" || d.schema != "" {
+               opts = append(opts, dbsql.WithInitialNamespace(d.catalog, 
d.schema))
+       }
+
+       if d.queryTimeout > 0 {
+               opts = append(opts, dbsql.WithTimeout(d.queryTimeout))
+       }
+
+       if d.maxRows > 0 {
+               opts = append(opts, dbsql.WithMaxRows(int(d.maxRows)))
+       }
+       if d.queryRetryCount >= 0 {
+               opts = append(opts, dbsql.WithRetries(d.queryRetryCount, 
DEFAULT_RETRY_WAIT_MIN, DEFAULT_RETRY_WAIT_MAX))
+       }
+       if d.downloadThreadCount > 0 {
+               opts = append(opts, 
dbsql.WithMaxDownloadThreads(d.downloadThreadCount))
+       }
+
+       // TLS/SSL handling
+       if d.sslMode != "" || d.sslRootCert != "" {
+               var tlsConfig *tls.Config
+
+               // Handle custom root certificate
+               if d.sslRootCert != "" {
+                       caCert, err := os.ReadFile(d.sslRootCert)
+                       if err != nil {
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  fmt.Sprintf("failed to read SSL 
root certificate: %v", err),
+                               }
+                       }
+
+                       caCertPool := x509.NewCertPool()
+                       if !caCertPool.AppendCertsFromPEM(caCert) {
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  "failed to parse SSL root 
certificate",
+                               }
+                       }
+
+                       tlsConfig = &tls.Config{
+                               RootCAs:    caCertPool,
+                               MinVersion: tls.VersionTLS12,
+                       }
+               }
+
+               // Handle SSL mode
+               if d.sslMode != "" {
+                       switch strings.ToLower(d.sslMode) {
+                       case "insecure":
+                               if tlsConfig == nil {
+                                       tlsConfig = &tls.Config{MinVersion: 
tls.VersionTLS12}
+                               }
+                               tlsConfig.InsecureSkipVerify = true
+                       case "require":
+                               // Default behavior - full TLS verification
+                               if tlsConfig == nil {
+                                       tlsConfig = &tls.Config{MinVersion: 
tls.VersionTLS12}
+                               }
+                       default:
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  fmt.Sprintf("invalid SSL mode: %s 
(supported: 'require', 'insecure')", d.sslMode),
+                               }
+                       }
+               }

Review Comment:
   same as above, move this to `SetOptions` instead of here



##########
go/adbc/driver/databricks/cloudfetch_e2e_test.go:
##########
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks_test
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/apache/arrow-adbc/go/adbc/driver/databricks"
+       "github.com/apache/arrow-adbc/go/adbc/validation"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// TestCloudFetchE2E_BasicConnection tests basic connectivity
+// CloudFetch is handled automatically by the databricks-sql-go driver
+func TestCloudFetchE2E_BasicConnection(t *testing.T) {
+       host, token, httpPath, catalog, schema := getDatabricksConfig(t)
+
+       driver := databricks.NewDriver(memory.DefaultAllocator)
+
+       opts := map[string]string{
+               databricks.OptionServerHostname: host,
+               databricks.OptionHTTPPath:       httpPath,
+               databricks.OptionAccessToken:    token,
+               databricks.OptionCatalog:        catalog,
+               databricks.OptionSchema:         schema,
+       }
+
+       db, err := driver.NewDatabase(opts)
+       require.NoError(t, err)
+       defer validation.CheckedClose(t, db)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+       defer cancel()
+
+       conn, err := db.Open(ctx)
+       require.NoError(t, err)
+       defer validation.CheckedClose(t, conn)
+
+       // Execute a simple query to verify connection works
+       stmt, err := conn.NewStatement()
+       require.NoError(t, err)
+       defer validation.CheckedClose(t, stmt)
+
+       err = stmt.SetSqlQuery("SELECT 1 as test")
+       require.NoError(t, err)
+
+       reader, _, err := stmt.ExecuteQuery(ctx)
+       require.NoError(t, err)
+       defer reader.Release()
+
+       assert.True(t, reader.Next(), "Expected at least one record")
+       t.Logf("✅ Connection successful (CloudFetch handled automatically by 
driver)")

Review Comment:
   unnecessary log line



##########
go/adbc/driver/databricks/database.go:
##########
@@ -0,0 +1,407 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package databricks
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "strconv"
+       "strings"
+       "time"
+
+       "crypto/tls"
+       "crypto/x509"
+       "net/http"
+       "os"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
+       dbsql "github.com/databricks/databricks-sql-go"
+)
+
+const DEFAULT_PORT = 443
+const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second
+const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second
+
+type databaseImpl struct {
+       driverbase.DatabaseImplBase
+
+       // Connection Pool
+       db           *sql.DB
+       needsRefresh bool // Whether we need to re-initialize
+
+       // Connection parameters
+       serverHostname string
+       httpPath       string
+       accessToken    string
+       port           string
+       catalog        string
+       schema         string
+
+       // Query options
+       queryTimeout        time.Duration
+       maxRows             int
+       queryRetryCount     int
+       downloadThreadCount int
+
+       // TLS/SSL options
+       sslMode     string
+       sslRootCert string
+
+       // OAuth options (for future expansion)
+       oauthClientID     string
+       oauthClientSecret string
+       oauthRefreshToken string
+}
+
+func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) {
+       if d.serverHostname == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "server hostname is required",
+               }
+       }
+
+       if d.httpPath == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "HTTP path is required",
+               }
+       }
+
+       // FIXME: Support other auth methods
+       if d.accessToken == "" {
+               return nil, adbc.Error{
+                       Code: adbc.StatusInvalidArgument,
+                       Msg:  "access token is required",
+               }
+       }
+
+       opts := []dbsql.ConnOption{
+               dbsql.WithAccessToken(d.accessToken),
+               dbsql.WithServerHostname(d.serverHostname),
+               dbsql.WithHTTPPath(d.httpPath),
+       }
+
+       // Validate and set custom port
+       // Defaults to 443
+       if d.port != "" {
+               port, err := strconv.Atoi(d.port)
+               if err != nil || port < 1 || port > 65535 {
+                       return nil, adbc.Error{
+                               Code: adbc.StatusInvalidArgument,
+                               Msg:  "invalid port number",
+                       }
+               }
+               opts = append(opts, dbsql.WithPort(port))
+       } else {
+               opts = append(opts, dbsql.WithPort(DEFAULT_PORT))
+       }
+
+       // Default namespace for queries (catalog/schema)
+       if d.catalog != "" || d.schema != "" {
+               opts = append(opts, dbsql.WithInitialNamespace(d.catalog, 
d.schema))
+       }
+
+       if d.queryTimeout > 0 {
+               opts = append(opts, dbsql.WithTimeout(d.queryTimeout))
+       }
+
+       if d.maxRows > 0 {
+               opts = append(opts, dbsql.WithMaxRows(int(d.maxRows)))
+       }
+       if d.queryRetryCount >= 0 {
+               opts = append(opts, dbsql.WithRetries(d.queryRetryCount, 
DEFAULT_RETRY_WAIT_MIN, DEFAULT_RETRY_WAIT_MAX))
+       }
+       if d.downloadThreadCount > 0 {
+               opts = append(opts, 
dbsql.WithMaxDownloadThreads(d.downloadThreadCount))
+       }
+
+       // TLS/SSL handling
+       if d.sslMode != "" || d.sslRootCert != "" {
+               var tlsConfig *tls.Config
+
+               // Handle custom root certificate
+               if d.sslRootCert != "" {
+                       caCert, err := os.ReadFile(d.sslRootCert)
+                       if err != nil {
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  fmt.Sprintf("failed to read SSL 
root certificate: %v", err),
+                               }
+                       }
+
+                       caCertPool := x509.NewCertPool()
+                       if !caCertPool.AppendCertsFromPEM(caCert) {
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  "failed to parse SSL root 
certificate",
+                               }
+                       }
+
+                       tlsConfig = &tls.Config{
+                               RootCAs:    caCertPool,
+                               MinVersion: tls.VersionTLS12,
+                       }
+               }
+
+               // Handle SSL mode
+               if d.sslMode != "" {
+                       switch strings.ToLower(d.sslMode) {
+                       case "insecure":
+                               if tlsConfig == nil {
+                                       tlsConfig = &tls.Config{MinVersion: 
tls.VersionTLS12}
+                               }
+                               tlsConfig.InsecureSkipVerify = true
+                       case "require":
+                               // Default behavior - full TLS verification
+                               if tlsConfig == nil {
+                                       tlsConfig = &tls.Config{MinVersion: 
tls.VersionTLS12}
+                               }
+                       default:
+                               return nil, adbc.Error{
+                                       Code: adbc.StatusInvalidArgument,
+                                       Msg:  fmt.Sprintf("invalid SSL mode: %s 
(supported: 'require', 'insecure')", d.sslMode),
+                               }
+                       }
+               }
+
+               // Apply custom TLS config if we have one
+               if tlsConfig != nil {
+                       transport := &http.Transport{
+                               TLSClientConfig: tlsConfig,
+                       }
+                       opts = append(opts, dbsql.WithTransport(transport))
+               }
+       }
+
+       return opts, nil
+}
+
+func (d *databaseImpl) initializeConnectionPool(ctx context.Context) (*sql.DB, 
error) {
+       opts, err := d.resolveConnectionOptions()
+
+       if err != nil {
+               return nil, err
+       }
+
+       connector, err := dbsql.NewConnector(opts...)
+
+       if err != nil {
+               return nil, err
+       }
+
+       db := sql.OpenDB(connector)
+
+       // Test the connection
+       if err := db.PingContext(ctx); err != nil {
+               err = errors.Join(db.Close())
+               return nil, adbc.Error{
+                       Code: adbc.StatusInternal,
+                       Msg:  fmt.Sprintf("failed to ping database: %v", err),
+               }
+       }
+
+       return db, nil
+}
+
+func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) {
+       // Re-initialize the connection pool and settings if anything
+       // has changed, or we have not initialized yet
+       if d.needsRefresh || d.db == nil {
+               db, err := d.initializeConnectionPool(ctx)
+
+               if err != nil {
+                       return nil, err
+               }
+
+               // Close the existing connection pool
+               if d.db != nil {
+                       d.db.Close()

Review Comment:
   check if `Close` returns an error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to