This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 50ba3cc1c04e3fdd381eac5295a007d77cd2d789 Author: lahiruj <[email protected]> AuthorDate: Tue Sep 23 00:16:02 2025 -0400 AMIE packet processing worker and handler skeleton --- .../apache/custos/amie/handler/NoOpHandler.java | 45 +++++++ .../apache/custos/amie/handler/PacketHandler.java | 43 +++++++ .../apache/custos/amie/handler/PacketRouter.java | 59 +++++++++ .../custos/amie/worker/ProcessingEventWorker.java | 140 +++++++++++++++++++++ 4 files changed, 287 insertions(+) diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/handler/NoOpHandler.java b/amie-decoder/src/main/java/org/apache/custos/amie/handler/NoOpHandler.java new file mode 100644 index 000000000..c009734c9 --- /dev/null +++ b/amie-decoder/src/main/java/org/apache/custos/amie/handler/NoOpHandler.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.custos.amie.handler; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.custos.amie.model.PacketEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * A default PacketHandler that performs no action. + */ +@Component +public class NoOpHandler implements PacketHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(NoOpHandler.class); + + @Override + public void handle(JsonNode packetJson, PacketEntity packetEntity) { + LOGGER.info("NoOpHandler executed for packet with amie_packet_rec_id [{}] and type [{}]. No action taken.", packetEntity.getAmieId(), packetEntity.getType()); + // No operations + } + + @Override + public String supportsType() { + return "*"; + } +} diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketHandler.java b/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketHandler.java new file mode 100644 index 000000000..9e763b516 --- /dev/null +++ b/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.custos.amie.handler; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.custos.amie.model.PacketEntity; + +public interface PacketHandler { + + /** + * Executes the business logic for a given AMIE packet + * + * @param packetJson The raw packet content + * @param packetEntity The entity for the packet + * @throws Exception if processing fails + */ + void handle(JsonNode packetJson, PacketEntity packetEntity) throws Exception; + + /** + * Define which packet type this handler is responsible for. + * For example, "request_account_create". "*" indicates a default handler. + * + * @return The AMIE packet type this handler supports + */ + String supportsType(); + +} diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketRouter.java b/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketRouter.java new file mode 100644 index 000000000..50af2c1cd --- /dev/null +++ b/amie-decoder/src/main/java/org/apache/custos/amie/handler/PacketRouter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.custos.amie.handler; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.custos.amie.model.PacketEntity; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * A service that routes an incoming packet to the appropriate PacketHandler. + */ +@Component +public class PacketRouter { + + private final List<PacketHandler> handlers; + private final PacketHandler defaultHandler; + + public PacketRouter(List<PacketHandler> handlers) { + this.handlers = handlers; + this.defaultHandler = new NoOpHandler(); + } + + /** + * Routes the packet to the correct handler and executes it. + * + * @param packetJson The raw packet content + * @param packetEntity The Packet entity + * @throws Exception if the handler logic fails + */ + public void route(JsonNode packetJson, PacketEntity packetEntity) throws Exception { + PacketHandler handler = findHandlerFor(packetEntity.getType()); + handler.handle(packetJson, packetEntity); + } + + private PacketHandler findHandlerFor(String packetType) { + return handlers.stream() + .filter(h -> h.supportsType().equalsIgnoreCase(packetType)) + .findFirst() + .orElse(defaultHandler); + } +} diff --git a/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java b/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java new file mode 100644 index 000000000..22edb13bd --- /dev/null +++ b/amie-decoder/src/main/java/org/apache/custos/amie/worker/ProcessingEventWorker.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.custos.amie.worker; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.custos.amie.handler.PacketRouter; +import org.apache.custos.amie.model.PacketEntity; +import org.apache.custos.amie.model.PacketStatus; +import org.apache.custos.amie.model.ProcessingErrorEntity; +import org.apache.custos.amie.model.ProcessingEventEntity; +import org.apache.custos.amie.model.ProcessingEventType; +import org.apache.custos.amie.model.ProcessingStatus; +import org.apache.custos.amie.repo.PacketRepository; +import org.apache.custos.amie.repo.ProcessingErrorRepository; +import org.apache.custos.amie.repo.ProcessingEventRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.time.Instant; + +/** + * A scheduled worker that fetches for new processing events and executes them. + * State of an event (NEW -> RUNNING -> SUCCEEDED/FAILED) + */ +@Component +public class ProcessingEventWorker { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessingEventWorker.class); + private static final int MAX_ATTEMPTS = 5; + + private final ProcessingEventRepository eventRepo; + private final PacketRepository packetRepo; + private final ProcessingErrorRepository errorRepo; + private final PacketRouter router; + private final ObjectMapper objectMapper = new ObjectMapper(); + + public ProcessingEventWorker(ProcessingEventRepository eventRepo, PacketRepository packetRepo, + ProcessingErrorRepository errorRepo, PacketRouter router) { + this.eventRepo = eventRepo; + this.packetRepo = packetRepo; + this.errorRepo = errorRepo; + this.router = router; + } + + /** + * Runs on a fixed delay, checks for NEW events, and processes them one by one. + */ + @Scheduled(fixedDelayString = "${app.amie.scheduler.worker-delay}") + @Transactional + public void processPendingEvents() { + try (var eventStream = eventRepo.findTop50ByStatusOrderByCreatedAtAsc(ProcessingStatus.NEW)) { + eventStream.forEach(this::executeEvent); + } + } + + private void executeEvent(ProcessingEventEntity event) { + PacketEntity packet = event.getPacket(); + LOGGER.info("Processing event [{}] for packet amie_id [{}].", event.getType(), packet.getAmieId()); + + event.setStatus(ProcessingStatus.RUNNING); + event.setStartedAt(Instant.now()); + event.setAttempts(event.getAttempts() + 1); + eventRepo.save(event); + + try { + var packetJson = objectMapper.readTree(packet.getRawJson()); + router.route(packetJson, packet); + + handleSuccess(event, packet); + + } catch (Exception e) { + LOGGER.error("Event processing failed for packet amie_id [{}]. Attempt {} of {}.", packet.getAmieId(), event.getAttempts(), MAX_ATTEMPTS, e); + handleFailure(event, packet, e); + } + } + + private void handleSuccess(ProcessingEventEntity event, PacketEntity packet) { + event.setStatus(ProcessingStatus.SUCCEEDED); + event.setFinishedAt(Instant.now()); + eventRepo.save(event); + + if (event.getType() == ProcessingEventType.DECODE_PACKET) { + packet.setStatus(PacketStatus.DECODED); + packet.setDecodedAt(Instant.now()); + packetRepo.save(packet); + } + + LOGGER.info("Successfully processed event [{}] for packet amie_id [{}].", event.getType(), packet.getAmieId()); + } + + private void handleFailure(ProcessingEventEntity event, PacketEntity packet, Exception e) { + // Check if the event should be retried or marked as failed + boolean isRetryable = event.getAttempts() < MAX_ATTEMPTS; + ProcessingStatus newStatus = isRetryable ? ProcessingStatus.NEW : ProcessingStatus.FAILED; + + event.setStatus(newStatus); + event.setLastError(e.getMessage()); + event.setFinishedAt(Instant.now()); + eventRepo.save(event); + + packet.setStatus(PacketStatus.FAILED); + packet.setLastError(e.getMessage()); + packetRepo.save(packet); + + ProcessingErrorEntity error = new ProcessingErrorEntity(); + error.setPacket(packet); + error.setEvent(event); + error.setSummary(e.getClass().getSimpleName() + ": " + e.getMessage()); + error.setDetail(getStackTraceAsString(e)); + errorRepo.save(error); + } + + private String getStackTraceAsString(Exception e) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + return sw.toString(); + } +}
