This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch tracing-impl
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit e953640ea8df3ccf18f43cd850265545f4f5da44
Author: lahiruj <[email protected]>
AuthorDate: Fri Jun 5 00:44:30 2026 -0400

    Include tracing for AMIE packet handlers
---
 .../AMIE-Processor/handler/data_account_create.go  | 14 +++++++-
 .../AMIE-Processor/handler/data_project_create.go  | 14 +++++++-
 .../handler/inform_transaction_complete.go         | 14 +++++++-
 connectors/ACCESS/AMIE-Processor/handler/noop.go   |  4 +++
 .../handler/request_account_create.go              | 14 +++++++-
 .../handler/request_account_inactivate.go          | 14 +++++++-
 .../handler/request_account_reactivate.go          | 14 +++++++-
 .../AMIE-Processor/handler/request_person_merge.go | 14 +++++++-
 .../handler/request_project_create.go              | 14 +++++++-
 .../handler/request_project_inactivate.go          | 14 +++++++-
 .../handler/request_project_reactivate.go          | 14 +++++++-
 .../AMIE-Processor/handler/request_user_modify.go  | 14 +++++++-
 .../AMIE-Processor/pipeline/integration_common.go  | 36 ++++++++++++++++---
 .../ACCESS/AMIE-Processor/pkg/amie/loader.go       |  7 +++-
 .../ACCESS/AMIE-Processor/worker/processor.go      | 40 ++++++++++++++++++++--
 .../worker/processor_integration_test.go           |  2 ++
 16 files changed, 224 insertions(+), 19 deletions(-)

diff --git a/connectors/ACCESS/AMIE-Processor/handler/data_account_create.go 
b/connectors/ACCESS/AMIE-Processor/handler/data_account_create.go
index 0312c7aaf..62f428cb9 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/data_account_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/data_account_create.go
@@ -24,8 +24,11 @@ import (
        "fmt"
        "log/slog"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/service"
 )
 
@@ -42,7 +45,16 @@ func NewDataAccountCreateHandler(svc *service.Service, 
userDNStore store.UserDNS
 
 func (h *DataAccountCreateHandler) SupportsType() string { return 
"data_account_create" }
 
-func (h *DataAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) error {
+func (h *DataAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) (err error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git a/connectors/ACCESS/AMIE-Processor/handler/data_project_create.go 
b/connectors/ACCESS/AMIE-Processor/handler/data_project_create.go
index 706c9aa80..c9ef353f9 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/data_project_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/data_project_create.go
@@ -24,8 +24,11 @@ import (
        "fmt"
        "log/slog"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/service"
 )
 
@@ -42,7 +45,16 @@ func NewDataProjectCreateHandler(svc *service.Service, 
userDNStore store.UserDNS
 
 func (h *DataProjectCreateHandler) SupportsType() string { return 
"data_project_create" }
 
-func (h *DataProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) error {
+func (h *DataProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) (err error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git 
a/connectors/ACCESS/AMIE-Processor/handler/inform_transaction_complete.go 
b/connectors/ACCESS/AMIE-Processor/handler/inform_transaction_complete.go
index 660681711..2ca66179a 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/inform_transaction_complete.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/inform_transaction_complete.go
@@ -23,7 +23,10 @@ import (
        "fmt"
        "log/slog"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       "github.com/apache/airavata-custos/internal/tracing"
 )
 
 type InformTransactionCompleteHandler struct {
@@ -38,7 +41,16 @@ func (h *InformTransactionCompleteHandler) SupportsType() 
string {
        return "inform_transaction_complete"
 }
 
-func (h *InformTransactionCompleteHandler) Handle(ctx context.Context, tx 
*sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error 
{
+func (h *InformTransactionCompleteHandler) Handle(ctx context.Context, tx 
*sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err 
error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git a/connectors/ACCESS/AMIE-Processor/handler/noop.go 
b/connectors/ACCESS/AMIE-Processor/handler/noop.go
index b09931f95..0803c565b 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/noop.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/noop.go
@@ -23,6 +23,7 @@ import (
        "log/slog"
 
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       "github.com/apache/airavata-custos/internal/tracing"
 )
 
 type NoOpHandler struct{}
@@ -36,6 +37,9 @@ func (h *NoOpHandler) SupportsType() string {
 }
 
 func (h *NoOpHandler) Handle(ctx context.Context, _ *sql.Tx, _ map[string]any, 
packet *model.Packet, _ string) error {
+       _, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+
        slog.WarnContext(ctx, "NoOp handler invoked for unknown packet type",
                "packetID", packet.ID,
                "packetType", packet.Type,
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
index 9896916f5..d4fb916c4 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
@@ -23,7 +23,10 @@ import (
        "errors"
        "fmt"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/models"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -46,7 +49,16 @@ func (h *RequestAccountCreateHandler) SupportsType() string 
{ return "request_ac
 // ComputeClusterUser on the configured cluster, and attaches a
 // ComputeAllocationMembership against the project's allocation. Replies with
 // the assigned posix username.
-func (h *RequestAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) error {
+func (h *RequestAccountCreateHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) (err error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git 
a/connectors/ACCESS/AMIE-Processor/handler/request_account_inactivate.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_account_inactivate.go
index ff5e188b6..4a0e58b4c 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_account_inactivate.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_account_inactivate.go
@@ -22,7 +22,10 @@ import (
        "database/sql"
        "fmt"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/models"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -39,7 +42,16 @@ func NewRequestAccountInactivateHandler(svc 
*service.Service, amieClient AmieCli
 
 func (h *RequestAccountInactivateHandler) SupportsType() string { return 
"request_account_inactivate" }
 
-func (h *RequestAccountInactivateHandler) Handle(ctx context.Context, tx 
*sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error 
{
+func (h *RequestAccountInactivateHandler) Handle(ctx context.Context, tx 
*sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err 
error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git 
a/connectors/ACCESS/AMIE-Processor/handler/request_account_reactivate.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_account_reactivate.go
index 50f248493..fc9e56e0a 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_account_reactivate.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_account_reactivate.go
@@ -22,7 +22,10 @@ import (
        "database/sql"
        "fmt"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/models"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -39,7 +42,16 @@ func NewRequestAccountReactivateHandler(svc 
*service.Service, amieClient AmieCli
 
 func (h *RequestAccountReactivateHandler) SupportsType() string { return 
"request_account_reactivate" }
 
-func (h *RequestAccountReactivateHandler) Handle(ctx context.Context, tx 
*sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error 
{
+func (h *RequestAccountReactivateHandler) Handle(ctx context.Context, tx 
*sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err 
error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_person_merge.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_person_merge.go
index 691e764a5..06a9e9352 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_person_merge.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_person_merge.go
@@ -23,8 +23,11 @@ import (
        "fmt"
        "log/slog"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/models"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -42,7 +45,16 @@ func NewRequestPersonMergeHandler(svc *service.Service, 
userDNStore store.UserDN
 
 func (h *RequestPersonMergeHandler) SupportsType() string { return 
"request_person_merge" }
 
-func (h *RequestPersonMergeHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) error {
+func (h *RequestPersonMergeHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) (err error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
index 178f2be4e..66dc5ae77 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
@@ -24,7 +24,10 @@ import (
        "fmt"
        "strings"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/models"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -46,7 +49,16 @@ func (h *RequestProjectCreateHandler) SupportsType() string 
{ return "request_pr
 // PiOrganization), creates (or finds) the Project, and creates a
 // ComputeAllocation populated from the packet body's ServiceUnitsAllocated,
 // StartDate and EndDate.
-func (h *RequestProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) error {
+func (h *RequestProjectCreateHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) (err error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git 
a/connectors/ACCESS/AMIE-Processor/handler/request_project_inactivate.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_project_inactivate.go
index f3bee420a..b14c1434d 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_project_inactivate.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_inactivate.go
@@ -24,7 +24,10 @@ import (
        "fmt"
        "log/slog"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/models"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -41,7 +44,16 @@ func NewRequestProjectInactivateHandler(svc 
*service.Service, amieClient AmieCli
 
 func (h *RequestProjectInactivateHandler) SupportsType() string { return 
"request_project_inactivate" }
 
-func (h *RequestProjectInactivateHandler) Handle(ctx context.Context, tx 
*sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error 
{
+func (h *RequestProjectInactivateHandler) Handle(ctx context.Context, tx 
*sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err 
error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git 
a/connectors/ACCESS/AMIE-Processor/handler/request_project_reactivate.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_project_reactivate.go
index 825d948a5..2ea169d8c 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_project_reactivate.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_reactivate.go
@@ -24,7 +24,10 @@ import (
        "fmt"
        "log/slog"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/models"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -44,7 +47,16 @@ func (h *RequestProjectReactivateHandler) SupportsType() 
string { return "reques
 // Handle flips the Project and all of its ComputeAllocations back to ACTIVE.
 // Per AMIE protocol, only the PI's membership is reactivated automatically;
 // other members must request reactivation via request_account_reactivate.
-func (h *RequestProjectReactivateHandler) Handle(ctx context.Context, tx 
*sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) error 
{
+func (h *RequestProjectReactivateHandler) Handle(ctx context.Context, tx 
*sql.Tx, packetJSON map[string]any, packet *model.Packet, eventID string) (err 
error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_user_modify.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_user_modify.go
index c62f015c4..dd94e962f 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_user_modify.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_user_modify.go
@@ -26,8 +26,11 @@ import (
        "log/slog"
        "strings"
 
+       "go.opentelemetry.io/otel/codes"
+
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/models"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -52,7 +55,16 @@ func NewRequestUserModifyHandler(svc *service.Service, 
userDNStore store.UserDNS
 
 func (h *RequestUserModifyHandler) SupportsType() string { return 
"request_user_modify" }
 
-func (h *RequestUserModifyHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) error {
+func (h *RequestUserModifyHandler) Handle(ctx context.Context, tx *sql.Tx, 
packetJSON map[string]any, packet *model.Packet, eventID string) (err error) {
+       ctx, span := tracing.Start(ctx, "amie.handle:"+packet.Type)
+       defer span.End()
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        body, err := getBody(packetJSON)
        if err != nil {
                return err
diff --git a/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go 
b/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go
index 51a480fbf..32650fc30 100644
--- a/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go
+++ b/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go
@@ -22,6 +22,7 @@ package pipeline
 import (
        "context"
        "fmt"
+       "log/slog"
        "net/http"
        "os"
        "sync"
@@ -29,6 +30,9 @@ import (
        "time"
 
        "github.com/jmoiron/sqlx"
+       "github.com/prometheus/client_golang/prometheus"
+       "go.opentelemetry.io/otel"
+       sdktrace "go.opentelemetry.io/otel/sdk/trace"
 
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/amieclient"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config"
@@ -39,6 +43,7 @@ import (
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/worker"
        "github.com/apache/airavata-custos/internal/db"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/events"
        coreservice "github.com/apache/airavata-custos/pkg/service"
 )
@@ -46,9 +51,11 @@ import (
 const testClusterID = "00000000-0000-0000-0000-000000000001"
 
 var (
-       sharedDB     *sqlx.DB
-       sharedDBOnce sync.Once
-       sharedDBErr  error
+       sharedDB         *sqlx.DB
+       sharedDBOnce     sync.Once
+       sharedDBErr      error
+       tracingInitOnce  sync.Once
+       tracingInitError error
 )
 
 func isLocalAMIEConfigAvailable() bool {
@@ -94,6 +101,16 @@ func setupTestDB(t *testing.T) *sqlx.DB {
        if sharedDBErr != nil {
                t.Fatalf("setup db: %v", sharedDBErr)
        }
+       tracingInitOnce.Do(func() {
+               _, tracingInitError = tracing.Init(tracing.InitConfig{
+                       Mode:        tracing.ModeProduction,
+                       Logger:      slog.Default(),
+                       ServiceName: "custos",
+               })
+       })
+       if tracingInitError != nil {
+               t.Fatalf("tracing init: %v", tracingInitError)
+       }
        truncateAll(t, sharedDB)
        seedCluster(t, sharedDB)
        return sharedDB
@@ -200,9 +217,9 @@ func newTestPipeline(t *testing.T) *testPipeline {
                handler.NewNoOpHandler(),
        )
 
-       met := metrics.New()
+       met := metrics.NewWithRegistry(prometheus.NewRegistry())
        poller := worker.NewPoller(amieClient, packetStore, eventStore, met, 
database, cfg)
-       processor := worker.NewProcessor(eventStore, packetStore, errorStore, 
router, met, database, cfg)
+       processor := worker.NewProcessor(eventStore, packetStore, errorStore, 
router, met, auditSvc, database, cfg)
 
        ctx, cancel := context.WithCancel(context.Background())
        pipe := &testPipeline{
@@ -226,6 +243,15 @@ func newTestPipeline(t *testing.T) *testPipeline {
 func (p *testPipeline) stop() {
        p.cancel()
        p.wg.Wait()
+       flushTracing()
+}
+
+func flushTracing() {
+       if tp, ok := otel.GetTracerProvider().(*sdktrace.TracerProvider); ok {
+               ctx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
+               defer cancel()
+               _ = tp.ForceFlush(ctx)
+       }
 }
 
 func (p *testPipeline) fireScenario(t *testing.T, scenarioType string) {
diff --git a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go 
b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
index 3b95e9abe..1b6a1bb94 100644
--- a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
+++ b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
@@ -37,10 +37,15 @@ import (
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/worker"
        "github.com/apache/airavata-custos/internal/db"
+       "github.com/apache/airavata-custos/internal/tracing"
        "github.com/apache/airavata-custos/pkg/events"
        coreservice "github.com/apache/airavata-custos/pkg/service"
 )
 
+func init() {
+       tracing.RegisterTerminalMarkers("amie", "TRANSACTION_COMPLETE")
+}
+
 const connectorName = "amie"
 
 // LoadConnector skips silently when AMIE_BASE_URL / AMIE_SITE_CODE /
@@ -90,7 +95,7 @@ func LoadConnector(ctx context.Context, database *sqlx.DB, 
eventBus *events.Bus,
 
        met := metrics.New()
        poller := worker.NewPoller(amie, packetStore, eventStore, met, 
database, cfg.AMIE)
-       processor := worker.NewProcessor(eventStore, packetStore, errorStore, 
router, met, database, cfg.AMIE)
+       processor := worker.NewProcessor(eventStore, packetStore, errorStore, 
router, met, auditSvc, database, cfg.AMIE)
 
        wg.Add(2)
        go func() {
diff --git a/connectors/ACCESS/AMIE-Processor/worker/processor.go 
b/connectors/ACCESS/AMIE-Processor/worker/processor.go
index 369644051..11ee0e6cf 100644
--- a/connectors/ACCESS/AMIE-Processor/worker/processor.go
+++ b/connectors/ACCESS/AMIE-Processor/worker/processor.go
@@ -26,12 +26,17 @@ import (
        "time"
 
        "github.com/jmoiron/sqlx"
+       "go.opentelemetry.io/otel/attribute"
+       "go.opentelemetry.io/otel/codes"
 
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config"
        custosdb 
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/db"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       "github.com/apache/airavata-custos/internal/tracing"
 )
 
+const rootEventMaxBytes = 64 * 1024
+
 const (
        // MaxAttempts is the maximum number of processing attempts before an
        // event is marked as permanently failed.
@@ -70,23 +75,29 @@ type processorMetrics interface {
        StartProcessingTimer() func(handlerType string)
 }
 
+type processorAuditService interface {
+       Log(ctx context.Context, tx *sql.Tx, packetID, eventID string, action 
model.AuditAction, entityType, entityID, summary string) error
+}
+
 type Processor struct {
        eventStore     processorEventStore
        packetStore    processorPacketStore
        errorStore     processorErrorStore
        router         processorRouter
        metrics        processorMetrics
+       auditSvc       processorAuditService
        db             *sqlx.DB
        workerInterval time.Duration
 }
 
-func NewProcessor(eventStore processorEventStore, packetStore 
processorPacketStore, errorStore processorErrorStore, router processorRouter, 
metrics processorMetrics, db *sqlx.DB, cfg config.AMIEConfig) *Processor {
+func NewProcessor(eventStore processorEventStore, packetStore 
processorPacketStore, errorStore processorErrorStore, router processorRouter, 
metrics processorMetrics, auditSvc processorAuditService, db *sqlx.DB, cfg 
config.AMIEConfig) *Processor {
        return &Processor{
                eventStore:     eventStore,
                packetStore:    packetStore,
                errorStore:     errorStore,
                router:         router,
                metrics:        metrics,
+               auditSvc:       auditSvc,
                db:             db,
                workerInterval: cfg.WorkerInterval,
        }
@@ -150,7 +161,27 @@ func (p *Processor) processPendingEvents(ctx 
context.Context) {
 // If the handler returns an error, the entire transaction is rolled back
 // (including the attempt increment), and the caller should record the failure
 // in a separate transaction.
-func (p *Processor) executeInTransaction(ctx context.Context, ewp 
model.EventWithPacket) error {
+func (p *Processor) executeInTransaction(ctx context.Context, ewp 
model.EventWithPacket) (err error) {
+       ctx, span := tracing.Start(ctx, "amie.process_event:"+string(ewp.Type))
+       defer span.End()
+
+       span.SetAttributes(
+               attribute.String("source", "amie"),
+               attribute.String("amie.event_id", ewp.ID),
+               attribute.String("amie.packet_id", ewp.PacketID),
+               attribute.String("amie.event_type", string(ewp.Type)),
+       )
+       if raw := ewp.PacketRawJSON; raw != "" && len(raw) <= rootEventMaxBytes 
{
+               span.SetAttributes(attribute.String("root_event", raw))
+       }
+
+       defer func() {
+               if err != nil {
+                       span.RecordError(err)
+                       span.SetStatus(codes.Error, err.Error())
+               }
+       }()
+
        return custosdb.TxFn(ctx, p.db, func(tx *sql.Tx) error {
                slog.Info("Processing event",
                        "eventId", ewp.ID,
@@ -158,6 +189,11 @@ func (p *Processor) executeInTransaction(ctx 
context.Context, ewp model.EventWit
                        "attempt", ewp.Attempts+1,
                )
 
+               // Root marker so child audit rows have a parent that exists in 
the table.
+               if err := p.auditSvc.Log(ctx, tx, ewp.PacketID, ewp.ID, 
model.AuditPacketReceived, "packet", ewp.PacketID, string(ewp.Type)); err != 
nil {
+                       return fmt.Errorf("audit PACKET_RECEIVED: %w", err)
+               }
+
                now := time.Now().UTC()
                ewp.Status = model.ProcessingStatusRunning
                ewp.StartedAt = &now
diff --git 
a/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go 
b/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go
index d9128ca04..00f51d9a6 100644
--- a/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go
+++ b/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go
@@ -31,6 +31,7 @@ import (
 
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model"
+       
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/service"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store"
 )
 
@@ -90,6 +91,7 @@ func newProcessor(database *sqlx.DB, router processorRouter, 
met *stubMetrics) *
                store.NewProcessingErrorStore(database),
                router,
                met,
+               service.NewAuditService(store.NewAuditStore(database)),
                database,
                cfg,
        )

Reply via email to