This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch tracing-impl in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit e953640ea8df3ccf18f43cd850265545f4f5da44 Author: lahiruj <[email protected]> AuthorDate: Fri Jun 5 00:44:30 2026 -0400 Include tracing for AMIE packet handlers --- .../AMIE-Processor/handler/data_account_create.go | 14 +++++++- .../AMIE-Processor/handler/data_project_create.go | 14 +++++++- .../handler/inform_transaction_complete.go | 14 +++++++- connectors/ACCESS/AMIE-Processor/handler/noop.go | 4 +++ .../handler/request_account_create.go | 14 +++++++- .../handler/request_account_inactivate.go | 14 +++++++- .../handler/request_account_reactivate.go | 14 +++++++- .../AMIE-Processor/handler/request_person_merge.go | 14 +++++++- .../handler/request_project_create.go | 14 +++++++- .../handler/request_project_inactivate.go | 14 +++++++- .../handler/request_project_reactivate.go | 14 +++++++- .../AMIE-Processor/handler/request_user_modify.go | 14 +++++++- .../AMIE-Processor/pipeline/integration_common.go | 36 ++++++++++++++++--- .../ACCESS/AMIE-Processor/pkg/amie/loader.go | 7 +++- .../ACCESS/AMIE-Processor/worker/processor.go | 40 ++++++++++++++++++++-- .../worker/processor_integration_test.go | 2 ++ 16 files changed, 224 insertions(+), 19 deletions(-) diff --git a/connectors/ACCESS/AMIE-Processor/handler/data_account_create.go b/connectors/ACCESS/AMIE-Processor/handler/data_account_create.go index 0312c7aaf..62f428cb9 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/data_account_create.go +++ b/connectors/ACCESS/AMIE-Processor/handler/data_account_create.go @@ -24,8 +24,11 @@ import ( "fmt" "log/slog" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/service" ) @@ -42,7 +45,16 @@ func NewDataAccountCreateHandler(svc *service.Service, userDNStore store.UserDNS func (h *DataAccountCreateHandler) SupportsType() string { return "data_account_create" } -func (h *DataAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *DataAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/handler/data_project_create.go b/connectors/ACCESS/AMIE-Processor/handler/data_project_create.go index 706c9aa80..c9ef353f9 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/data_project_create.go +++ b/connectors/ACCESS/AMIE-Processor/handler/data_project_create.go @@ -24,8 +24,11 @@ import ( "fmt" "log/slog" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/service" ) @@ -42,7 +45,16 @@ func NewDataProjectCreateHandler(svc *service.Service, userDNStore store.UserDNS func (h *DataProjectCreateHandler) SupportsType() string { return "data_project_create" } -func (h *DataProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *DataProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/handler/inform_transaction_complete.go b/connectors/ACCESS/AMIE-Processor/handler/inform_transaction_complete.go index 660681711..2ca66179a 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/inform_transaction_complete.go +++ b/connectors/ACCESS/AMIE-Processor/handler/inform_transaction_complete.go @@ -23,7 +23,10 @@ import ( "fmt" "log/slog" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" ) type InformTransactionCompleteHandler struct { @@ -38,7 +41,16 @@ func (h *InformTransactionCompleteHandler) SupportsType() string { return "inform_transaction_complete" } -func (h *InformTransactionCompleteHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *InformTransactionCompleteHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/handler/noop.go b/connectors/ACCESS/AMIE-Processor/handler/noop.go index b09931f95..0803c565b 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/noop.go +++ b/connectors/ACCESS/AMIE-Processor/handler/noop.go @@ -23,6 +23,7 @@ import ( "log/slog" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" ) type NoOpHandler struct{} @@ -36,6 +37,9 @@ func (h *NoOpHandler) SupportsType() string { } func (h *NoOpHandler) Handle(ctx context.Context, _ *sql.Tx, _ map[string]any, packet *model.Packet, _ string) error { + _, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + slog.WarnContext(ctx, "NoOp handler invoked for unknown packet type", "packetID", packet.ID, "packetType", packet.Type, diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go index 9896916f5..d4fb916c4 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go +++ b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go @@ -23,7 +23,10 @@ import ( "errors" "fmt" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/models" "github.com/apache/airavata-custos/pkg/service" ) @@ -46,7 +49,16 @@ func (h *RequestAccountCreateHandler) SupportsType() string { return "request_ac // ComputeClusterUser on the configured cluster, and attaches a // ComputeAllocationMembership against the project's allocation. Replies with // the assigned posix username. -func (h *RequestAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *RequestAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_account_inactivate.go b/connectors/ACCESS/AMIE-Processor/handler/request_account_inactivate.go index ff5e188b6..4a0e58b4c 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/request_account_inactivate.go +++ b/connectors/ACCESS/AMIE-Processor/handler/request_account_inactivate.go @@ -22,7 +22,10 @@ import ( "database/sql" "fmt" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/models" "github.com/apache/airavata-custos/pkg/service" ) @@ -39,7 +42,16 @@ func NewRequestAccountInactivateHandler(svc *service.Service, amieClient AmieCli func (h *RequestAccountInactivateHandler) SupportsType() string { return "request_account_inactivate" } -func (h *RequestAccountInactivateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *RequestAccountInactivateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_account_reactivate.go b/connectors/ACCESS/AMIE-Processor/handler/request_account_reactivate.go index 50f248493..fc9e56e0a 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/request_account_reactivate.go +++ b/connectors/ACCESS/AMIE-Processor/handler/request_account_reactivate.go @@ -22,7 +22,10 @@ import ( "database/sql" "fmt" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/models" "github.com/apache/airavata-custos/pkg/service" ) @@ -39,7 +42,16 @@ func NewRequestAccountReactivateHandler(svc *service.Service, amieClient AmieCli func (h *RequestAccountReactivateHandler) SupportsType() string { return "request_account_reactivate" } -func (h *RequestAccountReactivateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *RequestAccountReactivateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_person_merge.go b/connectors/ACCESS/AMIE-Processor/handler/request_person_merge.go index 691e764a5..06a9e9352 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/request_person_merge.go +++ b/connectors/ACCESS/AMIE-Processor/handler/request_person_merge.go @@ -23,8 +23,11 @@ import ( "fmt" "log/slog" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/models" "github.com/apache/airavata-custos/pkg/service" ) @@ -42,7 +45,16 @@ func NewRequestPersonMergeHandler(svc *service.Service, userDNStore store.UserDN func (h *RequestPersonMergeHandler) SupportsType() string { return "request_person_merge" } -func (h *RequestPersonMergeHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *RequestPersonMergeHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go index 178f2be4e..66dc5ae77 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go +++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go @@ -24,7 +24,10 @@ import ( "fmt" "strings" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/models" "github.com/apache/airavata-custos/pkg/service" ) @@ -46,7 +49,16 @@ func (h *RequestProjectCreateHandler) SupportsType() string { return "request_pr // PiOrganization), creates (or finds) the Project, and creates a // ComputeAllocation populated from the packet body's ServiceUnitsAllocated, // StartDate and EndDate. -func (h *RequestProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *RequestProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_project_inactivate.go b/connectors/ACCESS/AMIE-Processor/handler/request_project_inactivate.go index f3bee420a..b14c1434d 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/request_project_inactivate.go +++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_inactivate.go @@ -24,7 +24,10 @@ import ( "fmt" "log/slog" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/models" "github.com/apache/airavata-custos/pkg/service" ) @@ -41,7 +44,16 @@ func NewRequestProjectInactivateHandler(svc *service.Service, amieClient AmieCli func (h *RequestProjectInactivateHandler) SupportsType() string { return "request_project_inactivate" } -func (h *RequestProjectInactivateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *RequestProjectInactivateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_project_reactivate.go b/connectors/ACCESS/AMIE-Processor/handler/request_project_reactivate.go index 825d948a5..2ea169d8c 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/request_project_reactivate.go +++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_reactivate.go @@ -24,7 +24,10 @@ import ( "fmt" "log/slog" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/models" "github.com/apache/airavata-custos/pkg/service" ) @@ -44,7 +47,16 @@ func (h *RequestProjectReactivateHandler) SupportsType() string { return "reques // Handle flips the Project and all of its ComputeAllocations back to ACTIVE. // Per AMIE protocol, only the PI's membership is reactivated automatically; // other members must request reactivation via request_account_reactivate. -func (h *RequestProjectReactivateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *RequestProjectReactivateHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_user_modify.go b/connectors/ACCESS/AMIE-Processor/handler/request_user_modify.go index c62f015c4..dd94e962f 100644 --- a/connectors/ACCESS/AMIE-Processor/handler/request_user_modify.go +++ b/connectors/ACCESS/AMIE-Processor/handler/request_user_modify.go @@ -26,8 +26,11 @@ import ( "log/slog" "strings" + "go.opentelemetry.io/otel/codes" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/models" "github.com/apache/airavata-custos/pkg/service" ) @@ -52,7 +55,16 @@ func NewRequestUserModifyHandler(svc *service.Service, userDNStore store.UserDNS func (h *RequestUserModifyHandler) SupportsType() string { return "request_user_modify" } -func (h *RequestUserModifyHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error { +func (h *RequestUserModifyHandler) Handle(ctx context.Context, tx *sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err error) { + ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type) + defer span.End() + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + body, err := getBody(packetJSON) if err != nil { return err diff --git a/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go b/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go index 51a480fbf..32650fc30 100644 --- a/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go +++ b/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go @@ -22,6 +22,7 @@ package pipeline import ( "context" "fmt" + "log/slog" "net/http" "os" "sync" @@ -29,6 +30,9 @@ import ( "time" "github.com/jmoiron/sqlx" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/amieclient" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" @@ -39,6 +43,7 @@ import ( "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/worker" "github.com/apache/airavata-custos/internal/db" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/events" coreservice "github.com/apache/airavata-custos/pkg/service" ) @@ -46,9 +51,11 @@ import ( const testClusterID = "00000000-0000-0000-0000-000000000001" var ( - sharedDB *sqlx.DB - sharedDBOnce sync.Once - sharedDBErr error + sharedDB *sqlx.DB + sharedDBOnce sync.Once + sharedDBErr error + tracingInitOnce sync.Once + tracingInitError error ) func isLocalAMIEConfigAvailable() bool { @@ -94,6 +101,16 @@ func setupTestDB(t *testing.T) *sqlx.DB { if sharedDBErr != nil { t.Fatalf("setup db: %v", sharedDBErr) } + tracingInitOnce.Do(func() { + _, tracingInitError = tracing.Init(tracing.InitConfig{ + Mode: tracing.ModeProduction, + Logger: slog.Default(), + ServiceName: "custos", + }) + }) + if tracingInitError != nil { + t.Fatalf("tracing init: %v", tracingInitError) + } truncateAll(t, sharedDB) seedCluster(t, sharedDB) return sharedDB @@ -200,9 +217,9 @@ func newTestPipeline(t *testing.T) *testPipeline { handler.NewNoOpHandler(), ) - met := metrics.New() + met := metrics.NewWithRegistry(prometheus.NewRegistry()) poller := worker.NewPoller(amieClient, packetStore, eventStore, met, database, cfg) - processor := worker.NewProcessor(eventStore, packetStore, errorStore, router, met, database, cfg) + processor := worker.NewProcessor(eventStore, packetStore, errorStore, router, met, auditSvc, database, cfg) ctx, cancel := context.WithCancel(context.Background()) pipe := &testPipeline{ @@ -226,6 +243,15 @@ func newTestPipeline(t *testing.T) *testPipeline { func (p *testPipeline) stop() { p.cancel() p.wg.Wait() + flushTracing() +} + +func flushTracing() { + if tp, ok := otel.GetTracerProvider().(*sdktrace.TracerProvider); ok { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = tp.ForceFlush(ctx) + } } func (p *testPipeline) fireScenario(t *testing.T, scenarioType string) { diff --git a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go index 3b95e9abe..1b6a1bb94 100644 --- a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go +++ b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go @@ -37,10 +37,15 @@ import ( "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/worker" "github.com/apache/airavata-custos/internal/db" + "github.com/apache/airavata-custos/internal/tracing" "github.com/apache/airavata-custos/pkg/events" coreservice "github.com/apache/airavata-custos/pkg/service" ) +func init() { + tracing.RegisterTerminalMarkers("amie", "TRANSACTION_COMPLETE") +} + const connectorName = "amie" // LoadConnector skips silently when AMIE_BASE_URL / AMIE_SITE_CODE / @@ -90,7 +95,7 @@ func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, met := metrics.New() poller := worker.NewPoller(amie, packetStore, eventStore, met, database, cfg.AMIE) - processor := worker.NewProcessor(eventStore, packetStore, errorStore, router, met, database, cfg.AMIE) + processor := worker.NewProcessor(eventStore, packetStore, errorStore, router, met, auditSvc, database, cfg.AMIE) wg.Add(2) go func() { diff --git a/connectors/ACCESS/AMIE-Processor/worker/processor.go b/connectors/ACCESS/AMIE-Processor/worker/processor.go index 369644051..11ee0e6cf 100644 --- a/connectors/ACCESS/AMIE-Processor/worker/processor.go +++ b/connectors/ACCESS/AMIE-Processor/worker/processor.go @@ -26,12 +26,17 @@ import ( "time" "github.com/jmoiron/sqlx" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" custosdb "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/db" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/internal/tracing" ) +const rootEventMaxBytes = 64 * 1024 + const ( // MaxAttempts is the maximum number of processing attempts before an // event is marked as permanently failed. @@ -70,23 +75,29 @@ type processorMetrics interface { StartProcessingTimer() func(handlerType string) } +type processorAuditService interface { + Log(ctx context.Context, tx *sql.Tx, packetID, eventID string, action model.AuditAction, entityType, entityID, summary string) error +} + type Processor struct { eventStore processorEventStore packetStore processorPacketStore errorStore processorErrorStore router processorRouter metrics processorMetrics + auditSvc processorAuditService db *sqlx.DB workerInterval time.Duration } -func NewProcessor(eventStore processorEventStore, packetStore processorPacketStore, errorStore processorErrorStore, router processorRouter, metrics processorMetrics, db *sqlx.DB, cfg config.AMIEConfig) *Processor { +func NewProcessor(eventStore processorEventStore, packetStore processorPacketStore, errorStore processorErrorStore, router processorRouter, metrics processorMetrics, auditSvc processorAuditService, db *sqlx.DB, cfg config.AMIEConfig) *Processor { return &Processor{ eventStore: eventStore, packetStore: packetStore, errorStore: errorStore, router: router, metrics: metrics, + auditSvc: auditSvc, db: db, workerInterval: cfg.WorkerInterval, } @@ -150,7 +161,27 @@ func (p *Processor) processPendingEvents(ctx context.Context) { // If the handler returns an error, the entire transaction is rolled back // (including the attempt increment), and the caller should record the failure // in a separate transaction. -func (p *Processor) executeInTransaction(ctx context.Context, ewp model.EventWithPacket) error { +func (p *Processor) executeInTransaction(ctx context.Context, ewp model.EventWithPacket) (err error) { + ctx, span := tracing.Start(ctx, "amie.process_event:"+string(ewp.Type)) + defer span.End() + + span.SetAttributes( + attribute.String("source", "amie"), + attribute.String("amie.event_id", ewp.ID), + attribute.String("amie.packet_id", ewp.PacketID), + attribute.String("amie.event_type", string(ewp.Type)), + ) + if raw := ewp.PacketRawJSON; raw != "" && len(raw) <= rootEventMaxBytes { + span.SetAttributes(attribute.String("root_event", raw)) + } + + defer func() { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + }() + return custosdb.TxFn(ctx, p.db, func(tx *sql.Tx) error { slog.Info("Processing event", "eventId", ewp.ID, @@ -158,6 +189,11 @@ func (p *Processor) executeInTransaction(ctx context.Context, ewp model.EventWit "attempt", ewp.Attempts+1, ) + // Root marker so child audit rows have a parent that exists in the table. + if err := p.auditSvc.Log(ctx, tx, ewp.PacketID, ewp.ID, model.AuditPacketReceived, "packet", ewp.PacketID, string(ewp.Type)); err != nil { + return fmt.Errorf("audit PACKET_RECEIVED: %w", err) + } + now := time.Now().UTC() ewp.Status = model.ProcessingStatusRunning ewp.StartedAt = &now diff --git a/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go b/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go index d9128ca04..00f51d9a6 100644 --- a/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go +++ b/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go @@ -31,6 +31,7 @@ import ( "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/service" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store" ) @@ -90,6 +91,7 @@ func newProcessor(database *sqlx.DB, router processorRouter, met *stubMetrics) * store.NewProcessingErrorStore(database), router, met, + service.NewAuditService(store.NewAuditStore(database)), database, cfg, )
