This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 48eb1dd7 feat(go): add basic driver logging (#1048)
48eb1dd7 is described below
commit 48eb1dd703a6e62846f3ec7a92f731c8b2f5e244
Author: David Li <[email protected]>
AuthorDate: Tue Sep 12 09:36:27 2023 -0400
feat(go): add basic driver logging (#1048)
Fixes #492.
---
go/adbc/driver/flightsql/flightsql_adbc.go | 15 +++++
go/adbc/driver/flightsql/logging.go | 103 +++++++++++++++++++++++++++++
go/adbc/ext.go | 30 +++++++++
go/adbc/go.mod | 2 +-
go/adbc/pkg/_tmpl/driver.go.tmpl | 54 +++++++++++++--
go/adbc/pkg/flightsql/driver.go | 53 +++++++++++++--
go/adbc/pkg/gen/main.go | 11 ++-
go/adbc/pkg/panicdummy/driver.go | 53 +++++++++++++--
go/adbc/pkg/snowflake/driver.go | 53 +++++++++++++--
9 files changed, 350 insertions(+), 24 deletions(-)
diff --git a/go/adbc/driver/flightsql/flightsql_adbc.go
b/go/adbc/driver/flightsql/flightsql_adbc.go
index 70ba5018..1dea6eb8 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc.go
@@ -57,6 +57,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/bluele/gcache"
"golang.org/x/exp/maps"
+ "golang.org/x/exp/slog"
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@@ -156,6 +157,7 @@ func (d Driver) NewDatabase(opts map[string]string)
(adbc.Database, error) {
db.dialOpts.block = false
db.dialOpts.maxMsgSize = 16 * 1024 * 1024
+ db.logger = nilLogger()
db.options = make(map[string]string)
return db, db.SetOptions(opts)
@@ -186,11 +188,20 @@ type database struct {
timeout timeoutOption
dialOpts dbDialOpts
enableCookies bool
+ logger *slog.Logger
options map[string]string
alloc memory.Allocator
}
+func (d *database) SetLogger(logger *slog.Logger) {
+ if logger != nil {
+ d.logger = logger
+ } else {
+ d.logger = nilLogger()
+ }
+}
+
func (d *database) SetOptions(cnOptions map[string]string) error {
var tlsConfig tls.Config
@@ -691,6 +702,10 @@ func (b *bearerAuthMiddleware) HeadersReceived(ctx
context.Context, md metadata.
func getFlightClient(ctx context.Context, loc string, d *database)
(*flightsql.Client, error) {
authMiddle := &bearerAuthMiddleware{hdrs: d.hdrs.Copy()}
middleware := []flight.ClientMiddleware{
+ {
+ Unary: makeUnaryLoggingInterceptor(d.logger),
+ Stream: makeStreamLoggingInterceptor(d.logger),
+ },
flight.CreateClientMiddleware(authMiddle),
{
Unary: unaryTimeoutInterceptor,
diff --git a/go/adbc/driver/flightsql/logging.go
b/go/adbc/driver/flightsql/logging.go
new file mode 100644
index 00000000..3d0f51c8
--- /dev/null
+++ b/go/adbc/driver/flightsql/logging.go
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package flightsql
+
+import (
+ "context"
+ "io"
+ "os"
+ "time"
+
+ "golang.org/x/exp/maps"
+ "golang.org/x/exp/slices"
+ "golang.org/x/exp/slog"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+func nilLogger() *slog.Logger {
+ h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
+ AddSource: false,
+ Level: slog.LevelError,
+ })
+ return slog.New(h)
+}
+
+func makeUnaryLoggingInterceptor(logger *slog.Logger)
grpc.UnaryClientInterceptor {
+ interceptor := func(ctx context.Context, method string, req, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+ start := time.Now()
+ // Ignore errors
+ outgoing, _ := metadata.FromOutgoingContext(ctx)
+ err := invoker(ctx, method, req, reply, cc, opts...)
+ if logger.Enabled(ctx, slog.LevelDebug) {
+ logger.DebugContext(ctx, method, "target", cc.Target(),
"duration", time.Since(start), "err", err, "metadata", outgoing)
+ } else {
+ keys := maps.Keys(outgoing)
+ slices.Sort(keys)
+ logger.InfoContext(ctx, method, "target", cc.Target(),
"duration", time.Since(start), "err", err, "metadata", keys)
+ }
+ return err
+ }
+ return interceptor
+}
+
+func makeStreamLoggingInterceptor(logger *slog.Logger)
grpc.StreamClientInterceptor {
+ interceptor := func(ctx context.Context, desc *grpc.StreamDesc, cc
*grpc.ClientConn, method string, streamer grpc.Streamer, opts
...grpc.CallOption) (grpc.ClientStream, error) {
+ start := time.Now()
+ // Ignore errors
+ outgoing, _ := metadata.FromOutgoingContext(ctx)
+ stream, err := streamer(ctx, desc, cc, method, opts...)
+ if err != nil {
+ logger.InfoContext(ctx, method, "target", cc.Target(),
"duration", time.Since(start), "err", err)
+ return stream, err
+ }
+
+ return &loggedStream{ClientStream: stream, logger: logger, ctx:
ctx, method: method, start: start, target: cc.Target(), outgoing: outgoing}, err
+ }
+ return interceptor
+}
+
+type loggedStream struct {
+ grpc.ClientStream
+
+ logger *slog.Logger
+ ctx context.Context
+ method string
+ start time.Time
+ target string
+ outgoing metadata.MD
+}
+
+func (stream *loggedStream) RecvMsg(m any) error {
+ err := stream.ClientStream.RecvMsg(m)
+ if err != nil {
+ loggedErr := err
+ if loggedErr == io.EOF {
+ loggedErr = nil
+ }
+
+ if stream.logger.Enabled(stream.ctx, slog.LevelDebug) {
+ stream.logger.DebugContext(stream.ctx, stream.method,
"target", stream.target, "duration", time.Since(stream.start), "err",
loggedErr, "metadata", stream.outgoing)
+ } else {
+ keys := maps.Keys(stream.outgoing)
+ slices.Sort(keys)
+ stream.logger.InfoContext(stream.ctx, stream.method,
"target", stream.target, "duration", time.Since(stream.start), "err",
loggedErr, "metadata", keys)
+ }
+ }
+ return err
+}
diff --git a/go/adbc/ext.go b/go/adbc/ext.go
new file mode 100644
index 00000000..dcfaeaeb
--- /dev/null
+++ b/go/adbc/ext.go
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package adbc
+
+import (
+ "golang.org/x/exp/slog"
+)
+
+// DatabaseLogging is a Database that also supports logging information to an
+// application-supplied log sink.
+//
+// EXPERIMENTAL. Not formally part of the ADBC APIs.
+type DatabaseLogging interface {
+ SetLogger(*slog.Logger)
+}
diff --git a/go/adbc/go.mod b/go/adbc/go.mod
index b1bb5234..b3d0ecd5 100644
--- a/go/adbc/go.mod
+++ b/go/adbc/go.mod
@@ -17,7 +17,7 @@
module github.com/apache/arrow-adbc/go/adbc
-go 1.18
+go 1.19
require (
github.com/apache/arrow/go/v13 v13.0.0
diff --git a/go/adbc/pkg/_tmpl/driver.go.tmpl b/go/adbc/pkg/_tmpl/driver.go.tmpl
index 24c15f39..1ffa1f85 100644
--- a/go/adbc/pkg/_tmpl/driver.go.tmpl
+++ b/go/adbc/pkg/_tmpl/driver.go.tmpl
@@ -53,6 +53,7 @@ import (
"os"
"runtime"
"runtime/cgo"
+ "strings"
"sync/atomic"
"unsafe"
@@ -61,6 +62,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/cdata"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/apache/arrow/go/v13/arrow/memory/mallocator"
+ "golang.org/x/exp/slog"
)
// Must use malloc() to respect CGO rules
@@ -71,6 +73,7 @@ var drv = {{.Driver}}{Alloc: mallocator.NewMallocator()}
var globalPoison int32 = 0
const errPrefix = "[{{.Prefix}}] "
+const logLevelEnvVar = "ADBC_DRIVER_{{.PrefixUpper}}_LOG_LEVEL"
func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) {
if err == nil {
@@ -162,6 +165,45 @@ func poison(err *C.struct_AdbcError, fname string, e
interface{}) C.AdbcStatusCo
return C.ADBC_STATUS_INTERNAL
}
+// Check environment variables and enable logging if possible.
+func initLoggingFromEnv(db adbc.Database) {
+ logLevel := slog.LevelError
+ switch strings.ToLower(os.Getenv(logLevelEnvVar)) {
+ case "debug":
+ logLevel = slog.LevelDebug
+ case "info":
+ logLevel = slog.LevelInfo
+ case "warn":
+ case "warning":
+ logLevel = slog.LevelWarn
+ case "error":
+ logLevel = slog.LevelError
+ case "":
+ return
+ default:
+ printLoggingHelp()
+ return
+ }
+
+ h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
+ AddSource: false,
+ Level: logLevel,
+ })
+ logger := slog.New(h)
+
+ ext, ok := db.(adbc.DatabaseLogging)
+ if !ok {
+ logger.Error("{{.Prefix}} does not support logging")
+ return
+ }
+ ext.SetLogger(logger)
+}
+
+func printLoggingHelp() {
+ fmt.Fprintf(os.Stderr, "{{.Prefix}}: to enable logging, set %s to
'debug', 'info', 'warn', or 'error'", logLevelEnvVar)
+}
+
+
// Allocate a new cgo.Handle and store its address in a heap-allocated
// uintptr_t. Experimentally, this was found to be necessary, else
// something (the Go runtime?) would corrupt (garbage-collect?) the
@@ -305,7 +347,7 @@ func (cStream *cArrayStream) maybeError() C.int {
//export {{.Prefix}}ArrayStreamGetLastError
func {{.Prefix}}ArrayStreamGetLastError(stream *C.struct_ArrowArrayStream)
*C.cchar_t {
- if stream == nil || stream.release !=
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil {
return nil
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -317,7 +359,7 @@ func {{.Prefix}}ArrayStreamGetLastError(stream
*C.struct_ArrowArrayStream) *C.cc
//export {{.Prefix}}ArrayStreamGetNext
func {{.Prefix}}ArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array
*C.struct_ArrowArray) C.int {
- if stream == nil || stream.release !=
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil {
return C.EINVAL
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -332,7 +374,7 @@ func {{.Prefix}}ArrayStreamGetNext(stream
*C.struct_ArrowArrayStream, array *C.s
//export {{.Prefix}}ArrayStreamGetSchema
func {{.Prefix}}ArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema
*C.struct_ArrowSchema) C.int {
- if stream == nil || stream.release !=
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil {
return C.EINVAL
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -346,7 +388,7 @@ func {{.Prefix}}ArrayStreamGetSchema(stream
*C.struct_ArrowArrayStream, schema *
//export {{.Prefix}}ArrayStreamRelease
func {{.Prefix}}ArrayStreamRelease(stream *C.struct_ArrowArrayStream) {
- if stream == nil || stream.release !=
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil {
return
}
h := (*(*cgo.Handle)(stream.private_data))
@@ -365,7 +407,7 @@ func {{.Prefix}}ArrayStreamRelease(stream
*C.struct_ArrowArrayStream) {
//export {{.Prefix}}ErrorFromArrayStream
func {{.Prefix}}ErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status
*C.AdbcStatusCode) (*C.struct_AdbcError) {
- if stream == nil || stream.release !=
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil {
return nil
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -509,6 +551,8 @@ func {{.Prefix}}DatabaseInit(db *C.struct_AdbcDatabase, err
*C.struct_AdbcError)
return C.AdbcStatusCode(errToAdbcErr(err, aerr))
}
+ initLoggingFromEnv(adb)
+
cdb.db = adb
return C.ADBC_STATUS_OK
}
diff --git a/go/adbc/pkg/flightsql/driver.go b/go/adbc/pkg/flightsql/driver.go
index 46e09695..270ffaaf 100644
--- a/go/adbc/pkg/flightsql/driver.go
+++ b/go/adbc/pkg/flightsql/driver.go
@@ -55,6 +55,7 @@ import (
"os"
"runtime"
"runtime/cgo"
+ "strings"
"sync/atomic"
"unsafe"
@@ -64,6 +65,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/cdata"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/apache/arrow/go/v13/arrow/memory/mallocator"
+ "golang.org/x/exp/slog"
)
// Must use malloc() to respect CGO rules
@@ -75,6 +77,7 @@ var drv = flightsql.Driver{Alloc: mallocator.NewMallocator()}
var globalPoison int32 = 0
const errPrefix = "[FlightSQL] "
+const logLevelEnvVar = "ADBC_DRIVER_FLIGHTSQL_LOG_LEVEL"
func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) {
if err == nil {
@@ -166,6 +169,44 @@ func poison(err *C.struct_AdbcError, fname string, e
interface{}) C.AdbcStatusCo
return C.ADBC_STATUS_INTERNAL
}
+// Check environment variables and enable logging if possible.
+func initLoggingFromEnv(db adbc.Database) {
+ logLevel := slog.LevelError
+ switch strings.ToLower(os.Getenv(logLevelEnvVar)) {
+ case "debug":
+ logLevel = slog.LevelDebug
+ case "info":
+ logLevel = slog.LevelInfo
+ case "warn":
+ case "warning":
+ logLevel = slog.LevelWarn
+ case "error":
+ logLevel = slog.LevelError
+ case "":
+ return
+ default:
+ printLoggingHelp()
+ return
+ }
+
+ h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
+ AddSource: false,
+ Level: logLevel,
+ })
+ logger := slog.New(h)
+
+ ext, ok := db.(adbc.DatabaseLogging)
+ if !ok {
+ logger.Error("FlightSQL does not support logging")
+ return
+ }
+ ext.SetLogger(logger)
+}
+
+func printLoggingHelp() {
+ fmt.Fprintf(os.Stderr, "FlightSQL: to enable logging, set %s to
'debug', 'info', 'warn', or 'error'", logLevelEnvVar)
+}
+
// Allocate a new cgo.Handle and store its address in a heap-allocated
// uintptr_t. Experimentally, this was found to be necessary, else
// something (the Go runtime?) would corrupt (garbage-collect?) the
@@ -309,7 +350,7 @@ func (cStream *cArrayStream) maybeError() C.int {
//export FlightSQLArrayStreamGetLastError
func FlightSQLArrayStreamGetLastError(stream *C.struct_ArrowArrayStream)
*C.cchar_t {
- if stream == nil || stream.release !=
(*[0]byte)(C.FlightSQLArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
return nil
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -321,7 +362,7 @@ func FlightSQLArrayStreamGetLastError(stream
*C.struct_ArrowArrayStream) *C.ccha
//export FlightSQLArrayStreamGetNext
func FlightSQLArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array
*C.struct_ArrowArray) C.int {
- if stream == nil || stream.release !=
(*[0]byte)(C.FlightSQLArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
return C.EINVAL
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -336,7 +377,7 @@ func FlightSQLArrayStreamGetNext(stream
*C.struct_ArrowArrayStream, array *C.str
//export FlightSQLArrayStreamGetSchema
func FlightSQLArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema
*C.struct_ArrowSchema) C.int {
- if stream == nil || stream.release !=
(*[0]byte)(C.FlightSQLArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
return C.EINVAL
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -350,7 +391,7 @@ func FlightSQLArrayStreamGetSchema(stream
*C.struct_ArrowArrayStream, schema *C.
//export FlightSQLArrayStreamRelease
func FlightSQLArrayStreamRelease(stream *C.struct_ArrowArrayStream) {
- if stream == nil || stream.release !=
(*[0]byte)(C.FlightSQLArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
return
}
h := (*(*cgo.Handle)(stream.private_data))
@@ -369,7 +410,7 @@ func FlightSQLArrayStreamRelease(stream
*C.struct_ArrowArrayStream) {
//export FlightSQLErrorFromArrayStream
func FlightSQLErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status
*C.AdbcStatusCode) *C.struct_AdbcError {
- if stream == nil || stream.release !=
(*[0]byte)(C.FlightSQLArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
return nil
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -513,6 +554,8 @@ func FlightSQLDatabaseInit(db *C.struct_AdbcDatabase, err
*C.struct_AdbcError) (
return C.AdbcStatusCode(errToAdbcErr(err, aerr))
}
+ initLoggingFromEnv(adb)
+
cdb.db = adb
return C.ADBC_STATUS_OK
}
diff --git a/go/adbc/pkg/gen/main.go b/go/adbc/pkg/gen/main.go
index 24ae7d25..16f0001f 100644
--- a/go/adbc/pkg/gen/main.go
+++ b/go/adbc/pkg/gen/main.go
@@ -75,8 +75,9 @@ func (p *pathSpec) IsGoFile() bool { return
filepath.Ext(p.out) == ".go" }
func (p *pathSpec) IsCFile() bool { return filepath.Ext(p.out) == ".c" ||
filepath.Ext(p.out) == ".h" }
type tmplData struct {
- Driver string
- Prefix string
+ Driver string
+ Prefix string
+ PrefixUpper string
}
var fileList = []string{
@@ -125,7 +126,11 @@ func main() {
out: filepath.Join(*outDir, strings.TrimSuffix(f, Ext))}
}
- process(tmplData{Driver: pkg[0].Name + "." + *driverType, Prefix:
*prefix}, specs)
+ process(tmplData{
+ Driver: pkg[0].Name + "." + *driverType,
+ Prefix: *prefix,
+ PrefixUpper: strings.ToUpper(*prefix),
+ }, specs)
}
func mustReadAll(path string) []byte {
diff --git a/go/adbc/pkg/panicdummy/driver.go b/go/adbc/pkg/panicdummy/driver.go
index c99153cc..1e2279e0 100644
--- a/go/adbc/pkg/panicdummy/driver.go
+++ b/go/adbc/pkg/panicdummy/driver.go
@@ -55,6 +55,7 @@ import (
"os"
"runtime"
"runtime/cgo"
+ "strings"
"sync/atomic"
"unsafe"
@@ -64,6 +65,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/cdata"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/apache/arrow/go/v13/arrow/memory/mallocator"
+ "golang.org/x/exp/slog"
)
// Must use malloc() to respect CGO rules
@@ -75,6 +77,7 @@ var drv = panicdummy.Driver{Alloc: mallocator.NewMallocator()}
var globalPoison int32 = 0
const errPrefix = "[PanicDummy] "
+const logLevelEnvVar = "ADBC_DRIVER_PANICDUMMY_LOG_LEVEL"
func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) {
if err == nil {
@@ -166,6 +169,44 @@ func poison(err *C.struct_AdbcError, fname string, e
interface{}) C.AdbcStatusCo
return C.ADBC_STATUS_INTERNAL
}
+// Check environment variables and enable logging if possible.
+func initLoggingFromEnv(db adbc.Database) {
+ logLevel := slog.LevelError
+ switch strings.ToLower(os.Getenv(logLevelEnvVar)) {
+ case "debug":
+ logLevel = slog.LevelDebug
+ case "info":
+ logLevel = slog.LevelInfo
+ case "warn":
+ case "warning":
+ logLevel = slog.LevelWarn
+ case "error":
+ logLevel = slog.LevelError
+ case "":
+ return
+ default:
+ printLoggingHelp()
+ return
+ }
+
+ h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
+ AddSource: false,
+ Level: logLevel,
+ })
+ logger := slog.New(h)
+
+ ext, ok := db.(adbc.DatabaseLogging)
+ if !ok {
+ logger.Error("PanicDummy does not support logging")
+ return
+ }
+ ext.SetLogger(logger)
+}
+
+func printLoggingHelp() {
+ fmt.Fprintf(os.Stderr, "PanicDummy: to enable logging, set %s to
'debug', 'info', 'warn', or 'error'", logLevelEnvVar)
+}
+
// Allocate a new cgo.Handle and store its address in a heap-allocated
// uintptr_t. Experimentally, this was found to be necessary, else
// something (the Go runtime?) would corrupt (garbage-collect?) the
@@ -309,7 +350,7 @@ func (cStream *cArrayStream) maybeError() C.int {
//export PanicDummyArrayStreamGetLastError
func PanicDummyArrayStreamGetLastError(stream *C.struct_ArrowArrayStream)
*C.cchar_t {
- if stream == nil || stream.release !=
(*[0]byte)(C.PanicDummyArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil {
return nil
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -321,7 +362,7 @@ func PanicDummyArrayStreamGetLastError(stream
*C.struct_ArrowArrayStream) *C.cch
//export PanicDummyArrayStreamGetNext
func PanicDummyArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array
*C.struct_ArrowArray) C.int {
- if stream == nil || stream.release !=
(*[0]byte)(C.PanicDummyArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil {
return C.EINVAL
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -336,7 +377,7 @@ func PanicDummyArrayStreamGetNext(stream
*C.struct_ArrowArrayStream, array *C.st
//export PanicDummyArrayStreamGetSchema
func PanicDummyArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema
*C.struct_ArrowSchema) C.int {
- if stream == nil || stream.release !=
(*[0]byte)(C.PanicDummyArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil {
return C.EINVAL
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -350,7 +391,7 @@ func PanicDummyArrayStreamGetSchema(stream
*C.struct_ArrowArrayStream, schema *C
//export PanicDummyArrayStreamRelease
func PanicDummyArrayStreamRelease(stream *C.struct_ArrowArrayStream) {
- if stream == nil || stream.release !=
(*[0]byte)(C.PanicDummyArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil {
return
}
h := (*(*cgo.Handle)(stream.private_data))
@@ -369,7 +410,7 @@ func PanicDummyArrayStreamRelease(stream
*C.struct_ArrowArrayStream) {
//export PanicDummyErrorFromArrayStream
func PanicDummyErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status
*C.AdbcStatusCode) *C.struct_AdbcError {
- if stream == nil || stream.release !=
(*[0]byte)(C.PanicDummyArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil {
return nil
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -513,6 +554,8 @@ func PanicDummyDatabaseInit(db *C.struct_AdbcDatabase, err
*C.struct_AdbcError)
return C.AdbcStatusCode(errToAdbcErr(err, aerr))
}
+ initLoggingFromEnv(adb)
+
cdb.db = adb
return C.ADBC_STATUS_OK
}
diff --git a/go/adbc/pkg/snowflake/driver.go b/go/adbc/pkg/snowflake/driver.go
index 4804e32e..e468924d 100644
--- a/go/adbc/pkg/snowflake/driver.go
+++ b/go/adbc/pkg/snowflake/driver.go
@@ -55,6 +55,7 @@ import (
"os"
"runtime"
"runtime/cgo"
+ "strings"
"sync/atomic"
"unsafe"
@@ -64,6 +65,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/cdata"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/apache/arrow/go/v13/arrow/memory/mallocator"
+ "golang.org/x/exp/slog"
)
// Must use malloc() to respect CGO rules
@@ -75,6 +77,7 @@ var drv = snowflake.Driver{Alloc: mallocator.NewMallocator()}
var globalPoison int32 = 0
const errPrefix = "[Snowflake] "
+const logLevelEnvVar = "ADBC_DRIVER_SNOWFLAKE_LOG_LEVEL"
func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) {
if err == nil {
@@ -166,6 +169,44 @@ func poison(err *C.struct_AdbcError, fname string, e
interface{}) C.AdbcStatusCo
return C.ADBC_STATUS_INTERNAL
}
+// Check environment variables and enable logging if possible.
+func initLoggingFromEnv(db adbc.Database) {
+ logLevel := slog.LevelError
+ switch strings.ToLower(os.Getenv(logLevelEnvVar)) {
+ case "debug":
+ logLevel = slog.LevelDebug
+ case "info":
+ logLevel = slog.LevelInfo
+ case "warn":
+ case "warning":
+ logLevel = slog.LevelWarn
+ case "error":
+ logLevel = slog.LevelError
+ case "":
+ return
+ default:
+ printLoggingHelp()
+ return
+ }
+
+ h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
+ AddSource: false,
+ Level: logLevel,
+ })
+ logger := slog.New(h)
+
+ ext, ok := db.(adbc.DatabaseLogging)
+ if !ok {
+ logger.Error("Snowflake does not support logging")
+ return
+ }
+ ext.SetLogger(logger)
+}
+
+func printLoggingHelp() {
+ fmt.Fprintf(os.Stderr, "Snowflake: to enable logging, set %s to
'debug', 'info', 'warn', or 'error'", logLevelEnvVar)
+}
+
// Allocate a new cgo.Handle and store its address in a heap-allocated
// uintptr_t. Experimentally, this was found to be necessary, else
// something (the Go runtime?) would corrupt (garbage-collect?) the
@@ -309,7 +350,7 @@ func (cStream *cArrayStream) maybeError() C.int {
//export SnowflakeArrayStreamGetLastError
func SnowflakeArrayStreamGetLastError(stream *C.struct_ArrowArrayStream)
*C.cchar_t {
- if stream == nil || stream.release !=
(*[0]byte)(C.SnowflakeArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil {
return nil
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -321,7 +362,7 @@ func SnowflakeArrayStreamGetLastError(stream
*C.struct_ArrowArrayStream) *C.ccha
//export SnowflakeArrayStreamGetNext
func SnowflakeArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array
*C.struct_ArrowArray) C.int {
- if stream == nil || stream.release !=
(*[0]byte)(C.SnowflakeArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil {
return C.EINVAL
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -336,7 +377,7 @@ func SnowflakeArrayStreamGetNext(stream
*C.struct_ArrowArrayStream, array *C.str
//export SnowflakeArrayStreamGetSchema
func SnowflakeArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema
*C.struct_ArrowSchema) C.int {
- if stream == nil || stream.release !=
(*[0]byte)(C.SnowflakeArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil {
return C.EINVAL
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -350,7 +391,7 @@ func SnowflakeArrayStreamGetSchema(stream
*C.struct_ArrowArrayStream, schema *C.
//export SnowflakeArrayStreamRelease
func SnowflakeArrayStreamRelease(stream *C.struct_ArrowArrayStream) {
- if stream == nil || stream.release !=
(*[0]byte)(C.SnowflakeArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil {
return
}
h := (*(*cgo.Handle)(stream.private_data))
@@ -369,7 +410,7 @@ func SnowflakeArrayStreamRelease(stream
*C.struct_ArrowArrayStream) {
//export SnowflakeErrorFromArrayStream
func SnowflakeErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status
*C.AdbcStatusCode) *C.struct_AdbcError {
- if stream == nil || stream.release !=
(*[0]byte)(C.SnowflakeArrayStreamRelease) {
+ if stream == nil || stream.release !=
(*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil {
return nil
}
cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -513,6 +554,8 @@ func SnowflakeDatabaseInit(db *C.struct_AdbcDatabase, err
*C.struct_AdbcError) (
return C.AdbcStatusCode(errToAdbcErr(err, aerr))
}
+ initLoggingFromEnv(adb)
+
cdb.db = adb
return C.ADBC_STATUS_OK
}