This is an automated email from the ASF dual-hosted git repository.
ilgrosso pushed a commit to branch 4_0_X
in repository https://gitbox.apache.org/repos/asf/syncope.git
The following commit(s) were added to refs/heads/4_0_X by this push:
new 3a6cc0dc77 Improving Notification job management
3a6cc0dc77 is described below
commit 3a6cc0dc7733eb8a927e12c46ec5ac0f368a5153
Author: Francesco Chicchiriccò <[email protected]>
AuthorDate: Mon May 25 15:30:49 2026 +0200
Improving Notification job management
---
.../syncope/core/persistence/api/dao/TaskDAO.java | 4 +-
.../core/persistence/jpa/dao/JPATaskDAO.java | 126 ++++++++++++---------
.../core/persistence/jpa/inner/TaskTest.java | 4 +-
.../core/persistence/neo4j/dao/Neo4jTaskDAO.java | 102 ++++++++++-------
.../core/persistence/neo4j/inner/TaskTest.java | 4 +-
.../api/notification/NotificationManager.java | 3 +-
.../AbstractNotificationJobDelegate.java | 78 ++++++++-----
.../notification/DefaultNotificationManager.java | 10 +-
8 files changed, 203 insertions(+), 128 deletions(-)
diff --git
a/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/TaskDAO.java
b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/TaskDAO.java
index ce496456f4..f0d3c466d5 100644
---
a/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/TaskDAO.java
+++
b/core/persistence-api/src/main/java/org/apache/syncope/core/persistence/api/dao/TaskDAO.java
@@ -54,7 +54,9 @@ public interface TaskDAO extends DAO<Task<?>> {
List<MacroTask> findByRealm(Realm realm);
- <T extends Task<T>> List<T> findToExec(TaskType type);
+ long countToExec(TaskType type);
+
+ <T extends Task<T>> List<T> findToExec(TaskType type, Pageable pageable);
<T extends Task<T>> List<T> findAll(TaskType type);
diff --git
a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPATaskDAO.java
b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPATaskDAO.java
index 8007681f5a..8bb3c76669 100644
---
a/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPATaskDAO.java
+++
b/core/persistence-jpa/src/main/java/org/apache/syncope/core/persistence/jpa/dao/JPATaskDAO.java
@@ -229,8 +229,72 @@ public class JPATaskDAO implements TaskDAO {
}
@Override
+ public long countToExec(final TaskType type) {
+ StringBuilder queryString = buildFindAllQuery(type).append("AND ");
+
+ if (type == TaskType.NOTIFICATION) {
+ queryString.append("t.executed = false ");
+ } else {
+ queryString.append("t.executions IS EMPTY ");
+ }
+
+ Query query = entityManager.createQuery(Strings.CS.replaceOnce(
+ queryString.toString(),
+ "SELECT t FROM ",
+ "SELECT COUNT(t) FROM "));
+
+ return ((Number) query.getSingleResult()).longValue();
+ }
+
+ protected String toOrderByStatement(
+ final Class<? extends Task<?>> beanClass,
+ final Stream<Sort.Order> orderByClauses,
+ final String prefix) {
+
+ StringBuilder subStatement = new StringBuilder();
+ orderByClauses.forEach(clause -> {
+ String field = clause.getProperty().trim();
+ switch (field) {
+ case "latestExecStatus":
+ field = "status";
+ break;
+
+ case "start":
+ field = "startDate";
+ break;
+
+ case "end":
+ field = "endDate";
+ break;
+
+ default:
+ Field beanField = ReflectionUtils.findField(beanClass,
field);
+ if (beanField != null
+ && (beanField.getAnnotation(ManyToOne.class) !=
null
+ || beanField.getAnnotation(OneToMany.class) != null
+ || beanField.getAnnotation(OneToOne.class) !=
null)) {
+
+ field += "_id";
+ }
+ }
+
+ subStatement.append(prefix).append(field).append('
').append(clause.getDirection().name()).append(',');
+ });
+
+ StringBuilder statement = new StringBuilder(" ORDER BY ");
+ if (subStatement.length() == 0) {
+ statement.append(prefix).append("id DESC");
+ } else {
+ subStatement.deleteCharAt(subStatement.length() - 1);
+ statement.append(subStatement);
+ }
+
+ return statement.toString();
+ }
+
@SuppressWarnings("unchecked")
- public <T extends Task<T>> List<T> findToExec(final TaskType type) {
+ @Override
+ public <T extends Task<T>> List<T> findToExec(final TaskType type, final
Pageable pageable) {
StringBuilder queryString = buildFindAllQuery(type).append("AND ");
if (type == TaskType.NOTIFICATION) {
@@ -238,9 +302,17 @@ public class JPATaskDAO implements TaskDAO {
} else {
queryString.append("t.executions IS EMPTY ");
}
- queryString.append("ORDER BY t.id DESC");
+
+ queryString.append(toOrderByStatement(
+ taskUtilsFactory.getInstance(type).getTaskEntity(),
pageable.getSort().stream(), "t."));
Query query = entityManager.createQuery(queryString.toString());
+
+ if (pageable.isPaged()) {
+ query.setFirstResult(pageable.getPageSize() *
pageable.getPageNumber());
+ query.setMaxResults(pageable.getPageSize());
+ }
+
return query.getResultList();
}
@@ -350,54 +422,6 @@ public class JPATaskDAO implements TaskDAO {
return queryString;
}
- protected String toOrderByStatement(
- final Class<? extends Task<?>> beanClass,
- final Stream<Sort.Order> orderByClauses) {
-
- StringBuilder statement = new StringBuilder();
-
- statement.append(" ORDER BY ");
-
- StringBuilder subStatement = new StringBuilder();
- orderByClauses.forEach(clause -> {
- String field = clause.getProperty().trim();
- switch (field) {
- case "latestExecStatus":
- field = "status";
- break;
-
- case "start":
- field = "startDate";
- break;
-
- case "end":
- field = "endDate";
- break;
-
- default:
- Field beanField = ReflectionUtils.findField(beanClass,
field);
- if (beanField != null
- && (beanField.getAnnotation(ManyToOne.class) !=
null
- || beanField.getAnnotation(OneToMany.class) != null
- || beanField.getAnnotation(OneToOne.class) !=
null)) {
-
- field += "_id";
- }
- }
-
- subStatement.append(field).append('
').append(clause.getDirection().name()).append(',');
- });
-
- if (subStatement.length() == 0) {
- statement.append("id DESC");
- } else {
- subStatement.deleteCharAt(subStatement.length() - 1);
- statement.append(subStatement);
- }
-
- return statement.toString();
- }
-
@SuppressWarnings("unchecked")
@Override
public <T extends Task<T>> List<T> findAll(
@@ -446,7 +470,7 @@ public class JPATaskDAO implements TaskDAO {
}
queryString.append(toOrderByStatement(
- taskUtilsFactory.getInstance(type).getTaskEntity(),
pageable.getSort().stream()));
+ taskUtilsFactory.getInstance(type).getTaskEntity(),
pageable.getSort().stream(), ""));
Query query = entityManager.createNativeQuery(queryString.toString());
diff --git
a/core/persistence-jpa/src/test/java/org/apache/syncope/core/persistence/jpa/inner/TaskTest.java
b/core/persistence-jpa/src/test/java/org/apache/syncope/core/persistence/jpa/inner/TaskTest.java
index 2da9bbbf96..37b83ca351 100644
---
a/core/persistence-jpa/src/test/java/org/apache/syncope/core/persistence/jpa/inner/TaskTest.java
+++
b/core/persistence-jpa/src/test/java/org/apache/syncope/core/persistence/jpa/inner/TaskTest.java
@@ -90,7 +90,9 @@ public class TaskTest extends AbstractTest {
@Test
public void findWithoutExecs() {
- List<PropagationTask> tasks = taskDAO.findToExec(TaskType.PROPAGATION);
+ assertEquals(3, taskDAO.countToExec(TaskType.PROPAGATION));
+
+ List<PropagationTask> tasks = taskDAO.findToExec(TaskType.PROPAGATION,
Pageable.unpaged());
assertNotNull(tasks);
assertEquals(3, tasks.size());
}
diff --git
a/core/persistence-neo4j/src/main/java/org/apache/syncope/core/persistence/neo4j/dao/Neo4jTaskDAO.java
b/core/persistence-neo4j/src/main/java/org/apache/syncope/core/persistence/neo4j/dao/Neo4jTaskDAO.java
index 0bbea61c08..75576dd5c5 100644
---
a/core/persistence-neo4j/src/main/java/org/apache/syncope/core/persistence/neo4j/dao/Neo4jTaskDAO.java
+++
b/core/persistence-neo4j/src/main/java/org/apache/syncope/core/persistence/neo4j/dao/Neo4jTaskDAO.java
@@ -230,8 +230,58 @@ public class Neo4jTaskDAO extends AbstractDAO implements
TaskDAO {
}
@Override
+ public long countToExec(final TaskType type) {
+ TaskUtils taskUtils = taskUtilsFactory.getInstance(type);
+ StringBuilder query = new StringBuilder("MATCH (n:" +
taskUtils.getTaskStorage() + ") WHERE ");
+
+ if (type == TaskType.NOTIFICATION) {
+ query.append("n.executed = false ");
+ } else {
+ query.append("(n)-[:").append(execRelationship(type)).append("]-()
");
+ }
+
+ query.append(" RETURN COUNT(DISTINCT n)");
+
+ return neo4jTemplate.count(query.toString());
+ }
+
+ protected String toOrderByStatement(final Stream<Sort.Order>
orderByClauses) {
+ StringBuilder subStatement = new StringBuilder();
+ orderByClauses.forEach(clause -> {
+ String field = clause.getProperty().trim();
+ switch (field) {
+ case "latestExecStatus":
+ field = "status";
+ break;
+
+ case "start":
+ field = "startDate";
+ break;
+
+ case "end":
+ field = "endDate";
+ break;
+
+ default:
+ }
+
+ subStatement.append("p.").append(field).append('
').append(clause.getDirection().name()).append(',');
+ });
+
+ StringBuilder statement = new StringBuilder(" ORDER BY ");
+ if (subStatement.length() == 0) {
+ statement.append("n.id DESC");
+ } else {
+ subStatement.deleteCharAt(subStatement.length() - 1);
+ statement.append(subStatement);
+ }
+
+ return statement.toString();
+ }
+
@SuppressWarnings("unchecked")
- public <T extends Task<T>> List<T> findToExec(final TaskType type) {
+ @Override
+ public <T extends Task<T>> List<T> findToExec(final TaskType type, final
Pageable pageable) {
TaskUtils taskUtils = taskUtilsFactory.getInstance(type);
StringBuilder query = new StringBuilder("MATCH (n:" +
taskUtils.getTaskStorage() + ") WHERE ");
@@ -240,7 +290,13 @@ public class Neo4jTaskDAO extends AbstractDAO implements
TaskDAO {
} else {
query.append("(n)-[:").append(execRelationship(type)).append("]-()
");
}
- query.append("RETURN n.id ORDER BY n.id DESC");
+
+ query.append("RETURN n.id
").append(toOrderByStatement(pageable.getSort().get()));
+
+ if (pageable.isPaged()) {
+ query.append(" SKIP ").append(pageable.getPageSize() *
pageable.getPageNumber()).
+ append(" LIMIT ").append(pageable.getPageSize());
+ }
return toList(neo4jClient.query(query.toString()).fetch().all(),
"n.id",
@@ -382,46 +438,6 @@ public class Neo4jTaskDAO extends AbstractDAO implements
TaskDAO {
return neo4jTemplate.count(query.toString(), parameters);
}
- protected String toOrderByStatement(
- final Class<? extends Task<?>> beanClass,
- final Stream<Sort.Order> orderByClauses) {
-
- StringBuilder statement = new StringBuilder();
-
- statement.append("ORDER BY ");
-
- StringBuilder subStatement = new StringBuilder();
- orderByClauses.forEach(clause -> {
- String field = clause.getProperty().trim();
- switch (field) {
- case "latestExecStatus":
- field = "status";
- break;
-
- case "start":
- field = "startDate";
- break;
-
- case "end":
- field = "endDate";
- break;
-
- default:
- }
-
- subStatement.append("p.").append(field).append('
').append(clause.getDirection().name()).append(',');
- });
-
- if (subStatement.length() == 0) {
- statement.append("n.id DESC");
- } else {
- subStatement.deleteCharAt(subStatement.length() - 1);
- statement.append(subStatement);
- }
-
- return statement.toString();
- }
-
@SuppressWarnings("unchecked")
@Override
public <T extends Task<T>> List<T> findAll(
@@ -447,7 +463,7 @@ public class Neo4jTaskDAO extends AbstractDAO implements
TaskDAO {
query.append(" WITH n ");
- query.append(toOrderByStatement(taskUtils.getTaskEntity(),
pageable.getSort().get()));
+ query.append(toOrderByStatement(pageable.getSort().get()));
if (pageable.isPaged()) {
query.append(" SKIP ").append(pageable.getPageSize() *
pageable.getPageNumber()).
diff --git
a/core/persistence-neo4j/src/test/java/org/apache/syncope/core/persistence/neo4j/inner/TaskTest.java
b/core/persistence-neo4j/src/test/java/org/apache/syncope/core/persistence/neo4j/inner/TaskTest.java
index fb1d5bb677..8659f57cc2 100644
---
a/core/persistence-neo4j/src/test/java/org/apache/syncope/core/persistence/neo4j/inner/TaskTest.java
+++
b/core/persistence-neo4j/src/test/java/org/apache/syncope/core/persistence/neo4j/inner/TaskTest.java
@@ -90,7 +90,9 @@ public class TaskTest extends AbstractTest {
@Test
public void findWithoutExecs() {
- List<PropagationTask> tasks = taskDAO.findToExec(TaskType.PROPAGATION);
+ assertEquals(3, taskDAO.countToExec(TaskType.PROPAGATION));
+
+ List<PropagationTask> tasks = taskDAO.findToExec(TaskType.PROPAGATION,
Pageable.unpaged());
assertNotNull(tasks);
assertEquals(3, tasks.size());
}
diff --git
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/notification/NotificationManager.java
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/notification/NotificationManager.java
index 36cd3c7ce6..2b1a3cc5e0 100644
---
a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/notification/NotificationManager.java
+++
b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/notification/NotificationManager.java
@@ -103,8 +103,9 @@ public interface NotificationManager {
/**
* Store execution of a NotificationTask.
*
+ * @param taskKey task to be updated
* @param execution task execution.
* @return merged task execution.
*/
- TaskExec<NotificationTask> storeExec(TaskExec<NotificationTask> execution);
+ TaskExec<NotificationTask> storeExec(String taskKey,
TaskExec<NotificationTask> execution);
}
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/AbstractNotificationJobDelegate.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/AbstractNotificationJobDelegate.java
index f572dab181..e1cfa25f21 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/AbstractNotificationJobDelegate.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/notification/AbstractNotificationJobDelegate.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.syncope.common.lib.types.OpEvent;
import org.apache.syncope.common.lib.types.TaskType;
import org.apache.syncope.common.lib.types.TraceLevel;
+import org.apache.syncope.core.persistence.api.dao.AnyDAO;
import org.apache.syncope.core.persistence.api.dao.TaskDAO;
import org.apache.syncope.core.persistence.api.entity.task.NotificationTask;
import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
@@ -32,15 +33,18 @@ import
org.apache.syncope.core.persistence.api.utils.ExceptionUtils2;
import org.apache.syncope.core.provisioning.api.AuditManager;
import org.apache.syncope.core.provisioning.api.event.JobStatusEvent;
import org.apache.syncope.core.provisioning.api.job.JobManager;
+import org.apache.syncope.core.provisioning.api.job.StoppableJobDelegate;
import
org.apache.syncope.core.provisioning.api.notification.NotificationJobDelegate;
import
org.apache.syncope.core.provisioning.api.notification.NotificationManager;
import org.apache.syncope.core.spring.security.AuthContextUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Sort;
import org.springframework.transaction.annotation.Transactional;
-public abstract class AbstractNotificationJobDelegate implements
NotificationJobDelegate {
+public abstract class AbstractNotificationJobDelegate implements
NotificationJobDelegate, StoppableJobDelegate {
protected static final Logger LOG =
LoggerFactory.getLogger(NotificationJobDelegate.class);
@@ -54,6 +58,8 @@ public abstract class AbstractNotificationJobDelegate
implements NotificationJob
protected final ApplicationEventPublisher publisher;
+ protected boolean stopRequested = false;
+
protected AbstractNotificationJobDelegate(
final TaskDAO taskDAO,
final TaskUtilsFactory taskUtilsFactory,
@@ -68,6 +74,11 @@ public abstract class AbstractNotificationJobDelegate
implements NotificationJob
this.publisher = publisher;
}
+ @Override
+ public void stop() {
+ stopRequested = true;
+ }
+
protected void setStatus(final String status) {
publisher.publishEvent(new JobStatusEvent(
this, AuthContextUtils.getDomain(),
JobManager.NOTIFICATION_JOB, status));
@@ -80,7 +91,6 @@ public abstract class AbstractNotificationJobDelegate
implements NotificationJob
@Override
public TaskExec<NotificationTask> executeSingle(final NotificationTask
task, final String executor) {
TaskExec<NotificationTask> execution =
taskUtilsFactory.getInstance(TaskType.NOTIFICATION).newTaskExec();
- execution.setTask(task);
execution.setStart(OffsetDateTime.now());
execution.setExecutor(executor);
boolean retryPossible = true;
@@ -109,9 +119,9 @@ public abstract class AbstractNotificationJobDelegate
implements NotificationJob
setStatus("Sending notifications to " + task.getRecipients());
- for (String to : task.getRecipients()) {
+ for (String recipient : task.getRecipients()) {
try {
- notify(to, task, execution);
+ notify(recipient, task, execution);
notificationManager.createTasks(
AuthContextUtils.getWho(),
@@ -123,9 +133,9 @@ public abstract class AbstractNotificationJobDelegate
implements NotificationJob
null,
null,
task,
- "Successfully sent notification to " + to);
+ "Successfully sent notification to " + recipient);
} catch (Exception e) {
- LOG.error("Could not send out notification", e);
+ LOG.error("Could not send out notification to {}",
recipient, e);
execution.setStatus(NotificationJob.Status.NOT_SENT.name());
if (task.getTraceLevel().ordinal() >=
TraceLevel.FAILURES.ordinal()) {
@@ -142,22 +152,22 @@ public abstract class AbstractNotificationJobDelegate
implements NotificationJob
null,
null,
task,
- "Could not send notification to " + to, e);
+ "Could not send notification to " + recipient, e);
}
execution.setEnd(OffsetDateTime.now());
}
}
- if (hasToBeRegistered(execution)) {
- execution = notificationManager.storeExec(execution);
+ if (hasToBeRegistered(task, execution)) {
+ execution = notificationManager.storeExec(task.getKey(),
execution);
if (retryPossible
- && (NotificationJob.Status.valueOf(execution.getStatus())
== NotificationJob.Status.NOT_SENT)) {
+ && NotificationJob.Status.valueOf(execution.getStatus())
== NotificationJob.Status.NOT_SENT) {
- handleRetries(execution);
+ handleRetries(task, execution);
}
} else {
- notificationManager.setTaskExecuted(execution.getTask().getKey(),
true);
+ notificationManager.setTaskExecuted(task.getKey(), true);
}
return execution;
@@ -166,20 +176,32 @@ public abstract class AbstractNotificationJobDelegate
implements NotificationJob
@Transactional
@Override
public void execute(final String executor) {
- List<NotificationTask> tasks =
taskDAO.findToExec(TaskType.NOTIFICATION);
+ stopRequested = false;
+
+ long count = taskDAO.countToExec(TaskType.NOTIFICATION);
+ long pages = (count / AnyDAO.DEFAULT_PAGE_SIZE) + 1;
+ for (int page = 0; page < pages && !stopRequested; page++) {
+ List<NotificationTask> tasks = taskDAO.findToExec(
+ TaskType.NOTIFICATION,
+ PageRequest.of(page, AnyDAO.DEFAULT_PAGE_SIZE,
Sort.Direction.ASC, "id"));
- setStatus("Sending out " + tasks.size() + " notifications");
+ setStatus("Sending out " + tasks.size() + " notifications");
- for (int i = 0; i < tasks.size(); i++) {
- LOG.debug("Found notification task {} to be executed:
starting...", tasks.get(i));
- executeSingle(tasks.get(i), executor);
- LOG.debug("Notification task {} executed", tasks.get(i));
+ for (int i = 0; i < tasks.size() && !stopRequested; i++) {
+ LOG.debug("Found notification task {} to be executed:
starting...", tasks.get(i));
+ executeSingle(tasks.get(i), executor);
+ LOG.debug("Notification task {} executed", tasks.get(i));
+ }
}
- }
- protected static boolean hasToBeRegistered(final
TaskExec<NotificationTask> execution) {
- NotificationTask task = execution.getTask();
+ if (stopRequested) {
+ LOG.debug("Notification job interrupted");
+ }
+
+ setStatus(null);
+ }
+ protected boolean hasToBeRegistered(final NotificationTask task, final
TaskExec<NotificationTask> execution) {
// True if either failed and failures have to be registered, or if ALL
// has to be registered.
return (NotificationJob.Status.valueOf(execution.getStatus()) ==
NotificationJob.Status.NOT_SENT
@@ -187,18 +209,18 @@ public abstract class AbstractNotificationJobDelegate
implements NotificationJob
|| task.getTraceLevel() == TraceLevel.ALL;
}
- protected void handleRetries(final TaskExec<NotificationTask> execution) {
+ protected void handleRetries(final NotificationTask task, final
TaskExec<NotificationTask> execution) {
if (notificationManager.getMaxRetries() <= 0) {
return;
}
long failedExecutionsCount =
notificationManager.countExecutionsWithStatus(
- execution.getTask().getKey(),
NotificationJob.Status.NOT_SENT.name());
+ task.getKey(), NotificationJob.Status.NOT_SENT.name());
if (failedExecutionsCount <= notificationManager.getMaxRetries()) {
LOG.debug("Execution of notification task {} will be retried
[{}/{}]",
- execution.getTask(), failedExecutionsCount,
notificationManager.getMaxRetries());
- notificationManager.setTaskExecuted(execution.getTask().getKey(),
false);
+ task, failedExecutionsCount,
notificationManager.getMaxRetries());
+ notificationManager.setTaskExecuted(task.getKey(), false);
auditManager.audit(
AuthContextUtils.getDomain(),
@@ -211,9 +233,9 @@ public abstract class AbstractNotificationJobDelegate
implements NotificationJob
null,
null,
execution,
- "Notification task " + execution.getTask().getKey() + "
will be retried");
+ "Notification task " + task.getKey() + " will be retried");
} else {
- LOG.error("Maximum number of retries reached for task {} - giving
up", execution.getTask());
+ LOG.error("Maximum number of retries reached for task {} - giving
up", task);
auditManager.audit(
AuthContextUtils.getDomain(),
@@ -226,7 +248,7 @@ public abstract class AbstractNotificationJobDelegate
implements NotificationJob
null,
null,
execution,
- "Giving up retries on notification task " +
execution.getTask().getKey());
+ "Giving up retries on notification task " + task.getKey());
}
}
}
diff --git
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/notification/DefaultNotificationManager.java
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/notification/DefaultNotificationManager.java
index 60027566c8..cb38dacc61 100644
---
a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/notification/DefaultNotificationManager.java
+++
b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/notification/DefaultNotificationManager.java
@@ -75,6 +75,7 @@ import
org.apache.syncope.core.spring.implementation.ImplementationManager;
import org.apache.syncope.core.spring.security.AuthContextUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
@Transactional(rollbackFor = { Throwable.class })
@@ -420,17 +421,22 @@ public class DefaultNotificationManager implements
NotificationManager {
return email;
}
+ @Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
- public TaskExec<NotificationTask> storeExec(final
TaskExec<NotificationTask> execution) {
- NotificationTask task = taskDAO.findById(TaskType.NOTIFICATION,
execution.getTask().getKey()).
+ public TaskExec<NotificationTask> storeExec(final String taskKey, final
TaskExec<NotificationTask> execution) {
+ NotificationTask task = taskDAO.findById(TaskType.NOTIFICATION,
taskKey).
map(NotificationTask.class::cast).
orElseThrow(() -> new NotFoundException("NotificationTask " +
execution.getTask().getKey()));
+ execution.setTask(task);
task.add(execution);
task.setExecuted(true);
taskDAO.save(task);
+
+ execution.setTask(null);
return execution;
}
+ @Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void setTaskExecuted(final String taskKey, final boolean executed) {
NotificationTask task = taskDAO.findById(TaskType.NOTIFICATION,
taskKey).