This is an automated email from the ASF dual-hosted git repository.
adamsaghy pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/fineract.git
The following commit(s) were added to refs/heads/develop by this push:
new 0761c39d1 FINERACT-1971: Fix marking external events sent in parallel
0761c39d1 is described below
commit 0761c39d1690025396169aac04703671189c7453
Author: Adam Saghy <[email protected]>
AuthorDate: Tue Apr 16 16:28:39 2024 +0200
FINERACT-1971: Fix marking external events sent in parallel
---
.../event/external/jobs/SendAsynchronousEventsTasklet.java | 5 +++++
.../event/external/repository/ExternalEventRepository.java | 2 ++
2 files changed, 7 insertions(+)
diff --git
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
index 7277cfae6..91746f13b 100644
---
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
+++
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/jobs/SendAsynchronousEventsTasklet.java
@@ -34,7 +34,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.avro.MessageV1;
import
org.apache.fineract.infrastructure.configuration.domain.ConfigurationDomainService;
import org.apache.fineract.infrastructure.core.config.FineractProperties;
+import org.apache.fineract.infrastructure.core.domain.FineractContext;
import org.apache.fineract.infrastructure.core.service.DateUtils;
+import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import
org.apache.fineract.infrastructure.event.external.producer.ExternalEventProducer;
import
org.apache.fineract.infrastructure.event.external.repository.ExternalEventRepository;
import
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus;
@@ -104,11 +106,14 @@ public class SendAsynchronousEventsTasklet implements
Tasklet {
// Partitioning dataset to avoid exception: PreparedStatement can have
at most 65,535 parameters
final int partitionSize =
fineractProperties.getEvents().getExternal().getPartitionSize();
List<List<Long>> partitions = Lists.partition(eventIds, partitionSize);
+ FineractContext context = ThreadLocalContextUtil.getContext();
partitions.stream() //
.parallel() //
.forEach(partitionedEventIds -> {
measure(() -> {
+ ThreadLocalContextUtil.init(context);
repository.markEventsSent(partitionedEventIds, sentAt);
+ ThreadLocalContextUtil.reset();
}, timeTaken -> {
log.debug("Took {}ms to update {} events",
timeTaken.toMillis(), partitionedEventIds.size());
});
diff --git
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
index 9f7ccbddc..49f772b1b 100644
---
a/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
+++
b/fineract-core/src/main/java/org/apache/fineract/infrastructure/event/external/repository/ExternalEventRepository.java
@@ -18,6 +18,7 @@
*/
package org.apache.fineract.infrastructure.event.external.repository;
+import jakarta.transaction.Transactional;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.util.List;
@@ -40,6 +41,7 @@ public interface ExternalEventRepository extends
JpaRepository<ExternalEvent, Lo
void deleteOlderEventsWithSentStatus(@Param("status") ExternalEventStatus
status,
@Param("dateForPurgeCriteria") LocalDate dateForPurgeCriteria);
+ @Transactional
@Modifying
@Query("UPDATE ExternalEvent e SET e.status =
org.apache.fineract.infrastructure.event.external.repository.domain.ExternalEventStatus.SENT,
e.sentAt = :sentAt WHERE e.id IN :ids")
void markEventsSent(@Param("ids") List<Long> ids, @Param("sentAt")
OffsetDateTime sentAt);