This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 16793dd346c6a24067292c8080bbe1e713cfaf92 Author: lahiruj <[email protected]> AuthorDate: Fri Oct 10 04:10:39 2025 -0400 fixed bug - getting into an infinite loop when a packet failure occurs, align the entities with db schema --- .../org/apache/custos/amie/model/PacketEntity.java | 6 +-- .../custos/amie/model/ProcessingErrorEntity.java | 4 +- .../custos/amie/model/ProcessingEventEntity.java | 9 ++-- .../apache/custos/amie/model/ProcessingStatus.java | 6 ++- .../amie/repo/ProcessingEventRepository.java | 8 +++- .../custos/amie/worker/ProcessingEventWorker.java | 51 +++++++++++++++------- amie-decoder/src/main/resources/application.yml | 4 +- .../migration}/V1__initial_migration.sql | 6 +-- 8 files changed, 60 insertions(+), 34 deletions(-) diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java b/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java index 1975a4291..d0aeb0234 100644 --- a/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java +++ b/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java @@ -43,11 +43,11 @@ public class PacketEntity { private String type; @Enumerated(EnumType.STRING) - @Column(name = "status", nullable = false, length = 32) + @Column(name = "status", columnDefinition = "VARCHAR", nullable = false, length = 32) private PacketStatus status = PacketStatus.NEW; @Lob - @Column(name = "raw_json", columnDefinition = "JSON", nullable = false) + @Column(name = "raw_json", columnDefinition = "TEXT", nullable = false) private String rawJson; @Column(name = "received_at", nullable = false) @@ -63,7 +63,7 @@ public class PacketEntity { private int retries = 0; @Lob - @Column(name = "last_error") + @Column(name = "last_error", columnDefinition = "TEXT") private String lastError; public PacketEntity() { diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingErrorEntity.java b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingErrorEntity.java index ae9ef13bb..c268e5c0c 100644 --- a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingErrorEntity.java +++ b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingErrorEntity.java @@ -50,11 +50,11 @@ public class ProcessingErrorEntity { @Column(name = "occurred_at", nullable = false) private Instant occurredAt = Instant.now(); - @Column(name = "summary", nullable = false) + @Column(name = "summary", columnDefinition = "TEXT", nullable = false) private String summary; @Lob - @Column(name = "detail") + @Column(name = "detail", columnDefinition = "TEXT") private String detail; public Long getId() { diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java index 31ac135ef..4056519b3 100644 --- a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java +++ b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java @@ -23,7 +23,6 @@ import jakarta.persistence.Entity; import jakarta.persistence.EnumType; import jakarta.persistence.Enumerated; import jakarta.persistence.FetchType; -import jakarta.persistence.GeneratedValue; import jakarta.persistence.Id; import jakarta.persistence.JoinColumn; import jakarta.persistence.Lob; @@ -45,18 +44,18 @@ public class ProcessingEventEntity { private PacketEntity packet; @Enumerated(EnumType.STRING) - @Column(name = "type", nullable = false, length = 64) + @Column(name = "type", columnDefinition = "VARCHAR", nullable = false, length = 64) private ProcessingEventType type; @Enumerated(EnumType.STRING) - @Column(name = "status", nullable = false, length = 32) + @Column(name = "status", columnDefinition = "VARCHAR", nullable = false, length = 32) private ProcessingStatus status = ProcessingStatus.NEW; @Column(name = "attempts", nullable = false) private int attempts = 0; @Lob - @Column(name = "payload", nullable = false) + @Column(name = "payload", columnDefinition = "LONGBLOB", nullable = false) private byte[] payload; @Column(name = "created_at", nullable = false) @@ -69,7 +68,7 @@ public class ProcessingEventEntity { private Instant finishedAt; @Lob - @Column(name = "last_error") + @Column(name = "last_error", columnDefinition = "TEXT") private String lastError; public ProcessingEventEntity() { diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingStatus.java b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingStatus.java index 5d4cf3865..7babbfecd 100644 --- a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingStatus.java +++ b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingStatus.java @@ -35,5 +35,9 @@ public enum ProcessingStatus { /** * The event failed processing and will not be automatically retried. */ - FAILED + FAILED, + /** + * The event failed a previous attempt and is waiting to be retried. + */ + RETRY_SCHEDULED } diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/repo/ProcessingEventRepository.java b/amie-decoder/src/main/java/org/apache/custos/amie/repo/ProcessingEventRepository.java index 24c1fd854..a58b8b2d6 100644 --- a/amie-decoder/src/main/java/org/apache/custos/amie/repo/ProcessingEventRepository.java +++ b/amie-decoder/src/main/java/org/apache/custos/amie/repo/ProcessingEventRepository.java @@ -21,12 +21,16 @@ package org.apache.custos.amie.repo; import org.apache.custos.amie.model.ProcessingEventEntity; import org.apache.custos.amie.model.ProcessingStatus; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; import org.springframework.stereotype.Repository; -import java.util.stream.Stream; +import java.util.Collection; +import java.util.List; @Repository public interface ProcessingEventRepository extends JpaRepository<ProcessingEventEntity, String> { - Stream<ProcessingEventEntity> findTop50ByStatusOrderByCreatedAtAsc(ProcessingStatus status); + @Query("SELECT e FROM ProcessingEventEntity e JOIN FETCH e.packet WHERE e.status IN :statuses ORDER BY e.createdAt ASC") + List<ProcessingEventEntity> findTop50EventsToProcess(@Param("statuses") Collection<ProcessingStatus> statuses); } diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java b/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java index 39745bb6c..d8a0c7ff8 100644 --- a/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java +++ b/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java @@ -31,57 +31,71 @@ import org.apache.custos.amie.repo.ProcessingErrorRepository; import org.apache.custos.amie.repo.ProcessingEventRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.io.PrintWriter; import java.io.StringWriter; import java.time.Instant; +import java.util.List; /** - * A scheduled worker that fetches for new processing events and executes them. + * A scheduled worker that fetches for new/ processing events and executes them. * State of an event (NEW -> RUNNING -> SUCCEEDED/FAILED) */ @Component public class ProcessingEventWorker { private static final Logger LOGGER = LoggerFactory.getLogger(ProcessingEventWorker.class); - private static final int MAX_ATTEMPTS = 5; + private static final int MAX_ATTEMPTS = 3; private final ProcessingEventRepository eventRepo; private final PacketRepository packetRepo; private final ProcessingErrorRepository errorRepo; private final PacketRouter router; private final ObjectMapper objectMapper = new ObjectMapper(); + private final ProcessingEventWorker self; public ProcessingEventWorker(ProcessingEventRepository eventRepo, PacketRepository packetRepo, - ProcessingErrorRepository errorRepo, PacketRouter router) { + ProcessingErrorRepository errorRepo, PacketRouter router, @Lazy ProcessingEventWorker self) { this.eventRepo = eventRepo; this.packetRepo = packetRepo; this.errorRepo = errorRepo; this.router = router; + this.self = self; } /** - * Runs on a fixed delay, checks for NEW events, and processes them one by one. + * Runs on a fixed delay, checks for NEW/RETRY_SCHEDULED events, and processes them one by one on a separate transaction. */ @Scheduled(fixedDelayString = "#{T(org.springframework.boot.convert.DurationStyle).detectAndParse('${app.amie.scheduler.worker-delay}').toMillis()}") - @Transactional public void processPendingEvents() { - try (var eventStream = eventRepo.findTop50ByStatusOrderByCreatedAtAsc(ProcessingStatus.NEW)) { - eventStream.forEach(this::executeEvent); + List<ProcessingEventEntity> eventsToProcess = eventRepo.findTop50EventsToProcess(List.of(ProcessingStatus.NEW, ProcessingStatus.RETRY_SCHEDULED)); + + if (!eventsToProcess.isEmpty()) { + LOGGER.info("Found {} event(s) to process.", eventsToProcess.size()); + eventsToProcess.forEach(event -> { + try { + self.executeEventInTransaction(event); + } catch (Exception e) { + LOGGER.error("An unexpected error occurred while processing of eventId [{}].", event.getId(), e); + } + }); } } - private void executeEvent(ProcessingEventEntity event) { + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void executeEventInTransaction(ProcessingEventEntity event) { PacketEntity packet = event.getPacket(); - LOGGER.info("Processing event [{}] for packet amie_id [{}].", event.getType(), packet.getAmieId()); + LOGGER.info("Processing event [{}] for packet amie_id [{}]. Attempt: {}", event.getType(), packet.getAmieId(), event.getAttempts() + 1); event.setStatus(ProcessingStatus.RUNNING); event.setStartedAt(Instant.now()); event.setAttempts(event.getAttempts() + 1); - eventRepo.save(event); + eventRepo.saveAndFlush(event); try { var packetJson = objectMapper.readTree(packet.getRawJson()); @@ -112,16 +126,22 @@ public class ProcessingEventWorker { private void handleFailure(ProcessingEventEntity event, PacketEntity packet, Exception e) { // Check if the event should be retried or marked as failed boolean isRetryable = event.getAttempts() < MAX_ATTEMPTS; - ProcessingStatus newStatus = isRetryable ? ProcessingStatus.NEW : ProcessingStatus.FAILED; + ProcessingStatus newStatus = isRetryable ? ProcessingStatus.RETRY_SCHEDULED : ProcessingStatus.FAILED; event.setStatus(newStatus); event.setLastError(e.getMessage()); event.setFinishedAt(Instant.now()); eventRepo.save(event); - packet.setStatus(PacketStatus.FAILED); - packet.setLastError(e.getMessage()); - packetRepo.save(packet); + if (!isRetryable) { + LOGGER.error("Event for packet amie_id [{}] has failed permanently after {} attempts.", packet.getAmieId(), event.getAttempts()); + packet.setStatus(PacketStatus.FAILED); + packet.setLastError(e.getMessage()); + packetRepo.save(packet); + + } else { + LOGGER.warn("Event for packet amie_id [{}] will be retried. Status set to {}.", packet.getAmieId(), newStatus); + } ProcessingErrorEntity error = new ProcessingErrorEntity(); error.setPacket(packet); @@ -133,8 +153,7 @@ public class ProcessingEventWorker { private String getStackTraceAsString(Exception e) { StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - e.printStackTrace(pw); + e.printStackTrace(new PrintWriter(sw)); return sw.toString(); } } diff --git a/amie-decoder/src/main/resources/application.yml b/amie-decoder/src/main/resources/application.yml index 5accb8388..77f2a4a79 100644 --- a/amie-decoder/src/main/resources/application.yml +++ b/amie-decoder/src/main/resources/application.yml @@ -27,12 +27,14 @@ spring: driver-class-name: org.mariadb.jdbc.Driver jpa: hibernate: - ddl-auto: update + ddl-auto: validate properties: hibernate.jdbc.time_zone: UTC + hibernate.type.preferred_enum_jdbc_type: VARCHAR show-sql: false database-platform: org.hibernate.dialect.MariaDBDialect flyway: + enabled: true locations: classpath:db/migration app: diff --git a/amie-decoder/src/main/resources/db.migration/V1__initial_migration.sql b/amie-decoder/src/main/resources/db/migration/V1__initial_migration.sql similarity index 95% rename from amie-decoder/src/main/resources/db.migration/V1__initial_migration.sql rename to amie-decoder/src/main/resources/db/migration/V1__initial_migration.sql index c8bad2577..37ba745d6 100644 --- a/amie-decoder/src/main/resources/db.migration/V1__initial_migration.sql +++ b/amie-decoder/src/main/resources/db/migration/V1__initial_migration.sql @@ -17,7 +17,7 @@ CREATE TABLE packets amie_id BIGINT NOT NULL, -- packet_rec_id from AMIE type VARCHAR(64) NOT NULL, -- eg. request_account_create status VARCHAR(32) NOT NULL, -- NEW, DECODED, PROCESSED, FAILED - raw_json JSON NOT NULL, + raw_json TEXT NOT NULL, received_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), decoded_at TIMESTAMP(6) NULL, processed_at TIMESTAMP(6) NULL, @@ -66,15 +66,13 @@ CREATE TABLE processing_errors packet_id VARCHAR(255) NULL, event_id VARCHAR(255) NULL, occurred_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), - summary VARCHAR(255) NOT NULL, + summary TEXT NOT NULL, detail TEXT NULL, PRIMARY KEY (id), CONSTRAINT fk_errors_packet FOREIGN KEY (packet_id) REFERENCES packets (id) ON DELETE SET NULL, CONSTRAINT fk_errors_event FOREIGN KEY (event_id) REFERENCES processing_events (id) ON DELETE SET NULL, - CHECK (packet_id IS NOT NULL OR event_id IS NOT NULL), - KEY idx_errors_packet_id (packet_id), KEY idx_errors_event_id (event_id), KEY idx_errors_occurred_at (occurred_at)
