reluxa commented on code in PR #3693: URL: https://github.com/apache/fineract/pull/3693#discussion_r1461756313
########## fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/api/InternalExternalEventsApiResource.java: ########## @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.infrastructure.event.external.api; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.fineract.infrastructure.core.serialization.DefaultToApiJsonSerializer; +import org.apache.fineract.infrastructure.event.external.service.InternalExternalEventService; +import org.apache.fineract.infrastructure.event.external.service.validation.ExternalEventDTO; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Component; + +@Profile("test") +@Component +@Path("/v1/internal/externalevents") +@RequiredArgsConstructor +@Slf4j +public class InternalExternalEventsApiResource implements InitializingBean { + + private final InternalExternalEventService internalExternalEventService; + private final DefaultToApiJsonSerializer<List<ExternalEventDTO>> jsonSerializer; + + @Override + @SuppressFBWarnings("SLF4J_SIGN_ONLY_FORMAT") + public void afterPropertiesSet() throws Exception { + log.warn("------------------------------------------------------------"); + log.warn(" "); + log.warn("DO NOT USE THIS IN PRODUCTION!"); + log.warn("Internal client services mode is enabled"); + log.warn("DO NOT USE THIS IN PRODUCTION!"); + log.warn(" "); + log.warn("------------------------------------------------------------"); + } + + @GET + @Consumes({ MediaType.APPLICATION_JSON }) + @Produces({ MediaType.APPLICATION_JSON }) + @Operation(summary = "Get all external events matching the provided query parameters", description = "") + @ApiResponses({ + @ApiResponse(responseCode = "200", description = "Get all external events with the optional filter attributes", content = @Content(schema = @Schema(implementation = GetExternalEventListResponse.class))) }) + public String getAllExternalEvents(@QueryParam("idempotencyKey") @Parameter(description = "idempotencyKey") final String idempotencyKey, + @QueryParam("type") @Parameter(description = "type") final String type, + @QueryParam("category") @Parameter(description = "category") final String category, + @QueryParam("aggregateRootId") @Parameter(description = "aggregateRootId") final Long aggregateRootId) { + log.debug("getAllExternalEvents called with params idempotencyKey:{}, type:{}, category:{}, aggregateRootId:{} ", idempotencyKey, + type, category, aggregateRootId); + List<ExternalEventDTO> allExternalEvents = internalExternalEventService.getAllExternalEvents(idempotencyKey, type, category, + aggregateRootId); + return jsonSerializer.serialize(allExternalEvents); + } + + @DELETE + @Operation(summary = "Deletes all external events stored in the database", description = "") + @ApiResponses({ @ApiResponse(responseCode = "204", description = "All external events deleted successfully") }) + public void deleteAllExternalEvents() { + log.debug("deleteAllExternalEvents called"); + internalExternalEventService.deleteAllExternalEvents(); + } + + @Schema(description = "GetExternalEventListResponse") + public static class GetExternalEventListResponse { + + private List<GetExternalEventResponse> externalEvents; + } + + @Schema(description = "GetExternalEventResponse") + public static class GetExternalEventResponse { + + @Schema(example = "1") + private int eventId; + @Schema(example = "LoanAccountCustomSnapshotBusinessEvent") + private String type; + @Schema(example = "Loan") + private String category; + @Schema(example = "[2024,1,17,11,20,37]") Review Comment: fixed ########## fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/service/InternalExternalEventService.java: ########## @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.fineract.infrastructure.event.external.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.fineract.avro.BulkMessageItemV1; +import org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository; +import org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEvent; +import org.apache.fineract.infrastructure.event.external.service.validation.ExternalEventDTO; +import org.springframework.context.annotation.Profile; +import org.springframework.data.jpa.domain.Specification; +import org.springframework.stereotype.Service; + +@Service +@Profile("test") +@Slf4j +@AllArgsConstructor +public class InternalExternalEventService { + + private final ExternalEventRepository externalEventRepository; + + public void deleteAllExternalEvents() { + externalEventRepository.deleteAll(); + } + + public List<ExternalEventDTO> getAllExternalEvents(String idempotencyKey, String type, String category, Long aggregateRootId) { + List<Specification<ExternalEvent>> specifications = new ArrayList<>(); + + if (StringUtils.isNotEmpty(idempotencyKey)) { + specifications.add(hasIdempotencyKey(idempotencyKey)); + } + + if (StringUtils.isNotEmpty(type)) { + specifications.add(hasType(type)); + } + + if (StringUtils.isNotEmpty(category)) { + specifications.add(hasCategory(category)); + } + + if (aggregateRootId != null) { + specifications.add(hasAggregateRootId(aggregateRootId)); + } + + Specification<ExternalEvent> reducedSpecification = specifications.stream().reduce(Specification::and) + .orElse((Specification<ExternalEvent>) (root, query, criteriaBuilder) -> null); + List<ExternalEvent> externalEvents = externalEventRepository.findAll(reducedSpecification); + + try { + return convertToReadableFormat(externalEvents); + } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | IllegalAccessException + | JsonProcessingException e) { + throw new RuntimeException("Error while converting external events to readable format", e); + } + } + + private Specification<ExternalEvent> hasIdempotencyKey(String idempotencyKey) { + return (root, query, cb) -> cb.equal(root.get("idempotencyKey"), idempotencyKey); + } + + private Specification<ExternalEvent> hasType(String type) { + return (root, query, cb) -> cb.equal(root.get("type"), type); + } + + private Specification<ExternalEvent> hasCategory(String category) { + return (root, query, cb) -> cb.equal(root.get("category"), category); + } + + private Specification<ExternalEvent> hasAggregateRootId(Long aggregateRootId) { + return (root, query, cb) -> cb.equal(root.get("aggregateRootId"), aggregateRootId); + } + + private List<ExternalEventDTO> convertToReadableFormat(List<ExternalEvent> externalEvents) throws ClassNotFoundException, + NoSuchMethodException, InvocationTargetException, IllegalAccessException, JsonProcessingException { + List<ExternalEventDTO> eventMessages = new ArrayList<>(); + for (ExternalEvent externalEvent : externalEvents) { + Class<?> payLoadClass = Class.forName(externalEvent.getSchema()); + ByteBuffer byteBuffer = ByteBuffer.wrap(externalEvent.getData()); + Method method = payLoadClass.getMethod("fromByteBuffer", ByteBuffer.class); + Object payLoad = method.invoke(null, byteBuffer); + if (externalEvent.getType().equalsIgnoreCase("BulkBusinessEvent")) { + Method methodToGetDatas = payLoad.getClass().getMethod("getDatas", (Class<?>) null); + List<BulkMessageItemV1> bulkMessages = (List<BulkMessageItemV1>) methodToGetDatas.invoke(payLoad); + StringBuilder bulkMessagePayload = new StringBuilder(); + for (BulkMessageItemV1 bulkMessage : bulkMessages) { + ExternalEventDTO bulkMessageData = retrieveBulkMessage(bulkMessage, externalEvent); + bulkMessagePayload.append(bulkMessageData); + bulkMessagePayload.append(System.lineSeparator()); + } + eventMessages.add(new ExternalEventDTO(externalEvent.getId(), externalEvent.getType(), externalEvent.getCategory(), + externalEvent.getCreatedAt().toLocalDateTime(), toJsonMap(bulkMessagePayload.toString()), + externalEvent.getBusinessDate(), externalEvent.getSchema(), externalEvent.getAggregateRootId())); + + } else { + eventMessages.add(new ExternalEventDTO(externalEvent.getId(), externalEvent.getType(), externalEvent.getCategory(), + externalEvent.getCreatedAt().toLocalDateTime(), toJsonMap(payLoad.toString()), externalEvent.getBusinessDate(), + externalEvent.getSchema(), externalEvent.getAggregateRootId())); + } + } + + return eventMessages; + } + + private ExternalEventDTO retrieveBulkMessage(BulkMessageItemV1 messageItem, ExternalEvent externalEvent) throws ClassNotFoundException, + InvocationTargetException, IllegalAccessException, NoSuchMethodException, JsonProcessingException { + Class<?> messageBulkMessagePayLoad = Class.forName(messageItem.getDataschema()); + Method methodForPayLoad = messageBulkMessagePayLoad.getMethod("fromByteBuffer", ByteBuffer.class); + Object payLoadBulkItem = methodForPayLoad.invoke(null, messageItem.getData()); + return new ExternalEventDTO((long) messageItem.getId(), messageItem.getType(), messageItem.getCategory(), + externalEvent.getCreatedAt().toLocalDateTime(), toJsonMap(payLoadBulkItem.toString()), externalEvent.getBusinessDate(), Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
