This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch tracing-impl in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 4a392af4027881ed14d8c34e1ea1811a6a2e71df Author: lahiruj <[email protected]> AuthorDate: Fri Jun 5 00:36:25 2026 -0400 Add trace_id, span_id and parent_span_id columns --- .../000002_amie_audit_trace_columns.down.sql | 22 ++++++ .../000002_amie_audit_trace_columns.up.sql | 22 ++++++ connectors/ACCESS/AMIE-Processor/model/audit.go | 20 +++-- .../ACCESS/AMIE-Processor/service/audit_service.go | 11 +++ .../AMIE-Processor/service/audit_service_test.go | 57 ++++++++++++++ .../ACCESS/AMIE-Processor/store/audit_store.go | 6 +- .../migrations/000006_audit_trace_columns.down.sql | 22 ++++++ .../migrations/000006_audit_trace_columns.up.sql | 22 ++++++ internal/store/audit_event_store.go | 8 +- pkg/models/audit.go | 30 ++++++-- pkg/service/audit_event.go | 10 +++ pkg/service/audit_event_test.go | 89 ++++++++++++++++++++++ 12 files changed, 299 insertions(+), 20 deletions(-) diff --git a/connectors/ACCESS/AMIE-Processor/db/migrations/000002_amie_audit_trace_columns.down.sql b/connectors/ACCESS/AMIE-Processor/db/migrations/000002_amie_audit_trace_columns.down.sql new file mode 100644 index 000000000..9bf0b87ce --- /dev/null +++ b/connectors/ACCESS/AMIE-Processor/db/migrations/000002_amie_audit_trace_columns.down.sql @@ -0,0 +1,22 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +ALTER TABLE amie_audit_log + DROP KEY idx_amie_audit_trace, + DROP COLUMN parent_span_id, + DROP COLUMN span_id, + DROP COLUMN trace_id; diff --git a/connectors/ACCESS/AMIE-Processor/db/migrations/000002_amie_audit_trace_columns.up.sql b/connectors/ACCESS/AMIE-Processor/db/migrations/000002_amie_audit_trace_columns.up.sql new file mode 100644 index 000000000..da6d5354d --- /dev/null +++ b/connectors/ACCESS/AMIE-Processor/db/migrations/000002_amie_audit_trace_columns.up.sql @@ -0,0 +1,22 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +ALTER TABLE amie_audit_log + ADD COLUMN trace_id BINARY(16) NULL, + ADD COLUMN span_id BINARY(8) NULL, + ADD COLUMN parent_span_id BINARY(8) NULL, + ADD KEY idx_amie_audit_trace (trace_id); diff --git a/connectors/ACCESS/AMIE-Processor/model/audit.go b/connectors/ACCESS/AMIE-Processor/model/audit.go index d6e1182ef..86d88baa0 100644 --- a/connectors/ACCESS/AMIE-Processor/model/audit.go +++ b/connectors/ACCESS/AMIE-Processor/model/audit.go @@ -23,6 +23,7 @@ import "time" type AuditAction string const ( + AuditPacketReceived AuditAction = "PACKET_RECEIVED" AuditCreatePerson AuditAction = "CREATE_PERSON" AuditUpdatePerson AuditAction = "UPDATE_PERSON" AuditDeletePerson AuditAction = "DELETE_PERSON" @@ -42,12 +43,15 @@ const ( // AuditLog records a handler action for traceability and compliance. type AuditLog struct { - ID int64 `db:"id" json:"id"` - PacketID string `db:"packet_id" json:"packet_id"` - EventID *string `db:"event_id" json:"event_id,omitempty"` - Action AuditAction `db:"action" json:"action"` - EntityType string `db:"entity_type" json:"entity_type"` - EntityID *string `db:"entity_id" json:"entity_id,omitempty"` - Summary *string `db:"summary" json:"summary,omitempty"` - CreatedAt time.Time `db:"created_at" json:"created_at"` + ID int64 `db:"id" json:"id"` + PacketID string `db:"packet_id" json:"packet_id"` + EventID *string `db:"event_id" json:"event_id,omitempty"` + Action AuditAction `db:"action" json:"action"` + EntityType string `db:"entity_type" json:"entity_type"` + EntityID *string `db:"entity_id" json:"entity_id,omitempty"` + Summary *string `db:"summary" json:"summary,omitempty"` + CreatedAt time.Time `db:"created_at" json:"created_at"` + TraceID []byte `db:"trace_id" json:"-"` + SpanID []byte `db:"span_id" json:"-"` + ParentSpanID []byte `db:"parent_span_id" json:"-"` } diff --git a/connectors/ACCESS/AMIE-Processor/service/audit_service.go b/connectors/ACCESS/AMIE-Processor/service/audit_service.go index 3a3e98ce6..932493f45 100644 --- a/connectors/ACCESS/AMIE-Processor/service/audit_service.go +++ b/connectors/ACCESS/AMIE-Processor/service/audit_service.go @@ -25,6 +25,7 @@ import ( "time" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" ) type auditStore interface { @@ -50,6 +51,16 @@ func (s *AuditService) Log(ctx context.Context, tx *sql.Tx, packetID, eventID st Summary: ptrOrNil(summary), CreatedAt: time.Now().UTC(), } + tracing.PopulateAuditIDs(ctx, &entry.TraceID, &entry.SpanID, &entry.ParentSpanID) + if entry.TraceID == nil { + slog.WarnContext(ctx, "audit write outside an active span", + "packet_id", packetID, + "event_id", eventID, + "action", string(action), + "entity_type", entityType, + "entity_id", entityID, + ) + } if err := s.audits.Save(ctx, tx, entry); err != nil { return fmt.Errorf("audit_service: saving audit log for packet %s: %w", packetID, err) diff --git a/connectors/ACCESS/AMIE-Processor/service/audit_service_test.go b/connectors/ACCESS/AMIE-Processor/service/audit_service_test.go index c5246bb27..7a9e3f8c3 100644 --- a/connectors/ACCESS/AMIE-Processor/service/audit_service_test.go +++ b/connectors/ACCESS/AMIE-Processor/service/audit_service_test.go @@ -18,14 +18,19 @@ package service import ( + "bytes" "context" "database/sql" "testing" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" ) // --------------------------------------------------------------------------- @@ -113,3 +118,55 @@ func TestAuditLog_PropagatesStoreError(t *testing.T) { assert.Contains(t, err.Error(), "audit_service") store.AssertExpectations(t) } + +func TestAuditLog_PersistsTraceAndSpanIDs(t *testing.T) { + prev := otel.GetTracerProvider() + tp := sdktrace.NewTracerProvider() + otel.SetTracerProvider(tp) + t.Cleanup(func() { otel.SetTracerProvider(prev) }) + + store := new(mockAuditStore) + svc := NewAuditService(store) + + ctx, span := tracing.Start(context.Background(), "test.root") + defer span.End() + + wantTrace := span.SpanContext().TraceID() + wantSpan := span.SpanContext().SpanID() + + var captured *model.AuditLog + store.On("Save", mock.Anything, mock.Anything, mock.MatchedBy(func(a *model.AuditLog) bool { + captured = a + return true + })).Return(nil) + + err := svc.Log(ctx, nil, "packet-trace", "event-trace", model.AuditCreatePerson, "person", "p1", "with trace") + require.NoError(t, err) + require.NotNil(t, captured) + if !bytes.Equal(captured.TraceID, wantTrace[:]) { + t.Fatalf("trace_id mismatch: got %x want %x", captured.TraceID, wantTrace[:]) + } + if !bytes.Equal(captured.SpanID, wantSpan[:]) { + t.Fatalf("span_id mismatch: got %x want %x", captured.SpanID, wantSpan[:]) + } + store.AssertExpectations(t) +} + +func TestAuditLog_NilTraceWhenNoSpan(t *testing.T) { + store := new(mockAuditStore) + svc := NewAuditService(store) + + var captured *model.AuditLog + store.On("Save", mock.Anything, mock.Anything, mock.MatchedBy(func(a *model.AuditLog) bool { + captured = a + return true + })).Return(nil) + + err := svc.Log(trace.ContextWithSpanContext(context.Background(), trace.SpanContext{}), + nil, "p", "e", model.AuditReplySent, "reply", "", "") + require.NoError(t, err) + require.NotNil(t, captured) + if captured.TraceID != nil || captured.SpanID != nil { + t.Fatalf("expected nil trace/span IDs when no active span") + } +} diff --git a/connectors/ACCESS/AMIE-Processor/store/audit_store.go b/connectors/ACCESS/AMIE-Processor/store/audit_store.go index 72cbb75f1..fd54a10a0 100644 --- a/connectors/ACCESS/AMIE-Processor/store/audit_store.go +++ b/connectors/ACCESS/AMIE-Processor/store/audit_store.go @@ -39,8 +39,8 @@ func NewAuditStore(db *sqlx.DB) AuditStore { func (s *mariaDBauditStore) Save(ctx context.Context, tx *sql.Tx, a *model.AuditLog) error { _, err := tx.ExecContext(ctx, - `INSERT INTO amie_audit_log (packet_id, event_id, action, entity_type, entity_id, summary, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?)`, - a.PacketID, a.EventID, a.Action, a.EntityType, a.EntityID, a.Summary, a.CreatedAt) + `INSERT INTO amie_audit_log (packet_id, event_id, action, entity_type, entity_id, summary, created_at, trace_id, span_id, parent_span_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + a.PacketID, a.EventID, a.Action, a.EntityType, a.EntityID, a.Summary, a.CreatedAt, a.TraceID, a.SpanID, a.ParentSpanID) return err } diff --git a/internal/db/migrations/000006_audit_trace_columns.down.sql b/internal/db/migrations/000006_audit_trace_columns.down.sql new file mode 100644 index 000000000..5abb8640e --- /dev/null +++ b/internal/db/migrations/000006_audit_trace_columns.down.sql @@ -0,0 +1,22 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +ALTER TABLE audit_events + DROP KEY idx_audit_events_trace, + DROP COLUMN parent_span_id, + DROP COLUMN span_id, + DROP COLUMN trace_id; diff --git a/internal/db/migrations/000006_audit_trace_columns.up.sql b/internal/db/migrations/000006_audit_trace_columns.up.sql new file mode 100644 index 000000000..8d5b3221a --- /dev/null +++ b/internal/db/migrations/000006_audit_trace_columns.up.sql @@ -0,0 +1,22 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +ALTER TABLE audit_events + ADD COLUMN trace_id BINARY(16) NULL, + ADD COLUMN span_id BINARY(8) NULL, + ADD COLUMN parent_span_id BINARY(8) NULL, + ADD KEY idx_audit_events_trace (trace_id); diff --git a/internal/store/audit_event_store.go b/internal/store/audit_event_store.go index db275c793..057778051 100644 --- a/internal/store/audit_event_store.go +++ b/internal/store/audit_event_store.go @@ -27,7 +27,7 @@ import ( "github.com/apache/airavata-custos/pkg/models" ) -const auditEventColumns = "id, event_type, event_time, entity_id, details" +const auditEventColumns = "id, event_type, event_time, entity_id, details, trace_id, span_id, parent_span_id" type mysqlAuditEventStore struct { db *sqlx.DB @@ -91,9 +91,9 @@ func (s *mysqlAuditEventStore) ListAll(ctx context.Context) ([]*models.AuditEven func (s *mysqlAuditEventStore) Create(ctx context.Context, tx *sql.Tx, e *models.AuditEvent) error { _, err := tx.ExecContext(ctx, - `INSERT INTO audit_events (id, event_type, event_time, entity_id, details) - VALUES (?, ?, ?, ?, ?)`, - e.ID, e.EventType, e.EventTime, e.EntityID, e.Details) + `INSERT INTO audit_events (id, event_type, event_time, entity_id, details, trace_id, span_id, parent_span_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, + e.ID, e.EventType, e.EventTime, e.EntityID, e.Details, e.TraceID, e.SpanID, e.ParentSpanID) return err } diff --git a/pkg/models/audit.go b/pkg/models/audit.go index 218b5f045..7e0221323 100644 --- a/pkg/models/audit.go +++ b/pkg/models/audit.go @@ -1,11 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package models import "time" type AuditEvent struct { - ID string `json:"id" db:"id"` - EventType string `json:"event_type" db:"event_type"` // e.g., "COMPUTE_ALLOCATION_CREATED", "COMPUTE_ALLOCATION_UPDATED", "COMPUTE_ALLOCATION_DELETED", etc. - EventTime time.Time `json:"event_time" db:"event_time"` - EntityID string `json:"entity_id" db:"entity_id"` // The ID of the entity associated with the event, e.g., the compute allocation ID. - Details string `json:"details" db:"details"` // Additional details about the event, stored as a JSON string or plain text. + ID string `json:"id" db:"id"` + EventType string `json:"event_type" db:"event_type"` // e.g., "COMPUTE_ALLOCATION_CREATED", "COMPUTE_ALLOCATION_UPDATED", "COMPUTE_ALLOCATION_DELETED", etc. + EventTime time.Time `json:"event_time" db:"event_time"` + EntityID string `json:"entity_id" db:"entity_id"` // The ID of the entity associated with the event, e.g., the compute allocation ID. + Details string `json:"details" db:"details"` // Additional details about the event, stored as a JSON string or plain text. + TraceID []byte `json:"-" db:"trace_id"` + SpanID []byte `json:"-" db:"span_id"` + ParentSpanID []byte `json:"-" db:"parent_span_id"` } diff --git a/pkg/service/audit_event.go b/pkg/service/audit_event.go index 259751498..1024afbd2 100644 --- a/pkg/service/audit_event.go +++ b/pkg/service/audit_event.go @@ -21,7 +21,9 @@ import ( "context" "database/sql" "fmt" + "log/slog" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/models" ) @@ -46,6 +48,14 @@ func (s *Service) CreateAuditEvent(ctx context.Context, e *models.AuditEvent) (* e.EventTime = nowUTC() } + tracing.PopulateAuditIDs(ctx, &e.TraceID, &e.SpanID, &e.ParentSpanID) + if e.TraceID == nil { + slog.WarnContext(ctx, "audit write outside an active span", + "event_type", e.EventType, + "entity_id", e.EntityID, + ) + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { return s.auditEvents.Create(ctx, tx, e) }); err != nil { diff --git a/pkg/service/audit_event_test.go b/pkg/service/audit_event_test.go new file mode 100644 index 000000000..64f106904 --- /dev/null +++ b/pkg/service/audit_event_test.go @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package service + +import ( + "bytes" + "context" + "testing" + + "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + + "github.com/apache/airavata-custos/internal/tracing" + "github.com/apache/airavata-custos/pkg/models" +) + +func TestAuditEventTraceIDs_PopulatedFromActiveSpan(t *testing.T) { + prev := otel.GetTracerProvider() + tp := sdktrace.NewTracerProvider() + otel.SetTracerProvider(tp) + t.Cleanup(func() { otel.SetTracerProvider(prev) }) + + ctx, span := tracing.Start(context.Background(), "test.root") + defer span.End() + + wantTrace := span.SpanContext().TraceID() + wantSpan := span.SpanContext().SpanID() + + e := &models.AuditEvent{EventType: "X", EntityID: "y"} + tracing.PopulateAuditIDs(ctx, &e.TraceID, &e.SpanID, &e.ParentSpanID) + + if !bytes.Equal(e.TraceID, wantTrace[:]) { + t.Fatalf("trace_id mismatch: got %x want %x", e.TraceID, wantTrace[:]) + } + if !bytes.Equal(e.SpanID, wantSpan[:]) { + t.Fatalf("span_id mismatch: got %x want %x", e.SpanID, wantSpan[:]) + } +} + +func TestAuditEventTraceIDs_NilWhenNoSpan(t *testing.T) { + e := &models.AuditEvent{EventType: "X", EntityID: "y"} + tracing.PopulateAuditIDs(context.Background(), &e.TraceID, &e.SpanID, &e.ParentSpanID) + + if e.TraceID != nil || e.SpanID != nil { + t.Fatalf("expected nil trace/span IDs, got trace=%x span=%x", e.TraceID, e.SpanID) + } +} + +func TestAuditEventTraceIDs_NotOverwrittenWhenPreset(t *testing.T) { + prev := otel.GetTracerProvider() + tp := sdktrace.NewTracerProvider() + otel.SetTracerProvider(tp) + t.Cleanup(func() { otel.SetTracerProvider(prev) }) + + ctx, span := tracing.Start(context.Background(), "test.root") + defer span.End() + + preset := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + presetSpan := []byte{1, 2, 3, 4, 5, 6, 7, 8} + e := &models.AuditEvent{ + EventType: "X", + EntityID: "y", + TraceID: preset, + SpanID: presetSpan, + } + tracing.PopulateAuditIDs(ctx, &e.TraceID, &e.SpanID, &e.ParentSpanID) + + if !bytes.Equal(e.TraceID, preset) { + t.Fatalf("preset trace_id was overwritten: got %x want %x", e.TraceID, preset) + } + if !bytes.Equal(e.SpanID, presetSpan) { + t.Fatalf("preset span_id was overwritten: got %x want %x", e.SpanID, presetSpan) + } +}
