This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch amie-decoder in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit a9da76ff635d111cf172174b965575c83d7eba92 Author: lahiruj <[email protected]> AuthorDate: Mon Sep 22 23:45:46 2025 -0400 polling AMIE endpoint and persists the packet and event in db --- .../java/org/apache/custos/amie/AmiePoller.java | 123 +++++++++++++++++++++ .../org/apache/custos/amie/model/PacketEntity.java | 4 +- .../custos/amie/model/ProcessingEventEntity.java | 3 +- .../org/apache/custos/amie/util/ProtoUtils.java | 76 +++++++++++++ 4 files changed, 203 insertions(+), 3 deletions(-) diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/AmiePoller.java b/amie-decoder/src/main/java/org/apache/custos/amie/AmiePoller.java new file mode 100644 index 000000000..0efbdb597 --- /dev/null +++ b/amie-decoder/src/main/java/org/apache/custos/amie/AmiePoller.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.custos.amie; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.custos.amie.client.AmieClient; +import org.apache.custos.amie.model.PacketEntity; +import org.apache.custos.amie.model.PacketStatus; +import org.apache.custos.amie.model.ProcessingEventEntity; +import org.apache.custos.amie.model.ProcessingEventType; +import org.apache.custos.amie.model.ProcessingStatus; +import org.apache.custos.amie.repo.PacketRepository; +import org.apache.custos.amie.repo.ProcessingEventRepository; +import org.apache.custos.amie.util.ProtoUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.Instant; +import java.util.List; + +/** + * A scheduled service that polls the AMIE API for new packets, + * persists them to the database, and later for processing. + */ +@Component +public class AmiePoller { + + private static final Logger LOGGER = LoggerFactory.getLogger(AmiePoller.class); + + private final AmieClient client; + private final PacketRepository packetRepo; + private final ProcessingEventRepository eventRepo; + + public AmiePoller(AmieClient client, PacketRepository packetRepo, ProcessingEventRepository eventRepo) { + this.client = client; + this.packetRepo = packetRepo; + this.eventRepo = eventRepo; + } + + + @Scheduled(fixedDelayString = "${app.amie.scheduler.poll-delay}") + @Transactional + public void pollForPackets() { + LOGGER.info("Polling for new AMIE packets..."); + List<JsonNode> packets = client.fetchInProgressPackets(); + + if (packets.isEmpty()) { + LOGGER.info("No new packets found."); + return; + } + + LOGGER.info("Found {} packets to process.", packets.size()); + for (JsonNode packetNode : packets) { + try { + processIndividualPacket(packetNode); + } catch (Exception e) { + // If a malformed packet is found + LOGGER.error("An unexpected error occurred while processing a packet. Raw packet: {}", packetNode.toString(), e); + } + } + } + + /** + * Processes a single packet from the AMIE API. Only persists new packets. + * + * @param packetNode The raw packet + */ + private void processIndividualPacket(JsonNode packetNode) { + long amiePacketRecId = packetNode.at("/header/packet_rec_id").asLong(-1); + String packetType = packetNode.path("type").asText(null); + + if (amiePacketRecId < 0 || packetType == null) { + LOGGER.warn("Skipping packet with missing or invalid 'packet_rec_id' or 'type'. Packet: {}", packetNode); + return; + } + + // Only process if this packetRecId is new + packetRepo.findByAmieId(amiePacketRecId).ifPresentOrElse( + (existingPacket) -> LOGGER.debug("Packet with amie_packet_rec_id {} already exists. Skip the processing.", amiePacketRecId), + () -> { + LOGGER.info("Persisting new packet with amie_packet_rec_id {} and type '{}'.", amiePacketRecId, packetType); + + PacketEntity newPacket = new PacketEntity(); + newPacket.setAmieId(amiePacketRecId); + newPacket.setType(packetType); + newPacket.setStatus(PacketStatus.NEW); + newPacket.setRawJson(packetNode.toString()); + newPacket.setReceivedAt(Instant.now()); + packetRepo.save(newPacket); + + ProcessingEventEntity decodeEvent = new ProcessingEventEntity(); + decodeEvent.setPacket(newPacket); + decodeEvent.setType(ProcessingEventType.DECODE_PACKET); + decodeEvent.setStatus(ProcessingStatus.NEW); + + byte[] payload = ProtoUtils.createDecodeStartedEvent(decodeEvent.getId(), newPacket.getId(), amiePacketRecId); + decodeEvent.setPayload(payload); + eventRepo.save(decodeEvent); + + LOGGER.info("Successfully enqueued DECODE_PACKET event for packet {}.", amiePacketRecId); + } + ); + } +} \ No newline at end of file diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java b/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java index 17ede689f..1975a4291 100644 --- a/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java +++ b/amie-decoder/src/main/java/org/apache/custos/amie/model/PacketEntity.java @@ -22,19 +22,18 @@ import jakarta.persistence.Column; import jakarta.persistence.Entity; import jakarta.persistence.EnumType; import jakarta.persistence.Enumerated; -import jakarta.persistence.GeneratedValue; import jakarta.persistence.Id; import jakarta.persistence.Lob; import jakarta.persistence.Table; import java.time.Instant; +import java.util.UUID; @Entity @Table(name = "packets") public class PacketEntity { @Id - @GeneratedValue(generator = "uuid") private String id; @Column(name = "amie_id", nullable = false, unique = true) @@ -68,6 +67,7 @@ public class PacketEntity { private String lastError; public PacketEntity() { + this.id = UUID.randomUUID().toString(); } public String getId() { diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java index 87673d7ee..31ac135ef 100644 --- a/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java +++ b/amie-decoder/src/main/java/org/apache/custos/amie/model/ProcessingEventEntity.java @@ -31,13 +31,13 @@ import jakarta.persistence.ManyToOne; import jakarta.persistence.Table; import java.time.Instant; +import java.util.UUID; @Entity @Table(name = "processing_events") public class ProcessingEventEntity { @Id - @GeneratedValue(generator = "uuid") private String id; @ManyToOne(fetch = FetchType.LAZY, optional = false) @@ -73,6 +73,7 @@ public class ProcessingEventEntity { private String lastError; public ProcessingEventEntity() { + this.id = UUID.randomUUID().toString(); } public String getId() { diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/util/ProtoUtils.java b/amie-decoder/src/main/java/org/apache/custos/amie/util/ProtoUtils.java new file mode 100644 index 000000000..b16f27046 --- /dev/null +++ b/amie-decoder/src/main/java/org/apache/custos/amie/util/ProtoUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.custos.amie.util; + +import com.google.protobuf.Timestamp; +import org.apache.custos.amie.internal.events.v1.DecodeStartedPayload; +import org.apache.custos.amie.internal.events.v1.EventType; +import org.apache.custos.amie.internal.events.v1.ProcessingEvent; +import org.apache.custos.amie.internal.events.v1.ProcessingStatus; + +import java.time.Instant; + +public final class ProtoUtils { + + private ProtoUtils() { + } + + /** + * Creates the serialized payload for a DECODE_STARTED event. + * <p> + * This method builds the full {@link ProcessingEvent} envelope with its payload + * set to a {@link DecodeStartedPayload}. The resulting byte array is what will be + * stored in the {@code payload} column of the {@code processing_events} table. + * + * @param eventId ID of the event entity + * @param packetId ID of the parent packet entity + * @param amiePacketRecId The unique record ID from the original AMIE packet header + * @return A byte array containing the serialized Protobuf message + */ + public static byte[] createDecodeStartedEvent(String eventId, String packetId, long amiePacketRecId) { + DecodeStartedPayload payload = DecodeStartedPayload.newBuilder() + .setPacketDbId(packetId) + .setAmiePacketRecId(amiePacketRecId) + .build(); + + ProcessingEvent event = ProcessingEvent.newBuilder() + .setId(eventId) + .setPacketDbId(packetId) + .setAmiePacketRecId(amiePacketRecId) + .setType(EventType.DECODE_STARTED) + .setStatus(ProcessingStatus.PENDING) + .setAttempts(0) + .setCreatedAt(instantToTimestamp(Instant.now())) + .setDecodeStarted(payload) + .build(); + + return event.toByteArray(); + } + + private static Timestamp instantToTimestamp(Instant instant) { + if (instant == null) { + return Timestamp.getDefaultInstance(); + } + return Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build(); + } +} +
