This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch tracing-impl in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 45b7a83b4eb62107cedae91b7f92610832e2f2c0 Author: lahiruj <[email protected]> AuthorDate: Fri Jun 5 00:30:16 2026 -0400 Propagate request context across event bus so subscribers inherit trace IDs --- pkg/events/bus.go | 64 +++++- pkg/events/bus_test.go | 225 +++++++++++++++++++++ pkg/events/compute_allocation_diff_subscribe.go | 9 +- ...ation_membership_resource_override_subscribe.go | 9 +- .../compute_allocation_membership_subscribe.go | 9 +- ...ompute_allocation_resource_mapping_subscribe.go | 9 +- .../compute_allocation_resource_subscribe.go | 9 +- pkg/events/compute_allocation_subscribe.go | 9 +- pkg/events/compute_cluster_user_subscribe.go | 9 +- pkg/events/organization_subscribe.go | 9 +- pkg/events/project_subscribe.go | 9 +- pkg/events/types.go | 3 +- pkg/events/user_identity_subscribe.go | 9 +- pkg/events/user_subscribe.go | 9 +- pkg/service/compute_allocation.go | 6 +- pkg/service/compute_allocation_diff.go | 4 +- pkg/service/compute_allocation_membership.go | 8 +- ...pute_allocation_membership_resource_override.go | 6 +- pkg/service/compute_allocation_resource.go | 6 +- pkg/service/compute_allocation_resource_mapping.go | 6 +- pkg/service/compute_cluster.go | 6 +- pkg/service/compute_cluster_user.go | 6 +- pkg/service/organization.go | 6 +- pkg/service/project.go | 8 +- pkg/service/user.go | 8 +- pkg/service/user_identity.go | 6 +- pkg/service/user_merge.go | 4 +- 27 files changed, 382 insertions(+), 89 deletions(-) diff --git a/pkg/events/bus.go b/pkg/events/bus.go index e2e419b25..2044e02ca 100644 --- a/pkg/events/bus.go +++ b/pkg/events/bus.go @@ -1,5 +1,15 @@ package events +import ( + "context" + "fmt" + "log/slog" + "runtime/debug" + + "github.com/apache/airavata-custos/internal/tracing" + "go.opentelemetry.io/otel/codes" +) + func New() *Bus { return &Bus{ subs: make(map[string][]EventSubscriberFunc), @@ -17,21 +27,47 @@ func (b *Bus) Subscribe(topic EventType, handler EventSubscriberFunc) { // Publish sends an event to all subscribers of the given topic. // Each handler runs in its own goroutine so publishers never block. -func (b *Bus) Publish(topic EventType, payload any) { +func (b *Bus) Publish(ctx context.Context, topic EventType, payload any) { + ctx, span := tracing.Start(ctx, "bus.publish:"+string(topic)) + defer span.End() + b.mu.RLock() handlers := make([]EventSubscriberFunc, len(b.subs[string(topic)])) copy(handlers, b.subs[string(topic)]) b.mu.RUnlock() event := Event{Type: topic, Payload: payload} + detached := context.WithoutCancel(ctx) for _, h := range handlers { - go h(event, payload) + go safeDispatch(detached, h, event, payload) } } +func safeDispatch(ctx context.Context, h EventSubscriberFunc, event Event, payload any) { + ctx, span := tracing.Start(ctx, "bus.subscribe:"+string(event.Type)) + defer span.End() + + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("subscriber panic: %v", r) + span.RecordError(err) + span.SetStatus(codes.Error, "subscriber panic") + slog.Error("event subscriber panicked", + "topic", event.Type, + "panic", r, + "stack", string(debug.Stack()), + ) + } + }() + h(ctx, event, payload) +} + // PublishSync is like Publish but calls handlers in the caller's goroutine. // Useful when you need to guarantee ordering or want backpressure. -func (b *Bus) PublishSync(topic EventType, payload any) { +func (b *Bus) PublishSync(ctx context.Context, topic EventType, payload any) { + ctx, span := tracing.Start(ctx, "bus.publish:"+string(topic)) + defer span.End() + b.mu.RLock() handlers := make([]EventSubscriberFunc, len(b.subs[string(topic)])) copy(handlers, b.subs[string(topic)]) @@ -39,6 +75,26 @@ func (b *Bus) PublishSync(topic EventType, payload any) { event := Event{Type: topic, Payload: payload} for _, h := range handlers { - h(event, payload) + dispatchSync(ctx, h, event, payload) } } + +func dispatchSync(ctx context.Context, h EventSubscriberFunc, event Event, payload any) { + ctx, span := tracing.Start(ctx, "bus.subscribe:"+string(event.Type)) + defer span.End() + + defer func() { + if r := recover(); r != nil { + err := fmt.Errorf("subscriber panic: %v", r) + span.RecordError(err) + span.SetStatus(codes.Error, "subscriber panic") + slog.Error("event subscriber panicked", + "topic", event.Type, + "panic", r, + "stack", string(debug.Stack()), + ) + panic(r) + } + }() + h(ctx, event, payload) +} diff --git a/pkg/events/bus_test.go b/pkg/events/bus_test.go new file mode 100644 index 000000000..46af671a6 --- /dev/null +++ b/pkg/events/bus_test.go @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package events + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +type ctxKey string + +const testCtxKey ctxKey = "trace-id" + +const topicTest EventType = "test::topic" + +func TestPublishSyncPropagatesContext(t *testing.T) { + bus := New() + + got := make(chan string, 1) + bus.Subscribe(topicTest, func(ctx context.Context, _ Event, _ interface{}) { + v, _ := ctx.Value(testCtxKey).(string) + got <- v + }) + + ctx := context.WithValue(context.Background(), testCtxKey, "abc123") + bus.PublishSync(ctx, topicTest, "payload") + + select { + case v := <-got: + if v != "abc123" { + t.Fatalf("PublishSync did not propagate ctx value, got %q", v) + } + case <-time.After(time.Second): + t.Fatalf("subscriber never ran") + } +} + +func TestPublishAsyncPropagatesContext(t *testing.T) { + bus := New() + + got := make(chan string, 1) + bus.Subscribe(topicTest, func(ctx context.Context, _ Event, _ interface{}) { + v, _ := ctx.Value(testCtxKey).(string) + got <- v + }) + + ctx := context.WithValue(context.Background(), testCtxKey, "def456") + bus.Publish(ctx, topicTest, "payload") + + select { + case v := <-got: + if v != "def456" { + t.Fatalf("Publish did not propagate ctx value, got %q", v) + } + case <-time.After(time.Second): + t.Fatalf("subscriber never ran") + } +} + +func TestPublishAsyncDetachesCancellation(t *testing.T) { + bus := New() + + started := make(chan struct{}) + done := make(chan error, 1) + bus.Subscribe(topicTest, func(ctx context.Context, _ Event, _ interface{}) { + close(started) + select { + case <-ctx.Done(): + done <- ctx.Err() + case <-time.After(200 * time.Millisecond): + done <- nil + } + }) + + ctx, cancel := context.WithCancel(context.Background()) + bus.Publish(ctx, topicTest, "payload") + + <-started + cancel() + + select { + case err := <-done: + if err != nil { + t.Fatalf("expected subscriber ctx to be detached from cancellation, got err=%v", err) + } + case <-time.After(time.Second): + t.Fatalf("subscriber never finished") + } +} + +func TestPublishSyncPanicPropagatesToCaller(t *testing.T) { + bus := New() + + bus.Subscribe(topicTest, func(context.Context, Event, interface{}) { + panic("boom") + }) + + var recovered any + func() { + defer func() { recovered = recover() }() + bus.PublishSync(context.Background(), topicTest, nil) + }() + + if recovered == nil { + t.Fatalf("expected sync publish to surface subscriber panic to caller") + } +} + +func TestPublishAsyncSubscriberPanicDoesNotKillOthers(t *testing.T) { + bus := New() + + var ran atomic.Int32 + var wg sync.WaitGroup + wg.Add(2) + bus.Subscribe(topicTest, func(context.Context, Event, interface{}) { + defer wg.Done() + ran.Add(1) + panic("boom") + }) + bus.Subscribe(topicTest, func(context.Context, Event, interface{}) { + defer wg.Done() + ran.Add(1) + }) + + bus.Publish(context.Background(), topicTest, nil) + + finished := make(chan struct{}) + go func() { + wg.Wait() + close(finished) + }() + + select { + case <-finished: + case <-time.After(time.Second): + } + + if got := ran.Load(); got != 2 { + t.Fatalf("expected both subscribers to run, got %d", got) + } +} + +type recordingProcessor struct { + mu sync.Mutex + spans []sdktrace.ReadOnlySpan +} + +func (p *recordingProcessor) OnStart(context.Context, sdktrace.ReadWriteSpan) {} +func (p *recordingProcessor) OnEnd(s sdktrace.ReadOnlySpan) { + p.mu.Lock() + defer p.mu.Unlock() + p.spans = append(p.spans, s) +} +func (p *recordingProcessor) Shutdown(context.Context) error { return nil } +func (p *recordingProcessor) ForceFlush(context.Context) error { return nil } + +func (p *recordingProcessor) findByName(name string) sdktrace.ReadOnlySpan { + p.mu.Lock() + defer p.mu.Unlock() + for _, s := range p.spans { + if s.Name() == name { + return s + } + } + return nil +} + +func TestSafeDispatchSetsSpanErrorOnPanic(t *testing.T) { + rec := &recordingProcessor{} + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)) + prev := otel.GetTracerProvider() + otel.SetTracerProvider(tp) + t.Cleanup(func() { otel.SetTracerProvider(prev) }) + + bus := New() + bus.Subscribe(topicTest, func(context.Context, Event, interface{}) { + panic("boom") + }) + + bus.Publish(context.Background(), topicTest, nil) + + deadline := time.Now().Add(time.Second) + var sub sdktrace.ReadOnlySpan + for time.Now().Before(deadline) { + _ = tp.ForceFlush(context.Background()) + sub = rec.findByName("bus.subscribe:" + string(topicTest)) + if sub != nil { + break + } + time.Sleep(5 * time.Millisecond) + } + + if sub == nil { + var names []string + for _, s := range rec.spans { + names = append(names, s.Name()) + } + t.Fatalf("did not see bus.subscribe span; got names=%v", names) + } + if got := sub.Status().Code; got != codes.Error { + t.Fatalf("expected bus.subscribe span status=Error, got %v", got) + } +} diff --git a/pkg/events/compute_allocation_diff_subscribe.go b/pkg/events/compute_allocation_diff_subscribe.go index 126aa2b4c..83c1b723c 100644 --- a/pkg/events/compute_allocation_diff_subscribe.go +++ b/pkg/events/compute_allocation_diff_subscribe.go @@ -1,13 +1,14 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" ) // ComputeAllocationDiffHandler handles compute allocation diff lifecycle events with a typed payload. -type ComputeAllocationDiffHandler func(diff models.ComputeAllocationDiff) +type ComputeAllocationDiffHandler func(ctx context.Context, diff models.ComputeAllocationDiff) // SubscribeComputeAllocationDiffCreated registers a typed handler invoked whenever a // compute_allocation_diff::create event is published. Events with payloads that are @@ -30,13 +31,13 @@ func (b *Bus) SubscribeComputeAllocationDiffDeleted(handler ComputeAllocationDif } func (b *Bus) subscribeComputeAllocationDiff(topic EventType, handler ComputeAllocationDiffHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch d := value.(type) { case models.ComputeAllocationDiff: - handler(d) + handler(ctx, d) case *models.ComputeAllocationDiff: if d != nil { - handler(*d) + handler(ctx, *d) } default: slog.Warn("compute allocation diff event payload has unexpected type", diff --git a/pkg/events/compute_allocation_membership_resource_override_subscribe.go b/pkg/events/compute_allocation_membership_resource_override_subscribe.go index ea77026fa..6a381a524 100644 --- a/pkg/events/compute_allocation_membership_resource_override_subscribe.go +++ b/pkg/events/compute_allocation_membership_resource_override_subscribe.go @@ -18,6 +18,7 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" @@ -25,7 +26,7 @@ import ( // ComputeAllocationMembershipResourceOverrideHandler handles lifecycle // events for membership resource overrides with a typed payload. -type ComputeAllocationMembershipResourceOverrideHandler func(o models.ComputeAllocationMembershipResourceOverride) +type ComputeAllocationMembershipResourceOverrideHandler func(ctx context.Context, o models.ComputeAllocationMembershipResourceOverride) // SubscribeComputeAllocationMembershipResourceOverrideCreated registers a // typed handler invoked whenever a @@ -49,13 +50,13 @@ func (b *Bus) SubscribeComputeAllocationMembershipResourceOverrideDeleted(handle } func (b *Bus) subscribeMembershipResourceOverride(topic EventType, handler ComputeAllocationMembershipResourceOverrideHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch o := value.(type) { case models.ComputeAllocationMembershipResourceOverride: - handler(o) + handler(ctx, o) case *models.ComputeAllocationMembershipResourceOverride: if o != nil { - handler(*o) + handler(ctx, *o) } default: slog.Warn("compute allocation membership resource override event payload has unexpected type", diff --git a/pkg/events/compute_allocation_membership_subscribe.go b/pkg/events/compute_allocation_membership_subscribe.go index 79c3f6cf0..13c1256e2 100644 --- a/pkg/events/compute_allocation_membership_subscribe.go +++ b/pkg/events/compute_allocation_membership_subscribe.go @@ -1,13 +1,14 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" ) // ComputeAllocationMembershipHandler handles compute allocation membership lifecycle events with a typed payload. -type ComputeAllocationMembershipHandler func(membership models.ComputeAllocationMembership) +type ComputeAllocationMembershipHandler func(ctx context.Context, membership models.ComputeAllocationMembership) // SubscribeComputeAllocationMembershipCreated registers a typed handler invoked whenever a // compute_allocation_membership::create event is published. Events with payloads that are @@ -30,13 +31,13 @@ func (b *Bus) SubscribeComputeAllocationMembershipDeleted(handler ComputeAllocat } func (b *Bus) subscribeComputeAllocationMembership(topic EventType, handler ComputeAllocationMembershipHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch m := value.(type) { case models.ComputeAllocationMembership: - handler(m) + handler(ctx, m) case *models.ComputeAllocationMembership: if m != nil { - handler(*m) + handler(ctx, *m) } default: slog.Warn("compute allocation membership event payload has unexpected type", diff --git a/pkg/events/compute_allocation_resource_mapping_subscribe.go b/pkg/events/compute_allocation_resource_mapping_subscribe.go index 25522204a..e8e6be175 100644 --- a/pkg/events/compute_allocation_resource_mapping_subscribe.go +++ b/pkg/events/compute_allocation_resource_mapping_subscribe.go @@ -1,13 +1,14 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" ) // ComputeAllocationResourceMappingHandler handles compute allocation resource mapping lifecycle events with a typed payload. -type ComputeAllocationResourceMappingHandler func(mapping models.ComputeAllocationResourceMapping) +type ComputeAllocationResourceMappingHandler func(ctx context.Context, mapping models.ComputeAllocationResourceMapping) // SubscribeComputeAllocationResourceMappingCreated registers a typed handler invoked whenever a // compute_allocation_resource_mapping::create event is published. Events with payloads that are @@ -30,13 +31,13 @@ func (b *Bus) SubscribeComputeAllocationResourceMappingDeleted(handler ComputeAl } func (b *Bus) subscribeComputeAllocationResourceMapping(topic EventType, handler ComputeAllocationResourceMappingHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch m := value.(type) { case models.ComputeAllocationResourceMapping: - handler(m) + handler(ctx, m) case *models.ComputeAllocationResourceMapping: if m != nil { - handler(*m) + handler(ctx, *m) } default: slog.Warn("compute allocation resource mapping event payload has unexpected type", diff --git a/pkg/events/compute_allocation_resource_subscribe.go b/pkg/events/compute_allocation_resource_subscribe.go index bca3308b5..e45d89072 100644 --- a/pkg/events/compute_allocation_resource_subscribe.go +++ b/pkg/events/compute_allocation_resource_subscribe.go @@ -1,13 +1,14 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" ) // ComputeAllocationResourceHandler handles compute allocation resource lifecycle events with a typed payload. -type ComputeAllocationResourceHandler func(resource models.ComputeAllocationResource) +type ComputeAllocationResourceHandler func(ctx context.Context, resource models.ComputeAllocationResource) // SubscribeComputeAllocationResourceCreated registers a typed handler invoked whenever a // compute_allocation_resource::create event is published. Events with payloads that are @@ -30,13 +31,13 @@ func (b *Bus) SubscribeComputeAllocationResourceDeleted(handler ComputeAllocatio } func (b *Bus) subscribeComputeAllocationResource(topic EventType, handler ComputeAllocationResourceHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch r := value.(type) { case models.ComputeAllocationResource: - handler(r) + handler(ctx, r) case *models.ComputeAllocationResource: if r != nil { - handler(*r) + handler(ctx, *r) } default: slog.Warn("compute allocation resource event payload has unexpected type", diff --git a/pkg/events/compute_allocation_subscribe.go b/pkg/events/compute_allocation_subscribe.go index 93fc2e3c8..93500c816 100644 --- a/pkg/events/compute_allocation_subscribe.go +++ b/pkg/events/compute_allocation_subscribe.go @@ -1,13 +1,14 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" ) // ComputeAllocationHandler handles compute allocation lifecycle events with a typed payload. -type ComputeAllocationHandler func(allocation models.ComputeAllocation) +type ComputeAllocationHandler func(ctx context.Context, allocation models.ComputeAllocation) // SubscribeComputeAllocationCreated registers a typed handler invoked whenever a // compute_allocation::create event is published. Events with payloads that are @@ -30,13 +31,13 @@ func (b *Bus) SubscribeComputeAllocationDeleted(handler ComputeAllocationHandler } func (b *Bus) subscribeComputeAllocation(topic EventType, handler ComputeAllocationHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch a := value.(type) { case models.ComputeAllocation: - handler(a) + handler(ctx, a) case *models.ComputeAllocation: if a != nil { - handler(*a) + handler(ctx, *a) } default: slog.Warn("compute allocation event payload has unexpected type", diff --git a/pkg/events/compute_cluster_user_subscribe.go b/pkg/events/compute_cluster_user_subscribe.go index 5ba7a11a7..1ef7d60d1 100644 --- a/pkg/events/compute_cluster_user_subscribe.go +++ b/pkg/events/compute_cluster_user_subscribe.go @@ -18,6 +18,7 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" @@ -25,7 +26,7 @@ import ( // ComputeClusterUserHandler handles compute-cluster user lifecycle events // with a typed payload. -type ComputeClusterUserHandler func(user models.ComputeClusterUser) +type ComputeClusterUserHandler func(ctx context.Context, user models.ComputeClusterUser) // SubscribeComputeClusterUserCreated registers a typed handler invoked // whenever a compute_cluster_user::create event is published. @@ -46,13 +47,13 @@ func (b *Bus) SubscribeComputeClusterUserDeleted(handler ComputeClusterUserHandl } func (b *Bus) subscribeComputeClusterUser(topic EventType, handler ComputeClusterUserHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch u := value.(type) { case models.ComputeClusterUser: - handler(u) + handler(ctx, u) case *models.ComputeClusterUser: if u != nil { - handler(*u) + handler(ctx, *u) } default: slog.Warn("compute cluster user event payload has unexpected type", diff --git a/pkg/events/organization_subscribe.go b/pkg/events/organization_subscribe.go index 68a6b528e..5b5674957 100644 --- a/pkg/events/organization_subscribe.go +++ b/pkg/events/organization_subscribe.go @@ -1,13 +1,14 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" ) // OrganizationHandler handles organization lifecycle events with a typed payload. -type OrganizationHandler func(organization models.Organization) +type OrganizationHandler func(ctx context.Context, organization models.Organization) // SubscribeOrganizationCreated registers a typed handler invoked whenever an // organization::create event is published. Events with payloads that are not a @@ -29,13 +30,13 @@ func (b *Bus) SubscribeOrganizationDeleted(handler OrganizationHandler) { } func (b *Bus) subscribeOrganization(topic EventType, handler OrganizationHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch o := value.(type) { case models.Organization: - handler(o) + handler(ctx, o) case *models.Organization: if o != nil { - handler(*o) + handler(ctx, *o) } default: slog.Warn("organization event payload has unexpected type", diff --git a/pkg/events/project_subscribe.go b/pkg/events/project_subscribe.go index da7cb7819..874c3c08d 100644 --- a/pkg/events/project_subscribe.go +++ b/pkg/events/project_subscribe.go @@ -1,13 +1,14 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" ) // ProjectHandler handles project lifecycle events with a typed payload. -type ProjectHandler func(project models.Project) +type ProjectHandler func(ctx context.Context, project models.Project) // SubscribeProjectCreated registers a typed handler invoked whenever a // project::create event is published. Events with payloads that are not a @@ -31,13 +32,13 @@ func (b *Bus) SubscribeProjectDeleted(handler ProjectHandler) { } func (b *Bus) subscribeProject(topic EventType, handler ProjectHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch p := value.(type) { case models.Project: - handler(p) + handler(ctx, p) case *models.Project: if p != nil { - handler(*p) + handler(ctx, *p) } default: slog.Warn("project event payload has unexpected type", diff --git a/pkg/events/types.go b/pkg/events/types.go index 4110782c6..4d9cb2b62 100644 --- a/pkg/events/types.go +++ b/pkg/events/types.go @@ -20,6 +20,7 @@ package events import ( + "context" "sync" ) @@ -126,7 +127,7 @@ type Event struct { } // EventSubscriberFunc is a function type that can be registered to receive events from the bus. -type EventSubscriberFunc func(event Event, value interface{}) +type EventSubscriberFunc func(ctx context.Context, event Event, value interface{}) // Bus is a lightweight, in-memory, topic-based pub/sub event bus. // Modules publish and subscribe by topic without knowing about each other. diff --git a/pkg/events/user_identity_subscribe.go b/pkg/events/user_identity_subscribe.go index dfbf7df47..06159522a 100644 --- a/pkg/events/user_identity_subscribe.go +++ b/pkg/events/user_identity_subscribe.go @@ -18,13 +18,14 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" ) // UserIdentityHandler handles user-identity lifecycle events with a typed payload. -type UserIdentityHandler func(identity models.UserIdentity) +type UserIdentityHandler func(ctx context.Context, identity models.UserIdentity) // SubscribeUserIdentityCreated registers a typed handler invoked whenever a // user_identity::create event is published. @@ -45,13 +46,13 @@ func (b *Bus) SubscribeUserIdentityDeleted(handler UserIdentityHandler) { } func (b *Bus) subscribeUserIdentity(topic EventType, handler UserIdentityHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch e := value.(type) { case models.UserIdentity: - handler(e) + handler(ctx, e) case *models.UserIdentity: if e != nil { - handler(*e) + handler(ctx, *e) } default: slog.Warn("user identity event payload has unexpected type", diff --git a/pkg/events/user_subscribe.go b/pkg/events/user_subscribe.go index 7c08a49c5..c9bc92b07 100644 --- a/pkg/events/user_subscribe.go +++ b/pkg/events/user_subscribe.go @@ -1,13 +1,14 @@ package events import ( + "context" "log/slog" "github.com/apache/airavata-custos/pkg/models" ) // UserHandler handles user lifecycle events with a typed payload. -type UserHandler func(user models.User) +type UserHandler func(ctx context.Context, user models.User) // SubscribeUserCreated registers a typed handler invoked whenever a // user::create event is published. Events with payloads that are not a @@ -29,13 +30,13 @@ func (b *Bus) SubscribeUserDeleted(handler UserHandler) { } func (b *Bus) subscribeUser(topic EventType, handler UserHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { + b.Subscribe(topic, func(ctx context.Context, event Event, value interface{}) { switch u := value.(type) { case models.User: - handler(u) + handler(ctx, u) case *models.User: if u != nil { - handler(*u) + handler(ctx, *u) } default: slog.Warn("user event payload has unexpected type", diff --git a/pkg/service/compute_allocation.go b/pkg/service/compute_allocation.go index 47e010a48..dbc817ea4 100644 --- a/pkg/service/compute_allocation.go +++ b/pkg/service/compute_allocation.go @@ -67,7 +67,7 @@ func (s *Service) CreateComputeAllocation(ctx context.Context, alloc *models.Com return nil, fmt.Errorf("create compute allocation: %w", err) } - s.eventBus.Publish(events.ComputeAllocationCreateEvent, alloc) + s.eventBus.Publish(ctx, events.ComputeAllocationCreateEvent, alloc) return alloc, nil } @@ -113,7 +113,7 @@ func (s *Service) UpdateComputeAllocation(ctx context.Context, alloc *models.Com return fmt.Errorf("update compute allocation: %w", err) } - s.eventBus.Publish(events.ComputeAllocationUpdateEvent, alloc) + s.eventBus.Publish(ctx, events.ComputeAllocationUpdateEvent, alloc) return nil } @@ -135,6 +135,6 @@ func (s *Service) DeleteComputeAllocation(ctx context.Context, id string) error return fmt.Errorf("delete compute allocation: %w", err) } - s.eventBus.Publish(events.ComputeAllocationDeleteEvent, alloc) + s.eventBus.Publish(ctx, events.ComputeAllocationDeleteEvent, alloc) return nil } diff --git a/pkg/service/compute_allocation_diff.go b/pkg/service/compute_allocation_diff.go index 0375e1fee..ac250b905 100644 --- a/pkg/service/compute_allocation_diff.go +++ b/pkg/service/compute_allocation_diff.go @@ -63,7 +63,7 @@ func (s *Service) CreateComputeAllocationDiff(ctx context.Context, diff *models. return nil, fmt.Errorf("create compute allocation diff: %w", err) } - s.eventBus.Publish(events.ComputeAllocationDiffCreateEvent, diff) + s.eventBus.Publish(ctx, events.ComputeAllocationDiffCreateEvent, diff) return diff, nil } @@ -129,6 +129,6 @@ func (s *Service) DeleteComputeAllocationDiff(ctx context.Context, id string) er return fmt.Errorf("delete compute allocation diff: %w", err) } - s.eventBus.Publish(events.ComputeAllocationDiffDeleteEvent, diff) + s.eventBus.Publish(ctx, events.ComputeAllocationDiffDeleteEvent, diff) return nil } diff --git a/pkg/service/compute_allocation_membership.go b/pkg/service/compute_allocation_membership.go index f61bbdc06..cbf60cc5a 100644 --- a/pkg/service/compute_allocation_membership.go +++ b/pkg/service/compute_allocation_membership.go @@ -71,7 +71,7 @@ func (s *Service) CreateComputeAllocationMembership(ctx context.Context, m *mode return nil, fmt.Errorf("create compute allocation membership: %w", err) } - s.eventBus.Publish(events.ComputeAllocationMembershipCreateEvent, m) + s.eventBus.Publish(ctx, events.ComputeAllocationMembershipCreateEvent, m) return m, nil } @@ -150,7 +150,7 @@ func (s *Service) UpdateComputeAllocationMembership(ctx context.Context, m *mode return nil, fmt.Errorf("update compute allocation membership: %w", err) } - s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, m) + s.eventBus.Publish(ctx, events.ComputeAllocationMembershipUpdateEvent, m) return m, nil } @@ -177,7 +177,7 @@ func (s *Service) UpdateMembershipStatus(ctx context.Context, id string, status return nil, fmt.Errorf("update compute allocation membership status: %w", err) } - s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, existing) + s.eventBus.Publish(ctx, events.ComputeAllocationMembershipUpdateEvent, existing) return existing, nil } @@ -199,6 +199,6 @@ func (s *Service) DeleteComputeAllocationMembership(ctx context.Context, id stri return fmt.Errorf("delete compute allocation membership: %w", err) } - s.eventBus.Publish(events.ComputeAllocationMembershipDeleteEvent, existing) + s.eventBus.Publish(ctx, events.ComputeAllocationMembershipDeleteEvent, existing) return nil } diff --git a/pkg/service/compute_allocation_membership_resource_override.go b/pkg/service/compute_allocation_membership_resource_override.go index e3148cd00..d4e6cc355 100644 --- a/pkg/service/compute_allocation_membership_resource_override.go +++ b/pkg/service/compute_allocation_membership_resource_override.go @@ -77,7 +77,7 @@ func (s *Service) CreateComputeAllocationMembershipResourceOverride(ctx context. return nil, fmt.Errorf("create membership resource override: %w", err) } - s.eventBus.Publish(events.ComputeAllocationMembershipResourceOverrideCreateEvent, o) + s.eventBus.Publish(ctx, events.ComputeAllocationMembershipResourceOverrideCreateEvent, o) return o, nil } @@ -170,7 +170,7 @@ func (s *Service) UpdateComputeAllocationMembershipResourceOverride(ctx context. return nil, fmt.Errorf("update membership resource override: %w", err) } - s.eventBus.Publish(events.ComputeAllocationMembershipResourceOverrideUpdateEvent, o) + s.eventBus.Publish(ctx, events.ComputeAllocationMembershipResourceOverrideUpdateEvent, o) return o, nil } @@ -192,6 +192,6 @@ func (s *Service) DeleteComputeAllocationMembershipResourceOverride(ctx context. return fmt.Errorf("delete membership resource override: %w", err) } - s.eventBus.Publish(events.ComputeAllocationMembershipResourceOverrideDeleteEvent, existing) + s.eventBus.Publish(ctx, events.ComputeAllocationMembershipResourceOverrideDeleteEvent, existing) return nil } diff --git a/pkg/service/compute_allocation_resource.go b/pkg/service/compute_allocation_resource.go index 4efb28b51..f8ee2c182 100644 --- a/pkg/service/compute_allocation_resource.go +++ b/pkg/service/compute_allocation_resource.go @@ -48,7 +48,7 @@ func (s *Service) CreateComputeAllocationResource(ctx context.Context, resource return nil, fmt.Errorf("create compute allocation resource: %w", err) } - s.eventBus.Publish(events.ComputeAllocationResourceCreateEvent, resource) + s.eventBus.Publish(ctx, events.ComputeAllocationResourceCreateEvent, resource) return resource, nil } @@ -85,7 +85,7 @@ func (s *Service) UpdateComputeAllocationResource(ctx context.Context, resource return fmt.Errorf("update compute allocation resource: %w", err) } - s.eventBus.Publish(events.ComputeAllocationResourceUpdateEvent, resource) + s.eventBus.Publish(ctx, events.ComputeAllocationResourceUpdateEvent, resource) return nil } @@ -107,6 +107,6 @@ func (s *Service) DeleteComputeAllocationResource(ctx context.Context, id string return fmt.Errorf("delete compute allocation resource: %w", err) } - s.eventBus.Publish(events.ComputeAllocationResourceDeleteEvent, resource) + s.eventBus.Publish(ctx, events.ComputeAllocationResourceDeleteEvent, resource) return nil } diff --git a/pkg/service/compute_allocation_resource_mapping.go b/pkg/service/compute_allocation_resource_mapping.go index ce5f4ef18..e6d154ec0 100644 --- a/pkg/service/compute_allocation_resource_mapping.go +++ b/pkg/service/compute_allocation_resource_mapping.go @@ -76,7 +76,7 @@ func (s *Service) AttachResourceToAllocation(ctx context.Context, allocationID, return nil, fmt.Errorf("attach resource to allocation: %w", err) } - s.eventBus.Publish(events.ComputeAllocationResourceMappingCreateEvent, mapping) + s.eventBus.Publish(ctx, events.ComputeAllocationResourceMappingCreateEvent, mapping) return mapping, nil } @@ -109,7 +109,7 @@ func (s *Service) UpdateAllocationResourceMapping(ctx context.Context, allocatio return nil, fmt.Errorf("update allocation resource mapping: %w", err) } - s.eventBus.Publish(events.ComputeAllocationResourceMappingUpdateEvent, existing) + s.eventBus.Publish(ctx, events.ComputeAllocationResourceMappingUpdateEvent, existing) return existing, nil } @@ -135,7 +135,7 @@ func (s *Service) DetachResourceFromAllocation(ctx context.Context, allocationID return fmt.Errorf("detach resource from allocation: %w", err) } - s.eventBus.Publish(events.ComputeAllocationResourceMappingDeleteEvent, existing) + s.eventBus.Publish(ctx, events.ComputeAllocationResourceMappingDeleteEvent, existing) return nil } diff --git a/pkg/service/compute_cluster.go b/pkg/service/compute_cluster.go index a9d5b3091..382f22efc 100644 --- a/pkg/service/compute_cluster.go +++ b/pkg/service/compute_cluster.go @@ -51,7 +51,7 @@ func (s *Service) CreateComputeCluster(ctx context.Context, cluster *models.Comp return nil, fmt.Errorf("create compute cluster: %w", err) } - s.eventBus.Publish(events.ComputeClusterCreateEvent, cluster) + s.eventBus.Publish(ctx, events.ComputeClusterCreateEvent, cluster) return cluster, nil } @@ -103,7 +103,7 @@ func (s *Service) UpdateComputeCluster(ctx context.Context, cluster *models.Comp return fmt.Errorf("update compute cluster: %w", err) } - s.eventBus.Publish(events.ComputeClusterUpdateEvent, cluster) + s.eventBus.Publish(ctx, events.ComputeClusterUpdateEvent, cluster) return nil } @@ -125,6 +125,6 @@ func (s *Service) DeleteComputeCluster(ctx context.Context, id string) error { return fmt.Errorf("delete compute cluster: %w", err) } - s.eventBus.Publish(events.ComputeClusterDeleteEvent, cluster) + s.eventBus.Publish(ctx, events.ComputeClusterDeleteEvent, cluster) return nil } diff --git a/pkg/service/compute_cluster_user.go b/pkg/service/compute_cluster_user.go index 0aeca1f27..e97d33a8c 100644 --- a/pkg/service/compute_cluster_user.go +++ b/pkg/service/compute_cluster_user.go @@ -79,7 +79,7 @@ func (s *Service) CreateComputeClusterUser(ctx context.Context, cu *models.Compu } } - s.eventBus.Publish(events.ComputeClusterUserCreateEvent, cu) + s.eventBus.Publish(ctx, events.ComputeClusterUserCreateEvent, cu) return cu, nil } @@ -177,7 +177,7 @@ func (s *Service) UpdateComputeClusterUser(ctx context.Context, cu *models.Compu return fmt.Errorf("update compute cluster user: %w", err) } - s.eventBus.Publish(events.ComputeClusterUserUpdateEvent, cu) + s.eventBus.Publish(ctx, events.ComputeClusterUserUpdateEvent, cu) return nil } @@ -199,6 +199,6 @@ func (s *Service) DeleteComputeClusterUser(ctx context.Context, id string) error return fmt.Errorf("delete compute cluster user: %w", err) } - s.eventBus.Publish(events.ComputeClusterUserDeleteEvent, cu) + s.eventBus.Publish(ctx, events.ComputeClusterUserDeleteEvent, cu) return nil } diff --git a/pkg/service/organization.go b/pkg/service/organization.go index 50effd1e7..15bb88925 100644 --- a/pkg/service/organization.go +++ b/pkg/service/organization.go @@ -53,7 +53,7 @@ func (s *Service) CreateOrganization(ctx context.Context, org *models.Organizati return nil, fmt.Errorf("create organization: %w", err) } - s.eventBus.Publish(events.OrganizationCreateEvent, org) + s.eventBus.Publish(ctx, events.OrganizationCreateEvent, org) return org, nil } @@ -93,7 +93,7 @@ func (s *Service) UpdateOrganization(ctx context.Context, org *models.Organizati return fmt.Errorf("update organization: %w", err) } - s.eventBus.Publish(events.OrganizationUpdateEvent, org) + s.eventBus.Publish(ctx, events.OrganizationUpdateEvent, org) return nil } @@ -115,6 +115,6 @@ func (s *Service) DeleteOrganization(ctx context.Context, id string) error { return fmt.Errorf("delete organization: %w", err) } - s.eventBus.Publish(events.OrganizationDeleteEvent, org) + s.eventBus.Publish(ctx, events.OrganizationDeleteEvent, org) return nil } diff --git a/pkg/service/project.go b/pkg/service/project.go index 338542473..53d57617f 100644 --- a/pkg/service/project.go +++ b/pkg/service/project.go @@ -69,7 +69,7 @@ func (s *Service) CreateProject(ctx context.Context, project *models.Project) (* return nil, fmt.Errorf("create project: %w", err) } - s.eventBus.Publish(events.ProjectCreateEvent, project) + s.eventBus.Publish(ctx, events.ProjectCreateEvent, project) return project, nil } @@ -143,7 +143,7 @@ func (s *Service) UpdateProject(ctx context.Context, project *models.Project) er return fmt.Errorf("update project: %w", err) } - s.eventBus.Publish(events.ProjectUpdateEvent, project) + s.eventBus.Publish(ctx, events.ProjectUpdateEvent, project) return nil } @@ -170,7 +170,7 @@ func (s *Service) UpdateProjectStatus(ctx context.Context, id string, status mod } existing.Status = status - s.eventBus.Publish(events.ProjectUpdateEvent, existing) + s.eventBus.Publish(ctx, events.ProjectUpdateEvent, existing) return existing, nil } @@ -192,6 +192,6 @@ func (s *Service) DeleteProject(ctx context.Context, id string) error { return fmt.Errorf("delete project: %w", err) } - s.eventBus.Publish(events.ProjectDeleteEvent, project) + s.eventBus.Publish(ctx, events.ProjectDeleteEvent, project) return nil } diff --git a/pkg/service/user.go b/pkg/service/user.go index adc7f6a60..1da52c807 100644 --- a/pkg/service/user.go +++ b/pkg/service/user.go @@ -64,7 +64,7 @@ func (s *Service) CreateUser(ctx context.Context, user *models.User) (*models.Us return nil, fmt.Errorf("create user: %w", err) } - s.eventBus.Publish(events.UserCreateEvent, user) + s.eventBus.Publish(ctx, events.UserCreateEvent, user) return user, nil } @@ -167,7 +167,7 @@ func (s *Service) UpdateUser(ctx context.Context, user *models.User) error { return fmt.Errorf("update user: %w", err) } - s.eventBus.Publish(events.UserUpdateEvent, user) + s.eventBus.Publish(ctx, events.UserUpdateEvent, user) return nil } @@ -194,7 +194,7 @@ func (s *Service) UpdateUserStatus(ctx context.Context, id string, status models } existing.Status = status - s.eventBus.Publish(events.UserUpdateEvent, existing) + s.eventBus.Publish(ctx, events.UserUpdateEvent, existing) return existing, nil } @@ -216,6 +216,6 @@ func (s *Service) DeleteUser(ctx context.Context, id string) error { return fmt.Errorf("delete user: %w", err) } - s.eventBus.Publish(events.UserDeleteEvent, existing) + s.eventBus.Publish(ctx, events.UserDeleteEvent, existing) return nil } diff --git a/pkg/service/user_identity.go b/pkg/service/user_identity.go index e9d9a7ce7..2fc218a55 100644 --- a/pkg/service/user_identity.go +++ b/pkg/service/user_identity.go @@ -64,7 +64,7 @@ func (s *Service) CreateUserIdentity(ctx context.Context, e *models.UserIdentity return nil, fmt.Errorf("create user identity: %w", err) } - s.eventBus.Publish(events.UserIdentityCreateEvent, e) + s.eventBus.Publish(ctx, events.UserIdentityCreateEvent, e) return e, nil } @@ -163,7 +163,7 @@ func (s *Service) UpdateUserIdentity(ctx context.Context, e *models.UserIdentity return fmt.Errorf("update user identity: %w", err) } - s.eventBus.Publish(events.UserIdentityUpdateEvent, e) + s.eventBus.Publish(ctx, events.UserIdentityUpdateEvent, e) return nil } @@ -185,6 +185,6 @@ func (s *Service) DeleteUserIdentity(ctx context.Context, id string) error { return fmt.Errorf("delete user identity: %w", err) } - s.eventBus.Publish(events.UserIdentityDeleteEvent, e) + s.eventBus.Publish(ctx, events.UserIdentityDeleteEvent, e) return nil } diff --git a/pkg/service/user_merge.go b/pkg/service/user_merge.go index f0a65263d..083d63f91 100644 --- a/pkg/service/user_merge.go +++ b/pkg/service/user_merge.go @@ -97,7 +97,7 @@ func (s *Service) MergeUsers(ctx context.Context, survivingID, retiringID string } retiring.Status = models.UserMerged - s.eventBus.Publish(events.UserUpdateEvent, retiring) - s.eventBus.Publish(events.UserUpdateEvent, survivor) + s.eventBus.Publish(ctx, events.UserUpdateEvent, retiring) + s.eventBus.Publish(ctx, events.UserUpdateEvent, survivor) return survivor, nil }
