zeroshade commented on code in PR #3325: URL: https://github.com/apache/arrow-adbc/pull/3325#discussion_r2407805253
########## go/adbc/driver/databricks/connection.go: ########## @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + _ "github.com/databricks/databricks-sql-go" +) + +type connectionImpl struct { + driverbase.ConnectionImplBase + + // Connection settings + catalog string + dbSchema string + + // Database connection + conn *sql.Conn +} + +func (c *connectionImpl) Close() error { + if c.conn == nil { + return adbc.Error{Code: adbc.StatusInvalidState} + } + defer func() { + c.conn = nil + }() + return c.conn.Close() +} + +func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + return &statementImpl{ + conn: c, + }, nil +} + +func (c *connectionImpl) SetAutocommit(autocommit bool) error { + // Databricks SQL doesn't support explicit transaction control in the same way + // as traditional databases. Most operations are implicitly committed. + if !autocommit { + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("disabling autocommit is not supported"), + } + } else { + return nil + } Review Comment: the else is unnecessary, just do `return nil` since we return in the previous case ########## go/adbc/driver/databricks/database.go: ########## @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "crypto/tls" + "crypto/x509" + "net/http" + "os" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + dbsql "github.com/databricks/databricks-sql-go" +) + +const DEFAULT_PORT = 443 +const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second +const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second Review Comment: ```suggestion const ( DEFAULT_PORT = 443 DEFAULT_RETRY_WAIT_MIN = 1 * time.Second DEFAULT_RETRY_WAIT_MAX = 30 * time.Second ) ``` ########## go/adbc/driver/databricks/connection.go: ########## @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + _ "github.com/databricks/databricks-sql-go" +) + +type connectionImpl struct { + driverbase.ConnectionImplBase + + // Connection settings + catalog string + dbSchema string + + // Database connection + conn *sql.Conn +} + +func (c *connectionImpl) Close() error { + if c.conn == nil { + return adbc.Error{Code: adbc.StatusInvalidState} + } + defer func() { + c.conn = nil + }() + return c.conn.Close() +} + +func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + return &statementImpl{ + conn: c, + }, nil +} + +func (c *connectionImpl) SetAutocommit(autocommit bool) error { + // Databricks SQL doesn't support explicit transaction control in the same way + // as traditional databases. Most operations are implicitly committed. + if !autocommit { + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("disabling autocommit is not supported"), + } + } else { + return nil + } +} + +// CurrentNamespacer interface implementation +func (c *connectionImpl) GetCurrentCatalog() (string, error) { + return c.catalog, nil +} + +func (c *connectionImpl) GetCurrentDbSchema() (string, error) { + return c.dbSchema, nil +} + +func (c *connectionImpl) SetCurrentCatalog(catalog string) error { + if catalog == "" { + return adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "catalog cannot be empty", + } + } + if c.conn != nil { + escapedCatalog := strings.ReplaceAll(catalog, "`", "``") + _, err := c.conn.ExecContext(context.Background(), fmt.Sprintf("USE CATALOG `%s`", escapedCatalog)) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set catalog: %v", err), + } + } + c.catalog = catalog + } + return nil +} + +func (c *connectionImpl) SetCurrentDbSchema(schema string) error { + if schema == "" { + return adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "schema cannot be empty", + } + } + if c.conn != nil { + escapedSchema := strings.ReplaceAll(schema, "`", "``") + _, err := c.conn.ExecContext(context.Background(), fmt.Sprintf("USE SCHEMA `%s`", escapedSchema)) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set schema: %v", err), + } + } + c.dbSchema = schema + } + return nil Review Comment: same as above ########## go/adbc/driver/databricks/statement.go: ########## @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "database/sql/driver" + "errors" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + + dbsqlrows "github.com/databricks/databricks-sql-go/rows" +) + +type statementImpl struct { + conn *connectionImpl + query string + parameters []interface{} + prepared *sql.Stmt +} + +func (s *statementImpl) Close() error { + if s.conn == nil { + return adbc.Error{ + Msg: "statement already closed", + Code: adbc.StatusInvalidState, + } + } + if s.prepared != nil { + return s.prepared.Close() + } + s.conn = nil + return nil +} + +func (s *statementImpl) SetOption(key, val string) error { + // No statement-specific options are supported yet + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("unsupported statement option: %s", key), + } +} + +func (s *statementImpl) SetSqlQuery(query string) error { + s.query = query + // Reset prepared statement if query changes + if s.prepared != nil { + if err := s.prepared.Close(); err != nil { + return adbc.Error{ + Code: adbc.StatusInvalidState, + Msg: fmt.Sprintf("failed to close previous prepared statement: %v", err), + } + } + s.prepared = nil + } + return nil +} + +func (s *statementImpl) Prepare(ctx context.Context) error { + if s.query == "" { + return adbc.Error{ + Code: adbc.StatusInvalidState, + Msg: "no query set", + } + } + + stmt, err := s.conn.conn.PrepareContext(ctx, s.query) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInvalidState, + Msg: fmt.Sprintf("failed to prepare statement: %v", err), + } + } + + s.prepared = stmt + return nil +} + +func (s *statementImpl) ExecuteQuery(ctx context.Context) (array.RecordReader, int64, error) { + // TODO: Prepared statement support with raw connections + if s.prepared != nil { + return nil, -1, adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: "Prepared statements are not yet supported via `execute query`", + } + } Review Comment: We shouldn't *require* a prepared statement here. If we have one, then use it, otherwise use `s.query`. ########## go/adbc/driver/databricks/connection.go: ########## @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + _ "github.com/databricks/databricks-sql-go" +) + +type connectionImpl struct { + driverbase.ConnectionImplBase + + // Connection settings + catalog string + dbSchema string + + // Database connection + conn *sql.Conn +} + +func (c *connectionImpl) Close() error { + if c.conn == nil { + return adbc.Error{Code: adbc.StatusInvalidState} + } + defer func() { + c.conn = nil + }() + return c.conn.Close() +} + +func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + return &statementImpl{ + conn: c, + }, nil +} + +func (c *connectionImpl) SetAutocommit(autocommit bool) error { + // Databricks SQL doesn't support explicit transaction control in the same way + // as traditional databases. Most operations are implicitly committed. + if !autocommit { + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("disabling autocommit is not supported"), + } + } else { + return nil + } +} + +// CurrentNamespacer interface implementation +func (c *connectionImpl) GetCurrentCatalog() (string, error) { + return c.catalog, nil +} + +func (c *connectionImpl) GetCurrentDbSchema() (string, error) { + return c.dbSchema, nil +} + +func (c *connectionImpl) SetCurrentCatalog(catalog string) error { + if catalog == "" { + return adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "catalog cannot be empty", + } + } + if c.conn != nil { + escapedCatalog := strings.ReplaceAll(catalog, "`", "``") + _, err := c.conn.ExecContext(context.Background(), fmt.Sprintf("USE CATALOG `%s`", escapedCatalog)) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set catalog: %v", err), + } + } + c.catalog = catalog + } + return nil Review Comment: simplify this by inverting it: do `if c.conn == nil { return adbc.Error{.....} }` and then handle the other case. ########## go/adbc/driver/databricks/database.go: ########## @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "crypto/tls" + "crypto/x509" + "net/http" + "os" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + dbsql "github.com/databricks/databricks-sql-go" +) + +const DEFAULT_PORT = 443 +const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second +const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second + +type databaseImpl struct { + driverbase.DatabaseImplBase + + // Connection Pool + db *sql.DB + needsRefresh bool // Whether we need to re-initialize + + // Connection parameters + serverHostname string + httpPath string + accessToken string + port string + catalog string + schema string + + // Query options + queryTimeout time.Duration + maxRows int + queryRetryCount int + downloadThreadCount int + + // TLS/SSL options + sslMode string + sslRootCert string + + // OAuth options (for future expansion) + oauthClientID string + oauthClientSecret string + oauthRefreshToken string +} + +func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) { + if d.serverHostname == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "server hostname is required", + } + } + + if d.httpPath == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "HTTP path is required", + } + } + + // FIXME: Support other auth methods + if d.accessToken == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "access token is required", + } + } + + opts := []dbsql.ConnOption{ + dbsql.WithAccessToken(d.accessToken), + dbsql.WithServerHostname(d.serverHostname), + dbsql.WithHTTPPath(d.httpPath), + } + + // Validate and set custom port + // Defaults to 443 + if d.port != "" { + port, err := strconv.Atoi(d.port) + if err != nil || port < 1 || port > 65535 { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "invalid port number", + } + } + opts = append(opts, dbsql.WithPort(port)) + } else { + opts = append(opts, dbsql.WithPort(DEFAULT_PORT)) + } Review Comment: why not just have the Port be an integer instead of a string in the first place? ########## go/adbc/driver/databricks/database.go: ########## @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "crypto/tls" + "crypto/x509" + "net/http" + "os" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + dbsql "github.com/databricks/databricks-sql-go" +) + +const DEFAULT_PORT = 443 +const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second +const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second + +type databaseImpl struct { + driverbase.DatabaseImplBase + + // Connection Pool + db *sql.DB + needsRefresh bool // Whether we need to re-initialize + + // Connection parameters + serverHostname string + httpPath string + accessToken string + port string + catalog string + schema string + + // Query options + queryTimeout time.Duration + maxRows int + queryRetryCount int + downloadThreadCount int + + // TLS/SSL options + sslMode string + sslRootCert string + + // OAuth options (for future expansion) + oauthClientID string + oauthClientSecret string + oauthRefreshToken string +} + +func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) { + if d.serverHostname == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "server hostname is required", + } + } + + if d.httpPath == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "HTTP path is required", + } + } + + // FIXME: Support other auth methods + if d.accessToken == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "access token is required", + } + } + + opts := []dbsql.ConnOption{ + dbsql.WithAccessToken(d.accessToken), + dbsql.WithServerHostname(d.serverHostname), + dbsql.WithHTTPPath(d.httpPath), + } + + // Validate and set custom port + // Defaults to 443 + if d.port != "" { + port, err := strconv.Atoi(d.port) + if err != nil || port < 1 || port > 65535 { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "invalid port number", + } + } + opts = append(opts, dbsql.WithPort(port)) + } else { + opts = append(opts, dbsql.WithPort(DEFAULT_PORT)) + } + + // Default namespace for queries (catalog/schema) + if d.catalog != "" || d.schema != "" { + opts = append(opts, dbsql.WithInitialNamespace(d.catalog, d.schema)) + } + + if d.queryTimeout > 0 { + opts = append(opts, dbsql.WithTimeout(d.queryTimeout)) + } + + if d.maxRows > 0 { + opts = append(opts, dbsql.WithMaxRows(int(d.maxRows))) + } + if d.queryRetryCount >= 0 { + opts = append(opts, dbsql.WithRetries(d.queryRetryCount, DEFAULT_RETRY_WAIT_MIN, DEFAULT_RETRY_WAIT_MAX)) + } + if d.downloadThreadCount > 0 { + opts = append(opts, dbsql.WithMaxDownloadThreads(d.downloadThreadCount)) + } + + // TLS/SSL handling + if d.sslMode != "" || d.sslRootCert != "" { + var tlsConfig *tls.Config + + // Handle custom root certificate + if d.sslRootCert != "" { + caCert, err := os.ReadFile(d.sslRootCert) + if err != nil { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: fmt.Sprintf("failed to read SSL root certificate: %v", err), + } + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "failed to parse SSL root certificate", + } + } + + tlsConfig = &tls.Config{ + RootCAs: caCertPool, + MinVersion: tls.VersionTLS12, + } + } Review Comment: can we move this into the `SetOptions` directly instead of doing it here? ########## go/adbc/driver/databricks/statement.go: ########## @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "database/sql/driver" + "errors" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + + dbsqlrows "github.com/databricks/databricks-sql-go/rows" +) + +type statementImpl struct { + conn *connectionImpl + query string + parameters []interface{} Review Comment: since bind is not yet implemented, should we remove this member until we implement it? ########## go/adbc/driver/databricks/cloudfetch_e2e_test.go: ########## @@ -0,0 +1,494 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks_test + +import ( + "context" + "testing" + "time" + + "github.com/apache/arrow-adbc/go/adbc/driver/databricks" + "github.com/apache/arrow-adbc/go/adbc/validation" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestCloudFetchE2E_BasicConnection tests basic connectivity +// CloudFetch is handled automatically by the databricks-sql-go driver +func TestCloudFetchE2E_BasicConnection(t *testing.T) { + host, token, httpPath, catalog, schema := getDatabricksConfig(t) + + driver := databricks.NewDriver(memory.DefaultAllocator) + + opts := map[string]string{ + databricks.OptionServerHostname: host, + databricks.OptionHTTPPath: httpPath, + databricks.OptionAccessToken: token, + databricks.OptionCatalog: catalog, + databricks.OptionSchema: schema, + } + + db, err := driver.NewDatabase(opts) + require.NoError(t, err) + defer validation.CheckedClose(t, db) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + conn, err := db.Open(ctx) + require.NoError(t, err) + defer validation.CheckedClose(t, conn) + + // Execute a simple query to verify connection works + stmt, err := conn.NewStatement() + require.NoError(t, err) + defer validation.CheckedClose(t, stmt) + + err = stmt.SetSqlQuery("SELECT 1 as test") + require.NoError(t, err) + + reader, _, err := stmt.ExecuteQuery(ctx) + require.NoError(t, err) + defer reader.Release() + + assert.True(t, reader.Next(), "Expected at least one record") + t.Logf("✅ Connection successful (CloudFetch handled automatically by driver)") +} + +// TestCloudFetchE2E_SmallQueries tests CloudFetch with small result sets +func TestCloudFetchE2E_SmallQueries(t *testing.T) { + host, token, httpPath, _, _ := getDatabricksConfig(t) + + queries := []struct { + name string + query string + rowCount int + }{ + { + name: "range_100", + query: "SELECT * FROM range(100)", + rowCount: 100, + }, + { + name: "range_1000", + query: "SELECT * FROM range(1000)", + rowCount: 1000, + }, + { + name: "range_10000", + query: "SELECT * FROM range(10000)", + rowCount: 10000, + }, + } + + for _, q := range queries { + t.Run(q.name, func(t *testing.T) { + driver := databricks.NewDriver(memory.DefaultAllocator) + + opts := map[string]string{ + databricks.OptionServerHostname: host, + databricks.OptionHTTPPath: httpPath, + databricks.OptionAccessToken: token, + } + + db, err := driver.NewDatabase(opts) + require.NoError(t, err) + defer validation.CheckedClose(t, db) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + conn, err := db.Open(ctx) + require.NoError(t, err) + defer validation.CheckedClose(t, conn) + + stmt, err := conn.NewStatement() + require.NoError(t, err) + defer validation.CheckedClose(t, stmt) + + err = stmt.SetSqlQuery(q.query) + require.NoError(t, err) + + startTime := time.Now() + reader, _, err := stmt.ExecuteQuery(ctx) + require.NoError(t, err) + defer reader.Release() + + // Count rows + totalRows := int64(0) + batchCount := 0 + for reader.Next() { + record := reader.Record() + totalRows += record.NumRows() + batchCount++ + } + duration := time.Since(startTime) + + // Note: Databricks may apply row limits, so we check for at least some rows + // rather than exact counts + assert.Greater(t, totalRows, int64(0), "Expected to receive some rows") + t.Logf("Query '%s': %d rows (requested %d) in %d batches, duration: %v", Review Comment: unnecessary test log ########## go/adbc/driver/databricks/connection.go: ########## @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + _ "github.com/databricks/databricks-sql-go" +) + +type connectionImpl struct { + driverbase.ConnectionImplBase + + // Connection settings + catalog string + dbSchema string + + // Database connection + conn *sql.Conn +} + +func (c *connectionImpl) Close() error { + if c.conn == nil { + return adbc.Error{Code: adbc.StatusInvalidState} + } + defer func() { + c.conn = nil + }() + return c.conn.Close() +} + +func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + return &statementImpl{ + conn: c, + }, nil +} + +func (c *connectionImpl) SetAutocommit(autocommit bool) error { + // Databricks SQL doesn't support explicit transaction control in the same way + // as traditional databases. Most operations are implicitly committed. + if !autocommit { + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("disabling autocommit is not supported"), + } + } else { + return nil + } +} + +// CurrentNamespacer interface implementation +func (c *connectionImpl) GetCurrentCatalog() (string, error) { + return c.catalog, nil +} + +func (c *connectionImpl) GetCurrentDbSchema() (string, error) { + return c.dbSchema, nil +} + +func (c *connectionImpl) SetCurrentCatalog(catalog string) error { + if catalog == "" { + return adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "catalog cannot be empty", + } + } + if c.conn != nil { + escapedCatalog := strings.ReplaceAll(catalog, "`", "``") + _, err := c.conn.ExecContext(context.Background(), fmt.Sprintf("USE CATALOG `%s`", escapedCatalog)) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set catalog: %v", err), + } + } + c.catalog = catalog + } + return nil +} + +func (c *connectionImpl) SetCurrentDbSchema(schema string) error { + if schema == "" { + return adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "schema cannot be empty", + } + } + if c.conn != nil { + escapedSchema := strings.ReplaceAll(schema, "`", "``") + _, err := c.conn.ExecContext(context.Background(), fmt.Sprintf("USE SCHEMA `%s`", escapedSchema)) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set schema: %v", err), + } + } + c.dbSchema = schema + } + return nil +} + +// TableTypeLister interface implementation +func (c *connectionImpl) ListTableTypes(ctx context.Context) ([]string, error) { + // Databricks supports these table types + return []string{"TABLE", "VIEW", "EXTERNAL_TABLE", "MANAGED_TABLE", "STREAMING_TABLE", "MATERIALIZED_VIEW"}, nil +} + +// Transaction methods (Databricks has limited transaction support) +func (c *connectionImpl) Commit(ctx context.Context) error { + // Most operations are auto-committed. + return nil Review Comment: shouldn't this error since we can't turn off auto-commit? ########## go/adbc/driver/databricks/statement.go: ########## @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "database/sql/driver" + "errors" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + + dbsqlrows "github.com/databricks/databricks-sql-go/rows" +) + +type statementImpl struct { + conn *connectionImpl + query string + parameters []interface{} + prepared *sql.Stmt +} + +func (s *statementImpl) Close() error { + if s.conn == nil { + return adbc.Error{ + Msg: "statement already closed", + Code: adbc.StatusInvalidState, + } + } + if s.prepared != nil { + return s.prepared.Close() + } + s.conn = nil + return nil +} + +func (s *statementImpl) SetOption(key, val string) error { + // No statement-specific options are supported yet + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("unsupported statement option: %s", key), + } +} + +func (s *statementImpl) SetSqlQuery(query string) error { + s.query = query + // Reset prepared statement if query changes + if s.prepared != nil { + if err := s.prepared.Close(); err != nil { + return adbc.Error{ + Code: adbc.StatusInvalidState, + Msg: fmt.Sprintf("failed to close previous prepared statement: %v", err), + } + } + s.prepared = nil + } + return nil +} + +func (s *statementImpl) Prepare(ctx context.Context) error { + if s.query == "" { + return adbc.Error{ + Code: adbc.StatusInvalidState, + Msg: "no query set", + } + } + + stmt, err := s.conn.conn.PrepareContext(ctx, s.query) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInvalidState, + Msg: fmt.Sprintf("failed to prepare statement: %v", err), + } + } + + s.prepared = stmt + return nil +} + +func (s *statementImpl) ExecuteQuery(ctx context.Context) (array.RecordReader, int64, error) { + // TODO: Prepared statement support with raw connections + if s.prepared != nil { + return nil, -1, adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: "Prepared statements are not yet supported via `execute query`", + } + } Review Comment: use the same logic as at line 172 ########## go/adbc/driver/databricks/database.go: ########## @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "crypto/tls" + "crypto/x509" + "net/http" + "os" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + dbsql "github.com/databricks/databricks-sql-go" +) + +const DEFAULT_PORT = 443 +const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second +const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second + +type databaseImpl struct { + driverbase.DatabaseImplBase + + // Connection Pool + db *sql.DB + needsRefresh bool // Whether we need to re-initialize + + // Connection parameters + serverHostname string + httpPath string + accessToken string + port string + catalog string + schema string + + // Query options + queryTimeout time.Duration + maxRows int + queryRetryCount int + downloadThreadCount int + + // TLS/SSL options + sslMode string + sslRootCert string + + // OAuth options (for future expansion) + oauthClientID string + oauthClientSecret string + oauthRefreshToken string +} + +func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) { + if d.serverHostname == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "server hostname is required", + } + } + + if d.httpPath == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "HTTP path is required", + } + } + + // FIXME: Support other auth methods + if d.accessToken == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "access token is required", + } + } + + opts := []dbsql.ConnOption{ + dbsql.WithAccessToken(d.accessToken), + dbsql.WithServerHostname(d.serverHostname), + dbsql.WithHTTPPath(d.httpPath), + } + + // Validate and set custom port + // Defaults to 443 + if d.port != "" { + port, err := strconv.Atoi(d.port) + if err != nil || port < 1 || port > 65535 { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "invalid port number", + } + } + opts = append(opts, dbsql.WithPort(port)) + } else { + opts = append(opts, dbsql.WithPort(DEFAULT_PORT)) + } + + // Default namespace for queries (catalog/schema) + if d.catalog != "" || d.schema != "" { + opts = append(opts, dbsql.WithInitialNamespace(d.catalog, d.schema)) + } + + if d.queryTimeout > 0 { + opts = append(opts, dbsql.WithTimeout(d.queryTimeout)) + } + + if d.maxRows > 0 { + opts = append(opts, dbsql.WithMaxRows(int(d.maxRows))) + } + if d.queryRetryCount >= 0 { + opts = append(opts, dbsql.WithRetries(d.queryRetryCount, DEFAULT_RETRY_WAIT_MIN, DEFAULT_RETRY_WAIT_MAX)) + } + if d.downloadThreadCount > 0 { + opts = append(opts, dbsql.WithMaxDownloadThreads(d.downloadThreadCount)) + } + + // TLS/SSL handling + if d.sslMode != "" || d.sslRootCert != "" { + var tlsConfig *tls.Config + + // Handle custom root certificate + if d.sslRootCert != "" { + caCert, err := os.ReadFile(d.sslRootCert) + if err != nil { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: fmt.Sprintf("failed to read SSL root certificate: %v", err), + } + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "failed to parse SSL root certificate", + } + } + + tlsConfig = &tls.Config{ + RootCAs: caCertPool, + MinVersion: tls.VersionTLS12, + } + } + + // Handle SSL mode + if d.sslMode != "" { + switch strings.ToLower(d.sslMode) { + case "insecure": + if tlsConfig == nil { + tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12} + } + tlsConfig.InsecureSkipVerify = true + case "require": + // Default behavior - full TLS verification + if tlsConfig == nil { + tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12} + } + default: + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: fmt.Sprintf("invalid SSL mode: %s (supported: 'require', 'insecure')", d.sslMode), + } + } + } Review Comment: same as above, move this to `SetOptions` instead of here ########## go/adbc/driver/databricks/cloudfetch_e2e_test.go: ########## @@ -0,0 +1,494 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks_test + +import ( + "context" + "testing" + "time" + + "github.com/apache/arrow-adbc/go/adbc/driver/databricks" + "github.com/apache/arrow-adbc/go/adbc/validation" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestCloudFetchE2E_BasicConnection tests basic connectivity +// CloudFetch is handled automatically by the databricks-sql-go driver +func TestCloudFetchE2E_BasicConnection(t *testing.T) { + host, token, httpPath, catalog, schema := getDatabricksConfig(t) + + driver := databricks.NewDriver(memory.DefaultAllocator) + + opts := map[string]string{ + databricks.OptionServerHostname: host, + databricks.OptionHTTPPath: httpPath, + databricks.OptionAccessToken: token, + databricks.OptionCatalog: catalog, + databricks.OptionSchema: schema, + } + + db, err := driver.NewDatabase(opts) + require.NoError(t, err) + defer validation.CheckedClose(t, db) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + conn, err := db.Open(ctx) + require.NoError(t, err) + defer validation.CheckedClose(t, conn) + + // Execute a simple query to verify connection works + stmt, err := conn.NewStatement() + require.NoError(t, err) + defer validation.CheckedClose(t, stmt) + + err = stmt.SetSqlQuery("SELECT 1 as test") + require.NoError(t, err) + + reader, _, err := stmt.ExecuteQuery(ctx) + require.NoError(t, err) + defer reader.Release() + + assert.True(t, reader.Next(), "Expected at least one record") + t.Logf("✅ Connection successful (CloudFetch handled automatically by driver)") Review Comment: unnecessary log line ########## go/adbc/driver/databricks/database.go: ########## @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "crypto/tls" + "crypto/x509" + "net/http" + "os" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + dbsql "github.com/databricks/databricks-sql-go" +) + +const DEFAULT_PORT = 443 +const DEFAULT_RETRY_WAIT_MIN = 1 * time.Second +const DEFAULT_RETRY_WAIT_MAX = 30 * time.Second + +type databaseImpl struct { + driverbase.DatabaseImplBase + + // Connection Pool + db *sql.DB + needsRefresh bool // Whether we need to re-initialize + + // Connection parameters + serverHostname string + httpPath string + accessToken string + port string + catalog string + schema string + + // Query options + queryTimeout time.Duration + maxRows int + queryRetryCount int + downloadThreadCount int + + // TLS/SSL options + sslMode string + sslRootCert string + + // OAuth options (for future expansion) + oauthClientID string + oauthClientSecret string + oauthRefreshToken string +} + +func (d *databaseImpl) resolveConnectionOptions() ([]dbsql.ConnOption, error) { + if d.serverHostname == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "server hostname is required", + } + } + + if d.httpPath == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "HTTP path is required", + } + } + + // FIXME: Support other auth methods + if d.accessToken == "" { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "access token is required", + } + } + + opts := []dbsql.ConnOption{ + dbsql.WithAccessToken(d.accessToken), + dbsql.WithServerHostname(d.serverHostname), + dbsql.WithHTTPPath(d.httpPath), + } + + // Validate and set custom port + // Defaults to 443 + if d.port != "" { + port, err := strconv.Atoi(d.port) + if err != nil || port < 1 || port > 65535 { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "invalid port number", + } + } + opts = append(opts, dbsql.WithPort(port)) + } else { + opts = append(opts, dbsql.WithPort(DEFAULT_PORT)) + } + + // Default namespace for queries (catalog/schema) + if d.catalog != "" || d.schema != "" { + opts = append(opts, dbsql.WithInitialNamespace(d.catalog, d.schema)) + } + + if d.queryTimeout > 0 { + opts = append(opts, dbsql.WithTimeout(d.queryTimeout)) + } + + if d.maxRows > 0 { + opts = append(opts, dbsql.WithMaxRows(int(d.maxRows))) + } + if d.queryRetryCount >= 0 { + opts = append(opts, dbsql.WithRetries(d.queryRetryCount, DEFAULT_RETRY_WAIT_MIN, DEFAULT_RETRY_WAIT_MAX)) + } + if d.downloadThreadCount > 0 { + opts = append(opts, dbsql.WithMaxDownloadThreads(d.downloadThreadCount)) + } + + // TLS/SSL handling + if d.sslMode != "" || d.sslRootCert != "" { + var tlsConfig *tls.Config + + // Handle custom root certificate + if d.sslRootCert != "" { + caCert, err := os.ReadFile(d.sslRootCert) + if err != nil { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: fmt.Sprintf("failed to read SSL root certificate: %v", err), + } + } + + caCertPool := x509.NewCertPool() + if !caCertPool.AppendCertsFromPEM(caCert) { + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: "failed to parse SSL root certificate", + } + } + + tlsConfig = &tls.Config{ + RootCAs: caCertPool, + MinVersion: tls.VersionTLS12, + } + } + + // Handle SSL mode + if d.sslMode != "" { + switch strings.ToLower(d.sslMode) { + case "insecure": + if tlsConfig == nil { + tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12} + } + tlsConfig.InsecureSkipVerify = true + case "require": + // Default behavior - full TLS verification + if tlsConfig == nil { + tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12} + } + default: + return nil, adbc.Error{ + Code: adbc.StatusInvalidArgument, + Msg: fmt.Sprintf("invalid SSL mode: %s (supported: 'require', 'insecure')", d.sslMode), + } + } + } + + // Apply custom TLS config if we have one + if tlsConfig != nil { + transport := &http.Transport{ + TLSClientConfig: tlsConfig, + } + opts = append(opts, dbsql.WithTransport(transport)) + } + } + + return opts, nil +} + +func (d *databaseImpl) initializeConnectionPool(ctx context.Context) (*sql.DB, error) { + opts, err := d.resolveConnectionOptions() + + if err != nil { + return nil, err + } + + connector, err := dbsql.NewConnector(opts...) + + if err != nil { + return nil, err + } + + db := sql.OpenDB(connector) + + // Test the connection + if err := db.PingContext(ctx); err != nil { + err = errors.Join(db.Close()) + return nil, adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to ping database: %v", err), + } + } + + return db, nil +} + +func (d *databaseImpl) Open(ctx context.Context) (adbc.Connection, error) { + // Re-initialize the connection pool and settings if anything + // has changed, or we have not initialized yet + if d.needsRefresh || d.db == nil { + db, err := d.initializeConnectionPool(ctx) + + if err != nil { + return nil, err + } + + // Close the existing connection pool + if d.db != nil { + d.db.Close() Review Comment: check if `Close` returns an error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
