This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch amie-decoder
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit a9da76ff635d111cf172174b965575c83d7eba92
Author: lahiruj <[email protected]>
AuthorDate: Mon Sep 22 23:45:46 2025 -0400

    polling AMIE endpoint and persists the packet and event in db
---
 .../java/org/apache/custos/amie/AmiePoller.java    | 123 +++++++++++++++++++++
 .../org/apache/custos/amie/model/PacketEntity.java |   4 +-
 .../custos/amie/model/ProcessingEventEntity.java   |   3 +-
 .../org/apache/custos/amie/util/ProtoUtils.java    |  76 +++++++++++++
 4 files changed, 203 insertions(+), 3 deletions(-)

diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/AmiePoller.java 
b/amie-decoder/src/main/java/org/apache/custos/amie/AmiePoller.java
new file mode 100644
index 000000000..0efbdb597
--- /dev/null
+++ b/amie-decoder/src/main/java/org/apache/custos/amie/AmiePoller.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.custos.amie;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.custos.amie.client.AmieClient;
+import org.apache.custos.amie.model.PacketEntity;
+import org.apache.custos.amie.model.PacketStatus;
+import org.apache.custos.amie.model.ProcessingEventEntity;
+import org.apache.custos.amie.model.ProcessingEventType;
+import org.apache.custos.amie.model.ProcessingStatus;
+import org.apache.custos.amie.repo.PacketRepository;
+import org.apache.custos.amie.repo.ProcessingEventRepository;
+import org.apache.custos.amie.util.ProtoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.time.Instant;
+import java.util.List;
+
+/**
+ * A scheduled service that polls the AMIE API for new packets,
+ * persists them to the database, and later for processing.
+ */
+@Component
+public class AmiePoller {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AmiePoller.class);
+
+    private final AmieClient client;
+    private final PacketRepository packetRepo;
+    private final ProcessingEventRepository eventRepo;
+
+    public AmiePoller(AmieClient client, PacketRepository packetRepo, 
ProcessingEventRepository eventRepo) {
+        this.client = client;
+        this.packetRepo = packetRepo;
+        this.eventRepo = eventRepo;
+    }
+
+
+    @Scheduled(fixedDelayString = "${app.amie.scheduler.poll-delay}")
+    @Transactional
+    public void pollForPackets() {
+        LOGGER.info("Polling for new AMIE packets...");
+        List<JsonNode> packets = client.fetchInProgressPackets();
+
+        if (packets.isEmpty()) {
+            LOGGER.info("No new packets found.");
+            return;
+        }
+
+        LOGGER.info("Found {} packets to process.", packets.size());
+        for (JsonNode packetNode : packets) {
+            try {
+                processIndividualPacket(packetNode);
+            } catch (Exception e) {
+                // If a malformed packet is found
+                LOGGER.error("An unexpected error occurred while processing a 
packet. Raw packet: {}", packetNode.toString(), e);
+            }
+        }
+    }
+
+    /**
+     * Processes a single packet from the AMIE API. Only persists new packets.
+     *
+     * @param packetNode The raw packet
+     */
+    private void processIndividualPacket(JsonNode packetNode) {
+        long amiePacketRecId = 
packetNode.at("/header/packet_rec_id").asLong(-1);
+        String packetType = packetNode.path("type").asText(null);
+
+        if (amiePacketRecId < 0 || packetType == null) {
+            LOGGER.warn("Skipping packet with missing or invalid 
'packet_rec_id' or 'type'. Packet: {}", packetNode);
+            return;
+        }
+
+        // Only process if this packetRecId is new
+        packetRepo.findByAmieId(amiePacketRecId).ifPresentOrElse(
+                (existingPacket) -> LOGGER.debug("Packet with 
amie_packet_rec_id {} already exists. Skip the processing.", amiePacketRecId),
+                () -> {
+                    LOGGER.info("Persisting new packet with amie_packet_rec_id 
{} and type '{}'.", amiePacketRecId, packetType);
+
+                    PacketEntity newPacket = new PacketEntity();
+                    newPacket.setAmieId(amiePacketRecId);
+                    newPacket.setType(packetType);
+                    newPacket.setStatus(PacketStatus.NEW);
+                    newPacket.setRawJson(packetNode.toString());
+                    newPacket.setReceivedAt(Instant.now());
+                    packetRepo.save(newPacket);
+
+                    ProcessingEventEntity decodeEvent = new 
ProcessingEventEntity();
+                    decodeEvent.setPacket(newPacket);
+                    decodeEvent.setType(ProcessingEventType.DECODE_PACKET);
+                    decodeEvent.setStatus(ProcessingStatus.NEW);
+
+                    byte[] payload = 
ProtoUtils.createDecodeStartedEvent(decodeEvent.getId(), newPacket.getId(), 
amiePacketRecId);
+                    decodeEvent.setPayload(payload);
+                    eventRepo.save(decodeEvent);
+
+                    LOGGER.info("Successfully enqueued DECODE_PACKET event for 
packet {}.", amiePacketRecId);
+                }
+        );
+    }
+}
\ No newline at end of file
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java 
b/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java
index 17ede689f..1975a4291 100644
--- a/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java
+++ b/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java
@@ -22,19 +22,18 @@ import jakarta.persistence.Column;
 import jakarta.persistence.Entity;
 import jakarta.persistence.EnumType;
 import jakarta.persistence.Enumerated;
-import jakarta.persistence.GeneratedValue;
 import jakarta.persistence.Id;
 import jakarta.persistence.Lob;
 import jakarta.persistence.Table;
 
 import java.time.Instant;
+import java.util.UUID;
 
 @Entity
 @Table(name = "packets")
 public class PacketEntity {
 
     @Id
-    @GeneratedValue(generator = "uuid")
     private String id;
 
     @Column(name = "amie_id", nullable = false, unique = true)
@@ -68,6 +67,7 @@ public class PacketEntity {
     private String lastError;
 
     public PacketEntity() {
+        this.id = UUID.randomUUID().toString();
     }
 
     public String getId() {
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java
 
b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java
index 87673d7ee..31ac135ef 100644
--- 
a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java
+++ 
b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java
@@ -31,13 +31,13 @@ import jakarta.persistence.ManyToOne;
 import jakarta.persistence.Table;
 
 import java.time.Instant;
+import java.util.UUID;
 
 @Entity
 @Table(name = "processing_events")
 public class ProcessingEventEntity {
 
     @Id
-    @GeneratedValue(generator = "uuid")
     private String id;
 
     @ManyToOne(fetch = FetchType.LAZY, optional = false)
@@ -73,6 +73,7 @@ public class ProcessingEventEntity {
     private String lastError;
 
     public ProcessingEventEntity() {
+        this.id = UUID.randomUUID().toString();
     }
 
     public String getId() {
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/util/ProtoUtils.java 
b/amie-decoder/src/main/java/org/apache/custos/amie/util/ProtoUtils.java
new file mode 100644
index 000000000..b16f27046
--- /dev/null
+++ b/amie-decoder/src/main/java/org/apache/custos/amie/util/ProtoUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.custos.amie.util;
+
+import com.google.protobuf.Timestamp;
+import org.apache.custos.amie.internal.events.v1.DecodeStartedPayload;
+import org.apache.custos.amie.internal.events.v1.EventType;
+import org.apache.custos.amie.internal.events.v1.ProcessingEvent;
+import org.apache.custos.amie.internal.events.v1.ProcessingStatus;
+
+import java.time.Instant;
+
+public final class ProtoUtils {
+
+    private ProtoUtils() {
+    }
+
+    /**
+     * Creates the serialized payload for a DECODE_STARTED event.
+     * <p>
+     * This method builds the full {@link ProcessingEvent} envelope with its 
payload
+     * set to a {@link DecodeStartedPayload}. The resulting byte array is what 
will be
+     * stored in the {@code payload} column of the {@code processing_events} 
table.
+     *
+     * @param eventId         ID of the event entity
+     * @param packetId        ID of the parent packet entity
+     * @param amiePacketRecId The unique record ID from the original AMIE 
packet header
+     * @return A byte array containing the serialized Protobuf message
+     */
+    public static byte[] createDecodeStartedEvent(String eventId, String 
packetId, long amiePacketRecId) {
+        DecodeStartedPayload payload = DecodeStartedPayload.newBuilder()
+                .setPacketDbId(packetId)
+                .setAmiePacketRecId(amiePacketRecId)
+                .build();
+
+        ProcessingEvent event = ProcessingEvent.newBuilder()
+                .setId(eventId)
+                .setPacketDbId(packetId)
+                .setAmiePacketRecId(amiePacketRecId)
+                .setType(EventType.DECODE_STARTED)
+                .setStatus(ProcessingStatus.PENDING)
+                .setAttempts(0)
+                .setCreatedAt(instantToTimestamp(Instant.now()))
+                .setDecodeStarted(payload)
+                .build();
+
+        return event.toByteArray();
+    }
+
+    private static Timestamp instantToTimestamp(Instant instant) {
+        if (instant == null) {
+            return Timestamp.getDefaultInstance();
+        }
+        return Timestamp.newBuilder()
+                .setSeconds(instant.getEpochSecond())
+                .setNanos(instant.getNano())
+                .build();
+    }
+}
+

Reply via email to