This is an automated email from the ASF dual-hosted git repository. ilgrosso pushed a commit to branch 4_0_X in repository https://gitbox.apache.org/repos/asf/syncope.git
commit 69fa6b65b093b1f1eeba1c29dfe67f50e2191bcb Author: Francesco Chicchiriccò <[email protected]> AuthorDate: Tue May 13 10:49:59 2025 +0200 [SYNCOPE-1878] Refactoring PriorityPropagationTaskExecutor --- .../api/propagation/PropagationTaskCallable.java | 32 -------- .../DefaultPropagationTaskCallable.java | 88 ---------------------- .../PriorityPropagationTaskExecutor.java | 66 ++++++++++------ 3 files changed, 41 insertions(+), 145 deletions(-) diff --git a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskCallable.java b/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskCallable.java deleted file mode 100644 index c7a1877714..0000000000 --- a/core/provisioning-api/src/main/java/org/apache/syncope/core/provisioning/api/propagation/PropagationTaskCallable.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.syncope.core.provisioning.api.propagation; - -import java.util.concurrent.Callable; -import org.apache.syncope.core.persistence.api.entity.task.PropagationTask; -import org.apache.syncope.core.persistence.api.entity.task.TaskExec; - -public interface PropagationTaskCallable extends Callable<TaskExec<PropagationTask>> { - - void setTaskInfo(PropagationTaskInfo taskInfo); - - void setReporter(PropagationReporter reporter); - - void setExecutor(String executor); -} diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/DefaultPropagationTaskCallable.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/DefaultPropagationTaskCallable.java deleted file mode 100644 index 8f92e03e68..0000000000 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/DefaultPropagationTaskCallable.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.syncope.core.provisioning.java.propagation; - -import java.util.Collection; -import java.util.stream.Collectors; -import org.apache.syncope.core.persistence.api.entity.task.PropagationTask; -import org.apache.syncope.core.persistence.api.entity.task.TaskExec; -import org.apache.syncope.core.provisioning.api.propagation.PropagationReporter; -import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskCallable; -import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskExecutor; -import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskInfo; -import org.apache.syncope.core.spring.security.AuthContextUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.security.core.GrantedAuthority; -import org.springframework.security.core.context.SecurityContext; -import org.springframework.security.core.context.SecurityContextHolder; - -public class DefaultPropagationTaskCallable implements PropagationTaskCallable { - - protected static final Logger LOG = LoggerFactory.getLogger(PropagationTaskCallable.class); - - @Autowired - protected PropagationTaskExecutor taskExecutor; - - protected final String domain; - - protected final Collection<String> authorities; - - protected PropagationTaskInfo taskInfo; - - protected PropagationReporter reporter; - - protected String executor; - - public DefaultPropagationTaskCallable() { - SecurityContext ctx = SecurityContextHolder.getContext(); - domain = AuthContextUtils.getDomain(); - authorities = ctx.getAuthentication().getAuthorities().stream(). - map(GrantedAuthority::getAuthority).collect(Collectors.toSet()); - } - - @Override - public void setTaskInfo(final PropagationTaskInfo taskInfo) { - this.taskInfo = taskInfo; - } - - @Override - public void setReporter(final PropagationReporter reporter) { - this.reporter = reporter; - } - - @Override - public void setExecutor(final String executor) { - this.executor = executor; - } - - @Override - public TaskExec<PropagationTask> call() { - return AuthContextUtils.callAs(domain, executor, authorities, () -> { - LOG.debug("Execution started for {}", taskInfo); - - TaskExec<PropagationTask> execution = taskExecutor.execute(taskInfo, reporter, executor); - - LOG.debug("Execution completed for {}, {}", taskInfo, execution); - - return execution; - }); - } -} diff --git a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java index a95a20d09c..f4bfc86950 100644 --- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java +++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/propagation/PriorityPropagationTaskExecutor.java @@ -23,11 +23,13 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.apache.syncope.common.lib.types.ExecStatus; -import org.apache.syncope.core.persistence.api.ApplicationContextProvider; import org.apache.syncope.core.persistence.api.attrvalue.PlainAttrValidationManager; import org.apache.syncope.core.persistence.api.dao.ExternalResourceDAO; import org.apache.syncope.core.persistence.api.dao.PlainSchemaDAO; @@ -43,12 +45,14 @@ import org.apache.syncope.core.provisioning.api.data.TaskDataBinder; import org.apache.syncope.core.provisioning.api.notification.NotificationManager; import org.apache.syncope.core.provisioning.api.propagation.PropagationException; import org.apache.syncope.core.provisioning.api.propagation.PropagationReporter; -import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskCallable; import org.apache.syncope.core.provisioning.api.propagation.PropagationTaskInfo; import org.apache.syncope.core.provisioning.java.pushpull.OutboundMatcher; import org.apache.syncope.core.provisioning.java.utils.ConnObjectUtils; +import org.apache.syncope.core.spring.security.AuthContextUtils; import org.apache.syncope.core.spring.task.VirtualThreadPoolTaskExecutor; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.security.core.GrantedAuthority; +import org.springframework.security.core.context.SecurityContextHolder; /** * Sorts the tasks to be executed according to related @@ -59,28 +63,6 @@ import org.springframework.context.ApplicationEventPublisher; */ public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExecutor { - /** - * Creates new instances of {@link PropagationTaskCallable} for usage with - * {@link java.util.concurrent.CompletionService}. - * - * @param taskInfo to be executed - * @param reporter to report propagation execution status - * @param executor user that triggered the propagation execution - * @return new {@link PropagationTaskCallable} instance for usage with - * {@link java.util.concurrent.CompletionService} - */ - protected PropagationTaskCallable newPropagationTaskCallable( - final PropagationTaskInfo taskInfo, final PropagationReporter reporter, final String executor) { - - PropagationTaskCallable callable = ApplicationContextProvider.getBeanFactory(). - createBean(DefaultPropagationTaskCallable.class); - callable.setTaskInfo(taskInfo); - callable.setReporter(reporter); - callable.setExecutor(executor); - - return callable; - } - protected final VirtualThreadPoolTaskExecutor taskExecutor; public PriorityPropagationTaskExecutor( @@ -115,6 +97,40 @@ public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExec this.taskExecutor = taskExecutor; } + /** + * Creates new instances of {@link Callable} for usage with{@link java.util.concurrent.CompletionService}. + * + * @param taskInfo to be executed + * @param reporter to report propagation execution status + * @param executor user that triggered the propagation execution + * @return new {@link Callable} instance for usage with {@link java.util.concurrent.CompletionService} + */ + protected Callable<TaskExec<PropagationTask>> newPropagationTaskCallable( + final PropagationTaskInfo taskInfo, final PropagationReporter reporter, final String executor) { + + String domain = AuthContextUtils.getDomain(); + Set<String> authorities = SecurityContextHolder.getContext().getAuthentication().getAuthorities().stream(). + map(GrantedAuthority::getAuthority).collect(Collectors.toSet()); + + return () -> AuthContextUtils.callAs(domain, executor, authorities, () -> { + LOG.debug("Execution started for {}", taskInfo); + + TaskExec<PropagationTask> execution = this.execute(taskInfo, reporter, executor); + + LOG.debug("Execution completed for {} with results {}", taskInfo, execution); + + return execution; + }); + } + + protected boolean failed( + final PropagationTaskInfo taskInfpo, + final TaskExec<PropagationTask> exec, + final ExecStatus execStatus) { + + return execStatus == ExecStatus.FAILURE; + } + @Override public PropagationReporter execute( final Collection<PropagationTaskInfo> taskInfos, @@ -147,7 +163,7 @@ public class PriorityPropagationTaskExecutor extends AbstractPropagationTaskExec execStatus = ExecStatus.FAILURE; errorMessage = e.getMessage(); } - if (execStatus == ExecStatus.FAILURE) { + if (failed(taskInfo, exec, execStatus)) { throw new PropagationException( taskInfo.getResource().getKey(), Optional.ofNullable(exec).map(Exec::getMessage).orElse(errorMessage));
