lidavidm commented on code in PR #3325: URL: https://github.com/apache/arrow-adbc/pull/3325#discussion_r2347778663
########## go/adbc/driver/databricks/metadata_e2e_test.go: ########## Review Comment: Are these tests still necessary if we've implemented the validation suite? ########## go/adbc/driver/databricks/connection.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + _ "github.com/databricks/databricks-sql-go" +) + +type connectionImpl struct { + driverbase.ConnectionImplBase + + // Connection settings + catalog string + dbSchema string + + // Database connection + conn *sql.Conn +} + +func (c *connectionImpl) Close() error { + if c.conn == nil { + return adbc.Error{Code: adbc.StatusInvalidState} + } + defer func() { + c.conn = nil + }() + return c.conn.Close() +} + +func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + return &statementImpl{ + conn: c, + }, nil +} + +func (c *connectionImpl) SetAutocommit(autocommit bool) error { + // Databricks SQL doesn't support explicit transaction control in the same way + // as traditional databases. Most operations are implicitly committed. + if !autocommit { + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("disabling autocommit is not supported"), + } + } else { + return nil + } +} + +// CurrentNamespacer interface implementation +func (c *connectionImpl) GetCurrentCatalog() (string, error) { + return c.catalog, nil +} + +func (c *connectionImpl) GetCurrentDbSchema() (string, error) { + return c.dbSchema, nil +} + +func (c *connectionImpl) SetCurrentCatalog(catalog string) error { + // Use the database to execute USE CATALOG + if c.conn != nil && catalog != "" { + _, err := c.conn.ExecContext(context.Background(), "USE CATALOG `%s`", catalog) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set catalog: %v", err), + } + } + } + c.catalog = catalog + return nil +} + +func (c *connectionImpl) SetCurrentDbSchema(schema string) error { + // Use the database to execute USE SCHEMA + if c.conn != nil && schema != "" { + _, err := c.conn.ExecContext(context.Background(), "USE SCHEMA `%s`", schema) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set schema: %v", err), + } + } + } + c.dbSchema = schema Review Comment: Ditto here. ########## go/adbc/driver/databricks/statement_test.go: ########## @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks_test + +import ( + "testing" + + "github.com/apache/arrow-adbc/go/adbc/driver/databricks" + "github.com/apache/arrow-adbc/go/adbc/validation" + "github.com/stretchr/testify/assert" +) + +func TestStatementBasic(t *testing.T) { + // This is a basic test to ensure the code compiles + // Real tests would require a connection to Databricks + + // Create a driver and database + driver := databricks.NewDriver(nil) + db, err := driver.NewDatabase(map[string]string{ + databricks.OptionServerHostname: "mock-host", + databricks.OptionAccessToken: "mock-token", + databricks.OptionHTTPPath: "mock-path", + }) + assert.NoError(t, err) + + defer validation.CheckedClose(t, db) + + // Note: We can't test the actual statement implementation without a real connection + // This test just ensures the public API compiles correctly Review Comment: Do we need these tests still then? ########## go/adbc/driver/databricks/driver_test.go: ########## @@ -0,0 +1,805 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks_test + +import ( + "context" + "fmt" + "os" + "strings" + "testing" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/databricks" + "github.com/apache/arrow-adbc/go/adbc/validation" + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type DatabricksQuirks struct { + mem *memory.CheckedAllocator + catalogName string + schemaName string + hostname string + httpPath string + token string + port string +} + +func (d *DatabricksQuirks) SetupDriver(t *testing.T) adbc.Driver { + d.mem = memory.NewCheckedAllocator(memory.DefaultAllocator) + return databricks.NewDriver(d.mem) +} + +func (d *DatabricksQuirks) TearDownDriver(t *testing.T, _ adbc.Driver) { + d.mem.AssertSize(t, 0) +} + +func (d *DatabricksQuirks) DatabaseOptions() map[string]string { + opts := map[string]string{ + databricks.OptionServerHostname: d.hostname, + databricks.OptionHTTPPath: d.httpPath, + databricks.OptionAccessToken: d.token, + } + + if d.port != "" { + opts[databricks.OptionPort] = d.port + } + if d.catalogName != "" { + opts[databricks.OptionCatalog] = d.catalogName + } + if d.schemaName != "" { + opts[databricks.OptionSchema] = d.schemaName + } + + return opts +} + +func (d *DatabricksQuirks) getSqlTypeFromArrowType(dt arrow.DataType) string { + switch dt.ID() { + case arrow.INT8, arrow.INT16, arrow.INT32, arrow.INT64: + return "BIGINT" + case arrow.FLOAT32: + return "FLOAT" + case arrow.FLOAT64: + return "DOUBLE" + case arrow.STRING: + return "STRING" + case arrow.BOOL: + return "BOOLEAN" + case arrow.DATE32: + return "DATE" + case arrow.TIMESTAMP: + return "TIMESTAMP" + case arrow.BINARY: + return "BINARY" + default: + return "STRING" + } +} + +func quoteTblName(name string) string { + return "`" + strings.ReplaceAll(name, "`", "``") + "`" +} + +func getArr(arr arrow.Array) interface{} { + switch arr := arr.(type) { + case *array.Int8: + v := arr.Int8Values() + return &v + case *array.Uint8: + v := arr.Uint8Values() + return &v + case *array.Int16: + v := arr.Int16Values() + return &v + case *array.Uint16: + v := arr.Uint16Values() + return &v + case *array.Int32: + v := arr.Int32Values() + return &v + case *array.Uint32: + v := arr.Uint32Values() + return &v + case *array.Int64: + v := arr.Int64Values() + return &v + case *array.Uint64: + v := arr.Uint64Values() + return &v + case *array.Float32: + v := arr.Float32Values() + return &v + case *array.Float64: + v := arr.Float64Values() + return &v + case *array.String: + v := make([]string, arr.Len()) + for i := 0; i < arr.Len(); i++ { + if arr.IsNull(i) { + continue + } + v[i] = arr.Value(i) + } + return &v + default: + panic(fmt.Errorf("unimplemented type %s", arr.DataType())) + } +} + +func (d *DatabricksQuirks) CreateSampleTable(tableName string, r arrow.RecordBatch) error { + drv := databricks.NewDriver(d.mem) + db, err := drv.NewDatabase(d.DatabaseOptions()) + if err != nil { + return err + } + defer db.Close() + + cnxn, err := db.Open(context.Background()) + if err != nil { + return err + } + defer cnxn.Close() + + stmt, err := cnxn.NewStatement() + if err != nil { + return err + } + defer stmt.Close() + + var b strings.Builder + b.WriteString("CREATE OR REPLACE TABLE ") + b.WriteString(quoteTblName(tableName)) + b.WriteString(" (") + + for i := 0; i < int(r.NumCols()); i++ { + if i != 0 { + b.WriteString(", ") + } + f := r.Schema().Field(i) + b.WriteString(quoteTblName(f.Name)) + b.WriteByte(' ') + b.WriteString(d.getSqlTypeFromArrowType(f.Type)) + } + + b.WriteString(")") + + if err := stmt.SetSqlQuery(b.String()); err != nil { + return err + } + if _, err := stmt.ExecuteUpdate(context.Background()); err != nil { + return err + } + + if r.NumRows() == 0 { + return nil + } + + return d.insertDataRows(stmt, tableName, r) +} + +func (d *DatabricksQuirks) insertDataRows(stmt adbc.Statement, tableName string, r arrow.RecordBatch) error { + if r.NumRows() == 0 { + return nil + } + + for row := 0; row < int(r.NumRows()); row++ { + var values []string + for col := 0; col < int(r.NumCols()); col++ { + column := r.Column(col) + if column.IsNull(row) { + values = append(values, "NULL") + } else { + values = append(values, d.getSimpleTestValue(column.DataType(), row)) + } + } + + querySQL := fmt.Sprintf("INSERT INTO %s VALUES (%s)", + quoteTblName(tableName), strings.Join(values, ", ")) + + if err := stmt.SetSqlQuery(querySQL); err != nil { + return err + } + if _, err := stmt.ExecuteUpdate(context.Background()); err != nil { + return err + } + } + return nil +} + +func (d *DatabricksQuirks) getSimpleTestValue(dataType arrow.DataType, row int) string { + switch dataType.ID() { + case arrow.INT8, arrow.UINT8, arrow.INT16, arrow.UINT16, arrow.INT32, arrow.UINT32, arrow.INT64, arrow.UINT64: + return fmt.Sprintf("%d", row+1) + case arrow.FLOAT32, arrow.FLOAT64: + return fmt.Sprintf("%g", float64(row)+0.5) + case arrow.STRING: + return fmt.Sprintf("'test_string_%d'", row) + case arrow.BINARY: + return fmt.Sprintf("X'%02X'", row) + case arrow.BOOL: + if row%2 == 0 { + return "TRUE" + } + return "FALSE" + case arrow.DATE32: + return "'2023-01-01'" + case arrow.TIMESTAMP: + return "'2023-01-01 12:00:00.000000'" + case arrow.DECIMAL128: + return fmt.Sprintf("%d.50", row+1) + default: + return fmt.Sprintf("'test_value_%d'", row) + } +} + +func (d *DatabricksQuirks) DropTable(cnxn adbc.Connection, tblname string) error { + stmt, err := cnxn.NewStatement() + if err != nil { + return err + } + defer func() { + if err = stmt.Close(); err != nil { + panic(err) + } + }() + + if err = stmt.SetSqlQuery(`DROP TABLE IF EXISTS ` + quoteTblName(tblname)); err != nil { + return err + } + + _, err = stmt.ExecuteUpdate(context.Background()) + return err +} + +func (d *DatabricksQuirks) Alloc() memory.Allocator { return d.mem } +func (d *DatabricksQuirks) BindParameter(index int) string { return "?" } +func (d *DatabricksQuirks) SupportsBulkIngest(mode string) bool { return true } +func (d *DatabricksQuirks) SupportsConcurrentStatements() bool { return true } +func (d *DatabricksQuirks) SupportsCurrentCatalogSchema() bool { return true } +func (d *DatabricksQuirks) SupportsExecuteSchema() bool { return true } +func (d *DatabricksQuirks) SupportsGetSetOptions() bool { return true } +func (d *DatabricksQuirks) SupportsPartitionedData() bool { return false } +func (d *DatabricksQuirks) SupportsStatistics() bool { return false } +func (d *DatabricksQuirks) SupportsTransactions() bool { return true } +func (d *DatabricksQuirks) SupportsGetParameterSchema() bool { return false } +func (d *DatabricksQuirks) SupportsDynamicParameterBinding() bool { return false } +func (d *DatabricksQuirks) SupportsErrorIngestIncompatibleSchema() bool { return false } +func (d *DatabricksQuirks) Catalog() string { return d.catalogName } +func (d *DatabricksQuirks) DBSchema() string { return d.schemaName } + +func (d *DatabricksQuirks) GetMetadata(code adbc.InfoCode) interface{} { + switch code { + case adbc.InfoDriverName: + return "ADBC Databricks Driver - Go" + case adbc.InfoDriverVersion: + return "(unknown or development build)" + case adbc.InfoDriverArrowVersion: + return "(unknown or development build)" + case adbc.InfoVendorVersion: + return "(unknown or development build)" + case adbc.InfoVendorArrowVersion: + return "(unknown or development build)" + case adbc.InfoDriverADBCVersion: + return adbc.AdbcVersion1_1_0 + case adbc.InfoVendorName: + return "Databricks" + } + return nil +} + +func (d *DatabricksQuirks) SampleTableSchemaMetadata(tblName string, dt arrow.DataType) arrow.Metadata { + switch dt.ID() { + case arrow.STRING: + return arrow.MetadataFrom(map[string]string{ + "DATA_TYPE": "STRING", "PRIMARY_KEY": "N", + }) + case arrow.INT64: + return arrow.MetadataFrom(map[string]string{ + "DATA_TYPE": "BIGINT", "PRIMARY_KEY": "N", + }) + case arrow.FLOAT64: + return arrow.MetadataFrom(map[string]string{ + "DATA_TYPE": "DOUBLE", "PRIMARY_KEY": "N", + }) + case arrow.BOOL: + return arrow.MetadataFrom(map[string]string{ + "DATA_TYPE": "BOOLEAN", "PRIMARY_KEY": "N", + }) + } + return arrow.Metadata{} +} + +func (suite *DatabricksTests) checkRowCount(expected int64, actual int64) { + if actual != -1 { + suite.EqualValues(expected, actual) + } +} + +func withQuirks(t *testing.T, fn func(*DatabricksQuirks)) { + hostname := os.Getenv("DATABRICKS_HOST") + httpPath := os.Getenv("DATABRICKS_HTTPPATH") + token := os.Getenv("DATABRICKS_ACCESSTOKEN") + catalog := os.Getenv("DATABRICKS_CATALOG") + schema := os.Getenv("DATABRICKS_SCHEMA") + + if hostname == "" { + t.Skip("DATABRICKS_HOST not defined, skipping Databricks driver tests") + } else if httpPath == "" { + t.Skip("DATABRICKS_HTTPPATH not defined, skipping Databricks driver tests") + } else if token == "" { + t.Skip("DATABRICKS_ACCESSTOKEN not defined, skipping Databricks driver tests") + } + + if catalog == "" { + catalog = "main" + } + if schema == "" { + schema = "default" + } + q := &DatabricksQuirks{ + hostname: hostname, + httpPath: httpPath, + token: token, + catalogName: catalog, + schemaName: schema, + port: os.Getenv("DATABRICKS_PORT"), // optional + } + + fn(q) +} + +func TestValidation(t *testing.T) { + withQuirks(t, func(q *DatabricksQuirks) { + suite.Run(t, &validation.DatabaseTests{Quirks: q}) + suite.Run(t, &validation.ConnectionTests{Quirks: q}) + suite.Run(t, &validation.StatementTests{Quirks: q}) + }) +} + +func TestDatabricks(t *testing.T) { + withQuirks(t, func(q *DatabricksQuirks) { + suite.Run(t, &DatabricksTests{Quirks: q}) + }) +} + +// ---- Additional Tests -------------------- + +type DatabricksTests struct { + suite.Suite + + Quirks *DatabricksQuirks + + ctx context.Context + driver adbc.Driver + db adbc.Database + cnxn adbc.Connection + stmt adbc.Statement +} + +func (suite *DatabricksTests) SetupTest() { + var err error + suite.ctx = context.Background() + suite.driver = suite.Quirks.SetupDriver(suite.T()) + suite.db, err = suite.driver.NewDatabase(suite.Quirks.DatabaseOptions()) + suite.NoError(err) + suite.cnxn, err = suite.db.Open(suite.ctx) + suite.NoError(err) + suite.stmt, err = suite.cnxn.NewStatement() + suite.NoError(err) +} + +func (suite *DatabricksTests) TearDownTest() { + suite.NoError(suite.stmt.Close()) + suite.NoError(suite.cnxn.Close()) + suite.Quirks.TearDownDriver(suite.T(), suite.driver) + suite.cnxn = nil + suite.NoError(suite.db.Close()) + suite.db = nil + suite.driver = nil +} + +func (suite *DatabricksTests) TestNewDatabaseWithOptions() { + t := suite.T() + + drv := suite.Quirks.SetupDriver(t) + + t.Run("WithBasicOptions", func(t *testing.T) { + dbOptions := suite.Quirks.DatabaseOptions() + db, err := drv.NewDatabase(dbOptions) + suite.NoError(err) + suite.NotNil(db) + + cnxn, err := db.Open(suite.ctx) + suite.NoError(err) + suite.NotNil(cnxn) + + suite.NoError(cnxn.Close()) + suite.NoError(db.Close()) + }) + + t.Run("WithPort", func(t *testing.T) { + dbOptions := suite.Quirks.DatabaseOptions() + dbOptions[databricks.OptionPort] = "443" + db, err := drv.NewDatabase(dbOptions) + suite.NoError(err) + suite.NotNil(db) + suite.NoError(db.Close()) + }) + + t.Run("WithSSLOptions", func(t *testing.T) { + dbOptions := suite.Quirks.DatabaseOptions() + dbOptions[databricks.OptionSSLMode] = "require" + db, err := drv.NewDatabase(dbOptions) + suite.NoError(err) + suite.NotNil(db) + suite.NoError(db.Close()) + }) +} + +func (suite *DatabricksTests) TestConnectionOptions() { + suite.Require().NoError(suite.stmt.SetSqlQuery("SELECT 1 as test_col")) + rdr, n, err := suite.stmt.ExecuteQuery(suite.ctx) + suite.Require().NoError(err) + defer rdr.Release() + + _ = n + suite.True(rdr.Next()) + rec := rdr.Record() + suite.Equal(int64(1), rec.NumRows()) + suite.Equal(int64(1), rec.NumCols()) + suite.Equal("test_col", rec.ColumnName(0)) + suite.False(rdr.Next()) + suite.Require().NoError(rdr.Err()) +} + +func (suite *DatabricksTests) TestBasicDataTypes() { + suite.Require().NoError(suite.stmt.SetSqlQuery(` + SELECT + CAST(42 AS BIGINT) as bigint_col, + CAST(3.14 AS DOUBLE) as double_col, + CAST('hello' AS STRING) as string_col, + CAST(true AS BOOLEAN) as boolean_col + `)) + + rdr, n, err := suite.stmt.ExecuteQuery(suite.ctx) + suite.Require().NoError(err) + defer rdr.Release() + + suite.checkRowCount(1, n) + suite.True(rdr.Next()) + rec := rdr.Record() + + suite.Equal(int64(4), rec.NumCols()) + suite.Equal("bigint_col", rec.ColumnName(0)) + suite.Equal("double_col", rec.ColumnName(1)) + suite.Equal("string_col", rec.ColumnName(2)) + suite.Equal("boolean_col", rec.ColumnName(3)) + + suite.False(rdr.Next()) + suite.Require().NoError(rdr.Err()) +} + +func (suite *DatabricksTests) TestStatementEmptyResultSet() { + suite.NoError(suite.stmt.SetSqlQuery("SELECT 1 WHERE 1=0")) + + rdr, n, err := suite.stmt.ExecuteQuery(suite.ctx) + suite.Require().NoError(err) + defer rdr.Release() + + suite.checkRowCount(0, n) + suite.False(rdr.Next()) + suite.NoError(rdr.Err()) +} + +func (suite *DatabricksTests) TestGetSetOptions() { + getSetDB, ok := suite.db.(adbc.GetSetOptions) + suite.True(ok, "Database should implement GetSetOptions") + + testKey := databricks.OptionQueryTimeout + testValue := "1m0s" + + err := getSetDB.SetOption(testKey, testValue) + suite.NoError(err) + + retrievedValue, err := getSetDB.GetOption(testKey) + suite.NoError(err) + suite.Equal(testValue, retrievedValue) +} + +func (suite *DatabricksTests) TestQueryTimeout() { + dbOptions := suite.Quirks.DatabaseOptions() + dbOptions[databricks.OptionQueryTimeout] = "30s" + + db, err := suite.driver.NewDatabase(dbOptions) + suite.NoError(err) + defer func() { suite.NoError(db.Close()) }() Review Comment: (CheckedClose works here too - suite exposes a T) ########## go/adbc/driver/databricks/connection.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + _ "github.com/databricks/databricks-sql-go" +) + +type connectionImpl struct { + driverbase.ConnectionImplBase + + // Connection settings + catalog string + dbSchema string + + // Database connection + conn *sql.Conn +} + +func (c *connectionImpl) Close() error { + if c.conn == nil { + return adbc.Error{Code: adbc.StatusInvalidState} + } + defer func() { + c.conn = nil + }() + return c.conn.Close() +} + +func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + return &statementImpl{ + conn: c, + }, nil +} + +func (c *connectionImpl) SetAutocommit(autocommit bool) error { + // Databricks SQL doesn't support explicit transaction control in the same way + // as traditional databases. Most operations are implicitly committed. + if !autocommit { + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("disabling autocommit is not supported"), + } + } else { + return nil + } +} + +// CurrentNamespacer interface implementation +func (c *connectionImpl) GetCurrentCatalog() (string, error) { + return c.catalog, nil +} + +func (c *connectionImpl) GetCurrentDbSchema() (string, error) { + return c.dbSchema, nil +} + +func (c *connectionImpl) SetCurrentCatalog(catalog string) error { + // Use the database to execute USE CATALOG + if c.conn != nil && catalog != "" { + _, err := c.conn.ExecContext(context.Background(), "USE CATALOG `%s`", catalog) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set catalog: %v", err), + } + } + } + c.catalog = catalog Review Comment: Don't you want this to be in the `if`? Else if `catalog` is `""`, you'll do nothing and then set `c.catalog` to a blank string. ########## go/adbc/driver/databricks/connection.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + _ "github.com/databricks/databricks-sql-go" +) + +type connectionImpl struct { + driverbase.ConnectionImplBase + + // Connection settings + catalog string + dbSchema string + + // Database connection + conn *sql.Conn +} + +func (c *connectionImpl) Close() error { + if c.conn == nil { + return adbc.Error{Code: adbc.StatusInvalidState} + } + defer func() { + c.conn = nil + }() + return c.conn.Close() +} + +func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + return &statementImpl{ + conn: c, + }, nil +} + +func (c *connectionImpl) SetAutocommit(autocommit bool) error { + // Databricks SQL doesn't support explicit transaction control in the same way + // as traditional databases. Most operations are implicitly committed. + if !autocommit { + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("disabling autocommit is not supported"), + } + } else { + return nil + } +} + +// CurrentNamespacer interface implementation +func (c *connectionImpl) GetCurrentCatalog() (string, error) { + return c.catalog, nil +} + +func (c *connectionImpl) GetCurrentDbSchema() (string, error) { + return c.dbSchema, nil +} + +func (c *connectionImpl) SetCurrentCatalog(catalog string) error { + // Use the database to execute USE CATALOG + if c.conn != nil && catalog != "" { + _, err := c.conn.ExecContext(context.Background(), "USE CATALOG `%s`", catalog) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set catalog: %v", err), + } + } + } + c.catalog = catalog + return nil +} + +func (c *connectionImpl) SetCurrentDbSchema(schema string) error { + // Use the database to execute USE SCHEMA + if c.conn != nil && schema != "" { + _, err := c.conn.ExecContext(context.Background(), "USE SCHEMA `%s`", schema) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set schema: %v", err), + } + } + } + c.dbSchema = schema + return nil +} + +// TableTypeLister interface implementation +func (c *connectionImpl) ListTableTypes(ctx context.Context) ([]string, error) { + // Databricks supports these table types + return []string{"TABLE", "VIEW", "EXTERNAL_TABLE", "MANAGED_TABLE", "STREAMING_TABLE", "MATERIALIZED_VIEW"}, nil +} + +// Transaction methods (Databricks has limited transaction support) +func (c *connectionImpl) Commit(ctx context.Context) error { + // Most operations are auto-committed. + return nil +} + +func (c *connectionImpl) Rollback(ctx context.Context) error { + // Databricks SQL doesn't support explicit transactions in the traditional sense. + // Most operations are auto-committed. We'll track state but not perform any operation. + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("rollback is not supported"), + } +} + +// DbObjectsEnumerator interface implementation +func (c *connectionImpl) GetCatalogs(ctx context.Context, catalogFilter *string) ([]string, error) { + query := "SHOW CATALOGS" + if catalogFilter != nil { + query += fmt.Sprintf(" LIKE '%s'", *catalogFilter) + } + + rows, err := c.conn.QueryContext(ctx, query) + if err != nil { + return nil, adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to query catalogs: %v", err), + } + } + defer func() { + if closeErr := rows.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() Review Comment: This only works if the return values are named. Also, `Join` ignores `nil` errors so you can just write `err = errors.Join(err, rows.Close())` ########## go/adbc/driver/databricks/metadata_e2e_test.go: ########## @@ -0,0 +1,653 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/databricks" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestMetadataE2E_GetInfo tests the GetInfo metadata function +func TestMetadataE2E_GetInfo(t *testing.T) { + host, token, httpPath, catalog, schema := getDatabricksConfig(t) + + driver := databricks.NewDriver(memory.DefaultAllocator) + db, err := driver.NewDatabase(map[string]string{ + databricks.OptionServerHostname: host, + databricks.OptionHTTPPath: httpPath, + databricks.OptionAccessToken: token, + databricks.OptionCatalog: catalog, + databricks.OptionSchema: schema, + }) + require.NoError(t, err) + defer func() { _ = db.Close() }() Review Comment: Use CheckedClose. (I think we've pointed this out a few times - do you mind scanning for all instances of this to update them?) ########## go/adbc/driver/databricks/connection.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + _ "github.com/databricks/databricks-sql-go" +) + +type connectionImpl struct { + driverbase.ConnectionImplBase + + // Connection settings + catalog string + dbSchema string + + // Database connection + conn *sql.Conn +} + +func (c *connectionImpl) Close() error { + if c.conn == nil { + return adbc.Error{Code: adbc.StatusInvalidState} + } + defer func() { + c.conn = nil + }() + return c.conn.Close() +} + +func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + return &statementImpl{ + conn: c, + }, nil +} + +func (c *connectionImpl) SetAutocommit(autocommit bool) error { + // Databricks SQL doesn't support explicit transaction control in the same way + // as traditional databases. Most operations are implicitly committed. + if !autocommit { + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("disabling autocommit is not supported"), + } + } else { + return nil + } +} + +// CurrentNamespacer interface implementation +func (c *connectionImpl) GetCurrentCatalog() (string, error) { + return c.catalog, nil +} + +func (c *connectionImpl) GetCurrentDbSchema() (string, error) { + return c.dbSchema, nil +} + +func (c *connectionImpl) SetCurrentCatalog(catalog string) error { + // Use the database to execute USE CATALOG + if c.conn != nil && catalog != "" { + _, err := c.conn.ExecContext(context.Background(), "USE CATALOG `%s`", catalog) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set catalog: %v", err), + } + } + } + c.catalog = catalog + return nil +} + +func (c *connectionImpl) SetCurrentDbSchema(schema string) error { + // Use the database to execute USE SCHEMA + if c.conn != nil && schema != "" { + _, err := c.conn.ExecContext(context.Background(), "USE SCHEMA `%s`", schema) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set schema: %v", err), + } + } + } + c.dbSchema = schema + return nil +} + +// TableTypeLister interface implementation +func (c *connectionImpl) ListTableTypes(ctx context.Context) ([]string, error) { + // Databricks supports these table types + return []string{"TABLE", "VIEW", "EXTERNAL_TABLE", "MANAGED_TABLE", "STREAMING_TABLE", "MATERIALIZED_VIEW"}, nil +} + +// Transaction methods (Databricks has limited transaction support) +func (c *connectionImpl) Commit(ctx context.Context) error { + // Most operations are auto-committed. + return nil +} + +func (c *connectionImpl) Rollback(ctx context.Context) error { + // Databricks SQL doesn't support explicit transactions in the traditional sense. + // Most operations are auto-committed. We'll track state but not perform any operation. + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("rollback is not supported"), + } +} + +// DbObjectsEnumerator interface implementation +func (c *connectionImpl) GetCatalogs(ctx context.Context, catalogFilter *string) ([]string, error) { + query := "SHOW CATALOGS" + if catalogFilter != nil { + query += fmt.Sprintf(" LIKE '%s'", *catalogFilter) Review Comment: Don't you need to escape quotes? ########## go/adbc/driver/databricks/connection.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + _ "github.com/databricks/databricks-sql-go" +) + +type connectionImpl struct { + driverbase.ConnectionImplBase + + // Connection settings + catalog string + dbSchema string + + // Database connection + conn *sql.Conn +} + +func (c *connectionImpl) Close() error { + if c.conn == nil { + return adbc.Error{Code: adbc.StatusInvalidState} + } + defer func() { + c.conn = nil + }() + return c.conn.Close() +} + +func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + return &statementImpl{ + conn: c, + }, nil +} + +func (c *connectionImpl) SetAutocommit(autocommit bool) error { + // Databricks SQL doesn't support explicit transaction control in the same way + // as traditional databases. Most operations are implicitly committed. + if !autocommit { + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("disabling autocommit is not supported"), + } + } else { + return nil + } +} + +// CurrentNamespacer interface implementation +func (c *connectionImpl) GetCurrentCatalog() (string, error) { + return c.catalog, nil +} + +func (c *connectionImpl) GetCurrentDbSchema() (string, error) { + return c.dbSchema, nil +} + +func (c *connectionImpl) SetCurrentCatalog(catalog string) error { + // Use the database to execute USE CATALOG + if c.conn != nil && catalog != "" { + _, err := c.conn.ExecContext(context.Background(), "USE CATALOG `%s`", catalog) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set catalog: %v", err), + } + } + } + c.catalog = catalog + return nil +} + +func (c *connectionImpl) SetCurrentDbSchema(schema string) error { + // Use the database to execute USE SCHEMA + if c.conn != nil && schema != "" { + _, err := c.conn.ExecContext(context.Background(), "USE SCHEMA `%s`", schema) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set schema: %v", err), + } + } + } + c.dbSchema = schema + return nil +} + +// TableTypeLister interface implementation +func (c *connectionImpl) ListTableTypes(ctx context.Context) ([]string, error) { + // Databricks supports these table types + return []string{"TABLE", "VIEW", "EXTERNAL_TABLE", "MANAGED_TABLE", "STREAMING_TABLE", "MATERIALIZED_VIEW"}, nil +} + +// Transaction methods (Databricks has limited transaction support) +func (c *connectionImpl) Commit(ctx context.Context) error { + // Most operations are auto-committed. + return nil +} + +func (c *connectionImpl) Rollback(ctx context.Context) error { + // Databricks SQL doesn't support explicit transactions in the traditional sense. + // Most operations are auto-committed. We'll track state but not perform any operation. + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("rollback is not supported"), + } +} + +// DbObjectsEnumerator interface implementation +func (c *connectionImpl) GetCatalogs(ctx context.Context, catalogFilter *string) ([]string, error) { + query := "SHOW CATALOGS" + if catalogFilter != nil { + query += fmt.Sprintf(" LIKE '%s'", *catalogFilter) + } + + rows, err := c.conn.QueryContext(ctx, query) + if err != nil { + return nil, adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to query catalogs: %v", err), + } + } + defer func() { + if closeErr := rows.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() Review Comment: (This and the above comment apply below, too.) ########## go/adbc/driver/databricks/connection.go: ########## @@ -0,0 +1,241 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package databricks + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/apache/arrow-adbc/go/adbc" + "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" + _ "github.com/databricks/databricks-sql-go" +) + +type connectionImpl struct { + driverbase.ConnectionImplBase + + // Connection settings + catalog string + dbSchema string + + // Database connection + conn *sql.Conn +} + +func (c *connectionImpl) Close() error { + if c.conn == nil { + return adbc.Error{Code: adbc.StatusInvalidState} + } + defer func() { + c.conn = nil + }() + return c.conn.Close() +} + +func (c *connectionImpl) NewStatement() (adbc.Statement, error) { + return &statementImpl{ + conn: c, + }, nil +} + +func (c *connectionImpl) SetAutocommit(autocommit bool) error { + // Databricks SQL doesn't support explicit transaction control in the same way + // as traditional databases. Most operations are implicitly committed. + if !autocommit { + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("disabling autocommit is not supported"), + } + } else { + return nil + } +} + +// CurrentNamespacer interface implementation +func (c *connectionImpl) GetCurrentCatalog() (string, error) { + return c.catalog, nil +} + +func (c *connectionImpl) GetCurrentDbSchema() (string, error) { + return c.dbSchema, nil +} + +func (c *connectionImpl) SetCurrentCatalog(catalog string) error { + // Use the database to execute USE CATALOG + if c.conn != nil && catalog != "" { + _, err := c.conn.ExecContext(context.Background(), "USE CATALOG `%s`", catalog) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set catalog: %v", err), + } + } + } + c.catalog = catalog + return nil +} + +func (c *connectionImpl) SetCurrentDbSchema(schema string) error { + // Use the database to execute USE SCHEMA + if c.conn != nil && schema != "" { + _, err := c.conn.ExecContext(context.Background(), "USE SCHEMA `%s`", schema) + if err != nil { + return adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to set schema: %v", err), + } + } + } + c.dbSchema = schema + return nil +} + +// TableTypeLister interface implementation +func (c *connectionImpl) ListTableTypes(ctx context.Context) ([]string, error) { + // Databricks supports these table types + return []string{"TABLE", "VIEW", "EXTERNAL_TABLE", "MANAGED_TABLE", "STREAMING_TABLE", "MATERIALIZED_VIEW"}, nil +} + +// Transaction methods (Databricks has limited transaction support) +func (c *connectionImpl) Commit(ctx context.Context) error { + // Most operations are auto-committed. + return nil +} + +func (c *connectionImpl) Rollback(ctx context.Context) error { + // Databricks SQL doesn't support explicit transactions in the traditional sense. + // Most operations are auto-committed. We'll track state but not perform any operation. + return adbc.Error{ + Code: adbc.StatusNotImplemented, + Msg: fmt.Sprintf("rollback is not supported"), + } +} + +// DbObjectsEnumerator interface implementation +func (c *connectionImpl) GetCatalogs(ctx context.Context, catalogFilter *string) ([]string, error) { + query := "SHOW CATALOGS" + if catalogFilter != nil { + query += fmt.Sprintf(" LIKE '%s'", *catalogFilter) + } + + rows, err := c.conn.QueryContext(ctx, query) + if err != nil { + return nil, adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to query catalogs: %v", err), + } + } + defer func() { + if closeErr := rows.Close(); closeErr != nil { + err = errors.Join(err, closeErr) + } + }() + + var catalogs []string + for rows.Next() { + var catalog string + if err := rows.Scan(&catalog); err != nil { + return nil, adbc.Error{ + Code: adbc.StatusInternal, + Msg: fmt.Sprintf("failed to scan catalog: %v", err), + } + } + catalogs = append(catalogs, catalog) + } + + return catalogs, rows.Err() +} + +func (c *connectionImpl) GetDBSchemasForCatalog(ctx context.Context, catalog string, schemaFilter *string) ([]string, error) { + query := fmt.Sprintf("SHOW SCHEMAS IN `%s`", catalog) Review Comment: The catalog needs escaping too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org