This is an automated email from the ASF dual-hosted git repository. DImuthuUpe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit ceb9d4d9c72ccce421d3e67f541375218ae584c4 Author: DImuthuUpe <[email protected]> AuthorDate: Sun May 17 20:23:28 2026 -0400 Added subscription layer for connectors to receive allocation messages --- cmd/server/main.go | 6 +- pkg/events/bus.go | 44 ++++++++ pkg/events/compute_allocation_diff_subscribe.go | 48 +++++++++ .../compute_allocation_membership_subscribe.go | 48 +++++++++ ...ompute_allocation_resource_mapping_subscribe.go | 48 +++++++++ .../compute_allocation_resource_subscribe.go | 48 +++++++++ pkg/events/compute_allocation_subscribe.go | 48 +++++++++ pkg/events/organization_subscribe.go | 47 +++++++++ pkg/events/project_subscribe.go | 49 +++++++++ pkg/events/types.go | 115 +++++++++++++++++++++ pkg/events/user_subscribe.go | 47 +++++++++ pkg/service/compute_allocation.go | 14 +++ pkg/service/compute_allocation_diff.go | 12 +++ pkg/service/compute_allocation_membership.go | 11 ++ pkg/service/compute_allocation_resource.go | 14 +++ pkg/service/compute_allocation_resource_mapping.go | 5 + pkg/service/compute_cluster.go | 14 +++ pkg/service/organization.go | 14 +++ pkg/service/project.go | 14 +++ pkg/service/service.go | 7 +- 20 files changed, 651 insertions(+), 2 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 08f9dd748..978d5cc92 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -31,6 +31,7 @@ import ( "github.com/apache/airavata-custos/internal/db" "github.com/apache/airavata-custos/internal/server" + "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/service" ) @@ -68,7 +69,10 @@ func run() error { return err } - svc := service.New(database) + // Create a new event bus instance to async messaging between service and connectors + eventBus := events.New() + + svc := service.New(database, eventBus) handler := server.LoggingMiddleware(server.New(svc)) httpServer := &http.Server{ diff --git a/pkg/events/bus.go b/pkg/events/bus.go new file mode 100644 index 000000000..e2e419b25 --- /dev/null +++ b/pkg/events/bus.go @@ -0,0 +1,44 @@ +package events + +func New() *Bus { + return &Bus{ + subs: make(map[string][]EventSubscriberFunc), + } +} + +// Subscribe registers a handler for a given topic. +// The handler is called asynchronously (in a new goroutine) each time +// an event is published on that topic. +func (b *Bus) Subscribe(topic EventType, handler EventSubscriberFunc) { + b.mu.Lock() + defer b.mu.Unlock() + b.subs[string(topic)] = append(b.subs[string(topic)], handler) +} + +// Publish sends an event to all subscribers of the given topic. +// Each handler runs in its own goroutine so publishers never block. +func (b *Bus) Publish(topic EventType, payload any) { + b.mu.RLock() + handlers := make([]EventSubscriberFunc, len(b.subs[string(topic)])) + copy(handlers, b.subs[string(topic)]) + b.mu.RUnlock() + + event := Event{Type: topic, Payload: payload} + for _, h := range handlers { + go h(event, payload) + } +} + +// PublishSync is like Publish but calls handlers in the caller's goroutine. +// Useful when you need to guarantee ordering or want backpressure. +func (b *Bus) PublishSync(topic EventType, payload any) { + b.mu.RLock() + handlers := make([]EventSubscriberFunc, len(b.subs[string(topic)])) + copy(handlers, b.subs[string(topic)]) + b.mu.RUnlock() + + event := Event{Type: topic, Payload: payload} + for _, h := range handlers { + h(event, payload) + } +} diff --git a/pkg/events/compute_allocation_diff_subscribe.go b/pkg/events/compute_allocation_diff_subscribe.go new file mode 100644 index 000000000..126aa2b4c --- /dev/null +++ b/pkg/events/compute_allocation_diff_subscribe.go @@ -0,0 +1,48 @@ +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// ComputeAllocationDiffHandler handles compute allocation diff lifecycle events with a typed payload. +type ComputeAllocationDiffHandler func(diff models.ComputeAllocationDiff) + +// SubscribeComputeAllocationDiffCreated registers a typed handler invoked whenever a +// compute_allocation_diff::create event is published. Events with payloads that are +// not a models.ComputeAllocationDiff (or *models.ComputeAllocationDiff) are dropped +// with a warning log. +func (b *Bus) SubscribeComputeAllocationDiffCreated(handler ComputeAllocationDiffHandler) { + b.subscribeComputeAllocationDiff(ComputeAllocationDiffCreateEvent, handler) +} + +// SubscribeComputeAllocationDiffUpdated registers a typed handler invoked whenever a +// compute_allocation_diff::update event is published. +func (b *Bus) SubscribeComputeAllocationDiffUpdated(handler ComputeAllocationDiffHandler) { + b.subscribeComputeAllocationDiff(ComputeAllocationDiffUpdateEvent, handler) +} + +// SubscribeComputeAllocationDiffDeleted registers a typed handler invoked whenever a +// compute_allocation_diff::delete event is published. +func (b *Bus) SubscribeComputeAllocationDiffDeleted(handler ComputeAllocationDiffHandler) { + b.subscribeComputeAllocationDiff(ComputeAllocationDiffDeleteEvent, handler) +} + +func (b *Bus) subscribeComputeAllocationDiff(topic EventType, handler ComputeAllocationDiffHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch d := value.(type) { + case models.ComputeAllocationDiff: + handler(d) + case *models.ComputeAllocationDiff: + if d != nil { + handler(*d) + } + default: + slog.Warn("compute allocation diff event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/compute_allocation_membership_subscribe.go b/pkg/events/compute_allocation_membership_subscribe.go new file mode 100644 index 000000000..79c3f6cf0 --- /dev/null +++ b/pkg/events/compute_allocation_membership_subscribe.go @@ -0,0 +1,48 @@ +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// ComputeAllocationMembershipHandler handles compute allocation membership lifecycle events with a typed payload. +type ComputeAllocationMembershipHandler func(membership models.ComputeAllocationMembership) + +// SubscribeComputeAllocationMembershipCreated registers a typed handler invoked whenever a +// compute_allocation_membership::create event is published. Events with payloads that are +// not a models.ComputeAllocationMembership (or *models.ComputeAllocationMembership) are +// dropped with a warning log. +func (b *Bus) SubscribeComputeAllocationMembershipCreated(handler ComputeAllocationMembershipHandler) { + b.subscribeComputeAllocationMembership(ComputeAllocationMembershipCreateEvent, handler) +} + +// SubscribeComputeAllocationMembershipUpdated registers a typed handler invoked whenever a +// compute_allocation_membership::update event is published. +func (b *Bus) SubscribeComputeAllocationMembershipUpdated(handler ComputeAllocationMembershipHandler) { + b.subscribeComputeAllocationMembership(ComputeAllocationMembershipUpdateEvent, handler) +} + +// SubscribeComputeAllocationMembershipDeleted registers a typed handler invoked whenever a +// compute_allocation_membership::delete event is published. +func (b *Bus) SubscribeComputeAllocationMembershipDeleted(handler ComputeAllocationMembershipHandler) { + b.subscribeComputeAllocationMembership(ComputeAllocationMembershipDeleteEvent, handler) +} + +func (b *Bus) subscribeComputeAllocationMembership(topic EventType, handler ComputeAllocationMembershipHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch m := value.(type) { + case models.ComputeAllocationMembership: + handler(m) + case *models.ComputeAllocationMembership: + if m != nil { + handler(*m) + } + default: + slog.Warn("compute allocation membership event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/compute_allocation_resource_mapping_subscribe.go b/pkg/events/compute_allocation_resource_mapping_subscribe.go new file mode 100644 index 000000000..25522204a --- /dev/null +++ b/pkg/events/compute_allocation_resource_mapping_subscribe.go @@ -0,0 +1,48 @@ +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// ComputeAllocationResourceMappingHandler handles compute allocation resource mapping lifecycle events with a typed payload. +type ComputeAllocationResourceMappingHandler func(mapping models.ComputeAllocationResourceMapping) + +// SubscribeComputeAllocationResourceMappingCreated registers a typed handler invoked whenever a +// compute_allocation_resource_mapping::create event is published. Events with payloads that are +// not a models.ComputeAllocationResourceMapping (or *models.ComputeAllocationResourceMapping) are +// dropped with a warning log. +func (b *Bus) SubscribeComputeAllocationResourceMappingCreated(handler ComputeAllocationResourceMappingHandler) { + b.subscribeComputeAllocationResourceMapping(ComputeAllocationResourceMappingCreateEvent, handler) +} + +// SubscribeComputeAllocationResourceMappingUpdated registers a typed handler invoked whenever a +// compute_allocation_resource_mapping::update event is published. +func (b *Bus) SubscribeComputeAllocationResourceMappingUpdated(handler ComputeAllocationResourceMappingHandler) { + b.subscribeComputeAllocationResourceMapping(ComputeAllocationResourceMappingUpdateEvent, handler) +} + +// SubscribeComputeAllocationResourceMappingDeleted registers a typed handler invoked whenever a +// compute_allocation_resource_mapping::delete event is published. +func (b *Bus) SubscribeComputeAllocationResourceMappingDeleted(handler ComputeAllocationResourceMappingHandler) { + b.subscribeComputeAllocationResourceMapping(ComputeAllocationResourceMappingDeleteEvent, handler) +} + +func (b *Bus) subscribeComputeAllocationResourceMapping(topic EventType, handler ComputeAllocationResourceMappingHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch m := value.(type) { + case models.ComputeAllocationResourceMapping: + handler(m) + case *models.ComputeAllocationResourceMapping: + if m != nil { + handler(*m) + } + default: + slog.Warn("compute allocation resource mapping event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/compute_allocation_resource_subscribe.go b/pkg/events/compute_allocation_resource_subscribe.go new file mode 100644 index 000000000..bca3308b5 --- /dev/null +++ b/pkg/events/compute_allocation_resource_subscribe.go @@ -0,0 +1,48 @@ +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// ComputeAllocationResourceHandler handles compute allocation resource lifecycle events with a typed payload. +type ComputeAllocationResourceHandler func(resource models.ComputeAllocationResource) + +// SubscribeComputeAllocationResourceCreated registers a typed handler invoked whenever a +// compute_allocation_resource::create event is published. Events with payloads that are +// not a models.ComputeAllocationResource (or *models.ComputeAllocationResource) are dropped +// with a warning log. +func (b *Bus) SubscribeComputeAllocationResourceCreated(handler ComputeAllocationResourceHandler) { + b.subscribeComputeAllocationResource(ComputeAllocationResourceCreateEvent, handler) +} + +// SubscribeComputeAllocationResourceUpdated registers a typed handler invoked whenever a +// compute_allocation_resource::update event is published. +func (b *Bus) SubscribeComputeAllocationResourceUpdated(handler ComputeAllocationResourceHandler) { + b.subscribeComputeAllocationResource(ComputeAllocationResourceUpdateEvent, handler) +} + +// SubscribeComputeAllocationResourceDeleted registers a typed handler invoked whenever a +// compute_allocation_resource::delete event is published. +func (b *Bus) SubscribeComputeAllocationResourceDeleted(handler ComputeAllocationResourceHandler) { + b.subscribeComputeAllocationResource(ComputeAllocationResourceDeleteEvent, handler) +} + +func (b *Bus) subscribeComputeAllocationResource(topic EventType, handler ComputeAllocationResourceHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch r := value.(type) { + case models.ComputeAllocationResource: + handler(r) + case *models.ComputeAllocationResource: + if r != nil { + handler(*r) + } + default: + slog.Warn("compute allocation resource event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/compute_allocation_subscribe.go b/pkg/events/compute_allocation_subscribe.go new file mode 100644 index 000000000..93fc2e3c8 --- /dev/null +++ b/pkg/events/compute_allocation_subscribe.go @@ -0,0 +1,48 @@ +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// ComputeAllocationHandler handles compute allocation lifecycle events with a typed payload. +type ComputeAllocationHandler func(allocation models.ComputeAllocation) + +// SubscribeComputeAllocationCreated registers a typed handler invoked whenever a +// compute_allocation::create event is published. Events with payloads that are +// not a models.ComputeAllocation (or *models.ComputeAllocation) are dropped +// with a warning log. +func (b *Bus) SubscribeComputeAllocationCreated(handler ComputeAllocationHandler) { + b.subscribeComputeAllocation(ComputeAllocationCreateEvent, handler) +} + +// SubscribeComputeAllocationUpdated registers a typed handler invoked whenever a +// compute_allocation::update event is published. +func (b *Bus) SubscribeComputeAllocationUpdated(handler ComputeAllocationHandler) { + b.subscribeComputeAllocation(ComputeAllocationUpdateEvent, handler) +} + +// SubscribeComputeAllocationDeleted registers a typed handler invoked whenever a +// compute_allocation::delete event is published. +func (b *Bus) SubscribeComputeAllocationDeleted(handler ComputeAllocationHandler) { + b.subscribeComputeAllocation(ComputeAllocationDeleteEvent, handler) +} + +func (b *Bus) subscribeComputeAllocation(topic EventType, handler ComputeAllocationHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch a := value.(type) { + case models.ComputeAllocation: + handler(a) + case *models.ComputeAllocation: + if a != nil { + handler(*a) + } + default: + slog.Warn("compute allocation event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/organization_subscribe.go b/pkg/events/organization_subscribe.go new file mode 100644 index 000000000..68a6b528e --- /dev/null +++ b/pkg/events/organization_subscribe.go @@ -0,0 +1,47 @@ +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// OrganizationHandler handles organization lifecycle events with a typed payload. +type OrganizationHandler func(organization models.Organization) + +// SubscribeOrganizationCreated registers a typed handler invoked whenever an +// organization::create event is published. Events with payloads that are not a +// models.Organization (or *models.Organization) are dropped with a warning log. +func (b *Bus) SubscribeOrganizationCreated(handler OrganizationHandler) { + b.subscribeOrganization(OrganizationCreateEvent, handler) +} + +// SubscribeOrganizationUpdated registers a typed handler invoked whenever an +// organization::update event is published. +func (b *Bus) SubscribeOrganizationUpdated(handler OrganizationHandler) { + b.subscribeOrganization(OrganizationUpdateEvent, handler) +} + +// SubscribeOrganizationDeleted registers a typed handler invoked whenever an +// organization::delete event is published. +func (b *Bus) SubscribeOrganizationDeleted(handler OrganizationHandler) { + b.subscribeOrganization(OrganizationDeleteEvent, handler) +} + +func (b *Bus) subscribeOrganization(topic EventType, handler OrganizationHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch o := value.(type) { + case models.Organization: + handler(o) + case *models.Organization: + if o != nil { + handler(*o) + } + default: + slog.Warn("organization event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/project_subscribe.go b/pkg/events/project_subscribe.go new file mode 100644 index 000000000..da7cb7819 --- /dev/null +++ b/pkg/events/project_subscribe.go @@ -0,0 +1,49 @@ +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// ProjectHandler handles project lifecycle events with a typed payload. +type ProjectHandler func(project models.Project) + +// SubscribeProjectCreated registers a typed handler invoked whenever a +// project::create event is published. Events with payloads that are not a +// models.Project (or *models.Project) are dropped with a warning log. +func (b *Bus) SubscribeProjectCreated(handler ProjectHandler) { + b.subscribeProject(ProjectCreateEvent, handler) +} + +// SubscribeProjectUpdated registers a typed handler invoked whenever a +// project::update event is published. Events with payloads that are not a +// models.Project (or *models.Project) are dropped with a warning log. +func (b *Bus) SubscribeProjectUpdated(handler ProjectHandler) { + b.subscribeProject(ProjectUpdateEvent, handler) +} + +// SubscribeProjectDeleted registers a typed handler invoked whenever a +// project::delete event is published. Events with payloads that are not a +// models.Project (or *models.Project) are dropped with a warning log. +func (b *Bus) SubscribeProjectDeleted(handler ProjectHandler) { + b.subscribeProject(ProjectDeleteEvent, handler) +} + +func (b *Bus) subscribeProject(topic EventType, handler ProjectHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch p := value.(type) { + case models.Project: + handler(p) + case *models.Project: + if p != nil { + handler(*p) + } + default: + slog.Warn("project event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/events/types.go b/pkg/events/types.go new file mode 100644 index 000000000..11c900e94 --- /dev/null +++ b/pkg/events/types.go @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package events defines message types emitted by the service for +// downstream consumers (audit, projections, integrations). +package events + +import ( + "sync" +) + +// EventType identifies the kind of event carried on the bus. +type EventType string + +// Project lifecycle message types. +const ( + ProjectCreateEvent EventType = "project::create" + ProjectUpdateEvent EventType = "project::update" + ProjectDeleteEvent EventType = "project::delete" +) + +// User lifecycle message types. +const ( + UserCreateEvent EventType = "user::create" + UserUpdateEvent EventType = "user::update" + UserDeleteEvent EventType = "user::delete" +) + +// Organization lifecycle message types. +const ( + OrganizationCreateEvent EventType = "organization::create" + OrganizationUpdateEvent EventType = "organization::update" + OrganizationDeleteEvent EventType = "organization::delete" +) + +// ComputeCluster lifecycle message types. +const ( + ComputeClusterCreateEvent EventType = "compute_cluster::create" + ComputeClusterUpdateEvent EventType = "compute_cluster::update" + ComputeClusterDeleteEvent EventType = "compute_cluster::delete" +) + +// ClusterAccount lifecycle message types. +const ( + ClusterAccountCreateEvent EventType = "cluster_account::create" + ClusterAccountUpdateEvent EventType = "cluster_account::update" + ClusterAccountDeleteEvent EventType = "cluster_account::delete" +) + +// ComputeAllocation lifecycle message types. +const ( + ComputeAllocationCreateEvent EventType = "compute_allocation::create" + ComputeAllocationUpdateEvent EventType = "compute_allocation::update" + ComputeAllocationDeleteEvent EventType = "compute_allocation::delete" +) + +// CreateComputeAllocationDiff lifecycle message types. +const ( + ComputeAllocationDiffCreateEvent EventType = "compute_allocation_diff::create" + ComputeAllocationDiffUpdateEvent EventType = "compute_allocation_diff::update" + ComputeAllocationDiffDeleteEvent EventType = "compute_allocation_diff::delete" +) + +// ComputeAllocationResource lifecycle message types. +const ( + ComputeAllocationResourceCreateEvent EventType = "compute_allocation_resource::create" + ComputeAllocationResourceUpdateEvent EventType = "compute_allocation_resource::update" + ComputeAllocationResourceDeleteEvent EventType = "compute_allocation_resource::delete" +) + +// ComputeAllocationMembership lifecycle message types. +const ( + ComputeAllocationMembershipCreateEvent EventType = "compute_allocation_membership::create" + ComputeAllocationMembershipUpdateEvent EventType = "compute_allocation_membership::update" + ComputeAllocationMembershipDeleteEvent EventType = "compute_allocation_membership::delete" +) + +// ComputeAllocationResourceMapping lifecycle message types. +const ( + ComputeAllocationResourceMappingCreateEvent EventType = "compute_allocation_resource_mapping::create" + ComputeAllocationResourceMappingUpdateEvent EventType = "compute_allocation_resource_mapping::update" + ComputeAllocationResourceMappingDeleteEvent EventType = "compute_allocation_resource_mapping::delete" +) + +// Event represents a change in the system that downstream consumers may be interested in. +// The payload is the full record after the change (e.g. the +// new state of a project after an update). +type Event struct { + Type EventType `json:"type"` + Payload interface{} `json:"payload"` +} + +// EventSubscriberFunc is a function type that can be registered to receive events from the bus. +type EventSubscriberFunc func(event Event, value interface{}) + +// Bus is a lightweight, in-memory, topic-based pub/sub event bus. +// Modules publish and subscribe by topic without knowing about each other. +type Bus struct { + mu sync.RWMutex + subs map[string][]EventSubscriberFunc +} diff --git a/pkg/events/user_subscribe.go b/pkg/events/user_subscribe.go new file mode 100644 index 000000000..7c08a49c5 --- /dev/null +++ b/pkg/events/user_subscribe.go @@ -0,0 +1,47 @@ +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// UserHandler handles user lifecycle events with a typed payload. +type UserHandler func(user models.User) + +// SubscribeUserCreated registers a typed handler invoked whenever a +// user::create event is published. Events with payloads that are not a +// models.User (or *models.User) are dropped with a warning log. +func (b *Bus) SubscribeUserCreated(handler UserHandler) { + b.subscribeUser(UserCreateEvent, handler) +} + +// SubscribeUserUpdated registers a typed handler invoked whenever a +// user::update event is published. +func (b *Bus) SubscribeUserUpdated(handler UserHandler) { + b.subscribeUser(UserUpdateEvent, handler) +} + +// SubscribeUserDeleted registers a typed handler invoked whenever a +// user::delete event is published. +func (b *Bus) SubscribeUserDeleted(handler UserHandler) { + b.subscribeUser(UserDeleteEvent, handler) +} + +func (b *Bus) subscribeUser(topic EventType, handler UserHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch u := value.(type) { + case models.User: + handler(u) + case *models.User: + if u != nil { + handler(*u) + } + default: + slog.Warn("user event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/service/compute_allocation.go b/pkg/service/compute_allocation.go index e51a624e4..47e010a48 100644 --- a/pkg/service/compute_allocation.go +++ b/pkg/service/compute_allocation.go @@ -22,6 +22,7 @@ import ( "database/sql" "fmt" + "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/models" ) @@ -65,6 +66,8 @@ func (s *Service) CreateComputeAllocation(ctx context.Context, alloc *models.Com }); err != nil { return nil, fmt.Errorf("create compute allocation: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationCreateEvent, alloc) return alloc, nil } @@ -109,6 +112,8 @@ func (s *Service) UpdateComputeAllocation(ctx context.Context, alloc *models.Com }); err != nil { return fmt.Errorf("update compute allocation: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationUpdateEvent, alloc) return nil } @@ -117,10 +122,19 @@ func (s *Service) DeleteComputeAllocation(ctx context.Context, id string) error if id == "" { return fmt.Errorf("%w: compute allocation id is required", ErrInvalidInput) } + alloc, err := s.allocs.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup compute allocation: %w", err) + } + if alloc == nil { + return ErrNotFound + } if err := s.inTx(ctx, func(tx *sql.Tx) error { return s.allocs.Delete(ctx, tx, id) }); err != nil { return fmt.Errorf("delete compute allocation: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationDeleteEvent, alloc) return nil } diff --git a/pkg/service/compute_allocation_diff.go b/pkg/service/compute_allocation_diff.go index 49a414bf2..0375e1fee 100644 --- a/pkg/service/compute_allocation_diff.go +++ b/pkg/service/compute_allocation_diff.go @@ -22,6 +22,7 @@ import ( "database/sql" "fmt" + "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/models" ) @@ -61,6 +62,8 @@ func (s *Service) CreateComputeAllocationDiff(ctx context.Context, diff *models. }); err != nil { return nil, fmt.Errorf("create compute allocation diff: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationDiffCreateEvent, diff) return diff, nil } @@ -113,10 +116,19 @@ func (s *Service) DeleteComputeAllocationDiff(ctx context.Context, id string) er if id == "" { return fmt.Errorf("%w: compute allocation diff id is required", ErrInvalidInput) } + diff, err := s.allocDiffs.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup compute allocation diff: %w", err) + } + if diff == nil { + return ErrNotFound + } if err := s.inTx(ctx, func(tx *sql.Tx) error { return s.allocDiffs.Delete(ctx, tx, id) }); err != nil { return fmt.Errorf("delete compute allocation diff: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationDiffDeleteEvent, diff) return nil } diff --git a/pkg/service/compute_allocation_membership.go b/pkg/service/compute_allocation_membership.go index b010224c5..9d6e0056d 100644 --- a/pkg/service/compute_allocation_membership.go +++ b/pkg/service/compute_allocation_membership.go @@ -22,6 +22,7 @@ import ( "database/sql" "fmt" + "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/models" ) @@ -69,6 +70,8 @@ func (s *Service) CreateComputeAllocationMembership(ctx context.Context, m *mode }); err != nil { return nil, fmt.Errorf("create compute allocation membership: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationMembershipCreateEvent, m) return m, nil } @@ -146,6 +149,8 @@ func (s *Service) UpdateComputeAllocationMembership(ctx context.Context, m *mode }); err != nil { return nil, fmt.Errorf("update compute allocation membership: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, m) return m, nil } @@ -171,6 +176,8 @@ func (s *Service) UpdateMembershipAllocationAmount(ctx context.Context, id strin }); err != nil { return nil, fmt.Errorf("update compute allocation membership amount: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, existing) return existing, nil } @@ -196,6 +203,8 @@ func (s *Service) UpdateMembershipStatus(ctx context.Context, id string, status }); err != nil { return nil, fmt.Errorf("update compute allocation membership status: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationMembershipUpdateEvent, existing) return existing, nil } @@ -216,5 +225,7 @@ func (s *Service) DeleteComputeAllocationMembership(ctx context.Context, id stri }); err != nil { return fmt.Errorf("delete compute allocation membership: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationMembershipDeleteEvent, existing) return nil } diff --git a/pkg/service/compute_allocation_resource.go b/pkg/service/compute_allocation_resource.go index 433f019d5..4efb28b51 100644 --- a/pkg/service/compute_allocation_resource.go +++ b/pkg/service/compute_allocation_resource.go @@ -22,6 +22,7 @@ import ( "database/sql" "fmt" + "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/models" ) @@ -46,6 +47,8 @@ func (s *Service) CreateComputeAllocationResource(ctx context.Context, resource }); err != nil { return nil, fmt.Errorf("create compute allocation resource: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationResourceCreateEvent, resource) return resource, nil } @@ -81,6 +84,8 @@ func (s *Service) UpdateComputeAllocationResource(ctx context.Context, resource }); err != nil { return fmt.Errorf("update compute allocation resource: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationResourceUpdateEvent, resource) return nil } @@ -89,10 +94,19 @@ func (s *Service) DeleteComputeAllocationResource(ctx context.Context, id string if id == "" { return fmt.Errorf("%w: compute allocation resource id is required", ErrInvalidInput) } + resource, err := s.resources.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup compute allocation resource: %w", err) + } + if resource == nil { + return ErrNotFound + } if err := s.inTx(ctx, func(tx *sql.Tx) error { return s.resources.Delete(ctx, tx, id) }); err != nil { return fmt.Errorf("delete compute allocation resource: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationResourceDeleteEvent, resource) return nil } diff --git a/pkg/service/compute_allocation_resource_mapping.go b/pkg/service/compute_allocation_resource_mapping.go index 84c4b52e0..d0c96014c 100644 --- a/pkg/service/compute_allocation_resource_mapping.go +++ b/pkg/service/compute_allocation_resource_mapping.go @@ -22,6 +22,7 @@ import ( "database/sql" "fmt" + "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/models" ) @@ -65,6 +66,8 @@ func (s *Service) AttachResourceToAllocation(ctx context.Context, allocationID, }); err != nil { return nil, fmt.Errorf("attach resource to allocation: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationResourceMappingCreateEvent, mapping) return mapping, nil } @@ -89,6 +92,8 @@ func (s *Service) DetachResourceFromAllocation(ctx context.Context, allocationID }); err != nil { return fmt.Errorf("detach resource from allocation: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationResourceMappingDeleteEvent, existing) return nil } diff --git a/pkg/service/compute_cluster.go b/pkg/service/compute_cluster.go index 106be4a21..41198709b 100644 --- a/pkg/service/compute_cluster.go +++ b/pkg/service/compute_cluster.go @@ -22,6 +22,7 @@ import ( "database/sql" "fmt" + "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/models" ) @@ -49,6 +50,8 @@ func (s *Service) CreateComputeCluster(ctx context.Context, cluster *models.Comp }); err != nil { return nil, fmt.Errorf("create compute cluster: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationCreateEvent, cluster) return cluster, nil } @@ -99,6 +102,8 @@ func (s *Service) UpdateComputeCluster(ctx context.Context, cluster *models.Comp }); err != nil { return fmt.Errorf("update compute cluster: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationUpdateEvent, cluster) return nil } @@ -107,10 +112,19 @@ func (s *Service) DeleteComputeCluster(ctx context.Context, id string) error { if id == "" { return fmt.Errorf("%w: compute cluster id is required", ErrInvalidInput) } + cluster, err := s.clusters.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup compute cluster: %w", err) + } + if cluster == nil { + return ErrNotFound + } if err := s.inTx(ctx, func(tx *sql.Tx) error { return s.clusters.Delete(ctx, tx, id) }); err != nil { return fmt.Errorf("delete compute cluster: %w", err) } + + s.eventBus.Publish(events.ComputeAllocationDeleteEvent, cluster) return nil } diff --git a/pkg/service/organization.go b/pkg/service/organization.go index bca060a06..50effd1e7 100644 --- a/pkg/service/organization.go +++ b/pkg/service/organization.go @@ -22,6 +22,7 @@ import ( "database/sql" "fmt" + "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/models" ) @@ -51,6 +52,8 @@ func (s *Service) CreateOrganization(ctx context.Context, org *models.Organizati }); err != nil { return nil, fmt.Errorf("create organization: %w", err) } + + s.eventBus.Publish(events.OrganizationCreateEvent, org) return org, nil } @@ -89,6 +92,8 @@ func (s *Service) UpdateOrganization(ctx context.Context, org *models.Organizati }); err != nil { return fmt.Errorf("update organization: %w", err) } + + s.eventBus.Publish(events.OrganizationUpdateEvent, org) return nil } @@ -97,10 +102,19 @@ func (s *Service) DeleteOrganization(ctx context.Context, id string) error { if id == "" { return fmt.Errorf("%w: organization id is required", ErrInvalidInput) } + org, err := s.orgs.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup organization: %w", err) + } + if org == nil { + return ErrNotFound + } if err := s.inTx(ctx, func(tx *sql.Tx) error { return s.orgs.Delete(ctx, tx, id) }); err != nil { return fmt.Errorf("delete organization: %w", err) } + + s.eventBus.Publish(events.OrganizationDeleteEvent, org) return nil } diff --git a/pkg/service/project.go b/pkg/service/project.go index f503a537d..938ff4f8e 100644 --- a/pkg/service/project.go +++ b/pkg/service/project.go @@ -22,6 +22,7 @@ import ( "database/sql" "fmt" + "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/models" ) @@ -64,6 +65,8 @@ func (s *Service) CreateProject(ctx context.Context, project *models.Project) (* }); err != nil { return nil, fmt.Errorf("create project: %w", err) } + + s.eventBus.Publish(events.ProjectCreateEvent, project) return project, nil } @@ -110,6 +113,8 @@ func (s *Service) UpdateProject(ctx context.Context, project *models.Project) er }); err != nil { return fmt.Errorf("update project: %w", err) } + + s.eventBus.Publish(events.ProjectUpdateEvent, project) return nil } @@ -118,10 +123,19 @@ func (s *Service) DeleteProject(ctx context.Context, id string) error { if id == "" { return fmt.Errorf("%w: project id is required", ErrInvalidInput) } + project, err := s.projs.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup project: %w", err) + } + if project == nil { + return ErrNotFound + } if err := s.inTx(ctx, func(tx *sql.Tx) error { return s.projs.Delete(ctx, tx, id) }); err != nil { return fmt.Errorf("delete project: %w", err) } + + s.eventBus.Publish(events.ProjectDeleteEvent, project) return nil } diff --git a/pkg/service/service.go b/pkg/service/service.go index 6fd70c88f..935630ac5 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -27,6 +27,7 @@ import ( "github.com/apache/airavata-custos/internal/db" "github.com/apache/airavata-custos/internal/store" + "github.com/apache/airavata-custos/pkg/events" ) // Service is a high-level façade over the underlying stores. It wraps each @@ -34,6 +35,7 @@ import ( // *sql.Tx themselves. type Service struct { db *sqlx.DB + eventBus *events.Bus orgs store.OrganizationStore users store.UserStore projs store.ProjectStore @@ -51,9 +53,10 @@ type Service struct { // New constructs a Service backed by the supplied database handle. // Stores are instantiated internally using the default MySQL implementations. -func New(database *sqlx.DB) *Service { +func New(database *sqlx.DB, eventBus *events.Bus) *Service { return &Service{ db: database, + eventBus: eventBus, orgs: store.NewOrganizationStore(database), users: store.NewUserStore(database), projs: store.NewProjectStore(database), @@ -75,6 +78,7 @@ func New(database *sqlx.DB) *Service { // external callers. func NewWithStores( database *sqlx.DB, + eventBus *events.Bus, orgs store.OrganizationStore, users store.UserStore, projs store.ProjectStore, @@ -91,6 +95,7 @@ func NewWithStores( ) *Service { return &Service{ db: database, + eventBus: eventBus, orgs: orgs, users: users, projs: projs,
