zeroshade commented on code in PR #4322:
URL: https://github.com/apache/arrow-adbc/pull/4322#discussion_r3262261935
##########
go/adbc/driver/flightsql/flightsql_database.go:
##########
@@ -574,13 +700,46 @@ func (b *bearerAuthMiddleware) HeadersReceived(ctx
context.Context, md metadata.
headers := md.Get("authorization")
if len(headers) > 0 {
b.mutex.Lock()
- defer b.mutex.Unlock()
Review Comment:
why the switch to an explicit unlock vs defer?
##########
go/adbc/driver/flightsql/record_reader.go:
##########
@@ -48,7 +49,19 @@ type reader struct {
// kicks off a goroutine for each endpoint and returns a reader which
// gathers all of the records as they come in.
-func newRecordReader(ctx context.Context, alloc memory.Allocator, cl
*flightsql.Client, info *flight.FlightInfo, clCache gcache.Cache, bufferSize
int, opts ...grpc.CallOption) (rdr array.RecordReader, err error) {
+//
+// logger may be nil; in that case a no-op logger is used internally.
+// When supplied it receives structured records describing every endpoint
+// stream's lifecycle (start, first batch received, completion, failure).
+// These records are essential when diagnosing transient stream failures
+// such as "[FlightSQL] error reading from server: EOF (Unavailable; DoGet:
+// endpoint N: [])" because they record exactly which endpoint failed, how
+// many batches/rows had already been received, and how long the stream had
+// been open at the time of failure. Without these records the operator
+// otherwise has only the bare gRPC EOF to work with, which carries no
+// progress or location information.
+func newRecordReader(ctx context.Context, alloc memory.Allocator, cl
*flightsql.Client, info *flight.FlightInfo, clCache gcache.Cache, bufferSize
int, logger *slog.Logger, opts ...grpc.CallOption) (rdr array.RecordReader, err
error) {
Review Comment:
this argument list is getting rather long, could we condense some of these
arguments into a struct as a config thing?
##########
go/adbc/driver/flightsql/record_reader.go:
##########
@@ -174,6 +269,31 @@ func newRecordReader(ctx context.Context, alloc
memory.Allocator, cl *flightsql.
return reader, nil
}
+// estimateBatchBytes returns a rough estimate of the in-memory size of a
+// record batch. The estimate is intentionally cheap to compute and is only
+// used for diagnostic logging so it does not have to be exact; it simply sums
+// the lengths of every column buffer in the batch. The total is useful for
+// answering questions such as "did the stream fail after receiving 10 KB or
+// 10 MB?" when triaging a mid-stream EOF.
+func estimateBatchBytes(rec arrow.RecordBatch) int64 {
Review Comment:
just use
https://pkg.go.dev/github.com/apache/arrow-go/[email protected]/arrow/util#TotalRecordSize
##########
go/adbc/driver/flightsql/flightsql_database.go:
##########
@@ -574,13 +700,46 @@ func (b *bearerAuthMiddleware) HeadersReceived(ctx
context.Context, md metadata.
headers := md.Get("authorization")
if len(headers) > 0 {
b.mutex.Lock()
- defer b.mutex.Unlock()
+ previous := b.hdrs.Get("authorization")
b.hdrs.Set("authorization", headers...)
+ logger := b.logger
+ b.mutex.Unlock()
+ if logger != nil {
+ // Compare lengths rather than values so that we never
+ // touch the token contents in the log path. Equal
+ // lengths can still indicate a fresh token (a server
+ // might issue tokens of the same shape), but for the
+ // no-op case (server echoed the same header) the
+ // reflected length is what an operator wants to see
+ // anyway.
+ var prevLen int
+ if len(previous) > 0 {
+ prevLen = len(previous[0])
+ }
+ logger.InfoContext(ctx, "FlightSQL bearer token rotated
by server",
+ "previous_token_length", prevLen,
+ "new_token_length", len(headers[0]),
+ "source", "HeadersReceived",
+ )
+ }
}
}
func (b *bearerAuthMiddleware) SetHeader(authValue string) {
b.mutex.Lock()
- defer b.mutex.Unlock()
Review Comment:
same question here, why the shift to explicit unlock vs defer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]