This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 50ba3cc1c04e3fdd381eac5295a007d77cd2d789
Author: lahiruj <[email protected]>
AuthorDate: Tue Sep 23 00:16:02 2025 -0400

    AMIE packet processing worker and handler skeleton
---
 .../apache/custos/amie/handler/NoOpHandler.java    |  45 +++++++
 .../apache/custos/amie/handler/PacketHandler.java  |  43 +++++++
 .../apache/custos/amie/handler/PacketRouter.java   |  59 +++++++++
 .../custos/amie/worker/ProcessingEventWorker.java  | 140 +++++++++++++++++++++
 4 files changed, 287 insertions(+)

diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/handler/NoOpHandler.java 
b/amie-decoder/src/main/java/org/apache/custos/amie/handler/NoOpHandler.java
new file mode 100644
index 000000000..c009734c9
--- /dev/null
+++ b/amie-decoder/src/main/java/org/apache/custos/amie/handler/NoOpHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.custos.amie.handler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.custos.amie.model.PacketEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * A default PacketHandler that performs no action.
+ */
+@Component
+public class NoOpHandler implements PacketHandler {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(NoOpHandler.class);
+
+    @Override
+    public void handle(JsonNode packetJson, PacketEntity packetEntity) {
+        LOGGER.info("NoOpHandler executed for packet with amie_packet_rec_id 
[{}] and type [{}]. No action taken.", packetEntity.getAmieId(), 
packetEntity.getType());
+        // No operations
+    }
+
+    @Override
+    public String supportsType() {
+        return "*";
+    }
+}
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketHandler.java 
b/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketHandler.java
new file mode 100644
index 000000000..9e763b516
--- /dev/null
+++ 
b/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.custos.amie.handler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.custos.amie.model.PacketEntity;
+
+public interface PacketHandler {
+
+    /**
+     * Executes the business logic for a given AMIE packet
+     *
+     * @param packetJson   The raw packet content
+     * @param packetEntity The entity for the packet
+     * @throws Exception if processing fails
+     */
+    void handle(JsonNode packetJson, PacketEntity packetEntity) throws 
Exception;
+
+    /**
+     * Define which packet type this handler is responsible for.
+     * For example, "request_account_create". "*" indicates a default handler.
+     *
+     * @return The AMIE packet type this handler supports
+     */
+    String supportsType();
+
+}
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketRouter.java 
b/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketRouter.java
new file mode 100644
index 000000000..50af2c1cd
--- /dev/null
+++ 
b/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketRouter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.custos.amie.handler;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.custos.amie.model.PacketEntity;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * A service that routes an incoming packet to the appropriate PacketHandler.
+ */
+@Component
+public class PacketRouter {
+
+    private final List<PacketHandler> handlers;
+    private final PacketHandler defaultHandler;
+
+    public PacketRouter(List<PacketHandler> handlers) {
+        this.handlers = handlers;
+        this.defaultHandler = new NoOpHandler();
+    }
+
+    /**
+     * Routes the packet to the correct handler and executes it.
+     *
+     * @param packetJson   The raw packet content
+     * @param packetEntity The Packet entity
+     * @throws Exception if the handler logic fails
+     */
+    public void route(JsonNode packetJson, PacketEntity packetEntity) throws 
Exception {
+        PacketHandler handler = findHandlerFor(packetEntity.getType());
+        handler.handle(packetJson, packetEntity);
+    }
+
+    private PacketHandler findHandlerFor(String packetType) {
+        return handlers.stream()
+                .filter(h -> h.supportsType().equalsIgnoreCase(packetType))
+                .findFirst()
+                .orElse(defaultHandler);
+    }
+}
diff --git 
a/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java
 
b/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java
new file mode 100644
index 000000000..22edb13bd
--- /dev/null
+++ 
b/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.custos.amie.worker;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.custos.amie.handler.PacketRouter;
+import org.apache.custos.amie.model.PacketEntity;
+import org.apache.custos.amie.model.PacketStatus;
+import org.apache.custos.amie.model.ProcessingErrorEntity;
+import org.apache.custos.amie.model.ProcessingEventEntity;
+import org.apache.custos.amie.model.ProcessingEventType;
+import org.apache.custos.amie.model.ProcessingStatus;
+import org.apache.custos.amie.repo.PacketRepository;
+import org.apache.custos.amie.repo.ProcessingErrorRepository;
+import org.apache.custos.amie.repo.ProcessingEventRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.time.Instant;
+
+/**
+ * A scheduled worker that fetches for new processing events and executes them.
+ * State of an event (NEW -> RUNNING -> SUCCEEDED/FAILED)
+ */
+@Component
+public class ProcessingEventWorker {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcessingEventWorker.class);
+    private static final int MAX_ATTEMPTS = 5;
+
+    private final ProcessingEventRepository eventRepo;
+    private final PacketRepository packetRepo;
+    private final ProcessingErrorRepository errorRepo;
+    private final PacketRouter router;
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    public ProcessingEventWorker(ProcessingEventRepository eventRepo, 
PacketRepository packetRepo,
+                                 ProcessingErrorRepository errorRepo, 
PacketRouter router) {
+        this.eventRepo = eventRepo;
+        this.packetRepo = packetRepo;
+        this.errorRepo = errorRepo;
+        this.router = router;
+    }
+
+    /**
+     * Runs on a fixed delay, checks for NEW events, and processes them one by 
one.
+     */
+    @Scheduled(fixedDelayString = "${app.amie.scheduler.worker-delay}")
+    @Transactional
+    public void processPendingEvents() {
+        try (var eventStream = 
eventRepo.findTop50ByStatusOrderByCreatedAtAsc(ProcessingStatus.NEW)) {
+            eventStream.forEach(this::executeEvent);
+        }
+    }
+
+    private void executeEvent(ProcessingEventEntity event) {
+        PacketEntity packet = event.getPacket();
+        LOGGER.info("Processing event [{}] for packet amie_id [{}].", 
event.getType(), packet.getAmieId());
+
+        event.setStatus(ProcessingStatus.RUNNING);
+        event.setStartedAt(Instant.now());
+        event.setAttempts(event.getAttempts() + 1);
+        eventRepo.save(event);
+
+        try {
+            var packetJson = objectMapper.readTree(packet.getRawJson());
+            router.route(packetJson, packet);
+
+            handleSuccess(event, packet);
+
+        } catch (Exception e) {
+            LOGGER.error("Event processing failed for packet amie_id [{}]. 
Attempt {} of {}.", packet.getAmieId(), event.getAttempts(), MAX_ATTEMPTS, e);
+            handleFailure(event, packet, e);
+        }
+    }
+
+    private void handleSuccess(ProcessingEventEntity event, PacketEntity 
packet) {
+        event.setStatus(ProcessingStatus.SUCCEEDED);
+        event.setFinishedAt(Instant.now());
+        eventRepo.save(event);
+
+        if (event.getType() == ProcessingEventType.DECODE_PACKET) {
+            packet.setStatus(PacketStatus.DECODED);
+            packet.setDecodedAt(Instant.now());
+            packetRepo.save(packet);
+        }
+
+        LOGGER.info("Successfully processed event [{}] for packet amie_id 
[{}].", event.getType(), packet.getAmieId());
+    }
+
+    private void handleFailure(ProcessingEventEntity event, PacketEntity 
packet, Exception e) {
+        // Check if the event should be retried or marked as failed
+        boolean isRetryable = event.getAttempts() < MAX_ATTEMPTS;
+        ProcessingStatus newStatus = isRetryable ? ProcessingStatus.NEW : 
ProcessingStatus.FAILED;
+
+        event.setStatus(newStatus);
+        event.setLastError(e.getMessage());
+        event.setFinishedAt(Instant.now());
+        eventRepo.save(event);
+
+        packet.setStatus(PacketStatus.FAILED);
+        packet.setLastError(e.getMessage());
+        packetRepo.save(packet);
+
+        ProcessingErrorEntity error = new ProcessingErrorEntity();
+        error.setPacket(packet);
+        error.setEvent(event);
+        error.setSummary(e.getClass().getSimpleName() + ": " + e.getMessage());
+        error.setDetail(getStackTraceAsString(e));
+        errorRepo.save(error);
+    }
+
+    private String getStackTraceAsString(Exception e) {
+        StringWriter sw = new StringWriter();
+        PrintWriter pw = new PrintWriter(sw);
+        e.printStackTrace(pw);
+        return sw.toString();
+    }
+}

Reply via email to