This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 48eb1dd7 feat(go): add basic driver logging (#1048)
48eb1dd7 is described below

commit 48eb1dd703a6e62846f3ec7a92f731c8b2f5e244
Author: David Li <[email protected]>
AuthorDate: Tue Sep 12 09:36:27 2023 -0400

    feat(go): add basic driver logging (#1048)
    
    Fixes #492.
---
 go/adbc/driver/flightsql/flightsql_adbc.go |  15 +++++
 go/adbc/driver/flightsql/logging.go        | 103 +++++++++++++++++++++++++++++
 go/adbc/ext.go                             |  30 +++++++++
 go/adbc/go.mod                             |   2 +-
 go/adbc/pkg/_tmpl/driver.go.tmpl           |  54 +++++++++++++--
 go/adbc/pkg/flightsql/driver.go            |  53 +++++++++++++--
 go/adbc/pkg/gen/main.go                    |  11 ++-
 go/adbc/pkg/panicdummy/driver.go           |  53 +++++++++++++--
 go/adbc/pkg/snowflake/driver.go            |  53 +++++++++++++--
 9 files changed, 350 insertions(+), 24 deletions(-)

diff --git a/go/adbc/driver/flightsql/flightsql_adbc.go 
b/go/adbc/driver/flightsql/flightsql_adbc.go
index 70ba5018..1dea6eb8 100644
--- a/go/adbc/driver/flightsql/flightsql_adbc.go
+++ b/go/adbc/driver/flightsql/flightsql_adbc.go
@@ -57,6 +57,7 @@ import (
        "github.com/apache/arrow/go/v13/arrow/memory"
        "github.com/bluele/gcache"
        "golang.org/x/exp/maps"
+       "golang.org/x/exp/slog"
        "google.golang.org/grpc"
        grpccodes "google.golang.org/grpc/codes"
        "google.golang.org/grpc/credentials"
@@ -156,6 +157,7 @@ func (d Driver) NewDatabase(opts map[string]string) 
(adbc.Database, error) {
        db.dialOpts.block = false
        db.dialOpts.maxMsgSize = 16 * 1024 * 1024
 
+       db.logger = nilLogger()
        db.options = make(map[string]string)
 
        return db, db.SetOptions(opts)
@@ -186,11 +188,20 @@ type database struct {
        timeout       timeoutOption
        dialOpts      dbDialOpts
        enableCookies bool
+       logger        *slog.Logger
        options       map[string]string
 
        alloc memory.Allocator
 }
 
+func (d *database) SetLogger(logger *slog.Logger) {
+       if logger != nil {
+               d.logger = logger
+       } else {
+               d.logger = nilLogger()
+       }
+}
+
 func (d *database) SetOptions(cnOptions map[string]string) error {
        var tlsConfig tls.Config
 
@@ -691,6 +702,10 @@ func (b *bearerAuthMiddleware) HeadersReceived(ctx 
context.Context, md metadata.
 func getFlightClient(ctx context.Context, loc string, d *database) 
(*flightsql.Client, error) {
        authMiddle := &bearerAuthMiddleware{hdrs: d.hdrs.Copy()}
        middleware := []flight.ClientMiddleware{
+               {
+                       Unary:  makeUnaryLoggingInterceptor(d.logger),
+                       Stream: makeStreamLoggingInterceptor(d.logger),
+               },
                flight.CreateClientMiddleware(authMiddle),
                {
                        Unary:  unaryTimeoutInterceptor,
diff --git a/go/adbc/driver/flightsql/logging.go 
b/go/adbc/driver/flightsql/logging.go
new file mode 100644
index 00000000..3d0f51c8
--- /dev/null
+++ b/go/adbc/driver/flightsql/logging.go
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package flightsql
+
+import (
+       "context"
+       "io"
+       "os"
+       "time"
+
+       "golang.org/x/exp/maps"
+       "golang.org/x/exp/slices"
+       "golang.org/x/exp/slog"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/metadata"
+)
+
+func nilLogger() *slog.Logger {
+       h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
+               AddSource: false,
+               Level:     slog.LevelError,
+       })
+       return slog.New(h)
+}
+
+func makeUnaryLoggingInterceptor(logger *slog.Logger) 
grpc.UnaryClientInterceptor {
+       interceptor := func(ctx context.Context, method string, req, reply any, 
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
+               start := time.Now()
+               // Ignore errors
+               outgoing, _ := metadata.FromOutgoingContext(ctx)
+               err := invoker(ctx, method, req, reply, cc, opts...)
+               if logger.Enabled(ctx, slog.LevelDebug) {
+                       logger.DebugContext(ctx, method, "target", cc.Target(), 
"duration", time.Since(start), "err", err, "metadata", outgoing)
+               } else {
+                       keys := maps.Keys(outgoing)
+                       slices.Sort(keys)
+                       logger.InfoContext(ctx, method, "target", cc.Target(), 
"duration", time.Since(start), "err", err, "metadata", keys)
+               }
+               return err
+       }
+       return interceptor
+}
+
+func makeStreamLoggingInterceptor(logger *slog.Logger) 
grpc.StreamClientInterceptor {
+       interceptor := func(ctx context.Context, desc *grpc.StreamDesc, cc 
*grpc.ClientConn, method string, streamer grpc.Streamer, opts 
...grpc.CallOption) (grpc.ClientStream, error) {
+               start := time.Now()
+               // Ignore errors
+               outgoing, _ := metadata.FromOutgoingContext(ctx)
+               stream, err := streamer(ctx, desc, cc, method, opts...)
+               if err != nil {
+                       logger.InfoContext(ctx, method, "target", cc.Target(), 
"duration", time.Since(start), "err", err)
+                       return stream, err
+               }
+
+               return &loggedStream{ClientStream: stream, logger: logger, ctx: 
ctx, method: method, start: start, target: cc.Target(), outgoing: outgoing}, err
+       }
+       return interceptor
+}
+
+type loggedStream struct {
+       grpc.ClientStream
+
+       logger   *slog.Logger
+       ctx      context.Context
+       method   string
+       start    time.Time
+       target   string
+       outgoing metadata.MD
+}
+
+func (stream *loggedStream) RecvMsg(m any) error {
+       err := stream.ClientStream.RecvMsg(m)
+       if err != nil {
+               loggedErr := err
+               if loggedErr == io.EOF {
+                       loggedErr = nil
+               }
+
+               if stream.logger.Enabled(stream.ctx, slog.LevelDebug) {
+                       stream.logger.DebugContext(stream.ctx, stream.method, 
"target", stream.target, "duration", time.Since(stream.start), "err", 
loggedErr, "metadata", stream.outgoing)
+               } else {
+                       keys := maps.Keys(stream.outgoing)
+                       slices.Sort(keys)
+                       stream.logger.InfoContext(stream.ctx, stream.method, 
"target", stream.target, "duration", time.Since(stream.start), "err", 
loggedErr, "metadata", keys)
+               }
+       }
+       return err
+}
diff --git a/go/adbc/ext.go b/go/adbc/ext.go
new file mode 100644
index 00000000..dcfaeaeb
--- /dev/null
+++ b/go/adbc/ext.go
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package adbc
+
+import (
+       "golang.org/x/exp/slog"
+)
+
+// DatabaseLogging is a Database that also supports logging information to an
+// application-supplied log sink.
+//
+// EXPERIMENTAL. Not formally part of the ADBC APIs.
+type DatabaseLogging interface {
+       SetLogger(*slog.Logger)
+}
diff --git a/go/adbc/go.mod b/go/adbc/go.mod
index b1bb5234..b3d0ecd5 100644
--- a/go/adbc/go.mod
+++ b/go/adbc/go.mod
@@ -17,7 +17,7 @@
 
 module github.com/apache/arrow-adbc/go/adbc
 
-go 1.18
+go 1.19
 
 require (
        github.com/apache/arrow/go/v13 v13.0.0
diff --git a/go/adbc/pkg/_tmpl/driver.go.tmpl b/go/adbc/pkg/_tmpl/driver.go.tmpl
index 24c15f39..1ffa1f85 100644
--- a/go/adbc/pkg/_tmpl/driver.go.tmpl
+++ b/go/adbc/pkg/_tmpl/driver.go.tmpl
@@ -53,6 +53,7 @@ import (
        "os"
        "runtime"
        "runtime/cgo"
+       "strings"
        "sync/atomic"
        "unsafe"
 
@@ -61,6 +62,7 @@ import (
        "github.com/apache/arrow/go/v13/arrow/cdata"
        "github.com/apache/arrow/go/v13/arrow/memory"
        "github.com/apache/arrow/go/v13/arrow/memory/mallocator"
+       "golang.org/x/exp/slog"
 )
 
 // Must use malloc() to respect CGO rules
@@ -71,6 +73,7 @@ var drv = {{.Driver}}{Alloc: mallocator.NewMallocator()}
 var globalPoison int32 = 0
 
 const errPrefix = "[{{.Prefix}}] "
+const logLevelEnvVar = "ADBC_DRIVER_{{.PrefixUpper}}_LOG_LEVEL"
 
 func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) {
        if err == nil {
@@ -162,6 +165,45 @@ func poison(err *C.struct_AdbcError, fname string, e 
interface{}) C.AdbcStatusCo
        return C.ADBC_STATUS_INTERNAL
 }
 
+// Check environment variables and enable logging if possible.
+func initLoggingFromEnv(db adbc.Database) {
+       logLevel := slog.LevelError
+       switch strings.ToLower(os.Getenv(logLevelEnvVar)) {
+       case "debug":
+               logLevel = slog.LevelDebug
+       case "info":
+               logLevel = slog.LevelInfo
+       case "warn":
+       case "warning":
+               logLevel = slog.LevelWarn
+       case "error":
+               logLevel = slog.LevelError
+       case "":
+               return
+       default:
+               printLoggingHelp()
+               return
+       }
+
+       h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
+               AddSource: false,
+               Level:     logLevel,
+       })
+       logger := slog.New(h)
+
+       ext, ok := db.(adbc.DatabaseLogging)
+       if !ok {
+               logger.Error("{{.Prefix}} does not support logging")
+               return
+       }
+       ext.SetLogger(logger)
+}
+
+func printLoggingHelp() {
+       fmt.Fprintf(os.Stderr, "{{.Prefix}}: to enable logging, set %s to 
'debug', 'info', 'warn', or 'error'", logLevelEnvVar)
+}
+
+
 // Allocate a new cgo.Handle and store its address in a heap-allocated
 // uintptr_t.  Experimentally, this was found to be necessary, else
 // something (the Go runtime?) would corrupt (garbage-collect?) the
@@ -305,7 +347,7 @@ func (cStream *cArrayStream) maybeError() C.int {
 
 //export {{.Prefix}}ArrayStreamGetLastError
 func {{.Prefix}}ArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) 
*C.cchar_t {
-       if stream == nil || stream.release != 
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil {
                return nil
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -317,7 +359,7 @@ func {{.Prefix}}ArrayStreamGetLastError(stream 
*C.struct_ArrowArrayStream) *C.cc
 
 //export {{.Prefix}}ArrayStreamGetNext
 func {{.Prefix}}ArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array 
*C.struct_ArrowArray) C.int {
-       if stream == nil || stream.release != 
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil  {
                return C.EINVAL
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -332,7 +374,7 @@ func {{.Prefix}}ArrayStreamGetNext(stream 
*C.struct_ArrowArrayStream, array *C.s
 
 //export {{.Prefix}}ArrayStreamGetSchema
 func {{.Prefix}}ArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema 
*C.struct_ArrowSchema) C.int {
-       if stream == nil || stream.release != 
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil  {
                return C.EINVAL
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -346,7 +388,7 @@ func {{.Prefix}}ArrayStreamGetSchema(stream 
*C.struct_ArrowArrayStream, schema *
 
 //export {{.Prefix}}ArrayStreamRelease
 func {{.Prefix}}ArrayStreamRelease(stream *C.struct_ArrowArrayStream) {
-       if stream == nil || stream.release != 
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil  {
                return
        }
        h := (*(*cgo.Handle)(stream.private_data))
@@ -365,7 +407,7 @@ func {{.Prefix}}ArrayStreamRelease(stream 
*C.struct_ArrowArrayStream) {
 
 //export {{.Prefix}}ErrorFromArrayStream
 func {{.Prefix}}ErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status 
*C.AdbcStatusCode) (*C.struct_AdbcError) {
-       if stream == nil || stream.release != 
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil  {
                return nil
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -509,6 +551,8 @@ func {{.Prefix}}DatabaseInit(db *C.struct_AdbcDatabase, err 
*C.struct_AdbcError)
                return C.AdbcStatusCode(errToAdbcErr(err, aerr))
        }
 
+       initLoggingFromEnv(adb)
+
        cdb.db = adb
        return C.ADBC_STATUS_OK
 }
diff --git a/go/adbc/pkg/flightsql/driver.go b/go/adbc/pkg/flightsql/driver.go
index 46e09695..270ffaaf 100644
--- a/go/adbc/pkg/flightsql/driver.go
+++ b/go/adbc/pkg/flightsql/driver.go
@@ -55,6 +55,7 @@ import (
        "os"
        "runtime"
        "runtime/cgo"
+       "strings"
        "sync/atomic"
        "unsafe"
 
@@ -64,6 +65,7 @@ import (
        "github.com/apache/arrow/go/v13/arrow/cdata"
        "github.com/apache/arrow/go/v13/arrow/memory"
        "github.com/apache/arrow/go/v13/arrow/memory/mallocator"
+       "golang.org/x/exp/slog"
 )
 
 // Must use malloc() to respect CGO rules
@@ -75,6 +77,7 @@ var drv = flightsql.Driver{Alloc: mallocator.NewMallocator()}
 var globalPoison int32 = 0
 
 const errPrefix = "[FlightSQL] "
+const logLevelEnvVar = "ADBC_DRIVER_FLIGHTSQL_LOG_LEVEL"
 
 func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) {
        if err == nil {
@@ -166,6 +169,44 @@ func poison(err *C.struct_AdbcError, fname string, e 
interface{}) C.AdbcStatusCo
        return C.ADBC_STATUS_INTERNAL
 }
 
+// Check environment variables and enable logging if possible.
+func initLoggingFromEnv(db adbc.Database) {
+       logLevel := slog.LevelError
+       switch strings.ToLower(os.Getenv(logLevelEnvVar)) {
+       case "debug":
+               logLevel = slog.LevelDebug
+       case "info":
+               logLevel = slog.LevelInfo
+       case "warn":
+       case "warning":
+               logLevel = slog.LevelWarn
+       case "error":
+               logLevel = slog.LevelError
+       case "":
+               return
+       default:
+               printLoggingHelp()
+               return
+       }
+
+       h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
+               AddSource: false,
+               Level:     logLevel,
+       })
+       logger := slog.New(h)
+
+       ext, ok := db.(adbc.DatabaseLogging)
+       if !ok {
+               logger.Error("FlightSQL does not support logging")
+               return
+       }
+       ext.SetLogger(logger)
+}
+
+func printLoggingHelp() {
+       fmt.Fprintf(os.Stderr, "FlightSQL: to enable logging, set %s to 
'debug', 'info', 'warn', or 'error'", logLevelEnvVar)
+}
+
 // Allocate a new cgo.Handle and store its address in a heap-allocated
 // uintptr_t.  Experimentally, this was found to be necessary, else
 // something (the Go runtime?) would corrupt (garbage-collect?) the
@@ -309,7 +350,7 @@ func (cStream *cArrayStream) maybeError() C.int {
 
 //export FlightSQLArrayStreamGetLastError
 func FlightSQLArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) 
*C.cchar_t {
-       if stream == nil || stream.release != 
(*[0]byte)(C.FlightSQLArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
                return nil
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -321,7 +362,7 @@ func FlightSQLArrayStreamGetLastError(stream 
*C.struct_ArrowArrayStream) *C.ccha
 
 //export FlightSQLArrayStreamGetNext
 func FlightSQLArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array 
*C.struct_ArrowArray) C.int {
-       if stream == nil || stream.release != 
(*[0]byte)(C.FlightSQLArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
                return C.EINVAL
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -336,7 +377,7 @@ func FlightSQLArrayStreamGetNext(stream 
*C.struct_ArrowArrayStream, array *C.str
 
 //export FlightSQLArrayStreamGetSchema
 func FlightSQLArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema 
*C.struct_ArrowSchema) C.int {
-       if stream == nil || stream.release != 
(*[0]byte)(C.FlightSQLArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
                return C.EINVAL
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -350,7 +391,7 @@ func FlightSQLArrayStreamGetSchema(stream 
*C.struct_ArrowArrayStream, schema *C.
 
 //export FlightSQLArrayStreamRelease
 func FlightSQLArrayStreamRelease(stream *C.struct_ArrowArrayStream) {
-       if stream == nil || stream.release != 
(*[0]byte)(C.FlightSQLArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
                return
        }
        h := (*(*cgo.Handle)(stream.private_data))
@@ -369,7 +410,7 @@ func FlightSQLArrayStreamRelease(stream 
*C.struct_ArrowArrayStream) {
 
 //export FlightSQLErrorFromArrayStream
 func FlightSQLErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status 
*C.AdbcStatusCode) *C.struct_AdbcError {
-       if stream == nil || stream.release != 
(*[0]byte)(C.FlightSQLArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
                return nil
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -513,6 +554,8 @@ func FlightSQLDatabaseInit(db *C.struct_AdbcDatabase, err 
*C.struct_AdbcError) (
                return C.AdbcStatusCode(errToAdbcErr(err, aerr))
        }
 
+       initLoggingFromEnv(adb)
+
        cdb.db = adb
        return C.ADBC_STATUS_OK
 }
diff --git a/go/adbc/pkg/gen/main.go b/go/adbc/pkg/gen/main.go
index 24ae7d25..16f0001f 100644
--- a/go/adbc/pkg/gen/main.go
+++ b/go/adbc/pkg/gen/main.go
@@ -75,8 +75,9 @@ func (p *pathSpec) IsGoFile() bool { return 
filepath.Ext(p.out) == ".go" }
 func (p *pathSpec) IsCFile() bool  { return filepath.Ext(p.out) == ".c" || 
filepath.Ext(p.out) == ".h" }
 
 type tmplData struct {
-       Driver string
-       Prefix string
+       Driver      string
+       Prefix      string
+       PrefixUpper string
 }
 
 var fileList = []string{
@@ -125,7 +126,11 @@ func main() {
                        out: filepath.Join(*outDir, strings.TrimSuffix(f, Ext))}
        }
 
-       process(tmplData{Driver: pkg[0].Name + "." + *driverType, Prefix: 
*prefix}, specs)
+       process(tmplData{
+               Driver:      pkg[0].Name + "." + *driverType,
+               Prefix:      *prefix,
+               PrefixUpper: strings.ToUpper(*prefix),
+       }, specs)
 }
 
 func mustReadAll(path string) []byte {
diff --git a/go/adbc/pkg/panicdummy/driver.go b/go/adbc/pkg/panicdummy/driver.go
index c99153cc..1e2279e0 100644
--- a/go/adbc/pkg/panicdummy/driver.go
+++ b/go/adbc/pkg/panicdummy/driver.go
@@ -55,6 +55,7 @@ import (
        "os"
        "runtime"
        "runtime/cgo"
+       "strings"
        "sync/atomic"
        "unsafe"
 
@@ -64,6 +65,7 @@ import (
        "github.com/apache/arrow/go/v13/arrow/cdata"
        "github.com/apache/arrow/go/v13/arrow/memory"
        "github.com/apache/arrow/go/v13/arrow/memory/mallocator"
+       "golang.org/x/exp/slog"
 )
 
 // Must use malloc() to respect CGO rules
@@ -75,6 +77,7 @@ var drv = panicdummy.Driver{Alloc: mallocator.NewMallocator()}
 var globalPoison int32 = 0
 
 const errPrefix = "[PanicDummy] "
+const logLevelEnvVar = "ADBC_DRIVER_PANICDUMMY_LOG_LEVEL"
 
 func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) {
        if err == nil {
@@ -166,6 +169,44 @@ func poison(err *C.struct_AdbcError, fname string, e 
interface{}) C.AdbcStatusCo
        return C.ADBC_STATUS_INTERNAL
 }
 
+// Check environment variables and enable logging if possible.
+func initLoggingFromEnv(db adbc.Database) {
+       logLevel := slog.LevelError
+       switch strings.ToLower(os.Getenv(logLevelEnvVar)) {
+       case "debug":
+               logLevel = slog.LevelDebug
+       case "info":
+               logLevel = slog.LevelInfo
+       case "warn":
+       case "warning":
+               logLevel = slog.LevelWarn
+       case "error":
+               logLevel = slog.LevelError
+       case "":
+               return
+       default:
+               printLoggingHelp()
+               return
+       }
+
+       h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
+               AddSource: false,
+               Level:     logLevel,
+       })
+       logger := slog.New(h)
+
+       ext, ok := db.(adbc.DatabaseLogging)
+       if !ok {
+               logger.Error("PanicDummy does not support logging")
+               return
+       }
+       ext.SetLogger(logger)
+}
+
+func printLoggingHelp() {
+       fmt.Fprintf(os.Stderr, "PanicDummy: to enable logging, set %s to 
'debug', 'info', 'warn', or 'error'", logLevelEnvVar)
+}
+
 // Allocate a new cgo.Handle and store its address in a heap-allocated
 // uintptr_t.  Experimentally, this was found to be necessary, else
 // something (the Go runtime?) would corrupt (garbage-collect?) the
@@ -309,7 +350,7 @@ func (cStream *cArrayStream) maybeError() C.int {
 
 //export PanicDummyArrayStreamGetLastError
 func PanicDummyArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) 
*C.cchar_t {
-       if stream == nil || stream.release != 
(*[0]byte)(C.PanicDummyArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil {
                return nil
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -321,7 +362,7 @@ func PanicDummyArrayStreamGetLastError(stream 
*C.struct_ArrowArrayStream) *C.cch
 
 //export PanicDummyArrayStreamGetNext
 func PanicDummyArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array 
*C.struct_ArrowArray) C.int {
-       if stream == nil || stream.release != 
(*[0]byte)(C.PanicDummyArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil {
                return C.EINVAL
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -336,7 +377,7 @@ func PanicDummyArrayStreamGetNext(stream 
*C.struct_ArrowArrayStream, array *C.st
 
 //export PanicDummyArrayStreamGetSchema
 func PanicDummyArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema 
*C.struct_ArrowSchema) C.int {
-       if stream == nil || stream.release != 
(*[0]byte)(C.PanicDummyArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil {
                return C.EINVAL
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -350,7 +391,7 @@ func PanicDummyArrayStreamGetSchema(stream 
*C.struct_ArrowArrayStream, schema *C
 
 //export PanicDummyArrayStreamRelease
 func PanicDummyArrayStreamRelease(stream *C.struct_ArrowArrayStream) {
-       if stream == nil || stream.release != 
(*[0]byte)(C.PanicDummyArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil {
                return
        }
        h := (*(*cgo.Handle)(stream.private_data))
@@ -369,7 +410,7 @@ func PanicDummyArrayStreamRelease(stream 
*C.struct_ArrowArrayStream) {
 
 //export PanicDummyErrorFromArrayStream
 func PanicDummyErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status 
*C.AdbcStatusCode) *C.struct_AdbcError {
-       if stream == nil || stream.release != 
(*[0]byte)(C.PanicDummyArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil {
                return nil
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -513,6 +554,8 @@ func PanicDummyDatabaseInit(db *C.struct_AdbcDatabase, err 
*C.struct_AdbcError)
                return C.AdbcStatusCode(errToAdbcErr(err, aerr))
        }
 
+       initLoggingFromEnv(adb)
+
        cdb.db = adb
        return C.ADBC_STATUS_OK
 }
diff --git a/go/adbc/pkg/snowflake/driver.go b/go/adbc/pkg/snowflake/driver.go
index 4804e32e..e468924d 100644
--- a/go/adbc/pkg/snowflake/driver.go
+++ b/go/adbc/pkg/snowflake/driver.go
@@ -55,6 +55,7 @@ import (
        "os"
        "runtime"
        "runtime/cgo"
+       "strings"
        "sync/atomic"
        "unsafe"
 
@@ -64,6 +65,7 @@ import (
        "github.com/apache/arrow/go/v13/arrow/cdata"
        "github.com/apache/arrow/go/v13/arrow/memory"
        "github.com/apache/arrow/go/v13/arrow/memory/mallocator"
+       "golang.org/x/exp/slog"
 )
 
 // Must use malloc() to respect CGO rules
@@ -75,6 +77,7 @@ var drv = snowflake.Driver{Alloc: mallocator.NewMallocator()}
 var globalPoison int32 = 0
 
 const errPrefix = "[Snowflake] "
+const logLevelEnvVar = "ADBC_DRIVER_SNOWFLAKE_LOG_LEVEL"
 
 func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) {
        if err == nil {
@@ -166,6 +169,44 @@ func poison(err *C.struct_AdbcError, fname string, e 
interface{}) C.AdbcStatusCo
        return C.ADBC_STATUS_INTERNAL
 }
 
+// Check environment variables and enable logging if possible.
+func initLoggingFromEnv(db adbc.Database) {
+       logLevel := slog.LevelError
+       switch strings.ToLower(os.Getenv(logLevelEnvVar)) {
+       case "debug":
+               logLevel = slog.LevelDebug
+       case "info":
+               logLevel = slog.LevelInfo
+       case "warn":
+       case "warning":
+               logLevel = slog.LevelWarn
+       case "error":
+               logLevel = slog.LevelError
+       case "":
+               return
+       default:
+               printLoggingHelp()
+               return
+       }
+
+       h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
+               AddSource: false,
+               Level:     logLevel,
+       })
+       logger := slog.New(h)
+
+       ext, ok := db.(adbc.DatabaseLogging)
+       if !ok {
+               logger.Error("Snowflake does not support logging")
+               return
+       }
+       ext.SetLogger(logger)
+}
+
+func printLoggingHelp() {
+       fmt.Fprintf(os.Stderr, "Snowflake: to enable logging, set %s to 
'debug', 'info', 'warn', or 'error'", logLevelEnvVar)
+}
+
 // Allocate a new cgo.Handle and store its address in a heap-allocated
 // uintptr_t.  Experimentally, this was found to be necessary, else
 // something (the Go runtime?) would corrupt (garbage-collect?) the
@@ -309,7 +350,7 @@ func (cStream *cArrayStream) maybeError() C.int {
 
 //export SnowflakeArrayStreamGetLastError
 func SnowflakeArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) 
*C.cchar_t {
-       if stream == nil || stream.release != 
(*[0]byte)(C.SnowflakeArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil {
                return nil
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -321,7 +362,7 @@ func SnowflakeArrayStreamGetLastError(stream 
*C.struct_ArrowArrayStream) *C.ccha
 
 //export SnowflakeArrayStreamGetNext
 func SnowflakeArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array 
*C.struct_ArrowArray) C.int {
-       if stream == nil || stream.release != 
(*[0]byte)(C.SnowflakeArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil {
                return C.EINVAL
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -336,7 +377,7 @@ func SnowflakeArrayStreamGetNext(stream 
*C.struct_ArrowArrayStream, array *C.str
 
 //export SnowflakeArrayStreamGetSchema
 func SnowflakeArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema 
*C.struct_ArrowSchema) C.int {
-       if stream == nil || stream.release != 
(*[0]byte)(C.SnowflakeArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil {
                return C.EINVAL
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -350,7 +391,7 @@ func SnowflakeArrayStreamGetSchema(stream 
*C.struct_ArrowArrayStream, schema *C.
 
 //export SnowflakeArrayStreamRelease
 func SnowflakeArrayStreamRelease(stream *C.struct_ArrowArrayStream) {
-       if stream == nil || stream.release != 
(*[0]byte)(C.SnowflakeArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil {
                return
        }
        h := (*(*cgo.Handle)(stream.private_data))
@@ -369,7 +410,7 @@ func SnowflakeArrayStreamRelease(stream 
*C.struct_ArrowArrayStream) {
 
 //export SnowflakeErrorFromArrayStream
 func SnowflakeErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status 
*C.AdbcStatusCode) *C.struct_AdbcError {
-       if stream == nil || stream.release != 
(*[0]byte)(C.SnowflakeArrayStreamRelease) {
+       if stream == nil || stream.release != 
(*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil {
                return nil
        }
        cStream := getFromHandle[cArrayStream](stream.private_data)
@@ -513,6 +554,8 @@ func SnowflakeDatabaseInit(db *C.struct_AdbcDatabase, err 
*C.struct_AdbcError) (
                return C.AdbcStatusCode(errToAdbcErr(err, aerr))
        }
 
+       initLoggingFromEnv(adb)
+
        cdb.db = adb
        return C.ADBC_STATUS_OK
 }

Reply via email to