lidavidm commented on code in PR #2729:
URL: https://github.com/apache/arrow-adbc/pull/2729#discussion_r2076486174


##########
go/adbc/ext.go:
##########


Review Comment:
   Missing docstrings?



##########
go/adbc/driver/internal/driverbase/rotating_file_writer.go:
##########


Review Comment:
   Why is ADBC handling this? If it's for testing, isn't the standard to dump 
to OTLP and let the OTLP collector dump to stdout/file?



##########
go/adbc/adbc.go:
##########
@@ -326,6 +336,7 @@ const (
 // or desire for it.
 type Driver interface {
        NewDatabase(opts map[string]string) (Database, error)
+       NewDatabaseWithContext(ctx context.Context, opts map[string]string) 
(Database, error)

Review Comment:
   This should be in `ext.go`



##########
go/adbc/driver/internal/driverbase/connection.go:
##########
@@ -37,13 +43,20 @@ const (
        ConnectionMessageOptionUnsupported = "Unsupported connection option"
        ConnectionMessageCannotCommit      = "Cannot commit when autocommit is 
enabled"
        ConnectionMessageCannotRollback    = "Cannot rollback when autocommit 
is enabled"
+       ConnectionMessageIncorrectFormat   = "Incorrect or unsupported format"

Review Comment:
   The error message needs to be explicit that it's an OTel span parent that's 
incorrect



##########
go/adbc/driver/internal/driverbase/connection.go:
##########
@@ -714,4 +771,50 @@ func ValueOrZero[T any](val *T) T {
        return *val
 }
 
+func maybeAddTraceParent(ctx context.Context, cnxn adbc.OTelTracing, st 
adbc.OTelTracing) (context.Context, error) {
+       var traceParentStr string
+       if st != nil && st.GetTraceParent() != "" {
+               traceParentStr = st.GetTraceParent()
+       } else if cnxn != nil && cnxn.GetTraceParent() != "" {
+               traceParentStr = cnxn.GetTraceParent()
+       }
+       if traceParentStr != "" {
+               spanContext, err := propagateTraceParent(ctx, traceParentStr)
+               if err != nil {
+                       return ctx, err
+               }
+               ctx = trace.ContextWithRemoteSpanContext(ctx, spanContext)
+       }
+       return ctx, nil
+}
+
+func propagateTraceParent(ctx context.Context, traceParentStr string) 
(trace.SpanContext, error) {
+       if strings.TrimSpace(traceParentStr) == "" {
+               return trace.SpanContext{}, errors.New("traceparent string is 
empty")
+       }
+
+       propagator := propagation.TraceContext{}
+       carrier := propagation.MapCarrier{"traceparent": traceParentStr}
+       extractedContext := propagator.Extract(ctx, carrier)
+
+       spanContext := trace.SpanContextFromContext(extractedContext)
+       if !spanContext.IsValid() {
+               return trace.SpanContext{}, errors.New("invalid traceparent 
string")
+       }
+       return spanContext, nil
+}
+
+func isValidateTraceParent(traceParent string) bool {

Review Comment:
   ```suggestion
   func isValidTraceParent(traceParent string) bool {
   ```
   
   ?



##########
go/adbc/driver/internal/driverbase/driver.go:
##########
@@ -72,6 +73,10 @@ func (base *DriverImplBase) NewDatabase(opts 
map[string]string) (adbc.Database,
        return nil, base.ErrorHelper.Errorf(adbc.StatusNotImplemented, 
"NewDatabase")
 }
 
+func (base *DriverImplBase) NewDatabaseWithContext(ctx context.Context, opts 
map[string]string) (adbc.Database, error) {
+       return nil, base.ErrorHelper.Errorf(adbc.StatusNotImplemented, 
"NewDatabaseContext")

Review Comment:
   ```suggestion
        return nil, base.ErrorHelper.Errorf(adbc.StatusNotImplemented, 
"NewDatabaseWithContext")
   ```



##########
go/adbc/driver/internal/driverbase/connection.go:
##########
@@ -223,6 +240,10 @@ func (base *ConnectionImplBase) ReadPartition(ctx 
context.Context, serializedPar
 }
 
 func (base *ConnectionImplBase) GetOption(key string) (string, error) {
+       switch strings.ToLower(strings.TrimSpace(key)) {

Review Comment:
   Why do we need to trim?



##########
go/adbc/driver/internal/driverbase/connection.go:
##########
@@ -332,6 +360,35 @@ func (b *ConnectionBuilder) Connection() Connection {
        return conn
 }
 
+func (cnxn *ConnectionImplBase) GetTraceParent() string {
+       return cnxn.traceParent
+}
+
+func (cnxn *ConnectionImplBase) SetTraceParent(traceParent string) {
+       cnxn.traceParent = traceParent
+}
+
+func (cnxn *ConnectionImplBase) StartSpan(
+       ctx context.Context,
+       spanName string,
+       opts ...trace.SpanStartOption,
+) (context.Context, trace.Span) {
+       ctx, _ = maybeAddTraceParent(ctx, cnxn, nil)
+       return cnxn.Tracer.Start(ctx, spanName, opts...)
+}
+
+func (cnxn *ConnectionImplBase) SetErrorOnSpan(span trace.Span, err error) 
bool {

Review Comment:
   Does this really need to be part of the interface vs. having a utility 
method?



##########
go/adbc/driver/internal/driverbase/connection.go:
##########
@@ -239,9 +260,16 @@ func (base *ConnectionImplBase) GetOptionInt(key string) 
(int64, error) {
 }
 
 func (base *ConnectionImplBase) SetOption(key string, val string) error {
-       switch key {
+       switch strings.ToLower(strings.TrimSpace(key)) {

Review Comment:
   Same here - in general the caller shouldn't be adding spaces...



##########
go/adbc/driver/internal/driverbase/statement.go:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package driverbase
+
+import (
+       "context"
+       "strings"
+
+       "github.com/apache/arrow-adbc/go/adbc"
+       "go.opentelemetry.io/otel/trace"
+)
+
+const (
+       StatementMessageOptionUnknown     = "Unknown connection option"
+       StatementMessageOptionUnsupported = "Unsupported connection option"
+       StatementMessageCannotCommit      = "Cannot commit when autocommit is 
enabled"
+       StatementMessageCannotRollback    = "Cannot rollback when autocommit is 
enabled"
+       StatementMessageIncorrectFormat   = "Incorrect or unsupported format"
+)

Review Comment:
   (1) s/connection/statement/g
   (2) are these actually used (e.g. cannotcommit)?



##########
go/adbc/driver/internal/driverbase/connection.go:
##########
@@ -714,4 +771,50 @@ func ValueOrZero[T any](val *T) T {
        return *val
 }
 
+func maybeAddTraceParent(ctx context.Context, cnxn adbc.OTelTracing, st 
adbc.OTelTracing) (context.Context, error) {
+       var traceParentStr string
+       if st != nil && st.GetTraceParent() != "" {
+               traceParentStr = st.GetTraceParent()
+       } else if cnxn != nil && cnxn.GetTraceParent() != "" {
+               traceParentStr = cnxn.GetTraceParent()
+       }
+       if traceParentStr != "" {
+               spanContext, err := propagateTraceParent(ctx, traceParentStr)
+               if err != nil {
+                       return ctx, err
+               }
+               ctx = trace.ContextWithRemoteSpanContext(ctx, spanContext)
+       }
+       return ctx, nil
+}
+
+func propagateTraceParent(ctx context.Context, traceParentStr string) 
(trace.SpanContext, error) {
+       if strings.TrimSpace(traceParentStr) == "" {
+               return trace.SpanContext{}, errors.New("traceparent string is 
empty")
+       }
+
+       propagator := propagation.TraceContext{}
+       carrier := propagation.MapCarrier{"traceparent": traceParentStr}
+       extractedContext := propagator.Extract(ctx, carrier)
+
+       spanContext := trace.SpanContextFromContext(extractedContext)
+       if !spanContext.IsValid() {
+               return trace.SpanContext{}, errors.New("invalid traceparent 
string")
+       }
+       return spanContext, nil
+}
+
+func isValidateTraceParent(traceParent string) bool {

Review Comment:
   (1) Do we need to validate this? Can't we pass it on to OTel as is? (Can we 
just validate the length instead of running a regex engine every time we need 
to set the trace parent?)
   (2) I see string trimming when this is used...why is the caller passing 
spaces in the first place?



##########
go/adbc/driver/snowflake/statement.go:
##########
@@ -499,41 +516,58 @@ func (st *statement) ExecuteQuery(ctx context.Context) 
(array.RecordReader, int6
                st.streamBind = nil
 
                rdr := concatReader{}
-               err := rdr.Init(&bind)
+               err = rdr.Init(&bind)
                if err != nil {
-                       return nil, -1, err
+                       return
                }
-               return &rdr, -1, nil
+               reader = &rdr
+               return
        }
 
-       loader, err := st.cnxn.cn.QueryArrowStream(ctx, st.query)
+       var loader gosnowflake.ArrowStreamLoader
+       loader, err = st.cnxn.cn.QueryArrowStream(ctx, st.query)
        if err != nil {
-               return nil, -1, errToAdbcErr(adbc.StatusInternal, err)
+               err = errToAdbcErr(adbc.StatusInternal, err)
+               return
        }
 
-       rdr, err := newRecordReader(ctx, st.alloc, loader, st.queueSize, 
st.prefetchConcurrency, st.useHighPrecision)
-       nrec := loader.TotalRows()
-       return rdr, nrec, err
+       reader, err = newRecordReader(ctx, st.alloc, loader, st.queueSize, 
st.prefetchConcurrency, st.useHighPrecision)
+       nRows = loader.TotalRows()
+       return
 }
 
 // ExecuteUpdate executes a statement that does not generate a result
 // set. It returns the number of rows affected if known, otherwise -1.
-func (st *statement) ExecuteUpdate(ctx context.Context) (int64, error) {
+func (st *statement) ExecuteUpdate(ctx context.Context) (numRows int64, err 
error) {
+       numRows = -1
+
+       var span trace.Span
+       ctx, span = st.StartSpan(ctx, "ExecuteUpdate")
        ctx = st.setQueryContext(ctx)
 
+       defer func() {
+               if !st.SetErrorOnSpan(span, err) {
+                       
span.SetAttributes(attribute.Int64("db.response.returned_rows", numRows))

Review Comment:
   Is `-1` ending up here potentially problematic for downstream consumers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to