This is an automated email from the ASF dual-hosted git repository.
yasith pushed a commit to branch service-layer-improvements
in repository https://gitbox.apache.org/repos/asf/airavata.git
The following commit(s) were added to refs/heads/service-layer-improvements by
this push:
new 136d120a98 get more services into DI
136d120a98 is described below
commit 136d120a98c010fa8d6d1c5a40732f15ab852a65
Author: yasithdev <[email protected]>
AuthorDate: Tue Dec 16 10:06:46 2025 -0600
get more services into DI
---
.../helix/core/participant/HelixParticipant.java | 50 +++++++++++++++++++---
.../helix/impl/participant/GlobalParticipant.java | 4 +-
.../airavata/helix/impl/task/AWSTaskFactory.java | 35 +++++++++++----
.../airavata/helix/impl/task/SlurmTaskFactory.java | 30 +++++++++----
.../helix/impl/task/aws/AWSCompletingTask.java | 3 +-
.../helix/impl/task/aws/AWSJobSubmissionTask.java | 9 +++-
.../helix/impl/task/aws/CreateEC2InstanceTask.java | 3 +-
.../helix/impl/task/aws/NoOperationTask.java | 5 ++-
.../impl/task/cancel/CancelCompletingTask.java | 5 ++-
.../task/cancel/RemoteJobCancellationTask.java | 5 ++-
.../helix/impl/task/completing/CompletingTask.java | 5 ++-
.../airavata/helix/impl/task/env/EnvSetupTask.java | 5 ++-
.../impl/task/parsing/ParsingTriggeringTask.java | 5 ++-
.../helix/impl/task/staging/ArchiveTask.java | 5 ++-
.../helix/impl/task/staging/DataStagingTask.java | 5 ++-
.../impl/task/staging/InputDataStagingTask.java | 5 ++-
.../impl/task/staging/JobVerificationTask.java | 5 ++-
.../impl/task/staging/OutputDataStagingTask.java | 5 ++-
.../task/submission/DefaultJobSubmissionTask.java | 9 +++-
.../task/submission/ForkJobSubmissionTask.java | 9 +++-
.../impl/task/submission/JobSubmissionTask.java | 3 +-
.../task/submission/LocalJobSubmissionTask.java | 9 +++-
.../helix/impl/workflow/ParserWorkflowManager.java | 5 ++-
.../helix/impl/workflow/PostWorkflowManager.java | 5 ++-
.../helix/impl/workflow/PreWorkflowManager.java | 9 ++--
.../manager/dbevent/DBEventManagerRunner.java | 8 ++--
.../messaging/DBEventManagerMessagingFactory.java | 26 +++++++----
.../messaging/impl/DBEventMessageHandler.java | 18 ++++++--
.../airavata/messaging/core/stats/StatCounter.java | 2 +-
.../messaging/core/util/DBEventPublisherUtils.java | 6 ++-
.../airavata/metascheduler/core/utils/Utils.java | 4 +-
.../metadata/analyzer/DataInterpreterService.java | 3 +-
.../rescheduler/ProcessReschedulingService.java | 3 +-
.../engine/rescheduler/ProcessScannerImpl.java | 5 ++-
.../orchestrator/job/GFACPassiveJobSubmitter.java | 4 +-
.../messaging/RegistryServiceDBEventHandler.java | 4 +-
.../RegistryServiceDBEventMessagingFactory.java | 9 ++--
.../apache/airavata/service/AiravataService.java | 8 ++--
.../service/profile/TenantProfileService.java | 7 ++-
.../service/profile/UserProfileService.java | 8 ++--
.../airavata/service/security/IamAdminService.java | 7 ++-
.../messaging/SharingServiceDBEventHandler.java | 3 +-
.../SharingServiceDBEventMessagingFactory.java | 2 +-
.../apache/airavata/messaging/core/TestClient.java | 5 ++-
.../file/server/service/AirvataFileService.java | 11 ++++-
.../file/server/service/ProcessDataManager.java | 4 +-
.../thriftapi/handler/RegistryServiceHandler.java | 1 -
.../thriftapi/server/RegistryServiceServer.java | 3 +-
.../thriftapi/server/SharingRegistryServer.java | 2 +-
49 files changed, 275 insertions(+), 116 deletions(-)
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
index 55e522e66b..10f6d95f9b 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/core/participant/HelixParticipant.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import org.apache.airavata.config.AiravataServerProperties;
import org.apache.airavata.helix.core.AbstractTask;
-import org.apache.airavata.helix.core.support.TaskHelperImpl;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
import org.apache.helix.InstanceType;
import org.apache.helix.examples.OnlineOfflineStateModelFactory;
@@ -65,6 +64,7 @@ public class HelixParticipant<T extends AbstractTask>
implements Runnable {
private List<Class<? extends T>> taskClasses;
private final List<String> runningTasks = Collections.synchronizedList(new
ArrayList<String>());
private AiravataServerProperties properties;
+ private org.springframework.context.ApplicationContext applicationContext;
public HelixParticipant(
List<Class<? extends T>> taskClasses, String taskTypeName,
AiravataServerProperties properties) {
@@ -78,6 +78,10 @@ public class HelixParticipant<T extends AbstractTask>
implements Runnable {
// This allows subclasses to set taskClasses before initializing
properties
}
+ public void
setApplicationContext(org.springframework.context.ApplicationContext
applicationContext) {
+ this.applicationContext = applicationContext;
+ }
+
/**
* Initialize property-dependent fields. This method should be called after
* taskClasses are set (if deferred initialization is used).
@@ -155,12 +159,46 @@ public class HelixParticipant<T extends AbstractTask>
implements Runnable {
for (Class<? extends T> taskClass : taskClasses) {
TaskFactory taskFac = context -> {
try {
- return AbstractTask.class
- .cast(taskClass.newInstance())
- .setParticipant(HelixParticipant.this)
+ org.apache.airavata.helix.core.support.TaskHelperImpl
taskHelper = null;
+ AbstractTask task = null;
+ if (applicationContext != null) {
+ taskHelper =
+
applicationContext.getBean(org.apache.airavata.helix.core.support.TaskHelperImpl.class);
+ // Try to get task as Spring bean first
+ try {
+ task = applicationContext.getBean(taskClass);
+ } catch (Exception e) {
+ // If not a Spring bean, create via reflection
with dependencies from ApplicationContext
+ java.lang.reflect.Constructor<?>[] constructors =
taskClass.getConstructors();
+ java.lang.reflect.Constructor<?> constructor =
null;
+ for (java.lang.reflect.Constructor<?> c :
constructors) {
+ if (c.getParameterCount() > 0) {
+ constructor = c;
+ break;
+ }
+ }
+ if (constructor != null) {
+ java.lang.Class<?>[] paramTypes =
constructor.getParameterTypes();
+ Object[] args = new Object[paramTypes.length];
+ for (int i = 0; i < paramTypes.length; i++) {
+ args[i] =
applicationContext.getBean(paramTypes[i]);
+ }
+ task = (AbstractTask)
constructor.newInstance(args);
+ } else {
+ task = (AbstractTask)
+
taskClass.getDeclaredConstructor().newInstance();
+ }
+ }
+ } else {
+ // Fallback: get AdaptorSupport from
ApplicationContext if available
+ // This is a workaround for non-Spring contexts
+ throw new IllegalStateException(
+ "ApplicationContext must be set on
HelixParticipant to create tasks");
+ }
+ return task.setParticipant(HelixParticipant.this)
.setCallbackContext(context)
- .setTaskHelper(new TaskHelperImpl());
- } catch (InstantiationException | IllegalAccessException e) {
+ .setTaskHelper(taskHelper);
+ } catch (Exception e) {
logger.error(
"Failed to initialize the task: "
+ context.getTaskConfig().getId(),
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index d3c6d0bb17..8b1fa16666 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -69,10 +69,12 @@ public class GlobalParticipant extends
HelixParticipant<AbstractTask> {
// Constructor for Spring - uses constructor injection for properties
// No checked exceptions - initialization happens in @PostConstruct
@org.springframework.beans.factory.annotation.Autowired
- public GlobalParticipant(AiravataServerProperties properties) {
+ public GlobalParticipant(
+ AiravataServerProperties properties,
org.springframework.context.ApplicationContext applicationContext) {
// Pass empty list for taskClasses - will be set in @PostConstruct
// Using Collections.emptyList() to avoid ambiguity with Class<T>
constructor
super(new ArrayList<>(), null, properties);
+ setApplicationContext(applicationContext);
}
@PostConstruct
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
index 5b67badcd3..6fddeb63ed 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/AWSTaskFactory.java
@@ -42,6 +42,7 @@ public class AWSTaskFactory implements HelixTaskFactory {
private final CredentialStoreService credentialStoreService;
private final
org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder
groovyMapBuilder;
private final org.apache.airavata.helix.impl.task.aws.utils.AWSTaskUtil
awsTaskUtil;
+ private final org.apache.airavata.messaging.core.MessagingFactory
messagingFactory;
public AWSTaskFactory(
ApplicationContext applicationContext,
@@ -49,25 +50,33 @@ public class AWSTaskFactory implements HelixTaskFactory {
UserProfileService userProfileService,
CredentialStoreService credentialStoreService,
org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder
groovyMapBuilder,
- org.apache.airavata.helix.impl.task.aws.utils.AWSTaskUtil
awsTaskUtil) {
+ org.apache.airavata.helix.impl.task.aws.utils.AWSTaskUtil
awsTaskUtil,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
this.applicationContext = applicationContext;
this.registryService = registryService;
this.userProfileService = userProfileService;
this.credentialStoreService = credentialStoreService;
this.groovyMapBuilder = groovyMapBuilder;
this.awsTaskUtil = awsTaskUtil;
+ this.messagingFactory = messagingFactory;
}
@Override
public AiravataTask createEnvSetupTask(String processId) {
LOGGER.info("Creating AWS CreateEc2InstanceTask for process {}...",
processId);
return new CreateEC2InstanceTask(
- applicationContext, registryService, userProfileService,
credentialStoreService, awsTaskUtil);
+ applicationContext,
+ registryService,
+ userProfileService,
+ credentialStoreService,
+ messagingFactory,
+ awsTaskUtil);
}
@Override
public AiravataTask createInputDataStagingTask(String processId) {
- return new NoOperationTask(applicationContext, registryService,
userProfileService, credentialStoreService);
+ return new NoOperationTask(
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
@@ -77,33 +86,43 @@ public class AWSTaskFactory implements HelixTaskFactory {
registryService,
userProfileService,
credentialStoreService,
+ messagingFactory,
groovyMapBuilder,
awsTaskUtil);
}
@Override
public AiravataTask createOutputDataStagingTask(String processId) {
- return new NoOperationTask(applicationContext, registryService,
userProfileService, credentialStoreService);
+ return new NoOperationTask(
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
public AiravataTask createArchiveTask(String processId) {
- return new NoOperationTask(applicationContext, registryService,
userProfileService, credentialStoreService);
+ return new NoOperationTask(
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
public AiravataTask createJobVerificationTask(String processId) {
- return new NoOperationTask(applicationContext, registryService,
userProfileService, credentialStoreService);
+ return new NoOperationTask(
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
public AiravataTask createCompletingTask(String processId) {
return new AWSCompletingTask(
- applicationContext, registryService, userProfileService,
credentialStoreService, awsTaskUtil);
+ applicationContext,
+ registryService,
+ userProfileService,
+ credentialStoreService,
+ messagingFactory,
+ awsTaskUtil);
}
@Override
public AiravataTask createParsingTriggeringTask(String processId) {
- return new NoOperationTask(applicationContext, registryService,
userProfileService, credentialStoreService);
+ return new NoOperationTask(
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/SlurmTaskFactory.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/SlurmTaskFactory.java
index c4208418cb..83df740f8f 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/SlurmTaskFactory.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/SlurmTaskFactory.java
@@ -45,69 +45,81 @@ public class SlurmTaskFactory implements HelixTaskFactory {
private final UserProfileService userProfileService;
private final CredentialStoreService credentialStoreService;
private final
org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder
groovyMapBuilder;
+ private final org.apache.airavata.messaging.core.MessagingFactory
messagingFactory;
public SlurmTaskFactory(
ApplicationContext applicationContext,
RegistryService registryService,
UserProfileService userProfileService,
CredentialStoreService credentialStoreService,
-
org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder
groovyMapBuilder) {
+
org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder
groovyMapBuilder,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
this.applicationContext = applicationContext;
this.registryService = registryService;
this.userProfileService = userProfileService;
this.credentialStoreService = credentialStoreService;
this.groovyMapBuilder = groovyMapBuilder;
+ this.messagingFactory = messagingFactory;
}
@Override
public AiravataTask createEnvSetupTask(String processId) {
LOGGER.info("Creating Slurm EnvSetupTask for process {}...",
processId);
- return new EnvSetupTask(applicationContext, registryService,
userProfileService, credentialStoreService);
+ return new EnvSetupTask(
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
public AiravataTask createInputDataStagingTask(String processId) {
LOGGER.info("Creating Slurm InputDataStagingTask for process {}...",
processId);
return new InputDataStagingTask(
- applicationContext, registryService, userProfileService,
credentialStoreService);
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
public AiravataTask createJobSubmissionTask(String processId) {
LOGGER.info("Creating Slurm DefaultJobSubmissionTask for process
{}...", processId);
return new DefaultJobSubmissionTask(
- applicationContext, registryService, userProfileService,
credentialStoreService, groovyMapBuilder);
+ applicationContext,
+ registryService,
+ userProfileService,
+ credentialStoreService,
+ messagingFactory,
+ groovyMapBuilder);
}
@Override
public AiravataTask createOutputDataStagingTask(String processId) {
LOGGER.info("Creating Slurm OutputDataStagingTask for process {}...",
processId);
return new OutputDataStagingTask(
- applicationContext, registryService, userProfileService,
credentialStoreService);
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
public AiravataTask createArchiveTask(String processId) {
LOGGER.info("Creating Slurm ArchiveTask for process {}...", processId);
- return new ArchiveTask(applicationContext, registryService,
userProfileService, credentialStoreService);
+ return new ArchiveTask(
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
public AiravataTask createJobVerificationTask(String processId) {
LOGGER.info("Creating Slurm JobVerificationTask for process {}...",
processId);
- return new JobVerificationTask(applicationContext, registryService,
userProfileService, credentialStoreService);
+ return new JobVerificationTask(
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
public AiravataTask createCompletingTask(String processId) {
LOGGER.info("Creating Slurm CompletingTask for process {}...",
processId);
- return new CompletingTask(applicationContext, registryService,
userProfileService, credentialStoreService);
+ return new CompletingTask(
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
public AiravataTask createParsingTriggeringTask(String processId) {
LOGGER.info("Creating Slurm ParsingTriggeringTask for process {}...",
processId);
return new ParsingTriggeringTask(
- applicationContext, registryService, userProfileService,
credentialStoreService);
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/AWSCompletingTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/AWSCompletingTask.java
index 1885afdf0e..4abe0ab38a 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/AWSCompletingTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/AWSCompletingTask.java
@@ -40,8 +40,9 @@ public class AWSCompletingTask extends AiravataTask {
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory,
AWSTaskUtil awsTaskUtil) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
this.awsTaskUtil = awsTaskUtil;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/AWSJobSubmissionTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/AWSJobSubmissionTask.java
index 869d0e978f..88e631d872 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/AWSJobSubmissionTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/AWSJobSubmissionTask.java
@@ -68,9 +68,16 @@ public class AWSJobSubmissionTask extends JobSubmissionTask {
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
CredentialStoreService credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory,
org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder
groovyMapBuilder,
AWSTaskUtil awsTaskUtil) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService, groovyMapBuilder);
+ super(
+ applicationContext,
+ registryService,
+ userProfileService,
+ credentialStoreService,
+ messagingFactory,
+ groovyMapBuilder);
this.awsTaskUtil = awsTaskUtil;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
index b78dd2621b..102c5fe237 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/CreateEC2InstanceTask.java
@@ -54,8 +54,9 @@ public class CreateEC2InstanceTask extends AiravataTask {
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory,
AWSTaskUtil awsTaskUtil) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
this.awsTaskUtil = awsTaskUtil;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/NoOperationTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/NoOperationTask.java
index 84c7f2352f..44b85844d2 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/NoOperationTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/aws/NoOperationTask.java
@@ -32,8 +32,9 @@ public class NoOperationTask extends AiravataTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
index 7ad5c8c44f..43f7a7b3ff 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/cancel/CancelCompletingTask.java
@@ -36,8 +36,9 @@ public class CancelCompletingTask extends AiravataTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
index 14a0f0aac2..6fbaf2d227 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
@@ -50,8 +50,9 @@ public class RemoteJobCancellationTask extends AiravataTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
index 5a802ada24..2cc5388936 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/completing/CompletingTask.java
@@ -39,8 +39,9 @@ public class CompletingTask extends AiravataTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
index b8c6b9c543..445d14eeae 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
@@ -40,8 +40,9 @@ public class EnvSetupTask extends AiravataTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
index 031e3c27da..a333ab5aee 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/parsing/ParsingTriggeringTask.java
@@ -48,8 +48,9 @@ public class ParsingTriggeringTask extends AiravataTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
// Topic will be initialized in @PostConstruct or from
ApplicationContext
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
index 1837e7f405..16ed593dc2 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/ArchiveTask.java
@@ -50,8 +50,9 @@ public class ArchiveTask extends DataStagingTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
index 4458f73b0a..a5e00bfbe0 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/DataStagingTask.java
@@ -61,8 +61,9 @@ public abstract class DataStagingTask extends AiravataTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
private static final ExecutorService PASS_THROUGH_EXECUTOR =
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
index bd1cc99f54..2b47c13a91 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/InputDataStagingTask.java
@@ -46,8 +46,9 @@ public class InputDataStagingTask extends DataStagingTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
index 7fa6d84f5d..42274b1e44 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
@@ -45,8 +45,9 @@ public class JobVerificationTask extends AiravataTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
@Override
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
index 27cb3bbc5e..f91f6f1366 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/staging/OutputDataStagingTask.java
@@ -52,8 +52,9 @@ public class OutputDataStagingTask extends DataStagingTask {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
}
private static final Logger logger =
LoggerFactory.getLogger(OutputDataStagingTask.class);
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
index d76848a58d..e6b64ce94b 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
@@ -56,8 +56,15 @@ public class DefaultJobSubmissionTask extends
JobSubmissionTask {
RegistryService registryService,
UserProfileService userProfileService,
CredentialStoreService credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory,
org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder
groovyMapBuilder) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService, groovyMapBuilder);
+ super(
+ applicationContext,
+ registryService,
+ userProfileService,
+ credentialStoreService,
+ messagingFactory,
+ groovyMapBuilder);
}
private static final String DEFAULT_JOB_ID = "DEFAULT_JOB_ID";
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java
index 406c8e2181..e98e0c1e0e 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/ForkJobSubmissionTask.java
@@ -45,8 +45,15 @@ public class ForkJobSubmissionTask extends JobSubmissionTask
{
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory,
org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder
groovyMapBuilder) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService, groovyMapBuilder);
+ super(
+ applicationContext,
+ registryService,
+ userProfileService,
+ credentialStoreService,
+ messagingFactory,
+ groovyMapBuilder);
}
@Override
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 4c6d787b4f..d6c7611c10 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -55,8 +55,9 @@ public abstract class JobSubmissionTask extends AiravataTask {
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory,
org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder
groovyMapBuilder) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
this.groovyMapBuilder = groovyMapBuilder;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/LocalJobSubmissionTask.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/LocalJobSubmissionTask.java
index f387d63354..b1a70fe7da 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/LocalJobSubmissionTask.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/task/submission/LocalJobSubmissionTask.java
@@ -36,8 +36,15 @@ public class LocalJobSubmissionTask extends
JobSubmissionTask {
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory,
org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder
groovyMapBuilder) {
- super(applicationContext, registryService, userProfileService,
credentialStoreService, groovyMapBuilder);
+ super(
+ applicationContext,
+ registryService,
+ userProfileService,
+ credentialStoreService,
+ messagingFactory,
+ groovyMapBuilder);
}
@Override
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
index 96e6aa3b52..053f8a7a3f 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/ParserWorkflowManager.java
@@ -83,9 +83,10 @@ public class ParserWorkflowManager extends WorkflowManager {
public ParserWorkflowManager(
AiravataServerProperties properties,
org.springframework.context.ApplicationContext applicationContext,
- org.apache.airavata.service.registry.RegistryService
registryService) {
+ org.apache.airavata.service.registry.RegistryService
registryService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
// Default values, will be updated in @PostConstruct
- super("parser-workflow-manager", false, registryService, properties);
+ super("parser-workflow-manager", false, registryService, properties,
messagingFactory);
this.properties = properties;
this.applicationContext = applicationContext;
this.registryService = registryService;
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index d58df6b902..c7f73c21f6 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -85,9 +85,10 @@ public class PostWorkflowManager extends WorkflowManager {
org.springframework.context.ApplicationContext applicationContext,
org.apache.airavata.service.registry.RegistryService
registryService,
org.apache.airavata.service.profile.UserProfileService
userProfileService,
- org.apache.airavata.service.security.CredentialStoreService
credentialStoreService) {
+ org.apache.airavata.service.security.CredentialStoreService
credentialStoreService,
+ org.apache.airavata.messaging.core.MessagingFactory
messagingFactory) {
// Default values, will be updated in @PostConstruct
- super("post-workflow-manager", false, registryService, properties);
+ super("post-workflow-manager", false, registryService, properties,
messagingFactory);
this.properties = properties;
this.taskFactory = taskFactory;
this.applicationContext = applicationContext;
diff --git
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 3e1798ffba..fcf7efa302 100644
---
a/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++
b/airavata-api/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -92,6 +92,7 @@ public class PreWorkflowManager extends WorkflowManager {
this.registryService = registryService;
this.userProfileService = userProfileService;
this.credentialStoreService = credentialStoreService;
+ this.messagingFactory = messagingFactory;
}
@jakarta.annotation.PostConstruct
@@ -215,8 +216,8 @@ public class PreWorkflowManager extends WorkflowManager {
// For intermediate transfers add a final CompletingTask
if (intermediateTransfer) {
- CompletingTask completingTask =
- new CompletingTask(applicationContext, registryService,
userProfileService, credentialStoreService);
+ CompletingTask completingTask = new CompletingTask(
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
completingTask.setGatewayId(experimentModel.getGatewayId());
completingTask.setExperimentId(experimentModel.getExperimentId());
completingTask.setProcessId(processModel.getProcessId());
@@ -290,7 +291,7 @@ public class PreWorkflowManager extends WorkflowManager {
gcrPref.getResourceType());
RemoteJobCancellationTask rjct = new RemoteJobCancellationTask(
- applicationContext, registryService, userProfileService,
credentialStoreService);
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
rjct.setTaskId(UUID.randomUUID().toString());
rjct.setExperimentId(experimentId);
rjct.setProcessId(processId);
@@ -304,7 +305,7 @@ public class PreWorkflowManager extends WorkflowManager {
}
CancelCompletingTask cct = new CancelCompletingTask(
- applicationContext, registryService, userProfileService,
credentialStoreService);
+ applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
cct.setTaskId(UUID.randomUUID().toString());
cct.setExperimentId(experimentId);
cct.setProcessId(processId);
diff --git
a/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/DBEventManagerRunner.java
b/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/DBEventManagerRunner.java
index b799dd2281..b4eff0646d 100644
---
a/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/DBEventManagerRunner.java
+++
b/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/DBEventManagerRunner.java
@@ -39,10 +39,12 @@ public class DBEventManagerRunner implements IServer {
private static final String SERVER_VERSION = "1.0";
private final AiravataServerProperties properties;
+ private final DBEventManagerMessagingFactory messagingFactory;
private ServerStatus status;
- public DBEventManagerRunner(AiravataServerProperties properties) {
+ public DBEventManagerRunner(AiravataServerProperties properties,
DBEventManagerMessagingFactory messagingFactory) {
this.properties = properties;
+ this.messagingFactory = messagingFactory;
}
/**
@@ -52,12 +54,12 @@ public class DBEventManagerRunner implements IServer {
try {
log.info("Starting DB Event manager publisher");
- DBEventManagerMessagingFactory.getDBEventPublisher();
+ messagingFactory.getDBEventPublisher();
log.debug("DB Event manager publisher is running");
log.info("Starting DB Event manager subscriber");
- DBEventManagerMessagingFactory.getDBEventSubscriber(properties);
+ messagingFactory.getDBEventSubscriber(properties);
log.debug("DB Event manager subscriber is listening");
} catch (AiravataException e) {
log.error("Error starting DB Event Manager.", e);
diff --git
a/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/messaging/DBEventManagerMessagingFactory.java
b/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/messaging/DBEventManagerMessagingFactory.java
index 14b0034390..23d03b0528 100644
---
a/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/messaging/DBEventManagerMessagingFactory.java
+++
b/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/messaging/DBEventManagerMessagingFactory.java
@@ -28,30 +28,38 @@ import org.apache.airavata.messaging.core.Publisher;
import org.apache.airavata.messaging.core.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
/**
* Created by Ajinkya on 3/1/17.
*/
+@Component
public class DBEventManagerMessagingFactory {
private static final Logger log =
LoggerFactory.getLogger(DBEventManagerMessagingFactory.class);
- private static Subscriber dbEventSubscriber;
+ private Subscriber dbEventSubscriber;
- private static Publisher dbEventPublisher;
+ private Publisher dbEventPublisher;
+
+ private final MessagingFactory messagingFactory;
+
+ public DBEventManagerMessagingFactory(MessagingFactory messagingFactory) {
+ this.messagingFactory = messagingFactory;
+ }
/**
* Get DB Event subscriber
* @return
* @throws AiravataException
*/
- public static Subscriber getDBEventSubscriber(AiravataServerProperties
properties) throws AiravataException {
+ public Subscriber getDBEventSubscriber(AiravataServerProperties
properties) throws AiravataException {
if (null == dbEventSubscriber) {
- synchronized (DBEventManagerMessagingFactory.class) {
+ synchronized (this) {
if (null == dbEventSubscriber) {
log.info("Creating DB Event subscriber.....");
- dbEventSubscriber = MessagingFactory.getDBEventSubscriber(
- new DBEventMessageHandler(properties),
DBEventService.DB_EVENT.toString());
+ dbEventSubscriber = messagingFactory.getDBEventSubscriber(
+ new DBEventMessageHandler(properties, this),
DBEventService.DB_EVENT.toString());
log.info("DB Event subscriber created");
}
}
@@ -59,12 +67,12 @@ public class DBEventManagerMessagingFactory {
return dbEventSubscriber;
}
- public static Publisher getDBEventPublisher() throws AiravataException {
+ public Publisher getDBEventPublisher() throws AiravataException {
if (null == dbEventPublisher) {
- synchronized (DBEventManagerMessagingFactory.class) {
+ synchronized (this) {
if (null == dbEventPublisher) {
log.info("Creating DB Event publisher.....");
- dbEventPublisher = MessagingFactory.getDBEventPublisher();
+ dbEventPublisher = messagingFactory.getDBEventPublisher();
log.info("DB Event publisher created");
}
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/messaging/impl/DBEventMessageHandler.java
b/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/messaging/impl/DBEventMessageHandler.java
index bcfa4fc1d2..7e575dfef0 100644
---
a/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/messaging/impl/DBEventMessageHandler.java
+++
b/airavata-api/src/main/java/org/apache/airavata/manager/dbevent/messaging/impl/DBEventMessageHandler.java
@@ -44,6 +44,7 @@ public class DBEventMessageHandler implements MessageHandler {
private static final Logger log =
LoggerFactory.getLogger(DBEventMessageHandler.class);
private CuratorFramework curatorClient;
private AiravataServerProperties properties;
+ private DBEventManagerMessagingFactory messagingFactory;
public DBEventMessageHandler() {
// Properties should be set via setter
@@ -54,6 +55,12 @@ public class DBEventMessageHandler implements MessageHandler
{
startCuratorClient();
}
+ public DBEventMessageHandler(AiravataServerProperties properties,
DBEventManagerMessagingFactory messagingFactory) {
+ this.properties = properties;
+ this.messagingFactory = messagingFactory;
+ startCuratorClient();
+ }
+
public void setProperties(AiravataServerProperties properties) {
this.properties = properties;
if (curatorClient == null) {
@@ -107,14 +114,17 @@ public class DBEventMessageHandler implements
MessageHandler {
+ subscribers.toString());
MessageContext messageCtx = new
MessageContext(dbEventMessage, MessageType.DB_EVENT, "", "");
messageCtx.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-
DBEventManagerMessagingFactory.getDBEventPublisher().publish(messageCtx,
routingKey);
+ if (messagingFactory == null) {
+ throw new IllegalStateException(
+ "DBEventManagerMessagingFactory must be set on
DBEventMessageHandler");
+ }
+ messagingFactory.getDBEventPublisher().publish(messageCtx,
routingKey);
break;
}
log.info("Sending ack. Message Delivery Tag : " +
messageContext.getDeliveryTag());
- if (properties != null) {
- DBEventManagerMessagingFactory.getDBEventSubscriber(properties)
- .sendAck(messageContext.getDeliveryTag());
+ if (properties != null && messagingFactory != null) {
+
messagingFactory.getDBEventSubscriber(properties).sendAck(messageContext.getDeliveryTag());
}
} catch (Exception e) {
diff --git
a/airavata-api/src/main/java/org/apache/airavata/messaging/core/stats/StatCounter.java
b/airavata-api/src/main/java/org/apache/airavata/messaging/core/stats/StatCounter.java
index eeba31f803..f02c9249ad 100644
---
a/airavata-api/src/main/java/org/apache/airavata/messaging/core/stats/StatCounter.java
+++
b/airavata-api/src/main/java/org/apache/airavata/messaging/core/stats/StatCounter.java
@@ -19,6 +19,7 @@
*/
package org.apache.airavata.messaging.core.stats;
+import jakarta.annotation.PostConstruct;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
@@ -27,7 +28,6 @@ import java.util.Map;
import java.util.Timer;
import org.apache.airavata.common.model.Message;
import org.springframework.stereotype.Component;
-import jakarta.annotation.PostConstruct;
@Component
public class StatCounter {
diff --git
a/airavata-api/src/main/java/org/apache/airavata/messaging/core/util/DBEventPublisherUtils.java
b/airavata-api/src/main/java/org/apache/airavata/messaging/core/util/DBEventPublisherUtils.java
index 4bde78edaa..d3edfa78c3 100644
---
a/airavata-api/src/main/java/org/apache/airavata/messaging/core/util/DBEventPublisherUtils.java
+++
b/airavata-api/src/main/java/org/apache/airavata/messaging/core/util/DBEventPublisherUtils.java
@@ -46,9 +46,11 @@ public class DBEventPublisherUtils {
private static final ObjectMapper objectMapper = new ObjectMapper();
private Publisher dbEventPublisher = null;
private DBEventService publisherService;
+ private final MessagingFactory messagingFactory;
- public DBEventPublisherUtils(DBEventService dbEventService) {
+ public DBEventPublisherUtils(DBEventService dbEventService,
MessagingFactory messagingFactory) {
this.publisherService = dbEventService;
+ this.messagingFactory = messagingFactory;
}
/**
@@ -75,7 +77,7 @@ public class DBEventPublisherUtils {
synchronized (this) {
if (null == dbEventPublisher) {
logger.info("Creating DB Event publisher.....");
- dbEventPublisher = MessagingFactory.getDBEventPublisher();
+ dbEventPublisher = messagingFactory.getDBEventPublisher();
logger.info("DB Event publisher created");
}
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
b/airavata-api/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
index 7e8ba619a0..a001b7facc 100644
---
a/airavata-api/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
+++
b/airavata-api/src/main/java/org/apache/airavata/metascheduler/core/utils/Utils.java
@@ -49,9 +49,7 @@ public class Utils {
private static ApplicationContext staticApplicationContext;
public Utils(
- RegistryService registryService,
- ApplicationContext applicationContext,
- MessagingFactory messagingFactory) {
+ RegistryService registryService, ApplicationContext
applicationContext, MessagingFactory messagingFactory) {
this.registryService = registryService;
this.applicationContext = applicationContext;
this.messagingFactory = messagingFactory;
diff --git
a/airavata-api/src/main/java/org/apache/airavata/metascheduler/metadata/analyzer/DataInterpreterService.java
b/airavata-api/src/main/java/org/apache/airavata/metascheduler/metadata/analyzer/DataInterpreterService.java
index 7bfcf0d777..dbe77b4f39 100644
---
a/airavata-api/src/main/java/org/apache/airavata/metascheduler/metadata/analyzer/DataInterpreterService.java
+++
b/airavata-api/src/main/java/org/apache/airavata/metascheduler/metadata/analyzer/DataInterpreterService.java
@@ -53,8 +53,7 @@ public class DataInterpreterService implements IServer {
private final AiravataServerProperties properties;
private final ApplicationContext applicationContext;
- public DataInterpreterService(
- AiravataServerProperties properties, ApplicationContext
applicationContext) {
+ public DataInterpreterService(AiravataServerProperties properties,
ApplicationContext applicationContext) {
this.properties = properties;
this.applicationContext = applicationContext;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/rescheduler/ProcessReschedulingService.java
b/airavata-api/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/rescheduler/ProcessReschedulingService.java
index a8b00bcddc..297d9da8d6 100644
---
a/airavata-api/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/rescheduler/ProcessReschedulingService.java
+++
b/airavata-api/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/rescheduler/ProcessReschedulingService.java
@@ -55,8 +55,7 @@ public class ProcessReschedulingService implements IServer {
private final AiravataServerProperties properties;
private final ApplicationContext applicationContext;
- public ProcessReschedulingService(
- AiravataServerProperties properties, ApplicationContext
applicationContext) {
+ public ProcessReschedulingService(AiravataServerProperties properties,
ApplicationContext applicationContext) {
this.properties = properties;
this.applicationContext = applicationContext;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/rescheduler/ProcessScannerImpl.java
b/airavata-api/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/rescheduler/ProcessScannerImpl.java
index ecf2d0e102..180a6e48e4 100644
---
a/airavata-api/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/rescheduler/ProcessScannerImpl.java
+++
b/airavata-api/src/main/java/org/apache/airavata/metascheduler/process/scheduling/engine/rescheduler/ProcessScannerImpl.java
@@ -52,8 +52,9 @@ public class ProcessScannerImpl implements ProcessScanner {
List<ProcessModel> processModelList =
registryService.getProcessListInState(state);
String reSchedulerPolicyClass =
properties.services.scheduler.computeResourceReschedulerPolicyClass;
- ReScheduler reScheduler =
- (ReScheduler)
Class.forName(reSchedulerPolicyClass).getDeclaredConstructor().newInstance();
+ ReScheduler reScheduler = (ReScheduler)
Class.forName(reSchedulerPolicyClass)
+ .getDeclaredConstructor()
+ .newInstance();
for (ProcessModel processModel : processModelList) {
reScheduler.reschedule(processModel, state);
diff --git
a/airavata-api/src/main/java/org/apache/airavata/orchestrator/job/GFACPassiveJobSubmitter.java
b/airavata-api/src/main/java/org/apache/airavata/orchestrator/job/GFACPassiveJobSubmitter.java
index 75f34c69c3..3c7d05ccde 100644
---
a/airavata-api/src/main/java/org/apache/airavata/orchestrator/job/GFACPassiveJobSubmitter.java
+++
b/airavata-api/src/main/java/org/apache/airavata/orchestrator/job/GFACPassiveJobSubmitter.java
@@ -55,8 +55,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,
Watcher {
this.publisher = orchestratorContext.getPublisher();
} else {
if (messagingFactory == null) {
- throw new OrchestratorException("MessagingFactory must be set
before initializing "
- + GFACPassiveJobSubmitter.class);
+ throw new OrchestratorException(
+ "MessagingFactory must be set before initializing " +
GFACPassiveJobSubmitter.class);
}
try {
this.publisher =
messagingFactory.getPublisher(Type.PROCESS_LAUNCH);
diff --git
a/airavata-api/src/main/java/org/apache/airavata/registry/messaging/RegistryServiceDBEventHandler.java
b/airavata-api/src/main/java/org/apache/airavata/registry/messaging/RegistryServiceDBEventHandler.java
index 386fa0f47e..4dac3f4cde 100644
---
a/airavata-api/src/main/java/org/apache/airavata/registry/messaging/RegistryServiceDBEventHandler.java
+++
b/airavata-api/src/main/java/org/apache/airavata/registry/messaging/RegistryServiceDBEventHandler.java
@@ -58,9 +58,11 @@ public class RegistryServiceDBEventHandler implements
MessageHandler {
RegistryService registryService,
RegistryServiceDBEventMessagingFactory messagingFactory) {
this.registryService = registryService;
this.messagingFactory = messagingFactory;
+ this.dbEventPublisherUtils =
+ new DBEventPublisherUtils(DBEventService.REGISTRY,
messagingFactory.getMessagingFactory());
}
- private DBEventPublisherUtils dbEventPublisherUtils = new
DBEventPublisherUtils(DBEventService.REGISTRY);
+ private final DBEventPublisherUtils dbEventPublisherUtils;
@Override
public void onMessage(MessageContext messageContext) {
diff --git
a/airavata-api/src/main/java/org/apache/airavata/registry/messaging/RegistryServiceDBEventMessagingFactory.java
b/airavata-api/src/main/java/org/apache/airavata/registry/messaging/RegistryServiceDBEventMessagingFactory.java
index 3828e02256..a40897f183 100644
---
a/airavata-api/src/main/java/org/apache/airavata/registry/messaging/RegistryServiceDBEventMessagingFactory.java
+++
b/airavata-api/src/main/java/org/apache/airavata/registry/messaging/RegistryServiceDBEventMessagingFactory.java
@@ -55,12 +55,15 @@ public class RegistryServiceDBEventMessagingFactory {
private final RegistryService registryService;
private final MessagingFactory messagingFactory;
- public RegistryServiceDBEventMessagingFactory(
- RegistryService registryService, MessagingFactory
messagingFactory) {
+ public RegistryServiceDBEventMessagingFactory(RegistryService
registryService, MessagingFactory messagingFactory) {
this.registryService = registryService;
this.messagingFactory = messagingFactory;
}
+ public MessagingFactory getMessagingFactory() {
+ return messagingFactory;
+ }
+
public Publisher getDBEventPublisher() throws AiravataException {
if (null == dbEventPublisher) {
synchronized (this) {
@@ -95,7 +98,7 @@ public class RegistryServiceDBEventMessagingFactory {
return registryServiceDBEventSubscriber;
}
- public static boolean registerRegistryServiceWithPublishers(List<String>
publisherList) throws AiravataException {
+ public boolean registerRegistryServiceWithPublishers(List<String>
publisherList) throws AiravataException {
for (String publisher : publisherList) {
logger.info("Sending service discovery message. Publisher: " +
publisher + ", Subscriber: "
+ DBEventService.REGISTRY.toString());
diff --git
a/airavata-api/src/main/java/org/apache/airavata/service/AiravataService.java
b/airavata-api/src/main/java/org/apache/airavata/service/AiravataService.java
index 3e1091b113..53a989ee8a 100644
---
a/airavata-api/src/main/java/org/apache/airavata/service/AiravataService.java
+++
b/airavata-api/src/main/java/org/apache/airavata/service/AiravataService.java
@@ -3882,8 +3882,8 @@ public class AiravataService {
}
}
- AgentAdaptor adaptor = adaptorSupport.fetchComputeSSHAdaptor(
- gatewayId, resourceId, credentialToken, userId, loginUserName);
+ AgentAdaptor adaptor =
+ adaptorSupport.fetchComputeSSHAdaptor(gatewayId, resourceId,
credentialToken, userId, loginUserName);
logger.info("Resolved resource {} as compute resource to fetch storage
details", resourceId);
return new StorageInfoContext(loginUserName, credentialToken, adaptor);
@@ -3991,8 +3991,8 @@ public class AiravataService {
}
}
- AgentAdaptor adaptor = adaptorSupport.fetchStorageSSHAdaptor(
- gatewayId, resourceId, credentialToken, userId, loginUserName);
+ AgentAdaptor adaptor =
+ adaptorSupport.fetchStorageSSHAdaptor(gatewayId, resourceId,
credentialToken, userId, loginUserName);
logger.info("Resolved resource {} as storage resource to fetch storage
details", resourceId);
return new StorageInfoContext(loginUserName, credentialToken, adaptor);
diff --git
a/airavata-api/src/main/java/org/apache/airavata/service/profile/TenantProfileService.java
b/airavata-api/src/main/java/org/apache/airavata/service/profile/TenantProfileService.java
index ab370f6682..c657e53d80 100644
---
a/airavata-api/src/main/java/org/apache/airavata/service/profile/TenantProfileService.java
+++
b/airavata-api/src/main/java/org/apache/airavata/service/profile/TenantProfileService.java
@@ -37,6 +37,7 @@ import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.DBEventService;
import org.apache.airavata.credential.exception.CredentialStoreException;
import org.apache.airavata.credential.model.PasswordCredential;
+import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.util.DBEventPublisherUtils;
import org.apache.airavata.profile.entities.GatewayEntity;
import org.apache.airavata.profile.exception.TenantProfileServiceException;
@@ -60,17 +61,19 @@ public class TenantProfileService {
private final Mapper mapper;
private final EntityManager entityManager;
- private DBEventPublisherUtils dbEventPublisherUtils = new
DBEventPublisherUtils(DBEventService.TENANT);
+ private final DBEventPublisherUtils dbEventPublisherUtils;
public TenantProfileService(
TenantProfileRepository tenantProfileRepository,
CredentialStoreService credentialStoreService,
Mapper mapper,
- @Qualifier("profileServiceEntityManager") EntityManager
entityManager) {
+ @Qualifier("profileServiceEntityManager") EntityManager
entityManager,
+ MessagingFactory messagingFactory) {
this.tenantProfileRepository = tenantProfileRepository;
this.credentialStoreService = credentialStoreService;
this.mapper = mapper;
this.entityManager = entityManager;
+ this.dbEventPublisherUtils = new
DBEventPublisherUtils(DBEventService.TENANT, messagingFactory);
}
public String addGateway(AuthzToken authzToken, Gateway gateway)
diff --git
a/airavata-api/src/main/java/org/apache/airavata/service/profile/UserProfileService.java
b/airavata-api/src/main/java/org/apache/airavata/service/profile/UserProfileService.java
index 646ebd0aeb..7f1430c2b5 100644
---
a/airavata-api/src/main/java/org/apache/airavata/service/profile/UserProfileService.java
+++
b/airavata-api/src/main/java/org/apache/airavata/service/profile/UserProfileService.java
@@ -32,6 +32,7 @@ import org.apache.airavata.common.model.UserProfile;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.DBEventService;
+import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.util.DBEventPublisherUtils;
import org.apache.airavata.profile.entities.UserProfileEntity;
import org.apache.airavata.profile.exception.IamAdminServicesException;
@@ -66,22 +67,23 @@ public class UserProfileService {
private final Mapper mapper;
private final AiravataSecurityManager securityManager;
private final EntityManager entityManager;
+ private final DBEventPublisherUtils dbEventPublisherUtils;
public UserProfileService(
UserProfileRepository userProfileRepository,
@Lazy org.apache.airavata.service.security.IamAdminService
iamAdminService,
Mapper mapper,
AiravataSecurityManager securityManager,
- @Qualifier("profileServiceEntityManager") EntityManager
entityManager) {
+ @Qualifier("profileServiceEntityManager") EntityManager
entityManager,
+ MessagingFactory messagingFactory) {
this.userProfileRepository = userProfileRepository;
this.iamAdminService = iamAdminService;
this.mapper = mapper;
this.securityManager = securityManager;
this.entityManager = entityManager;
+ this.dbEventPublisherUtils = new
DBEventPublisherUtils(DBEventService.USER_PROFILE, messagingFactory);
}
- private DBEventPublisherUtils dbEventPublisherUtils = new
DBEventPublisherUtils(DBEventService.USER_PROFILE);
-
public String initializeUserProfile(AuthzToken authzToken) throws
UserProfileServiceException {
String gatewayId = authzToken.getClaimsMap().get(Constants.GATEWAY_ID);
try {
diff --git
a/airavata-api/src/main/java/org/apache/airavata/service/security/IamAdminService.java
b/airavata-api/src/main/java/org/apache/airavata/service/security/IamAdminService.java
index 25be35178a..e20c95ec32 100644
---
a/airavata-api/src/main/java/org/apache/airavata/service/security/IamAdminService.java
+++
b/airavata-api/src/main/java/org/apache/airavata/service/security/IamAdminService.java
@@ -32,6 +32,7 @@ import org.apache.airavata.common.utils.DBEventService;
import org.apache.airavata.config.AiravataServerProperties;
import org.apache.airavata.credential.exception.CredentialStoreException;
import org.apache.airavata.credential.model.PasswordCredential;
+import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.util.DBEventPublisherUtils;
import org.apache.airavata.profile.exception.IamAdminServicesException;
import org.apache.airavata.profile.utils.TenantManagementKeycloakImpl;
@@ -54,17 +55,19 @@ public class IamAdminService {
private final CredentialStoreService credentialStoreService;
private final RegistryService registryService;
- private DBEventPublisherUtils dbEventPublisherUtils = new
DBEventPublisherUtils(DBEventService.IAM_ADMIN);
+ private final DBEventPublisherUtils dbEventPublisherUtils;
public IamAdminService(
AiravataServerProperties properties,
UserProfileService userProfileService,
CredentialStoreService credentialStoreService,
- RegistryService registryService) {
+ RegistryService registryService,
+ MessagingFactory messagingFactory) {
this.properties = properties;
this.userProfileService = userProfileService;
this.credentialStoreService = credentialStoreService;
this.registryService = registryService;
+ this.dbEventPublisherUtils = new
DBEventPublisherUtils(DBEventService.IAM_ADMIN, messagingFactory);
}
public Gateway setUpGateway(AuthzToken authzToken, Gateway gateway)
diff --git
a/airavata-api/src/main/java/org/apache/airavata/sharing/messaging/SharingServiceDBEventHandler.java
b/airavata-api/src/main/java/org/apache/airavata/sharing/messaging/SharingServiceDBEventHandler.java
index 9e9e1cef8e..03185acf06 100644
---
a/airavata-api/src/main/java/org/apache/airavata/sharing/messaging/SharingServiceDBEventHandler.java
+++
b/airavata-api/src/main/java/org/apache/airavata/sharing/messaging/SharingServiceDBEventHandler.java
@@ -53,8 +53,7 @@ public class SharingServiceDBEventHandler implements
MessageHandler {
private final SharingServiceDBEventMessagingFactory messagingFactory;
public SharingServiceDBEventHandler(
- SharingRegistryService sharingRegistryService,
- SharingServiceDBEventMessagingFactory messagingFactory) {
+ SharingRegistryService sharingRegistryService,
SharingServiceDBEventMessagingFactory messagingFactory) {
this.sharingRegistryService = sharingRegistryService;
this.messagingFactory = messagingFactory;
}
diff --git
a/airavata-api/src/main/java/org/apache/airavata/sharing/messaging/SharingServiceDBEventMessagingFactory.java
b/airavata-api/src/main/java/org/apache/airavata/sharing/messaging/SharingServiceDBEventMessagingFactory.java
index 003a689d0c..38b94533e0 100644
---
a/airavata-api/src/main/java/org/apache/airavata/sharing/messaging/SharingServiceDBEventMessagingFactory.java
+++
b/airavata-api/src/main/java/org/apache/airavata/sharing/messaging/SharingServiceDBEventMessagingFactory.java
@@ -105,7 +105,7 @@ public class SharingServiceDBEventMessagingFactory {
* @return
* @throws AiravataException
*/
- public static boolean registerSharingServiceWithPublishers(List<String>
publishers) throws AiravataException {
+ public boolean registerSharingServiceWithPublishers(List<String>
publishers) throws AiravataException {
for (String publisher : publishers) {
diff --git
a/airavata-api/src/test/java/org/apache/airavata/messaging/core/TestClient.java
b/airavata-api/src/test/java/org/apache/airavata/messaging/core/TestClient.java
index 465a3f9a71..22715963a3 100644
---
a/airavata-api/src/test/java/org/apache/airavata/messaging/core/TestClient.java
+++
b/airavata-api/src/test/java/org/apache/airavata/messaging/core/TestClient.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.model.ExperimentStatusChangeEvent;
import org.apache.airavata.common.model.MessageType;
+import org.apache.airavata.config.AiravataServerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,10 +36,12 @@ public class TestClient {
public static void main(String[] args) {
try {
+ AiravataServerProperties properties = new
AiravataServerProperties();
+ MessagingFactory messagingFactory = new
MessagingFactory(properties);
List<String> routingKeys = new ArrayList<>();
routingKeys.add(experimentId);
routingKeys.add(experimentId + ".*");
- MessagingFactory.getSubscriber(getMessageHandler(), routingKeys,
Type.STATUS);
+ messagingFactory.getSubscriber(getMessageHandler(), routingKeys,
Type.STATUS);
} catch (ApplicationSettingsException e) {
logger.error("Error reading airavata server properties", e);
} catch (Exception e) {
diff --git
a/modules/file-server/src/main/java/org/apache/airavata/file/server/service/AirvataFileService.java
b/modules/file-server/src/main/java/org/apache/airavata/file/server/service/AirvataFileService.java
index ce49e34188..494dc2ea3b 100644
---
a/modules/file-server/src/main/java/org/apache/airavata/file/server/service/AirvataFileService.java
+++
b/modules/file-server/src/main/java/org/apache/airavata/file/server/service/AirvataFileService.java
@@ -29,6 +29,7 @@ import org.apache.airavata.agents.api.FileMetadata;
import org.apache.airavata.file.server.model.AiravataDirectory;
import org.apache.airavata.file.server.model.AiravataFile;
import org.apache.airavata.helix.task.api.support.AdaptorSupport;
+import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.service.profile.UserProfileService;
import org.apache.airavata.service.registry.RegistryService;
import org.apache.airavata.service.security.CredentialStoreService;
@@ -48,18 +49,21 @@ public class AirvataFileService {
private final ApplicationContext applicationContext;
private final UserProfileService userProfileService;
private final CredentialStoreService credentialStoreService;
+ private final MessagingFactory messagingFactory;
public AirvataFileService(
AdaptorSupport adaptorSupport,
RegistryService registryService,
ApplicationContext applicationContext,
UserProfileService userProfileService,
- CredentialStoreService credentialStoreService) {
+ CredentialStoreService credentialStoreService,
+ MessagingFactory messagingFactory) {
this.adaptorSupport = adaptorSupport;
this.registryService = registryService;
this.applicationContext = applicationContext;
this.userProfileService = userProfileService;
this.credentialStoreService = credentialStoreService;
+ this.messagingFactory = messagingFactory;
}
private AgentAdaptor getAgentAdaptor(ProcessDataManager dataManager,
String processId) throws Exception {
@@ -79,6 +83,7 @@ public class AirvataFileService {
registryService,
userProfileService,
credentialStoreService,
+ messagingFactory,
processId,
adaptorSupport);
var agentAdaptor = getAgentAdaptor(dataManager, processId);
@@ -94,6 +99,7 @@ public class AirvataFileService {
registryService,
userProfileService,
credentialStoreService,
+ messagingFactory,
processId,
adaptorSupport);
var agentAdaptor = getAgentAdaptor(dataManager, processId);
@@ -127,6 +133,7 @@ public class AirvataFileService {
registryService,
userProfileService,
credentialStoreService,
+ messagingFactory,
processId,
adaptorSupport);
var agentAdaptor = getAgentAdaptor(dataManager, processId);
@@ -150,6 +157,7 @@ public class AirvataFileService {
registryService,
userProfileService,
credentialStoreService,
+ messagingFactory,
processId,
adaptorSupport);
var agentAdaptor = getAgentAdaptor(dataManager, processId);
@@ -176,6 +184,7 @@ public class AirvataFileService {
registryService,
userProfileService,
credentialStoreService,
+ messagingFactory,
processId,
adaptorSupport);
var agentAdaptor = getAgentAdaptor(dataManager, processId);
diff --git
a/modules/file-server/src/main/java/org/apache/airavata/file/server/service/ProcessDataManager.java
b/modules/file-server/src/main/java/org/apache/airavata/file/server/service/ProcessDataManager.java
index 789954e5b9..1160811c7f 100644
---
a/modules/file-server/src/main/java/org/apache/airavata/file/server/service/ProcessDataManager.java
+++
b/modules/file-server/src/main/java/org/apache/airavata/file/server/service/ProcessDataManager.java
@@ -29,6 +29,7 @@ import org.apache.airavata.helix.adaptor.SSHJAgentAdaptor;
import org.apache.airavata.helix.impl.task.aws.AWSProcessContextManager;
import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
import org.apache.airavata.helix.task.api.support.AdaptorSupport;
+import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.service.profile.UserProfileService;
import org.apache.airavata.service.registry.RegistryService;
import org.apache.airavata.service.security.CredentialStoreService;
@@ -51,10 +52,11 @@ public class ProcessDataManager extends
OutputDataStagingTask {
RegistryService registryService,
UserProfileService userProfileService,
CredentialStoreService credentialStoreService,
+ MessagingFactory messagingFactory,
String processId,
AdaptorSupport adaptorSupport)
throws Exception {
- super(applicationContext, registryService, userProfileService,
credentialStoreService);
+ super(applicationContext, registryService, userProfileService,
credentialStoreService, messagingFactory);
this.adaptorSupport = adaptorSupport;
try {
process = registryService.getProcess(processId);
diff --git
a/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/handler/RegistryServiceHandler.java
b/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/handler/RegistryServiceHandler.java
index 737da5cb02..1c6e5608fc 100644
---
a/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/handler/RegistryServiceHandler.java
+++
b/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/handler/RegistryServiceHandler.java
@@ -54,7 +54,6 @@ import
org.apache.airavata.thriftapi.mapper.GroupComputeResourcePreferenceMapper
import org.apache.airavata.thriftapi.mapper.GroupResourceProfileMapper;
import org.apache.airavata.thriftapi.mapper.InputDataObjectTypeMapper;
import org.apache.airavata.thriftapi.mapper.JobModelMapper;
-import org.apache.airavata.thriftapi.mapper.JobStatusMapper;
import org.apache.airavata.thriftapi.mapper.LOCALDataMovementMapper;
import org.apache.airavata.thriftapi.mapper.LOCALSubmissionMapper;
import org.apache.airavata.thriftapi.mapper.NotificationMapper;
diff --git
a/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/server/RegistryServiceServer.java
b/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/server/RegistryServiceServer.java
index 75bb1e1d14..9a65364ffb 100644
---
a/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/server/RegistryServiceServer.java
+++
b/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/server/RegistryServiceServer.java
@@ -140,8 +140,7 @@ public class RegistryServiceServer implements IServer {
try {
// db-event handlers
logger.info("Registring registry service with publishers for
db-events.");
-
RegistryServiceDBEventMessagingFactory.registerRegistryServiceWithPublishers(
- RegistryServiceConstants.DB_EVENT_SUBSCRIBERS);
+
messagingFactory.registerRegistryServiceWithPublishers(RegistryServiceConstants.DB_EVENT_SUBSCRIBERS);
logger.info("Starting registry service db-event-handler
subscriber.");
messagingFactory.getDBEventSubscriber();
diff --git
a/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/server/SharingRegistryServer.java
b/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/server/SharingRegistryServer.java
index 201cfd81ce..dabd344b8c 100644
---
a/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/server/SharingRegistryServer.java
+++
b/modules/thrift-api/src/main/java/org/apache/airavata/thriftapi/server/SharingRegistryServer.java
@@ -125,7 +125,7 @@ public class SharingRegistryServer implements IServer {
try {
logger.info("Register sharing service with DB
Event publishers");
-
SharingServiceDBEventMessagingFactory.registerSharingServiceWithPublishers(
+
messagingFactory.registerSharingServiceWithPublishers(
SharingRegistryConstants.PUBLISHERS);
logger.info("Start sharing service DB Event
subscriber");