This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch tracing-impl in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 4c7d12c8cb787f37dc4c8c571b546ab1ad2659c6 Author: lahiruj <[email protected]> AuthorDate: Fri Jun 5 00:05:14 2026 -0400 add OpenTelemetry tracing package for context propagation --- go.mod | 10 +- go.sum | 30 ++--- internal/httputil/status_recorder.go | 33 ++++++ internal/tracing/audit.go | 34 ++++++ internal/tracing/middleware.go | 70 ++++++++++++ internal/tracing/middleware_test.go | 213 +++++++++++++++++++++++++++++++++++ internal/tracing/slog_handler.go | 54 +++++++++ internal/tracing/status.go | 107 ++++++++++++++++++ internal/tracing/status_test.go | 87 ++++++++++++++ internal/tracing/tracing.go | 145 ++++++++++++++++++++++++ internal/tracing/tracing_test.go | 101 +++++++++++++++++ 11 files changed, 869 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 5499abb61..dc178be85 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,9 @@ require ( github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_model v0.6.2 github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/otel v1.41.0 + go.opentelemetry.io/otel/sdk v1.41.0 + go.opentelemetry.io/otel/trace v1.41.0 google.golang.org/protobuf v1.36.8 gopkg.in/yaml.v3 v3.0.1 ) @@ -19,12 +22,15 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/kr/text v0.2.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/stretchr/objx v0.5.2 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/metric v1.41.0 // indirect go.yaml.in/yaml/v2 v2.4.2 // indirect - golang.org/x/sys v0.38.0 // indirect + golang.org/x/sys v0.41.0 // indirect ) diff --git a/go.sum b/go.sum index 1cc526947..34b9608f5 100644 --- a/go.sum +++ b/go.sum @@ -13,7 +13,6 @@ github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dhui/dktest v0.4.6 h1:+DPKyScKSEp3VLtbMDHcUq6V5Lm5zfZZVb0Sk7Ahom4= @@ -28,6 +27,7 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -76,28 +76,32 @@ github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9Z github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= -go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6hwnZ41NSFTO5q4LYDtJRXBf2PD0rNBkeB/lus= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0/go.mod h1:UHB22Z8QsdRDrnAtX4PntOl36ajSxcdUMt1sF7Y6E7Q= -go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= -go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= -go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= -go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= -go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= -go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +go.opentelemetry.io/otel v1.41.0 h1:YlEwVsGAlCvczDILpUXpIpPSL/VPugt7zHThEMLce1c= +go.opentelemetry.io/otel v1.41.0/go.mod h1:Yt4UwgEKeT05QbLwbyHXEwhnjxNO6D8L5PQP51/46dE= +go.opentelemetry.io/otel/metric v1.41.0 h1:rFnDcs4gRzBcsO9tS8LCpgR0dxg4aaxWlJxCno7JlTQ= +go.opentelemetry.io/otel/metric v1.41.0/go.mod h1:xPvCwd9pU0VN8tPZYzDZV/BMj9CM9vs00GuBjeKhJps= +go.opentelemetry.io/otel/sdk v1.41.0 h1:YPIEXKmiAwkGl3Gu1huk1aYWwtpRLeskpV+wPisxBp8= +go.opentelemetry.io/otel/sdk v1.41.0/go.mod h1:ahFdU0G5y8IxglBf0QBJXgSe7agzjE4GiTJ6HT9ud90= +go.opentelemetry.io/otel/sdk/metric v1.41.0 h1:siZQIYBAUd1rlIWQT2uCxWJxcCO7q3TriaMlf08rXw8= +go.opentelemetry.io/otel/sdk/metric v1.41.0/go.mod h1:HNBuSvT7ROaGtGI50ArdRLUnvRTRGniSUZbxiWxSO8Y= +go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0= +go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= -golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= -golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/httputil/status_recorder.go b/internal/httputil/status_recorder.go new file mode 100644 index 000000000..b4c482ebe --- /dev/null +++ b/internal/httputil/status_recorder.go @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package httputil holds small HTTP helpers shared across the binary +// (tracing middleware, logging middleware, admin handlers). +package httputil + +import "net/http" + +// StatusRecorder captures the response status without buffering the body. +type StatusRecorder struct { + http.ResponseWriter + Status int +} + +func (sr *StatusRecorder) WriteHeader(code int) { + sr.Status = code + sr.ResponseWriter.WriteHeader(code) +} diff --git a/internal/tracing/audit.go b/internal/tracing/audit.go new file mode 100644 index 000000000..739f517b2 --- /dev/null +++ b/internal/tracing/audit.go @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tracing + +import "context" + +func PopulateAuditIDs(ctx context.Context, traceID, spanID, parentSpanID *[]byte) { + if traceID == nil || spanID == nil || parentSpanID == nil { + return + } + if *traceID == nil && *spanID == nil { + *traceID, *spanID = IDsBytesFromContext(ctx) + } + if *parentSpanID == nil { + if p := ParentSpanIDFromContext(ctx); p != nil { + *parentSpanID = p + } + } +} diff --git a/internal/tracing/middleware.go b/internal/tracing/middleware.go new file mode 100644 index 000000000..0cd183394 --- /dev/null +++ b/internal/tracing/middleware.go @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tracing + +import ( + "fmt" + "net/http" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + + "github.com/apache/airavata-custos/internal/httputil" +) + +// Middleware opens a root span per request and writes X-Trace-Id on the response. +func Middleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + route := r.Pattern + if route == "" { + route = r.URL.Path + } + name := "http." + r.Method + " " + route + + ctx, span := Start(r.Context(), name) + defer span.End() + + span.SetAttributes( + attribute.String("http.method", r.Method), + attribute.String("http.route", route), + attribute.String("source", "http"), + ) + + if tid, _ := IDsFromContext(ctx); tid != "" { + w.Header().Set("X-Trace-Id", tid) + } + + sw := &httputil.StatusRecorder{ResponseWriter: w, Status: http.StatusOK} + + // Re-panic so net/http's recover still logs and serves 500. + defer func() { + if rec := recover(); rec != nil { + span.RecordError(fmt.Errorf("http handler panic: %v", rec)) + span.SetStatus(codes.Error, "panic") + panic(rec) + } + }() + + next.ServeHTTP(sw, r.WithContext(ctx)) + + span.SetAttributes(attribute.Int("http.status_code", sw.Status)) + if sw.Status >= 500 { + span.SetStatus(codes.Error, http.StatusText(sw.Status)) + } + }) +} diff --git a/internal/tracing/middleware_test.go b/internal/tracing/middleware_test.go new file mode 100644 index 000000000..3529f45e1 --- /dev/null +++ b/internal/tracing/middleware_test.go @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tracing + +import ( + "net/http" + "net/http/httptest" + "testing" + + "go.opentelemetry.io/otel" + otelcodes "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace/noop" +) + +func setupRecordingTracer(t *testing.T) *tracetest.SpanRecorder { + t.Helper() + prev := otel.GetTracerProvider() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + otel.SetTracerProvider(tp) + t.Cleanup(func() { otel.SetTracerProvider(prev) }) + return sr +} + +func setupNoopTracer(t *testing.T) { + t.Helper() + prev := otel.GetTracerProvider() + otel.SetTracerProvider(noop.NewTracerProvider()) + t.Cleanup(func() { otel.SetTracerProvider(prev) }) +} + +func TestMiddlewareProductionEmitsRootSpanAndHeader(t *testing.T) { + sr := setupRecordingTracer(t) + + mux := http.NewServeMux() + var ( + innerTrace string + innerSpan string + ) + mux.HandleFunc("GET /probe", func(w http.ResponseWriter, r *http.Request) { + innerTrace, innerSpan = IDsFromContext(r.Context()) + w.WriteHeader(http.StatusNoContent) + }) + + handler := Middleware(mux) + req := httptest.NewRequest(http.MethodGet, "/probe", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusNoContent { + t.Fatalf("expected 204, got %d", rec.Code) + } + + hdr := rec.Header().Get("X-Trace-Id") + if hdr == "" { + t.Fatalf("expected X-Trace-Id header to be set") + } + if len(hdr) != 32 { + t.Fatalf("expected 32-char hex trace id, got %q (len=%d)", hdr, len(hdr)) + } + + if innerTrace == "" || innerSpan == "" { + t.Fatalf("expected inner handler to see recording span; got trace=%q span=%q", innerTrace, innerSpan) + } + if innerTrace != hdr { + t.Fatalf("inner trace_id %q does not match response header %q", innerTrace, hdr) + } + + spans := sr.Ended() + if len(spans) != 1 { + t.Fatalf("expected exactly 1 ended span, got %d", len(spans)) + } + s := spans[0] + if got, want := s.Name(), "http.GET /probe"; got != want { + t.Fatalf("span name = %q, want %q", got, want) + } + + attrs := map[string]string{} + for _, kv := range s.Attributes() { + attrs[string(kv.Key)] = kv.Value.Emit() + } + if attrs["http.method"] != "GET" { + t.Fatalf("http.method attr = %q, want GET", attrs["http.method"]) + } + if attrs["http.route"] != "/probe" { + t.Fatalf("http.route attr = %q, want /probe", attrs["http.route"]) + } + if attrs["http.status_code"] != "204" { + t.Fatalf("http.status_code attr = %q, want 204", attrs["http.status_code"]) + } + if attrs["source"] != "http" { + t.Fatalf("source attr = %q, want http", attrs["source"]) + } +} + +func TestMiddlewareNoopModeSetsNoHeader(t *testing.T) { + setupNoopTracer(t) + + mux := http.NewServeMux() + var ( + innerTrace string + innerSpan string + ) + mux.HandleFunc("GET /probe", func(w http.ResponseWriter, r *http.Request) { + innerTrace, innerSpan = IDsFromContext(r.Context()) + w.WriteHeader(http.StatusOK) + }) + + handler := Middleware(mux) + req := httptest.NewRequest(http.MethodGet, "/probe", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + if hdr := rec.Header().Get("X-Trace-Id"); hdr != "" { + t.Fatalf("expected no X-Trace-Id header in noop mode, got %q", hdr) + } + if innerTrace != "" || innerSpan != "" { + t.Fatalf("expected empty IDs from noop ctx, got trace=%q span=%q", innerTrace, innerSpan) + } +} + +func TestMiddleware5xxMarksSpanError(t *testing.T) { + sr := setupRecordingTracer(t) + + mux := http.NewServeMux() + mux.HandleFunc("GET /boom", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }) + + handler := Middleware(mux) + req := httptest.NewRequest(http.MethodGet, "/boom", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + spans := sr.Ended() + if len(spans) != 1 { + t.Fatalf("expected 1 span, got %d", len(spans)) + } + if got := spans[0].Status().Code; got != otelcodes.Error { + t.Fatalf("expected span status Error, got %v", got) + } +} + +func TestMiddlewareSetsErrorOnHandlerPanic(t *testing.T) { + sr := setupRecordingTracer(t) + + handler := Middleware(http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { + panic("boom") + })) + + req := httptest.NewRequest(http.MethodGet, "/explode", nil) + rec := httptest.NewRecorder() + + defer func() { + rec.Result().Body.Close() + if r := recover(); r == nil { + t.Fatalf("expected re-panic to surface, got none") + } + + spans := sr.Ended() + if len(spans) != 1 { + t.Fatalf("expected 1 ended span, got %d", len(spans)) + } + s := spans[0] + if s.Status().Code != otelcodes.Error { + t.Fatalf("expected span status Error, got %v", s.Status().Code) + } + if s.Status().Description != "panic" { + t.Fatalf("expected status description 'panic', got %q", s.Status().Description) + } + if len(s.Events()) == 0 { + t.Fatalf("expected at least one recorded event (the error), got 0") + } + }() + + handler.ServeHTTP(rec, req) +} + +func TestMiddlewareFallsBackToPathWhenPatternEmpty(t *testing.T) { + sr := setupRecordingTracer(t) + + handler := Middleware(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + req := httptest.NewRequest(http.MethodGet, "/no-mux-route", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + spans := sr.Ended() + if len(spans) != 1 { + t.Fatalf("expected 1 span, got %d", len(spans)) + } + if got, want := spans[0].Name(), "http.GET /no-mux-route"; got != want { + t.Fatalf("span name = %q, want %q", got, want) + } +} diff --git a/internal/tracing/slog_handler.go b/internal/tracing/slog_handler.go new file mode 100644 index 000000000..12a3cf46f --- /dev/null +++ b/internal/tracing/slog_handler.go @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tracing + +import ( + "context" + "log/slog" +) + +type slogHandler struct { + inner slog.Handler +} + +func SlogHandler(inner slog.Handler) slog.Handler { + return &slogHandler{inner: inner} +} + +func (h *slogHandler) Enabled(ctx context.Context, level slog.Level) bool { + return h.inner.Enabled(ctx, level) +} + +func (h *slogHandler) Handle(ctx context.Context, record slog.Record) error { + traceID, spanID := IDsFromContext(ctx) + if traceID != "" { + record.AddAttrs(slog.String("trace_id", traceID)) + } + if spanID != "" { + record.AddAttrs(slog.String("span_id", spanID)) + } + return h.inner.Handle(ctx, record) +} + +func (h *slogHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + return &slogHandler{inner: h.inner.WithAttrs(attrs)} +} + +func (h *slogHandler) WithGroup(name string) slog.Handler { + return &slogHandler{inner: h.inner.WithGroup(name)} +} diff --git a/internal/tracing/status.go b/internal/tracing/status.go new file mode 100644 index 000000000..79f8a3264 --- /dev/null +++ b/internal/tracing/status.go @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tracing + +import ( + "strings" + "sync" +) + +const ( + StatusOk = "ok" + StatusError = "error" + StatusInProgress = "in_progress" +) + +var errorMarkers = []string{"failed", "error", "rejected"} + +func EventStatus(eventType string) string { + lower := strings.ToLower(eventType) + for _, marker := range errorMarkers { + if strings.Contains(lower, marker) { + return StatusError + } + } + return StatusOk +} + +var ( + terminalMarkersMu sync.RWMutex + terminalMarkers = map[string][]string{} +) + +// RegisterTerminalMarkers declares the event names that close out a trace for +// the given source. Connectors call this at boot so the core stays unaware of +// connector-specific event names. +func RegisterTerminalMarkers(source string, markers ...string) { + terminalMarkersMu.Lock() + defer terminalMarkersMu.Unlock() + terminalMarkers[source] = append(terminalMarkers[source], markers...) +} + +type TraceEventStatus struct { + Source string + EventType string +} + +func setTerminalMarkersForTest(t interface{ Cleanup(func()) }, source string, markers []string) { + terminalMarkersMu.Lock() + prev, hadPrev := terminalMarkers[source] + terminalMarkers[source] = markers + terminalMarkersMu.Unlock() + t.Cleanup(func() { + terminalMarkersMu.Lock() + defer terminalMarkersMu.Unlock() + if hadPrev { + terminalMarkers[source] = prev + } else { + delete(terminalMarkers, source) + } + }) +} + +// TraceStatus is "error" if any event errored, "ok" if a registered terminal +// marker is present, else "in_progress". +func TraceStatus(events []TraceEventStatus) string { + terminalMarkersMu.RLock() + snapshot := make(map[string][]string, len(terminalMarkers)) + for k, v := range terminalMarkers { + snapshot[k] = v + } + terminalMarkersMu.RUnlock() + + hasError := false + hasTerminal := false + for _, e := range events { + if EventStatus(e.EventType) == StatusError { + hasError = true + } + for _, m := range snapshot[e.Source] { + if e.EventType == m { + hasTerminal = true + } + } + } + if hasError { + return StatusError + } + if hasTerminal { + return StatusOk + } + return StatusInProgress +} diff --git a/internal/tracing/status_test.go b/internal/tracing/status_test.go new file mode 100644 index 000000000..3eb21ccc6 --- /dev/null +++ b/internal/tracing/status_test.go @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tracing + +import "testing" + +func TestEventStatus(t *testing.T) { + cases := map[string]string{ + "CREATE_PERSON": StatusOk, + "ComanageClusterAccountAttached": StatusOk, + "TRANSACTION_COMPLETE": StatusOk, + "ComanageProvisioningFailed": StatusError, + "REQUEST_REJECTED": StatusError, + "some.error.happened": StatusError, + "": StatusOk, + } + for in, want := range cases { + if got := EventStatus(in); got != want { + t.Errorf("EventStatus(%q) = %q, want %q", in, got, want) + } + } +} + +func TestTraceStatusOk(t *testing.T) { + setTerminalMarkersForTest(t, "amie", []string{"TRANSACTION_COMPLETE"}) + events := []TraceEventStatus{ + {Source: "amie", EventType: "CREATE_PERSON"}, + {Source: "amie", EventType: "CREATE_ACCOUNT"}, + {Source: "amie", EventType: "TRANSACTION_COMPLETE"}, + } + if got := TraceStatus(events); got != StatusOk { + t.Errorf("TraceStatus(ok flow) = %q, want %q", got, StatusOk) + } +} + +func TestTraceStatusError(t *testing.T) { + setTerminalMarkersForTest(t, "amie", []string{"TRANSACTION_COMPLETE"}) + events := []TraceEventStatus{ + {Source: "amie", EventType: "CREATE_PERSON"}, + {Source: "comanage", EventType: "ComanageProvisioningFailed"}, + {Source: "amie", EventType: "TRANSACTION_COMPLETE"}, + } + if got := TraceStatus(events); got != StatusError { + t.Errorf("TraceStatus(error wins over terminal) = %q, want %q", got, StatusError) + } +} + +func TestTraceStatusInProgress(t *testing.T) { + events := []TraceEventStatus{ + {Source: "amie", EventType: "CREATE_PERSON"}, + {Source: "amie", EventType: "CREATE_ACCOUNT"}, + } + if got := TraceStatus(events); got != StatusInProgress { + t.Errorf("TraceStatus(no terminal) = %q, want %q", got, StatusInProgress) + } +} + +func TestTraceStatusEmpty(t *testing.T) { + if got := TraceStatus(nil); got != StatusInProgress { + t.Errorf("TraceStatus(nil) = %q, want %q", got, StatusInProgress) + } +} + +func TestTraceStatusComanageTerminal(t *testing.T) { + setTerminalMarkersForTest(t, "comanage", []string{"ComanageClusterAccountAttached"}) + events := []TraceEventStatus{ + {Source: "comanage", EventType: "ComanageClusterAccountAttached"}, + } + if got := TraceStatus(events); got != StatusOk { + t.Errorf("TraceStatus(comanage terminal) = %q, want %q", got, StatusOk) + } +} diff --git a/internal/tracing/tracing.go b/internal/tracing/tracing.go new file mode 100644 index 000000000..e7187a921 --- /dev/null +++ b/internal/tracing/tracing.go @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tracing + +import ( + "context" + "fmt" + "log/slog" + "os" + "strings" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.27.0" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" +) + +type Mode int + +const ( + ModeProduction Mode = iota + ModeNoop +) + +const tracerName = "custos" + +type InitConfig struct { + Mode Mode + Logger *slog.Logger + ServiceName string +} + +func Init(cfg InitConfig) (func(context.Context) error, error) { + if cfg.Mode == ModeNoop { + otel.SetTracerProvider(noop.NewTracerProvider()) + otel.SetTextMapPropagator(propagation.TraceContext{}) + return func(context.Context) error { return nil }, nil + } + + serviceName := cfg.ServiceName + if serviceName == "" { + serviceName = "custos" + } + + hostname, _ := os.Hostname() + instanceID := fmt.Sprintf("%s-%d", hostname, os.Getpid()) + + res, err := resource.New(context.Background(), + resource.WithAttributes( + semconv.ServiceName(serviceName), + semconv.ServiceInstanceID(instanceID), + ), + ) + if err != nil { + return nil, fmt.Errorf("tracing: build resource: %w", err) + } + + // No SpanProcessor: spans live in ctx for ID propagation only. + tp := sdktrace.NewTracerProvider(sdktrace.WithResource(res)) + + otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.TraceContext{}) + + return func(ctx context.Context) error { + return tp.Shutdown(ctx) + }, nil +} + +type parentSpanIDKeyType struct{} +type lastBusinessSpanIDKeyType struct{} + +var ( + parentSpanIDKey parentSpanIDKeyType + lastBusinessSpanIDKey lastBusinessSpanIDKeyType +) + +// Start opens a span and stamps the audit parent in ctx. bus.* spans are +// skipped so audit parents jump over the bus to the nearest business span. +func Start(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + newCtx, span := otel.Tracer(tracerName).Start(ctx, name, opts...) + + if strings.HasPrefix(name, "bus.") { + return newCtx, span + } + + if last, ok := ctx.Value(lastBusinessSpanIDKey).(trace.SpanID); ok && last.IsValid() { + newCtx = context.WithValue(newCtx, parentSpanIDKey, last) + } else if parent := trace.SpanFromContext(ctx).SpanContext().SpanID(); parent.IsValid() { + newCtx = context.WithValue(newCtx, parentSpanIDKey, parent) + } + + newCtx = context.WithValue(newCtx, lastBusinessSpanIDKey, span.SpanContext().SpanID()) + return newCtx, span +} + +func ParentSpanIDFromContext(ctx context.Context) []byte { + if p, ok := ctx.Value(parentSpanIDKey).(trace.SpanID); ok && p.IsValid() { + out := make([]byte, 8) + copy(out, p[:]) + return out + } + return nil +} + +func FromContext(ctx context.Context) trace.Span { + return trace.SpanFromContext(ctx) +} + +func IDsFromContext(ctx context.Context) (traceID, spanID string) { + sc := trace.SpanContextFromContext(ctx) + if !sc.IsValid() { + return "", "" + } + return sc.TraceID().String(), sc.SpanID().String() +} + +// IDsBytesFromContext returns the active trace_id (16 bytes) and span_id (8 bytes), +// or nil/nil if no recording span is on ctx. +func IDsBytesFromContext(ctx context.Context) (traceID, spanID []byte) { + sc := trace.SpanContextFromContext(ctx) + if !sc.IsValid() { + return nil, nil + } + tid := sc.TraceID() + sid := sc.SpanID() + return append([]byte(nil), tid[:]...), append([]byte(nil), sid[:]...) +} diff --git a/internal/tracing/tracing_test.go b/internal/tracing/tracing_test.go new file mode 100644 index 000000000..34aa7c316 --- /dev/null +++ b/internal/tracing/tracing_test.go @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tracing + +import ( + "bytes" + "context" + "testing" + "time" +) + +func TestInitNoopProducesNonRecordingSpans(t *testing.T) { + shutdown, err := Init(InitConfig{Mode: ModeNoop}) + if err != nil { + t.Fatalf("Init(noop) returned error: %v", err) + } + + ctx, span := Start(context.Background(), "test.noop") + if span.IsRecording() { + t.Fatalf("expected noop span to be non-recording") + } + + traceID, spanID := IDsFromContext(ctx) + if traceID != "" || spanID != "" { + t.Fatalf("expected empty IDs from noop ctx, got trace=%q span=%q", traceID, spanID) + } + + span.End() + + sctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err := shutdown(sctx); err != nil { + t.Fatalf("shutdown returned error: %v", err) + } +} + +func TestInitProductionMintsValidIDs(t *testing.T) { + shutdown, err := Init(InitConfig{Mode: ModeProduction, ServiceName: "custos-test"}) + if err != nil { + t.Fatalf("Init(production) returned error: %v", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = shutdown(ctx) + }() + + ctx, span := Start(context.Background(), "test.production") + defer span.End() + + traceID, spanID := IDsFromContext(ctx) + if traceID == "" || spanID == "" { + t.Fatalf("expected non-empty IDs from production ctx, got trace=%q span=%q", traceID, spanID) + } +} + +func TestStartCapturesParentSpanID(t *testing.T) { + shutdown, err := Init(InitConfig{Mode: ModeProduction, ServiceName: "custos-test"}) + if err != nil { + t.Fatalf("Init returned error: %v", err) + } + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _ = shutdown(ctx) + }() + + ctx1, span1 := Start(context.Background(), "outer") + defer span1.End() + span1ID := span1.SpanContext().SpanID() + + ctx2, span2 := Start(ctx1, "inner") + defer span2.End() + + got := ParentSpanIDFromContext(ctx2) + if len(got) != 8 { + t.Fatalf("expected 8 bytes, got %d", len(got)) + } + if !bytes.Equal(got, span1ID[:]) { + t.Errorf("parent mismatch: want %x got %x", span1ID, got) + } + + if root := ParentSpanIDFromContext(ctx1); root != nil { + t.Errorf("expected nil parent on root span ctx, got %x", root) + } +}
