This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 16793dd346c6a24067292c8080bbe1e713cfaf92
Author: lahiruj <[email protected]>
AuthorDate: Fri Oct 10 04:10:39 2025 -0400

    fixed bug - getting into an infinite loop when a packet failure occurs, 
align the entities with db schema
---
 .../org/apache/custos/amie/model/PacketEntity.java |  6 +--
 .../custos/amie/model/ProcessingErrorEntity.java   |  4 +-
 .../custos/amie/model/ProcessingEventEntity.java   |  9 ++--
 .../apache/custos/amie/model/ProcessingStatus.java |  6 ++-
 .../amie/repo/ProcessingEventRepository.java       |  8 +++-
 .../custos/amie/worker/ProcessingEventWorker.java  | 51 +++++++++++++++-------
 amie-decoder/src/main/resources/application.yml    |  4 +-
 .../migration}/V1__initial_migration.sql           |  6 +--
 8 files changed, 60 insertions(+), 34 deletions(-)

diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java 
b/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java
index 1975a4291..d0aeb0234 100644
--- a/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java
+++ b/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java
@@ -43,11 +43,11 @@ public class PacketEntity {
     private String type;
 
     @Enumerated(EnumType.STRING)
-    @Column(name = "status", nullable = false, length = 32)
+    @Column(name = "status", columnDefinition = "VARCHAR", nullable = false, 
length = 32)
     private PacketStatus status = PacketStatus.NEW;
 
     @Lob
-    @Column(name = "raw_json", columnDefinition = "JSON", nullable = false)
+    @Column(name = "raw_json", columnDefinition = "TEXT", nullable = false)
     private String rawJson;
 
     @Column(name = "received_at", nullable = false)
@@ -63,7 +63,7 @@ public class PacketEntity {
     private int retries = 0;
 
     @Lob
-    @Column(name = "last_error")
+    @Column(name = "last_error", columnDefinition = "TEXT")
     private String lastError;
 
     public PacketEntity() {
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingErrorEntity.java
 
b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingErrorEntity.java
index ae9ef13bb..c268e5c0c 100644
--- 
a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingErrorEntity.java
+++ 
b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingErrorEntity.java
@@ -50,11 +50,11 @@ public class ProcessingErrorEntity {
     @Column(name = "occurred_at", nullable = false)
     private Instant occurredAt = Instant.now();
 
-    @Column(name = "summary", nullable = false)
+    @Column(name = "summary", columnDefinition = "TEXT", nullable = false)
     private String summary;
 
     @Lob
-    @Column(name = "detail")
+    @Column(name = "detail", columnDefinition = "TEXT")
     private String detail;
 
     public Long getId() {
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java
 
b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java
index 31ac135ef..4056519b3 100644
--- 
a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java
+++ 
b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java
@@ -23,7 +23,6 @@ import jakarta.persistence.Entity;
 import jakarta.persistence.EnumType;
 import jakarta.persistence.Enumerated;
 import jakarta.persistence.FetchType;
-import jakarta.persistence.GeneratedValue;
 import jakarta.persistence.Id;
 import jakarta.persistence.JoinColumn;
 import jakarta.persistence.Lob;
@@ -45,18 +44,18 @@ public class ProcessingEventEntity {
     private PacketEntity packet;
 
     @Enumerated(EnumType.STRING)
-    @Column(name = "type", nullable = false, length = 64)
+    @Column(name = "type", columnDefinition = "VARCHAR", nullable = false, 
length = 64)
     private ProcessingEventType type;
 
     @Enumerated(EnumType.STRING)
-    @Column(name = "status", nullable = false, length = 32)
+    @Column(name = "status", columnDefinition = "VARCHAR", nullable = false, 
length = 32)
     private ProcessingStatus status = ProcessingStatus.NEW;
 
     @Column(name = "attempts", nullable = false)
     private int attempts = 0;
 
     @Lob
-    @Column(name = "payload", nullable = false)
+    @Column(name = "payload", columnDefinition = "LONGBLOB", nullable = false)
     private byte[] payload;
 
     @Column(name = "created_at", nullable = false)
@@ -69,7 +68,7 @@ public class ProcessingEventEntity {
     private Instant finishedAt;
 
     @Lob
-    @Column(name = "last_error")
+    @Column(name = "last_error", columnDefinition = "TEXT")
     private String lastError;
 
     public ProcessingEventEntity() {
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingStatus.java 
b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingStatus.java
index 5d4cf3865..7babbfecd 100644
--- 
a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingStatus.java
+++ 
b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingStatus.java
@@ -35,5 +35,9 @@ public enum ProcessingStatus {
     /**
      * The event failed processing and will not be automatically retried.
      */
-    FAILED
+    FAILED,
+    /**
+     * The event failed a previous attempt and is waiting to be retried.
+     */
+    RETRY_SCHEDULED
 }
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/repo/ProcessingEventRepository.java
 
b/amie-decoder/src/main/java/org/apache/custos/amie/repo/ProcessingEventRepository.java
index 24c1fd854..a58b8b2d6 100644
--- 
a/amie-decoder/src/main/java/org/apache/custos/amie/repo/ProcessingEventRepository.java
+++ 
b/amie-decoder/src/main/java/org/apache/custos/amie/repo/ProcessingEventRepository.java
@@ -21,12 +21,16 @@ package org.apache.custos.amie.repo;
 import org.apache.custos.amie.model.ProcessingEventEntity;
 import org.apache.custos.amie.model.ProcessingStatus;
 import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.query.Param;
 import org.springframework.stereotype.Repository;
 
-import java.util.stream.Stream;
+import java.util.Collection;
+import java.util.List;
 
 @Repository
 public interface ProcessingEventRepository extends 
JpaRepository<ProcessingEventEntity, String> {
 
-    Stream<ProcessingEventEntity> 
findTop50ByStatusOrderByCreatedAtAsc(ProcessingStatus status);
+    @Query("SELECT e FROM ProcessingEventEntity e JOIN FETCH e.packet WHERE 
e.status IN :statuses ORDER BY e.createdAt ASC")
+    List<ProcessingEventEntity> findTop50EventsToProcess(@Param("statuses")  
Collection<ProcessingStatus> statuses);
 }
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java
 
b/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java
index 39745bb6c..d8a0c7ff8 100644
--- 
a/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java
+++ 
b/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java
@@ -31,57 +31,71 @@ import 
org.apache.custos.amie.repo.ProcessingErrorRepository;
 import org.apache.custos.amie.repo.ProcessingEventRepository;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Lazy;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.time.Instant;
+import java.util.List;
 
 /**
- * A scheduled worker that fetches for new processing events and executes them.
+ * A scheduled worker that fetches for new/ processing events and executes 
them.
  * State of an event (NEW -> RUNNING -> SUCCEEDED/FAILED)
  */
 @Component
 public class ProcessingEventWorker {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcessingEventWorker.class);
-    private static final int MAX_ATTEMPTS = 5;
+    private static final int MAX_ATTEMPTS = 3;
 
     private final ProcessingEventRepository eventRepo;
     private final PacketRepository packetRepo;
     private final ProcessingErrorRepository errorRepo;
     private final PacketRouter router;
     private final ObjectMapper objectMapper = new ObjectMapper();
+    private final ProcessingEventWorker self;
 
     public ProcessingEventWorker(ProcessingEventRepository eventRepo, 
PacketRepository packetRepo,
-                                 ProcessingErrorRepository errorRepo, 
PacketRouter router) {
+                                 ProcessingErrorRepository errorRepo, 
PacketRouter router, @Lazy ProcessingEventWorker self) {
         this.eventRepo = eventRepo;
         this.packetRepo = packetRepo;
         this.errorRepo = errorRepo;
         this.router = router;
+        this.self = self;
     }
 
     /**
-     * Runs on a fixed delay, checks for NEW events, and processes them one by 
one.
+     * Runs on a fixed delay, checks for NEW/RETRY_SCHEDULED events, and 
processes them one by one on a separate transaction.
      */
     @Scheduled(fixedDelayString = 
"#{T(org.springframework.boot.convert.DurationStyle).detectAndParse('${app.amie.scheduler.worker-delay}').toMillis()}")
-    @Transactional
     public void processPendingEvents() {
-        try (var eventStream = 
eventRepo.findTop50ByStatusOrderByCreatedAtAsc(ProcessingStatus.NEW)) {
-            eventStream.forEach(this::executeEvent);
+        List<ProcessingEventEntity> eventsToProcess = 
eventRepo.findTop50EventsToProcess(List.of(ProcessingStatus.NEW, 
ProcessingStatus.RETRY_SCHEDULED));
+
+        if (!eventsToProcess.isEmpty()) {
+            LOGGER.info("Found {} event(s) to process.", 
eventsToProcess.size());
+            eventsToProcess.forEach(event -> {
+                try {
+                    self.executeEventInTransaction(event);
+                } catch (Exception e) {
+                    LOGGER.error("An unexpected error occurred while 
processing of eventId [{}].", event.getId(), e);
+                }
+            });
         }
     }
 
-    private void executeEvent(ProcessingEventEntity event) {
+    @Transactional(propagation = Propagation.REQUIRES_NEW)
+    public void executeEventInTransaction(ProcessingEventEntity event) {
         PacketEntity packet = event.getPacket();
-        LOGGER.info("Processing event [{}] for packet amie_id [{}].", 
event.getType(), packet.getAmieId());
+        LOGGER.info("Processing event [{}] for packet amie_id [{}]. Attempt: 
{}", event.getType(), packet.getAmieId(), event.getAttempts() + 1);
 
         event.setStatus(ProcessingStatus.RUNNING);
         event.setStartedAt(Instant.now());
         event.setAttempts(event.getAttempts() + 1);
-        eventRepo.save(event);
+        eventRepo.saveAndFlush(event);
 
         try {
             var packetJson = objectMapper.readTree(packet.getRawJson());
@@ -112,16 +126,22 @@ public class ProcessingEventWorker {
     private void handleFailure(ProcessingEventEntity event, PacketEntity 
packet, Exception e) {
         // Check if the event should be retried or marked as failed
         boolean isRetryable = event.getAttempts() < MAX_ATTEMPTS;
-        ProcessingStatus newStatus = isRetryable ? ProcessingStatus.NEW : 
ProcessingStatus.FAILED;
+        ProcessingStatus newStatus = isRetryable ? 
ProcessingStatus.RETRY_SCHEDULED : ProcessingStatus.FAILED;
 
         event.setStatus(newStatus);
         event.setLastError(e.getMessage());
         event.setFinishedAt(Instant.now());
         eventRepo.save(event);
 
-        packet.setStatus(PacketStatus.FAILED);
-        packet.setLastError(e.getMessage());
-        packetRepo.save(packet);
+        if (!isRetryable) {
+            LOGGER.error("Event for packet amie_id [{}] has failed permanently 
after {} attempts.", packet.getAmieId(), event.getAttempts());
+            packet.setStatus(PacketStatus.FAILED);
+            packet.setLastError(e.getMessage());
+            packetRepo.save(packet);
+
+        } else {
+            LOGGER.warn("Event for packet amie_id [{}] will be retried. Status 
set to {}.", packet.getAmieId(), newStatus);
+        }
 
         ProcessingErrorEntity error = new ProcessingErrorEntity();
         error.setPacket(packet);
@@ -133,8 +153,7 @@ public class ProcessingEventWorker {
 
     private String getStackTraceAsString(Exception e) {
         StringWriter sw = new StringWriter();
-        PrintWriter pw = new PrintWriter(sw);
-        e.printStackTrace(pw);
+        e.printStackTrace(new PrintWriter(sw));
         return sw.toString();
     }
 }
diff --git a/amie-decoder/src/main/resources/application.yml 
b/amie-decoder/src/main/resources/application.yml
index 5accb8388..77f2a4a79 100644
--- a/amie-decoder/src/main/resources/application.yml
+++ b/amie-decoder/src/main/resources/application.yml
@@ -27,12 +27,14 @@ spring:
     driver-class-name: org.mariadb.jdbc.Driver
   jpa:
     hibernate:
-      ddl-auto: update
+      ddl-auto: validate
     properties:
       hibernate.jdbc.time_zone: UTC
+      hibernate.type.preferred_enum_jdbc_type: VARCHAR
     show-sql: false
     database-platform: org.hibernate.dialect.MariaDBDialect
   flyway:
+    enabled: true
     locations: classpath:db/migration
 
 app:
diff --git 
a/amie-decoder/src/main/resources/db.migration/V1__initial_migration.sql 
b/amie-decoder/src/main/resources/db/migration/V1__initial_migration.sql
similarity index 95%
rename from 
amie-decoder/src/main/resources/db.migration/V1__initial_migration.sql
rename to amie-decoder/src/main/resources/db/migration/V1__initial_migration.sql
index c8bad2577..37ba745d6 100644
--- a/amie-decoder/src/main/resources/db.migration/V1__initial_migration.sql
+++ b/amie-decoder/src/main/resources/db/migration/V1__initial_migration.sql
@@ -17,7 +17,7 @@ CREATE TABLE packets
     amie_id      BIGINT       NOT NULL, -- packet_rec_id from AMIE
     type         VARCHAR(64)  NOT NULL, -- eg. request_account_create
     status       VARCHAR(32)  NOT NULL, -- NEW, DECODED, PROCESSED, FAILED
-    raw_json     JSON         NOT NULL,
+    raw_json     TEXT         NOT NULL,
     received_at  TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
     decoded_at   TIMESTAMP(6) NULL,
     processed_at TIMESTAMP(6) NULL,
@@ -66,15 +66,13 @@ CREATE TABLE processing_errors
     packet_id   VARCHAR(255) NULL,
     event_id    VARCHAR(255) NULL,
     occurred_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
-    summary     VARCHAR(255) NOT NULL,
+    summary     TEXT         NOT NULL,
     detail      TEXT NULL,
 
     PRIMARY KEY (id),
     CONSTRAINT fk_errors_packet FOREIGN KEY (packet_id) REFERENCES packets 
(id) ON DELETE SET NULL,
     CONSTRAINT fk_errors_event FOREIGN KEY (event_id) REFERENCES 
processing_events (id) ON DELETE SET NULL,
 
-    CHECK (packet_id IS NOT NULL OR event_id IS NOT NULL),
-
     KEY         idx_errors_packet_id (packet_id),
     KEY         idx_errors_event_id (event_id),
     KEY         idx_errors_occurred_at (occurred_at)

Reply via email to