This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new ef6a5b0 Rewriting data orchestrator logic to support directory
scanning through MFT
ef6a5b0 is described below
commit ef6a5b0f0d748a459b59c331f33c032632fa0fbb
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Mon Aug 23 07:37:09 2021 -0400
Rewriting data orchestrator logic to support directory scanning through MFT
---
ansible/inventories/test/group_vars/all/vars.yml | 2 +-
.../templates/data-orchestrator/config.yml.j2 | 2 +
.../clients/core/AbstractListener.java | 15 +-
.../clients/core/EventPublisher.java | 3 +-
.../file/client/model/Configuration.java | 9 +
.../file/client/publisher/FileEventPublisher.java | 5 +-
.../file/client/watcher/FileWatcher.java | 77 +++---
.../messaging/MessagingEvents.java | 9 -
.../messaging/model/NotificationEvent.java | 128 ++++------
.../model/NotificationEventDeserializer.java | 21 +-
.../model/NotificationEventSerializer.java | 16 +-
.../messaging/publisher/MessageProducer.java | 2 +-
.../persistance/entity/DataOrchestratorEntity.java | 226 ------------------
.../persistance/entity/OwnershipEntity.java | 56 -----
.../persistance/entity/WorkflowEntity.java | 108 ---------
.../persistance/entity/WorkflowTaskEntity.java | 118 ----------
.../entity/parser/DataParsingJobOutputEntity.java | 11 +
.../DataOrchestratorEventRepository.java | 32 ---
.../repository/WorkflowEntityRepository.java | 7 -
.../datalake/orchestrator/Configuration.java | 18 ++
.../orchestrator/connectors/DRMSConnector.java | 73 ++----
.../connectors/WorkflowServiceConnector.java | 6 +-
.../handlers/async/OrchestratorEventHandler.java | 24 +-
.../handlers/async/OrchestratorEventProcessor.java | 260 +++++++++++++++++++++
.../processor/InboundEventProcessor.java | 145 ------------
.../processor/OutboundEventProcessor.java | 197 ----------------
.../src/main/proto/parsing.proto | 1 +
.../wm/datasync/DataParsingWorkflowManager.java | 211 ++++++++---------
.../wm/datasync/DataSyncWorkflowManager.java | 6 +-
.../wm/datasync/WorkflowEngineAPIHandler.java | 17 +-
.../src/main/proto/service/WorkflowService.proto | 3 +-
31 files changed, 577 insertions(+), 1231 deletions(-)
diff --git a/ansible/inventories/test/group_vars/all/vars.yml
b/ansible/inventories/test/group_vars/all/vars.yml
index 8463bab..134537b 100644
--- a/ansible/inventories/test/group_vars/all/vars.yml
+++ b/ansible/inventories/test/group_vars/all/vars.yml
@@ -42,7 +42,7 @@ custos_repo: "https://github.com/apache/airavata-custos.git"
custos_git_branch: develop
mft_default_agent_id: agent0
-mft_default_agent_host: 10.1.0.33
+mft_default_agent_host: 10.1.0.42
mft_default_agent_advertised_url:
https://beta.iubemcenter.scigap.org:8443/downloads
mft_default_agent_port: 3333
diff --git a/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
b/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
index 40f6fdb..914245a 100644
--- a/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
+++ b/ansible/roles/data_lake/templates/data-orchestrator/config.yml.j2
@@ -10,6 +10,8 @@ outboundEventProcessor:
workflowPort: {{ workflow_manager_grpc_port }}
drmsHost: "{{ datalake_drms_host }}"
drmsPort: {{ datalake_drms_grpc_port }}
+ mftHost: "{{ mft_api_service_host }}"
+ mftPort: {{ mft_api_service_grpc_port }}
consumer:
brokerURL: "{{ datalake_data_orch_broker_url }}"
consumerGroup: "{{ datalake_data_orch_broker_consumer_group }}"
diff --git
a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java
b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java
index ba9932b..5395376 100644
---
a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java
+++
b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/AbstractListener.java
@@ -1,6 +1,5 @@
package org.apache.airavata.dataorchestrator.clients.core;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,28 +21,28 @@ public abstract class AbstractListener implements
EventListener {
public void onRegistered(NotificationEvent event) throws Exception {
LOGGER.info(" Registration event received for path " +
event.getResourcePath());
- eventPublisher.publish(event, MessagingEvents.REGISTER);
+ eventPublisher.publish(event, NotificationEvent.Type.REGISTER);
}
public void onCreated(NotificationEvent event) throws Exception {
LOGGER.info(event.getResourceType() + " " +
- event.getResourcePath() + ":" + event.getResourceName() + "
Created");
- eventPublisher.publish(event, MessagingEvents.CREATE);
+ event.getResourcePath() + ":" + event.getResourcePath() + "
Created");
+ eventPublisher.publish(event, NotificationEvent.Type.CREATE);
}
public void onModified(NotificationEvent event) throws Exception {
LOGGER.info(event.getResourceType() + " " +
- event.getResourcePath() + ":" + event.getResourceName() + "
Created");
- eventPublisher.publish(event, MessagingEvents.MODIFY);
+ event.getResourcePath() + ":" + event.getResourcePath() + "
Created");
+ eventPublisher.publish(event, NotificationEvent.Type.MODIFY);
}
public void onDeleted(NotificationEvent event) throws Exception {
LOGGER.info(event.getResourceType() + " " +
- event.getResourcePath() + ":" + event.getResourceName() + "
Created");
- eventPublisher.publish(event, MessagingEvents.DELETE);
+ event.getResourcePath() + ":" + event.getBasePath() + "
Created");
+ eventPublisher.publish(event, NotificationEvent.Type.DELETE);
}
diff --git
a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java
b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java
index 53b40c9..8b52c79 100644
---
a/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java
+++
b/data-orchestrator/data-orchestrator-clients/data-orchestrator-clients-core/src/main/java/org/apache/airavata/dataorchestrator/clients/core/EventPublisher.java
@@ -1,6 +1,5 @@
package org.apache.airavata.dataorchestrator.clients.core;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
import java.util.concurrent.ExecutionException;
@@ -11,7 +10,7 @@ import java.util.concurrent.ExecutionException;
public interface EventPublisher {
- public void publish(NotificationEvent notificationEvent, MessagingEvents
event) throws Exception;
+ public void publish(NotificationEvent notificationEvent,
NotificationEvent.Type eventType) throws Exception;
}
diff --git
a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
index 630db9c..d4a42df 100644
---
a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
+++
b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/model/Configuration.java
@@ -3,6 +3,7 @@ package org.apache.airavata.dataorchestrator.file.client.model;
public class Configuration {
private String listeningPath;
private String hostName;
+ private int depth = 2;
private Producer producer;
@@ -43,6 +44,14 @@ public class Configuration {
this.custos = custos;
}
+ public int getDepth() {
+ return depth;
+ }
+
+ public void setDepth(int depth) {
+ this.depth = depth;
+ }
+
public static class Producer {
private String brokerURL;
private String publisherId;
diff --git
a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java
b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java
index 82b8199..a228d1c 100644
---
a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java
+++
b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/publisher/FileEventPublisher.java
@@ -2,7 +2,6 @@ package
org.apache.airavata.dataorchestrator.file.client.publisher;
import org.apache.airavata.dataorchestrator.clients.core.EventPublisher;
import org.apache.airavata.dataorchestrator.file.client.model.Configuration;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
import
org.apache.airavata.dataorchestrator.messaging.publisher.MessageProducer;
import org.apache.kafka.clients.producer.Callback;
@@ -25,8 +24,8 @@ public class FileEventPublisher implements EventPublisher {
}
@Override
- public void publish(NotificationEvent notificationEvent, MessagingEvents
event) throws ExecutionException, InterruptedException {
- notificationEvent.getContext().setEvent(event);
+ public void publish(NotificationEvent notificationEvent,
NotificationEvent.Type eventType) throws ExecutionException,
InterruptedException {
+ notificationEvent.setEventType(eventType);
messageProducer.publish(configuration.getProducer().getPublisherTopic(),
notificationEvent, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception
e) {
diff --git
a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
index 9e16018..c1bf220 100644
---
a/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
+++
b/data-orchestrator/data-orchestrator-clients/data-orchestrator-file-event-listener/src/main/java/org/apache/airavata/dataorchestrator/file/client/watcher/FileWatcher.java
@@ -29,15 +29,12 @@ public class FileWatcher implements Runnable {
private Configuration configuration;
-
public FileWatcher(File rootFolder, Configuration configuration) throws
IOException {
this.rootFolder = rootFolder;
this.configuration = configuration;
}
-
@Override
-
public void run() {
LOGGER.info("Watcher service starting at " +
rootFolder.getAbsolutePath());
@@ -51,12 +48,9 @@ public class FileWatcher implements Runnable {
} catch (Exception e) {
LOGGER.error("Error occurred while watching folder " +
rootFolder.getAbsolutePath(), e);
Thread.currentThread().interrupt();
-
}
-
}
-
protected void pollEvents(WatchService watchService) throws Exception {
WatchKey key = watchService.take();
@@ -68,10 +62,10 @@ public class FileWatcher implements Runnable {
if (!key.reset()) {
keyPathMap.remove(key);
}
+
if (keyPathMap.isEmpty()) {
return;
}
-
}
@@ -82,12 +76,14 @@ public class FileWatcher implements Runnable {
path = parentPath.resolve(path);
File file = path.toFile();
- FileEvent event = getFileEvent(file);
+ Optional<FileEvent> event = getFileEvent(file);
if (kind == ENTRY_CREATE) {
- for (AbstractListener listener : listeners) {
- listener.onCreated(event);
+ if (event.isPresent()) {
+ for (AbstractListener listener : listeners) {
+ listener.onCreated(event.get());
+ }
}
if (file.isDirectory()) {
@@ -96,23 +92,22 @@ public class FileWatcher implements Runnable {
} else if (kind == ENTRY_MODIFY) {
- for (AbstractListener listener : listeners) {
-
- listener.onModified(event);
-
+ if (event.isPresent()) {
+ for (AbstractListener listener : listeners) {
+ listener.onModified(event.get());
+ }
}
} else if (kind == ENTRY_DELETE) {
- for (AbstractListener listener : listeners) {
- listener.onDeleted(event);
+ if (event.isPresent()) {
+ for (AbstractListener listener : listeners) {
+ listener.onDeleted(event.get());
+ }
}
-
}
-
}
-
public FileWatcher addListener(AbstractListener listener) {
listeners.add(listener);
@@ -152,24 +147,44 @@ public class FileWatcher implements Runnable {
*/
- protected FileEvent getFileEvent(File file) {
+ protected Optional<FileEvent> getFileEvent(File file) {
FileEvent event = new FileEvent();
- if (file.isDirectory()) {
+
+
+ String absolutePath = file.getAbsolutePath();
+ if (configuration.getDepth() > 0) {
+ String relativePath =
absolutePath.substring(configuration.getListeningPath().length());
+ if (relativePath.startsWith("/")) {
+ relativePath = relativePath.substring(1);
+ }
+ String[] relativeParts = relativePath.split("/");
+ if (relativeParts.length >= configuration.getDepth()) {
+ String beginPath = configuration.getListeningPath();
+ beginPath = beginPath.endsWith("/") ? beginPath.substring(0,
beginPath.length()-1) : beginPath;
+ for (int step = 0; step < configuration.getDepth(); step++) {
+ beginPath = beginPath + "/" + relativeParts[step];
+ }
+ absolutePath = beginPath;
+ } else {
+ LOGGER.warn("Depth of path {} is not greater or equal to
required depth {}", absolutePath, configuration.getDepth());
+ return Optional.empty();
+ }
+ }
+
+ if (new File(absolutePath).isDirectory()) {
event.setResourceType(Constants.FOLDER);
} else {
event.setResourceType(Constants.FILE);
}
- event.setResourceName(file.getName());
- event.setResourcePath(file.getAbsolutePath());
- NotificationEvent.Context context = new NotificationEvent.Context();
- context.setOccuredTime(System.currentTimeMillis());
-
context.setAuthToken(Base64.getEncoder().encodeToString((configuration.getCustos().getServiceAccountId()
+
+ event.setResourcePath(absolutePath);
+ event.setOccuredTime(System.currentTimeMillis());
+
event.setAuthToken(Base64.getEncoder().encodeToString((configuration.getCustos().getServiceAccountId()
+ ":" +
configuration.getCustos().getServiceAccountSecret()).getBytes(StandardCharsets.UTF_8)));
- context.setBasePath(configuration.getListeningPath());
- context.setTenantId(configuration.getCustos().getTenantId());
- context.setHostName(configuration.getHostName());
- event.setContext(context);
- return event;
+ event.setBasePath(configuration.getListeningPath());
+ event.setTenantId(configuration.getCustos().getTenantId());
+ event.setHostName(configuration.getHostName());
+ return Optional.of(event);
}
private static void registerDir(Path path, WatchService watchService)
throws
diff --git
a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/MessagingEvents.java
b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/MessagingEvents.java
deleted file mode 100644
index 3771b72..0000000
---
a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/MessagingEvents.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.airavata.dataorchestrator.messaging;
-
-public enum MessagingEvents {
-
- REGISTER,
- CREATE,
- MODIFY,
- DELETE
-}
diff --git
a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
index 1cda9db..baecaeb 100644
---
a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
+++
b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEvent.java
@@ -1,26 +1,25 @@
package org.apache.airavata.dataorchestrator.messaging.model;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
-
-import java.io.Serializable;
-import java.util.UUID;
-
/**
* Notification event represents triggering messages
*/
public class NotificationEvent {
- private String resourcePath;
- private String resourceName;
- private String resourceType;
- private Context context;
- private String id;
-
-
- public NotificationEvent() {
- this.id = UUID.randomUUID().toString();
+ public enum Type {
+ REGISTER,
+ CREATE,
+ MODIFY,
+ DELETE
}
+ private String resourcePath;
+ private String resourceType;
+ private Long occuredTime;
+ private String authToken;
+ private String tenantId;
+ private String hostName;
+ private String basePath;
+ private Type eventType;
public String getResourcePath() {
return resourcePath;
@@ -30,14 +29,6 @@ public class NotificationEvent {
this.resourcePath = resourcePath;
}
- public String getResourceName() {
- return resourceName;
- }
-
- public void setResourceName(String resourceName) {
- this.resourceName = resourceName;
- }
-
public String getResourceType() {
return resourceType;
}
@@ -46,84 +37,51 @@ public class NotificationEvent {
this.resourceType = resourceType;
}
- public Context getContext() {
- return context;
+ public Long getOccuredTime() {
+ return occuredTime;
}
- public void setContext(Context context) {
- this.context = context;
+ public void setOccuredTime(Long occuredTime) {
+ this.occuredTime = occuredTime;
}
- public String getId() {
- return id;
+ public String getAuthToken() {
+ return authToken;
}
- public void setId(String id) {
- this.id = id;
+ public void setAuthToken(String authToken) {
+ this.authToken = authToken;
}
- public static class Context implements Serializable {
-
- private MessagingEvents event;
- private Long occuredTime;
- private String authToken;
- private String tenantId;
- private String hostName;
- private String basePath;
-
-
- public MessagingEvents getEvent() {
- return event;
- }
-
- public void setEvent(MessagingEvents event) {
- this.event = event;
- }
-
- public Long getOccuredTime() {
- return occuredTime;
- }
-
- public void setOccuredTime(Long occuredTime) {
- this.occuredTime = occuredTime;
- }
-
- public String getAuthToken() {
- return authToken;
- }
-
- public void setAuthToken(String authToken) {
- this.authToken = authToken;
- }
-
- public String getTenantId() {
- return tenantId;
- }
-
- public void setTenantId(String tenantId) {
- this.tenantId = tenantId;
- }
+ public String getTenantId() {
+ return tenantId;
+ }
- public String getBasePath() {
- return basePath;
- }
+ public void setTenantId(String tenantId) {
+ this.tenantId = tenantId;
+ }
- public void setBasePath(String basePath) {
- this.basePath = basePath;
- }
+ public String getHostName() {
+ return hostName;
+ }
- public String getHostName() {
- return hostName;
- }
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
+ public String getBasePath() {
+ return basePath;
}
- public String getResourceId() {
- return context.hostName+ ":" + resourcePath + ":" + resourceType;
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
}
+ public Type getEventType() {
+ return eventType;
+ }
+ public void setEventType(Type eventType) {
+ this.eventType = eventType;
+ }
}
diff --git
a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
index f19d272..d8d834a 100644
---
a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
+++
b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventDeserializer.java
@@ -1,6 +1,5 @@
package org.apache.airavata.dataorchestrator.messaging.model;
-import org.apache.airavata.dataorchestrator.messaging.MessagingEvents;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
@@ -19,18 +18,14 @@ public class NotificationEventDeserializer implements
Deserializer<NotificationE
String deserialized = new String(bytes);
String parts[] = deserialized.split(",");
NotificationEvent event = new NotificationEvent();
- NotificationEvent.Context context = new NotificationEvent.Context();
- event.setId(parts[0]);
- context.setEvent(MessagingEvents.valueOf(parts[1]));
- context.setOccuredTime(Long.valueOf(parts[2]));
- context.setAuthToken(String.valueOf(parts[3]));
- context.setTenantId(String.valueOf(parts[4]));
- context.setHostName(parts[5]);
- context.setBasePath(parts[6]);
- event.setResourcePath(parts[7]);
- event.setResourceType(parts[8]);
- event.setResourceName(parts[9]);
- event.setContext(context);
+ event.setResourcePath(parts[0]);
+ event.setResourceType(parts[1]);
+ event.setOccuredTime(Long.valueOf(parts[2]));
+ event.setTenantId(parts[3]);
+ event.setHostName(parts[4]);
+ event.setBasePath(parts[5]);
+ event.setEventType(NotificationEvent.Type.valueOf(parts[6]));
+ event.setAuthToken(parts[7]);
return event;
}
diff --git
a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
index 7a55770..c92d181 100644
---
a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
+++
b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/model/NotificationEventSerializer.java
@@ -15,16 +15,14 @@ public class NotificationEventSerializer implements
Serializer<NotificationEvent
@Override
public byte[] serialize(String s, NotificationEvent notificationEvent) {
- String serializedData = notificationEvent.getId() + "," +
- notificationEvent.getContext().getEvent().name() + "," +
- notificationEvent.getContext().getOccuredTime() + "," +
- notificationEvent.getContext().getAuthToken() + "," +
- notificationEvent.getContext().getTenantId() + "," +
- notificationEvent.getContext().getHostName() + "," +
- notificationEvent.getContext().getBasePath() + "," +
- notificationEvent.getResourcePath() + "," +
+ String serializedData = notificationEvent.getResourcePath() + "," +
notificationEvent.getResourceType() + "," +
- notificationEvent.getResourceName();
+ notificationEvent.getOccuredTime() + "," +
+ notificationEvent.getTenantId() + "," +
+ notificationEvent.getHostName() + "," +
+ notificationEvent.getBasePath() + "," +
+ notificationEvent.getEventType() + "," +
+ notificationEvent.getAuthToken();
return serializedData.getBytes();
}
diff --git
a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java
b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java
index e4aa253..4abded7 100644
---
a/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java
+++
b/data-orchestrator/data-orchestrator-messaging/src/main/java/org/apache/airavata/dataorchestrator/messaging/publisher/MessageProducer.java
@@ -30,7 +30,7 @@ public class MessageProducer {
public void publish(String topic, NotificationEvent notificationMessage,
Callback callback) throws ExecutionException, InterruptedException {
try {
final ProducerRecord<String, NotificationEvent> record = new
ProducerRecord<>(topic,
- notificationMessage.getId(),
+ notificationMessage.getResourcePath(),
notificationMessage);
producer.send(record, callback).get();
} finally {
diff --git
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/DataOrchestratorEntity.java
b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/DataOrchestratorEntity.java
deleted file mode 100644
index d1843fd..0000000
---
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/DataOrchestratorEntity.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.springframework.data.annotation.CreatedDate;
-import org.springframework.data.annotation.LastModifiedDate;
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-import java.util.Date;
-import java.util.Set;
-
-/**
- * DataOrchestrator entity
- */
-@Entity
-@Table(name = "DATAORCHESTRATOR_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class DataOrchestratorEntity {
-
- @Id
- private String id;
-
- @Column(nullable = false)
- private String resourceId;
-
- @Column(nullable = false)
- private String resourcePath;
-
- @Column(nullable = false)
- private String resourceName;
-
- @Column(nullable = false)
- private String resourceType;
-
- @Column(nullable = false)
- private Date occurredTime;
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @CreatedDate
- private Date createdAt;
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @LastModifiedDate
- private Date lastModifiedAt;
-
- @Column(nullable = false)
- private String eventStatus;
-
- @Column(nullable = false)
- private String eventType;
-
- @Column(nullable = false)
- private String tenantId;
-
- @Column(nullable = false)
- private String agentId;
-
- @Column(nullable = false)
- private String authToken;
-
- @Column(nullable = false)
- private String hostName;
-
- @Lob
- private String error;
-
- @OneToMany(fetch = FetchType.EAGER, mappedBy = "dataOrchestratorEntity",
orphanRemoval = true, cascade = CascadeType.ALL)
- private Set<WorkflowEntity> workFlowEntities;
-
- @OneToMany(fetch = FetchType.EAGER, mappedBy = "dataOrchestratorEntity",
orphanRemoval = true, cascade = CascadeType.ALL)
- private Set<OwnershipEntity> ownershipEntities;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getResourceId() {
- return resourceId;
- }
-
- public void setResourceId(String resourceId) {
- this.resourceId = resourceId;
- }
-
- public String getResourcePath() {
- return resourcePath;
- }
-
- public void setResourcePath(String resourcePath) {
- this.resourcePath = resourcePath;
- }
-
- public String getResourceName() {
- return resourceName;
- }
-
- public void setResourceName(String resourceName) {
- this.resourceName = resourceName;
- }
-
- public String getResourceType() {
- return resourceType;
- }
-
- public void setResourceType(String resourceType) {
- this.resourceType = resourceType;
- }
-
- public Date getOccurredTime() {
- return occurredTime;
- }
-
- public void setOccurredTime(Date occurredTime) {
- this.occurredTime = occurredTime;
- }
-
- public Date getCreatedAt() {
- return createdAt;
- }
-
- public void setCreatedAt(Date createdAt) {
- this.createdAt = createdAt;
- }
-
- public Date getLastModifiedAt() {
- return lastModifiedAt;
- }
-
- public void setLastModifiedAt(Date lastModifiedAt) {
- this.lastModifiedAt = lastModifiedAt;
- }
-
- public String getEventStatus() {
- return eventStatus;
- }
-
- public void setEventStatus(String status) {
- this.eventStatus = status;
- }
-
- public Set<WorkflowEntity> getWorkFlowEntities() {
- return workFlowEntities;
- }
-
- public void setWorkFlowEntities(Set<WorkflowEntity> workFlowEntities) {
- this.workFlowEntities = workFlowEntities;
- }
-
- public String getError() {
- return error;
- }
-
- public void setError(String error) {
- this.error = error;
- }
-
- public String getEventType() {
- return eventType;
- }
-
- public void setEventType(String eventType) {
- this.eventType = eventType;
- }
-
- public String getTenantId() {
- return tenantId;
- }
-
- public void setTenantId(String tenantId) {
- this.tenantId = tenantId;
- }
-
- public String getAgentId() {
- return agentId;
- }
-
- public void setAgentId(String agentId) {
- this.agentId = agentId;
- }
-
- public String getAuthToken() {
- return authToken;
- }
-
- public void setAuthToken(String authToken) {
- this.authToken = authToken;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public void setHostName(String hostName) {
- this.hostName = hostName;
- }
-
- public Set<OwnershipEntity> getOwnershipEntities() {
- return ownershipEntities;
- }
-
- public void setOwnershipEntities(Set<OwnershipEntity> ownershipEntities) {
- this.ownershipEntities = ownershipEntities;
- }
-}
diff --git
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/OwnershipEntity.java
b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/OwnershipEntity.java
deleted file mode 100644
index ec6e4eb..0000000
---
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/OwnershipEntity.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-
-@Entity
-@Table(name = "OWNERSHIP_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class OwnershipEntity {
-
- @Id
- private String id;
-
- @Column(name = "USER_ID")
- private String userId;
-
- @Column(name = "PERMISSION_ID")
- private String permissionId;
-
- @ManyToOne
- @JoinColumn(name = "DATA_ORCHESTRATOR_ENTITY_ID")
- private DataOrchestratorEntity dataOrchestratorEntity;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getUserId() {
- return userId;
- }
-
- public void setUserId(String userId) {
- this.userId = userId;
- }
-
- public String getPermissionId() {
- return permissionId;
- }
-
- public void setPermissionId(String permissionId) {
- this.permissionId = permissionId;
- }
-
- public DataOrchestratorEntity getDataOrchestratorEntity() {
- return dataOrchestratorEntity;
- }
-
- public void setDataOrchestratorEntity(DataOrchestratorEntity
dataOrchestratorEntity) {
- this.dataOrchestratorEntity = dataOrchestratorEntity;
- }
-}
diff --git
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowEntity.java
b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowEntity.java
deleted file mode 100644
index d585020..0000000
---
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowEntity.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import org.springframework.data.annotation.CreatedDate;
-import org.springframework.data.annotation.LastModifiedDate;
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-import java.util.Date;
-import java.util.Set;
-
-/**
- * An workflow class that represents the workflow entity
- */
-@Entity
-@Table(name = "WORKFLOW_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class WorkflowEntity {
-
- @Id
- private String id;
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @CreatedDate
- private Date createdAt;
-
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @LastModifiedDate
- private Date lastModifiedAt;
-
- @Column(nullable = false)
- private String status;
-
-
- @ManyToOne
- @JoinColumn(name = "dataorchestrator_entity_id")
- private DataOrchestratorEntity dataOrchestratorEntity;
-
- @OneToMany(fetch = FetchType.EAGER, mappedBy = "workflowEntity",
orphanRemoval = true, cascade = CascadeType.ALL)
- private Set<WorkflowTaskEntity> workflowTaskEntities;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public Date getCreatedAt() {
- return createdAt;
- }
-
- public void setCreatedAt(Date createdAt) {
- this.createdAt = createdAt;
- }
-
- public Date getLastModifiedAt() {
- return lastModifiedAt;
- }
-
- public void setLastModifiedAt(Date lastModifiedAt) {
- this.lastModifiedAt = lastModifiedAt;
- }
-
- public String getStatus() {
- return status;
- }
-
- public void setStatus(String status) {
- this.status = status;
- }
-
- public DataOrchestratorEntity getDataOrchestratorEntity() {
- return dataOrchestratorEntity;
- }
-
- public void setDataOrchestratorEntity(DataOrchestratorEntity
dataOrchestratorEntity) {
- this.dataOrchestratorEntity = dataOrchestratorEntity;
- }
-
- public Set<WorkflowTaskEntity> getWorkflowTaskEntities() {
- return workflowTaskEntities;
- }
-
- public void setWorkflowTaskEntities(Set<WorkflowTaskEntity>
workflowTaskEntities) {
- this.workflowTaskEntities = workflowTaskEntities;
- }
-}
diff --git
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowTaskEntity.java
b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowTaskEntity.java
deleted file mode 100644
index b20ddd8..0000000
---
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/WorkflowTaskEntity.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.datalake.orchestrator.registry.persistance.entity;
-
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.WorkflowEntity;
-import org.springframework.data.annotation.CreatedDate;
-import org.springframework.data.annotation.LastModifiedDate;
-import org.springframework.data.jpa.domain.support.AuditingEntityListener;
-
-import javax.persistence.*;
-import java.util.Date;
-
-/**
- * An entity class represents the task entity
- */
-@Entity
-@Table(name = "WORKFLOW_TASK_ENTITY")
-@EntityListeners(AuditingEntityListener.class)
-public class WorkflowTaskEntity {
-
- @Id
- private String id;
-
- @ManyToOne
- @JoinColumn(name = "workflow_entity_id")
- private WorkflowEntity workflowEntity;
-
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @CreatedDate
- private Date createdAt;
-
-
- @Column(nullable = false)
- @Temporal(TemporalType.TIMESTAMP)
- @LastModifiedDate
- private Date lastModifiedAt;
-
- @Column(nullable = false)
- private String status;
-
- @Lob
- private String error;
-
- private int errorCode;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public WorkflowEntity getWorkflowEntity() {
- return workflowEntity;
- }
-
- public void setWorkflowEntity(WorkflowEntity workFlowEntity) {
- this.workflowEntity = workFlowEntity;
- }
-
- public Date getCreatedAt() {
- return createdAt;
- }
-
- public void setCreatedAt(Date createdAt) {
- this.createdAt = createdAt;
- }
-
- public Date getLastModifiedAt() {
- return lastModifiedAt;
- }
-
- public void setLastModifiedAt(Date lastModifiedAt) {
- this.lastModifiedAt = lastModifiedAt;
- }
-
- public String getStatus() {
- return status;
- }
-
- public void setStatus(String status) {
- this.status = status;
- }
-
- public String getError() {
- return error;
- }
-
- public void setError(String error) {
- this.error = error;
- }
-
- public int getErrorCode() {
- return errorCode;
- }
-
- public void setErrorCode(int errorCode) {
- this.errorCode = errorCode;
- }
-}
diff --git
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java
b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java
index 27aa4fd..b37c123 100644
---
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java
+++
b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/entity/parser/DataParsingJobOutputEntity.java
@@ -36,6 +36,9 @@ public class DataParsingJobOutputEntity {
@Column(name = "OUTPUT_TYPE")
private DataParsingJobOutputType outputType;
+ @Column(name = "OUTPUT_DIR_RESOURCE_ID")
+ private String outputDirectoryResourceId;
+
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "DATA_PARSER_OUTPUT_INTERFACE_ID", insertable=false,
updatable=false)
private DataParserOutputInterfaceEntity dataParserOutputInterface;
@@ -91,4 +94,12 @@ public class DataParsingJobOutputEntity {
public void setDataParsingJobEntity(DataParsingJobEntity
dataParsingJobEntity) {
this.dataParsingJobEntity = dataParsingJobEntity;
}
+
+ public String getOutputDirectoryResourceId() {
+ return outputDirectoryResourceId;
+ }
+
+ public void setOutputDirectoryResourceId(String outputDirectoryResourceId)
{
+ this.outputDirectoryResourceId = outputDirectoryResourceId;
+ }
}
diff --git
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/DataOrchestratorEventRepository.java
b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/DataOrchestratorEventRepository.java
deleted file mode 100644
index 43f2eb9..0000000
---
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/DataOrchestratorEventRepository.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.airavata.datalake.orchestrator.registry.persistance.repository;
-
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import org.springframework.data.jpa.repository.JpaRepository;
-import org.springframework.data.jpa.repository.Query;
-
-import java.util.List;
-
-public interface DataOrchestratorEventRepository extends
JpaRepository<DataOrchestratorEntity, String> {
-
- @Query(value = "select * from DATAORCHESTRATOR_ENTITY s where
s.eventStatus = ?1 ORDER BY occurredTime DESC", nativeQuery = true)
- public List<DataOrchestratorEntity> findAllEntitiesWithGivenStatus(String
eventStatus);
-
-
-}
diff --git
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/WorkflowEntityRepository.java
b/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/WorkflowEntityRepository.java
deleted file mode 100644
index 4014186..0000000
---
a/data-orchestrator/data-orchestrator-registry/src/main/java/org/apache/airavata/datalake/orchestrator/registry/persistance/repository/WorkflowEntityRepository.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package
org.apache.airavata.datalake.orchestrator.registry.persistance.repository;
-
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.WorkflowEntity;
-import org.springframework.data.jpa.repository.JpaRepository;
-
-public interface WorkflowEntityRepository extends
JpaRepository<WorkflowEntity, String> {
-}
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
index d728ef1..52494e5 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/Configuration.java
@@ -153,6 +153,8 @@ public class Configuration {
private String drmsHost;
private int drmsPort;
private int pollingInterval;
+ private String mftHost;
+ private int mftPort;
public OutboundEventProcessorConfig() {
@@ -198,6 +200,22 @@ public class Configuration {
public void setPollingInterval(int pollingInterval) {
this.pollingInterval = pollingInterval;
}
+
+ public String getMftHost() {
+ return mftHost;
+ }
+
+ public void setMftHost(String mftHost) {
+ this.mftHost = mftHost;
+ }
+
+ public int getMftPort() {
+ return mftPort;
+ }
+
+ public void setMftPort(int mftPort) {
+ this.mftPort = mftPort;
+ }
}
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index e24eb25..96fac88 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -11,10 +11,6 @@ import
org.apache.airavata.datalake.drms.sharing.ShareEntityWithUserRequest;
import org.apache.airavata.datalake.drms.storage.*;
import org.apache.airavata.datalake.orchestrator.Configuration;
import
org.apache.airavata.datalake.orchestrator.core.connector.AbstractConnector;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.OwnershipEntity;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.EventStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,50 +56,36 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
return !this.drmsChannel.isShutdown();
}
- public void shareWithUser(DataOrchestratorEntity entity) throws Exception {
-
- Optional<OwnershipEntity> adminOp =
entity.getOwnershipEntities().stream().filter(o ->
o.getPermissionId().equals("ADMIN")).findFirst();
- if (adminOp.isEmpty()) {
- throw new Exception("No admin user found");
- }
+ public void shareWithUser(String authToken, String tenantId, String admin,
String user, String resourceId, String permission) throws Exception {
DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
+ .setAccessToken(authToken)
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
.setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(adminOp.get().getUserId())
- .setTenantId(entity.getTenantId())
+ .setUsername(admin)
+ .setTenantId(tenantId)
.build())
.build();
- for (OwnershipEntity ownershipEntity : entity.getOwnershipEntities()) {
- if (ownershipEntity.getPermissionId().equals("ADMIN")) {
- continue;
- }
- ShareEntityWithUserRequest.Builder shareBuilder =
ShareEntityWithUserRequest.newBuilder()
- .setAuthToken(serviceAuthToken)
- .setEntityId(entity.getResourceId())
- .setSharedUserId(ownershipEntity.getUserId())
- .setPermissionId(ownershipEntity.getPermissionId());
-
-
this.sharingServiceBlockingStub.shareEntityWithUser(shareBuilder.build());
- }
+ ShareEntityWithUserRequest.Builder shareBuilder =
ShareEntityWithUserRequest.newBuilder()
+ .setAuthToken(serviceAuthToken)
+ .setEntityId(resourceId)
+ .setSharedUserId(user)
+ .setPermissionId(permission);
- }
+
this.sharingServiceBlockingStub.shareEntityWithUser(shareBuilder.build());
- public Optional<TransferMapping>
getActiveTransferMapping(DataOrchestratorEntity entity, String hostname) throws
Exception {
+ }
- Optional<OwnershipEntity> adminOp =
entity.getOwnershipEntities().stream().filter(o ->
o.getPermissionId().equals("ADMIN")).findFirst();
- if (adminOp.isEmpty()) {
- throw new Exception("No admin user found");
- }
+ public Optional<TransferMapping> getActiveTransferMapping(String
authToken, String tenantId,
+ String user,
String hostName) throws Exception {
DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
+ .setAccessToken(authToken)
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
.setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(adminOp.get().getUserId())
- .setTenantId(entity.getTenantId())
+ .setUsername(user)
+ .setTenantId(tenantId)
.build())
.build();
FindTransferMappingsRequest request =
FindTransferMappingsRequest.newBuilder()
@@ -116,7 +98,7 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
transferMappingList.forEach(transferMapping -> {
if (transferMapping.getSourceStorage().getStorageCase()
.equals(AnyStorage.StorageCase.SSH_STORAGE)) {
- if
(transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostname))
{
+ if
(transferMapping.getSourceStorage().getSshStorage().getHostName().equals(hostName))
{
transferMappingOp.set(transferMapping);
}
}
@@ -126,24 +108,22 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
}
- public Optional<GenericResource>
createResource(DataOrchestratorEventRepository repository,
DataOrchestratorEntity entity,
+ public Optional<GenericResource> createResource(String authToken,
+ String tenantId,
String resourceId,
String resourceName,
String resourcePath,
String parentId,
- String type, String
parentType) throws Exception {
+ String type, String
parentType,
+ String user) throws
Exception {
- Optional<OwnershipEntity> adminOp =
entity.getOwnershipEntities().stream().filter(o ->
o.getPermissionId().equals("ADMIN")).findFirst();
- if (adminOp.isEmpty()) {
- throw new Exception("No admin user found");
- }
DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
+ .setAccessToken(authToken)
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
.setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(adminOp.get().getUserId())
- .setTenantId(entity.getTenantId())
+ .setUsername(user)
+ .setTenantId(tenantId)
.build())
.build();
@@ -165,10 +145,7 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
ResourceCreateResponse resourceCreateResponse =
resourceServiceBlockingStub.createResource(resourceCreateRequest);
return Optional.ofNullable(resourceCreateResponse.getResource());
} catch (Exception ex) {
- LOGGER.error("Error occurred while creating resource {} in DRMS",
entity.getResourceId(), ex);
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Error occurred while creating resource in DRMS "
+ ex.getMessage());
- repository.save(entity);
+ LOGGER.error("Error occurred while creating resource {} in DRMS",
resourcePath, ex);
return Optional.empty();
}
}
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
index b95ccb3..d571568 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/WorkflowServiceConnector.java
@@ -10,6 +10,8 @@ import
org.apache.airavata.datalake.orchestrator.workflow.engine.WorkflowService
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
/**
* Drms connector to call DRMS services
*/
@@ -43,11 +45,11 @@ public class WorkflowServiceConnector implements
AbstractConnector<Configuration
return false;
}
- public void invokeWorkflow(String authToken, String username, String
tenantId, String sourceResourceId, String sourceCredentialToken,
+ public void invokeWorkflow(String authToken, String username, String
tenantId, List<String> sourceResourceIds, String sourceCredentialToken,
String dstResourceId, String
destinationCredentialToken) {
try {
WorkflowMessage workflowMessage = WorkflowMessage.newBuilder()
- .setSourceResourceId(sourceResourceId)
+ .addAllSourceResourceIds(sourceResourceIds)
.setDestinationResourceId(dstResourceId)
.setUsername(username)
.setTenantId(tenantId)
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
index ff5fa67..d6d25bc 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventHandler.java
@@ -18,9 +18,6 @@
package org.apache.airavata.datalake.orchestrator.handlers.async;
import org.apache.airavata.datalake.orchestrator.Configuration;
-import
org.apache.airavata.datalake.orchestrator.processor.InboundEventProcessor;
-import
org.apache.airavata.datalake.orchestrator.processor.OutboundEventProcessor;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
import org.apache.airavata.dataorchestrator.messaging.consumer.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,7 +27,6 @@ import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
/**
* Orchestrator event handler
@@ -45,10 +41,6 @@ public class OrchestratorEventHandler {
private ScheduledExecutorService ouboundExecutorService;
private MessageConsumer messageConsumer;
- @Autowired
- private DataOrchestratorEventRepository dataOrchestratorEventRepository;
-
-
public OrchestratorEventHandler() {
}
@@ -60,20 +52,18 @@ public class OrchestratorEventHandler {
configuration.getConsumer().getConsumerGroup(),
configuration.getConsumer().getMaxPollRecordsConfig(),
configuration.getConsumer().getTopic());
-
}
public void startProcessing() throws Exception {
messageConsumer.consume((notificationEvent -> {
- LOGGER.info("Message received " +
notificationEvent.getResourceName());
- LOGGER.info("Submitting {} to process in thread pool",
notificationEvent.getId());
- this.executorService.submit(new
InboundEventProcessor(configuration, notificationEvent,
dataOrchestratorEventRepository));
-
+ LOGGER.info("Message received for resource path {}",
notificationEvent.getResourcePath());
+ try {
+ this.executorService.submit(new
OrchestratorEventProcessor(configuration, notificationEvent));
+ } catch (Exception e) {
+ LOGGER.error("Failed tu submit data orchestrator event to
process on path {}",
+ notificationEvent.getResourcePath(), e);
+ }
}));
-
- this.ouboundExecutorService
- .scheduleAtFixedRate(new OutboundEventProcessor(configuration,
dataOrchestratorEventRepository),
- 0,
configuration.getOutboundEventProcessor().getPollingInterval(),
TimeUnit.SECONDS);
}
public Configuration getConfiguration() {
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
new file mode 100644
index 0000000..6fc7d45
--- /dev/null
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.datalake.orchestrator.handlers.async;
+
+import org.apache.airavata.datalake.drms.resource.GenericResource;
+import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
+import org.apache.airavata.datalake.drms.storage.TransferMapping;
+import org.apache.airavata.datalake.orchestrator.Configuration;
+import org.apache.airavata.datalake.orchestrator.Utils;
+import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
+import
org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
+import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
+import org.apache.airavata.mft.api.client.MFTApiClient;
+import org.apache.airavata.mft.api.service.DirectoryMetadataResponse;
+import org.apache.airavata.mft.api.service.FetchResourceMetadataRequest;
+import org.apache.airavata.mft.api.service.FileMetadataResponse;
+import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
+import org.apache.airavata.mft.common.AuthToken;
+import org.apache.airavata.mft.common.DelegateAuth;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+public class OrchestratorEventProcessor implements Runnable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(OrchestratorEventProcessor.class);
+
+ private NotificationEvent notificationEvent;
+
+ private DRMSConnector drmsConnector;
+ private Configuration configuration;
+ private WorkflowServiceConnector workflowServiceConnector;
+
+ public OrchestratorEventProcessor(Configuration configuration,
NotificationEvent notificationEvent) throws Exception {
+ this.notificationEvent = notificationEvent;
+ this.drmsConnector = new DRMSConnector(configuration);
+ this.workflowServiceConnector = new
WorkflowServiceConnector(configuration);
+ this.configuration = configuration;
+ }
+
+ private List<GenericResource> createResourceRecursively(String storageId,
String basePath, String resourcePath, String resourceType, String user)
+ throws Exception{
+
+ List<GenericResource> resourceList = new ArrayList<>();
+
+ String parentType = "Storage";
+
+ String[] splitted =
resourcePath.substring(basePath.length()).split("/");
+
+ String currentPath = basePath.endsWith("/")? basePath.substring(0,
basePath.length() -1): basePath;
+ String parentId = storageId;
+ for (int i = 0; i < splitted.length - 1; i++) {
+ String resourceName = splitted[i];
+ currentPath = currentPath + "/" + resourceName;
+ String resourceId = Utils.getId(storageId + ":" + currentPath);
+ Optional<GenericResource> optionalGenericResource =
+
this.drmsConnector.createResource(notificationEvent.getAuthToken(),
+ notificationEvent.getTenantId(),
+ resourceId, resourceName, currentPath, parentId,
"COLLECTION", parentType, user);
+ if (optionalGenericResource.isPresent()) {
+ parentId = optionalGenericResource.get().getResourceId();
+ parentType = "COLLECTION";
+ resourceList.add(optionalGenericResource.get());
+ } else {
+ logger.error("Could not create a resource for path {}",
currentPath);
+ throw new Exception("Could not create a resource for path " +
currentPath);
+ }
+ }
+
+ currentPath = currentPath + "/" + splitted[splitted.length -1];
+
+ Optional<GenericResource> optionalGenericResource =
+
this.drmsConnector.createResource(notificationEvent.getAuthToken(),
+ notificationEvent.getTenantId(),
+ Utils.getId(storageId + ":" + currentPath),
+ splitted[splitted.length -1], currentPath,
+ parentId, resourceType, parentType, user);
+
+ if (optionalGenericResource.isPresent()) {
+ resourceList.add(optionalGenericResource.get());
+ } else {
+ logger.error("Could not create a resource for path {}",
currentPath);
+ throw new Exception("Could not create a resource for path " +
currentPath);
+ }
+
+ return resourceList;
+ }
+
+
+ private void shareResources(List<GenericResource> resourceList, String
admin, String user, String permission) throws Exception {
+ for (GenericResource resource : resourceList) {
+ logger.info("Sharing resource {} with path {} with user {}",
+ resource.getResourceId(), resource.getResourcePath(),
user);
+ this.drmsConnector.shareWithUser(notificationEvent.getAuthToken(),
notificationEvent.getTenantId(), admin, user, resource.getResourceId(),
permission);
+ }
+ }
+
+ @Override
+ public void run() {
+ logger.info("Processing resource path {} on storage {}",
notificationEvent.getResourcePath(),
+ notificationEvent.getBasePath());
+
+ try {
+
+ if (!"FOLDER".equals(notificationEvent.getResourceType())) {
+ logger.error("Resource {} should be a Folder type but got {}",
+ notificationEvent.getResourcePath(),
+ notificationEvent.getResourceType());
+ logger.error("Resource should be a Folder type");
+ }
+ String removeBasePath =
notificationEvent.getResourcePath().substring(notificationEvent.getBasePath().length());
+ String[] splitted = removeBasePath.split("/");
+
+ String adminUser = splitted[0];
+ String owner = splitted[1].split("_")[0];
+
+ Map<String, String> ownerRules = new HashMap<>();
+ ownerRules.put(adminUser, "ADMIN");
+ ownerRules.put(splitted[1], "OWNER");
+
+ Optional<TransferMapping> optionalTransferMapping =
drmsConnector.getActiveTransferMapping(
+ notificationEvent.getAuthToken(),
+ notificationEvent.getTenantId(), adminUser,
+ notificationEvent.getHostName());
+
+ if (optionalTransferMapping.isEmpty()) {
+ logger.error("Could not find a transfer mapping for user {}
and host {}", adminUser, notificationEvent.getHostName());
+ throw new Exception("Could not find a transfer mapping");
+ }
+
+ TransferMapping transferMapping = optionalTransferMapping.get();
+
+ String sourceStorageId =
transferMapping.getSourceStorage().getSshStorage().getStorageId();
+ String destinationStorageId =
transferMapping.getDestinationStorage().getSshStorage().getStorageId();
+
+ // Creating parent resource
+
+ List<GenericResource> resourceList =
createResourceRecursively(sourceStorageId,
+ notificationEvent.getBasePath(),
+ notificationEvent.getResourcePath(),
+ "COLLECTION", adminUser);
+
+
shareResources(Collections.singletonList(resourceList.get(resourceList.size()
-1)), adminUser, owner, "ADMIN");
+
+ GenericResource resourceObj = resourceList.get(resourceList.size()
-1);
+
+ Optional<AnyStoragePreference> sourceSPOp =
this.drmsConnector.getStoragePreference(
+ notificationEvent.getAuthToken(), adminUser,
+ notificationEvent.getTenantId(), sourceStorageId);
+
+ if (sourceSPOp.isEmpty()) {
+ logger.error("No storage preference found for source storage
{} and user {}", sourceStorageId, adminUser);
+ throw new Exception("No storage preference found for source
storage");
+ }
+
+ Optional<AnyStoragePreference> destSPOp =
this.drmsConnector.getStoragePreference(
+ notificationEvent.getAuthToken(), adminUser,
+ notificationEvent.getTenantId(), destinationStorageId);
+
+ if (destSPOp.isEmpty()) {
+ logger.error("No storage preference found for destination
storage {} and user {}", sourceStorageId, adminUser);
+ throw new Exception("No storage preference found for
destination storage");
+ }
+
+ AnyStoragePreference sourceSP = sourceSPOp.get();
+ AnyStoragePreference destSP = destSPOp.get();
+
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient =
MFTApiClient.buildClient(
+
this.configuration.getOutboundEventProcessor().getMftHost(),
+
this.configuration.getOutboundEventProcessor().getMftPort());
+
+ String decodedAuth = new
String(Base64.getDecoder().decode(notificationEvent.getAuthToken()));
+ String[] authParts = decodedAuth.split(":");
+
+ if (authParts.length != 2) {
+ throw new Exception("Could not decode auth token to work with
MFT");
+ }
+
+ DelegateAuth delegateAuth = DelegateAuth.newBuilder()
+ .setUserId(adminUser)
+ .setClientId(authParts[0])
+ .setClientSecret(authParts[1])
+ .putProperties("TENANT_ID",
notificationEvent.getTenantId()).build();
+
+ AuthToken mftAuth =
AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
+
+ FetchResourceMetadataRequest.Builder resourceMetadataReq =
FetchResourceMetadataRequest.newBuilder()
+ .setMftAuthorizationToken(mftAuth)
+ .setResourceId(resourceObj.getResourceId());
+
+ switch (sourceSP.getStorageCase()){
+ case SSH_STORAGE_PREFERENCE:
+ resourceMetadataReq.setResourceType("SCP");
+
resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getCredentialToken());
+ break;
+ case S3_STORAGE_PREFERENCE:
+ resourceMetadataReq.setResourceType("S3");
+
resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getCredentialToken());
+ break;
+ }
+
+ // Fetching file list for parent resource
+
+ DirectoryMetadataResponse directoryResourceMetadata =
mftClient.getDirectoryResourceMetadata(resourceMetadataReq.build());
+
+ List<String> resourceIDsToProcess = new ArrayList<>();
+ for (FileMetadataResponse fileMetadata :
directoryResourceMetadata.getFilesList()) {
+ logger.info("Registering file {} for source storage {}",
fileMetadata.getResourcePath(), sourceStorageId);
+ resourceList = createResourceRecursively(sourceStorageId,
notificationEvent.getBasePath(),
+ fileMetadata.getResourcePath(), "FILE", adminUser);
+ GenericResource fileResource =
resourceList.get(resourceList.size() -1);
+
+ resourceIDsToProcess.add(fileResource.getResourceId());
+ }
+
+ for (DirectoryMetadataResponse directoryMetadata :
directoryResourceMetadata.getDirectoriesList()) {
+ logger.info("Registering directory {} for source storage {}",
directoryMetadata.getResourcePath(), sourceStorageId);
+ createResourceRecursively(sourceStorageId,
notificationEvent.getBasePath(),
+ directoryMetadata.getResourcePath(),
+ "COLLECTION", adminUser);
+ // TODO scan directories
+ }
+
+ logger.info("Creating destination zip resource for directory {}",
notificationEvent.getResourcePath());
+ resourceList = createResourceRecursively(destinationStorageId,
notificationEvent.getBasePath(),
+ notificationEvent.getResourcePath(), "FILE", adminUser);
+
+ GenericResource destinationResource =
resourceList.get(resourceList.size() -1);
+
+ System.out.println(destinationResource);
+
+ logger.info("Submitting resources to workflow manager");
+
this.workflowServiceConnector.invokeWorkflow(notificationEvent.getAuthToken(),
adminUser,
+ notificationEvent.getTenantId(), resourceIDsToProcess,
sourceSP.getSshStoragePreference().getCredentialToken(),
+ destinationResource.getResourceId(),
destSP.getSshStoragePreference().getCredentialToken());
+
+
+ logger.info("Completed processing path {}",
notificationEvent.getResourcePath());
+
+ } catch (Exception e) {
+ logger.error("Failed to process event for resource path {}",
notificationEvent.getResourcePath(), e);
+ }
+ }
+}
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
deleted file mode 100644
index 62bd6ff..0000000
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/InboundEventProcessor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.processor;
-
-import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.Utils;
-import
org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.OwnershipEntity;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.EventStatus;
-import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
-import org.dozer.DozerBeanMapper;
-import org.dozer.loader.api.BeanMappingBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.security.NoSuchAlgorithmException;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * This class is responsible for pick events from kafka queue and publish them
into inmemory store
- */
-public class InboundEventProcessor implements MessageProcessor<Configuration> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(InboundEventProcessor.class);
- private Configuration configuration;
- private NotificationEvent notificationEvent;
- private DozerBeanMapper dozerBeanMapper;
-
- private DataOrchestratorEventRepository repository;
-
- public InboundEventProcessor(Configuration configuration,
NotificationEvent notificationEvent,
- DataOrchestratorEventRepository repository)
throws Exception {
- this.notificationEvent = notificationEvent;
- this.repository = repository;
- this.init(configuration);
- }
-
- @Override
- public void init(Configuration configuration) throws Exception {
- try {
- this.configuration = configuration;
- dozerBeanMapper = new DozerBeanMapper();
- BeanMappingBuilder orchestratorEventMapper = new
BeanMappingBuilder() {
- @Override
- protected void configure() {
- mapping(NotificationEvent.class,
DataOrchestratorEntity.class);
- }
- };
- dozerBeanMapper.addMapping(orchestratorEventMapper);
- } catch (Exception exception) {
- LOGGER.error(" Error occurred while initiating Inbound event
processor ", exception);
- throw exception;
- }
-
- }
-
- @Override
- public void close() throws Exception {
-
- }
-
- @Override
- public void run() {
- try {
- LOGGER.info("Inbound event processor received event " +
notificationEvent.getResourceId());
- String typeStr =
this.configuration.getMessageFilter().getResourceType();
- String[] allowedTypes = typeStr.split(",");
- boolean proceed = false;
- long size = Arrays.stream(allowedTypes).filter(type ->
- type.equals(notificationEvent.getResourceType())).count();
- if (size == 0) {
- return;
- }
- String eventTypeStr =
this.configuration.getMessageFilter().getEventType();
- String[] eventTypes = eventTypeStr.split(",");
- long eventSize = Arrays.stream(eventTypes).filter(type ->
-
type.trim().equals(notificationEvent.getContext().getEvent().name())).count();
- if (eventSize == 0) {
- return;
- }
-
- String pattern =
this.configuration.getMessageFilter().getResourceNameExclusions();
-
- // Create a Pattern object
- Pattern r = Pattern.compile(pattern);
-
- // Now create matcher object.
- Matcher m = r.matcher(notificationEvent.getResourceName());
-
- if (m.find()) {
- return;
- }
-
- DataOrchestratorEntity entity = createEntity(notificationEvent);
- repository.save(entity);
- } catch (Exception exception) {
- LOGGER.error("Error occurred while pre processing event {}",
this.notificationEvent.getResourceId(), exception);
- }
-
- }
-
- private DataOrchestratorEntity createEntity(NotificationEvent event)
throws NoSuchAlgorithmException {
- DataOrchestratorEntity entity = dozerBeanMapper.map(event,
DataOrchestratorEntity.class);
- entity.setOccurredTime(new Date(event.getContext().getOccuredTime()));
- entity.setEventStatus(EventStatus.DATA_ORCH_RECEIVED.name());
- entity.setEventType(event.getContext().getEvent().name());
- entity.setAuthToken(event.getContext().getAuthToken());
- entity.setHostName(event.getContext().getHostName());
-
- String resourcePath = event.getResourcePath();
- String basePath = event.getContext().getBasePath();
- String removeBasePath = resourcePath.substring(basePath.length());
- String[] splitted = removeBasePath.split("/");
-
- OwnershipEntity owner1 = new OwnershipEntity();
- owner1.setId(UUID.randomUUID().toString());
- owner1.setUserId(splitted[1]);
- owner1.setPermissionId("OWNER");
- owner1.setDataOrchestratorEntity(entity);
-
- OwnershipEntity owner2 = new OwnershipEntity();
- owner2.setId(UUID.randomUUID().toString());
- owner2.setUserId(splitted[0]);
- owner2.setPermissionId("ADMIN");
- owner2.setDataOrchestratorEntity(entity);
-
-
- Set<OwnershipEntity> owners = new HashSet<>();
- owners.add(owner1);
- owners.add(owner2);
-
- entity.setOwnershipEntities(owners);
-
- entity.setTenantId(event.getContext().getTenantId());
-
- String authDecoded = new String(Base64.getDecoder()
-
.decode(event.getContext().getAuthToken().getBytes(StandardCharsets.UTF_8)));
- String agentId = authDecoded.split(":")[0];
- entity.setAgentId(agentId);
- entity.setResourceId(Utils.getId(event.getResourceId()));
- return entity;
- }
-}
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
deleted file mode 100644
index 4b04005..0000000
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++ /dev/null
@@ -1,197 +0,0 @@
-package org.apache.airavata.datalake.orchestrator.processor;
-
-import org.apache.airavata.datalake.drms.resource.GenericResource;
-import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
-import org.apache.airavata.datalake.drms.storage.TransferMapping;
-import org.apache.airavata.datalake.orchestrator.Configuration;
-import org.apache.airavata.datalake.orchestrator.Utils;
-import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
-import
org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
-import
org.apache.airavata.datalake.orchestrator.core.processor.MessageProcessor;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.DataOrchestratorEntity;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.OwnershipEntity;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.repository.DataOrchestratorEventRepository;
-import
org.apache.airavata.datalake.orchestrator.registry.persistance.entity.EventStatus;
-import org.apache.airavata.dataorchestrator.messaging.model.NotificationEvent;
-import org.dozer.DozerBeanMapper;
-import org.dozer.loader.api.BeanMappingBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * This class is responsible and publish events to registry and
- * Workflow engine
- */
-public class OutboundEventProcessor implements MessageProcessor<Configuration>
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(OutboundEventProcessor.class);
-
- private DozerBeanMapper dozerBeanMapper;
- private DataOrchestratorEventRepository repository;
-
- private DRMSConnector drmsConnector;
- private WorkflowServiceConnector workflowServiceConnector;
-
- public OutboundEventProcessor(Configuration configuration,
DataOrchestratorEventRepository repository) throws Exception {
- this.repository = repository;
- this.init(configuration);
- }
-
- @Override
- public void init(Configuration configuration) throws Exception {
- this.drmsConnector = new DRMSConnector(configuration);
- this.workflowServiceConnector = new
WorkflowServiceConnector(configuration);
- dozerBeanMapper = new DozerBeanMapper();
- BeanMappingBuilder orchestratorEventMapper = new BeanMappingBuilder() {
- @Override
- protected void configure() {
- mapping(NotificationEvent.class, DataOrchestratorEntity.class);
- }
- };
- dozerBeanMapper.addMapping(orchestratorEventMapper);
-
- }
-
- @Override
- public void close() throws Exception {
- this.drmsConnector.close();
- this.workflowServiceConnector.close();
- }
-
-
- @Override
- public void run() {
-
- try {
- List<DataOrchestratorEntity> orchestratorEntityList =
this.repository
-
.findAllEntitiesWithGivenStatus(EventStatus.DATA_ORCH_RECEIVED.name());
- Map<String, List<DataOrchestratorEntity>> entityMap = new
HashMap<>();
- orchestratorEntityList.forEach(entity -> {
- entityMap.computeIfAbsent(entity.getResourceId(), list -> new
ArrayList()).add(entity);
- });
- entityMap.forEach((key, value) -> {
- try {
- DataOrchestratorEntity entity = value.remove(0);
- processEvent(entity);
- value.forEach(val -> {
-
val.setEventStatus(EventStatus.DATA_ORCH_PROCESSED_AND_SKIPPED.name());
- repository.save(val);
- });
- } catch (Exception e) {
- LOGGER.error("Errored while processing event", e);
- }
- });
- } catch (Exception ex) {
- LOGGER.error("Error while processing events", ex);
- }
-
- }
-
- private void processEvent(DataOrchestratorEntity entity) {
- try {
-
- // TODO move this logic to file listener as this is EMC specific
- Optional<OwnershipEntity> adminOp =
entity.getOwnershipEntities().stream().filter(o ->
o.getPermissionId().equals("ADMIN")).findFirst();
- if (adminOp.isEmpty()) {
- throw new Exception("No admin user found");
- }
-
- String resourcePath = entity.getResourcePath();
- String tail =
resourcePath.substring(resourcePath.indexOf(adminOp.get().getUserId()));
-
- String[] collections = tail.split("/");
-
- Optional<TransferMapping> optionalStorPref =
drmsConnector.getActiveTransferMapping(entity, entity.getHostName());
- if (optionalStorPref.isEmpty()) {
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Storage not found for host: " +
entity.getHostName());
- repository.save(entity);
- return;
- }
-
- TransferMapping transferMapping = optionalStorPref.get();
- String sourceStorageId =
transferMapping.getSourceStorage().getSshStorage().getStorageId();
- String destinationStorageId =
transferMapping.getDestinationStorage().getSshStorage().getStorageId();
- String parentType = "Storage";
-
- String parentId = sourceStorageId;
- for (int i = 0; i < collections.length - 1; i++) {
- String resourceName = collections[i];
- String path = entity.getResourcePath().substring(0,
entity.getResourcePath().indexOf(resourceName));
- path = path.concat(resourceName);
- String entityId = Utils.getId(path);
- Optional<GenericResource> optionalGenericResource =
- this.drmsConnector.createResource(repository, entity,
entityId, resourceName, path, parentId, "COLLECTION", parentType);
- if (optionalGenericResource.isPresent()) {
- parentId = optionalGenericResource.get().getResourceId();
- parentType = "COLLECTION";
- } else {
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Collection structure creation failed: " +
entity.getHostName());
- repository.save(entity);
- return;
- }
- }
-
- Optional<GenericResource> optionalGenericResource =
- this.drmsConnector.createResource(repository, entity,
entity.getResourceId(),
- collections[collections.length - 1],
entity.getResourcePath(),
- parentId, "FILE", parentType);
-
- String destinationResourceId = destinationStorageId + ":" +
entity.getResourcePath() + ":" + entity.getResourceType();
- String messageId = Utils.getId(destinationResourceId);
-
- Optional<GenericResource> destinationFile =
this.drmsConnector.createResource(repository, entity, messageId,
- entity.getResourceName(),
- entity.getResourcePath(),
- destinationStorageId,
- "FILE", "Storage");
-
- if (optionalGenericResource.isPresent() &&
destinationFile.isPresent()) {
- try {
-
- Optional<AnyStoragePreference> storagePreferenceOptional =
this.drmsConnector
- .getStoragePreference(entity.getAuthToken(),
adminOp.get().getUserId(), entity.getTenantId(), sourceStorageId);
-
- Optional<AnyStoragePreference>
destinationPreferenceOptional = this.drmsConnector
- .getStoragePreference(entity.getAuthToken(),
adminOp.get().getUserId(), entity.getTenantId(), destinationStorageId);
- if (storagePreferenceOptional.isPresent() &&
destinationPreferenceOptional.isPresent()) {
- String sourceCredentialToken =
storagePreferenceOptional.get()
- .getSshStoragePreference()
- .getCredentialToken();
- String destinationCredentialToken =
storagePreferenceOptional.get()
- .getSshStoragePreference()
- .getCredentialToken();
-
-
this.workflowServiceConnector.invokeWorkflow(entity.getAuthToken(),
adminOp.get().getUserId(),
- entity.getTenantId(), entity.getResourceId(),
sourceCredentialToken,
- messageId, destinationCredentialToken);
-
entity.setEventStatus(EventStatus.DISPATCHED_TO_WORFLOW_ENGING.name());
- repository.save(entity);
- } else {
- String msg = "Cannot find storage preference for
storage " + sourceStorageId + " for user " + adminOp.get().getUserId();
- entity.setError(msg);
- entity.setEventStatus(EventStatus.ERRORED.name());
- repository.save(entity);
- LOGGER.error(msg);
- }
-
-
- } catch (Exception exception) {
- String msg = "Error occurred while invoking workflow
manager" + exception.getMessage();
- entity.setError("Error occurred while invoking workflow
manager " + exception.getMessage());
- entity.setEventStatus(EventStatus.ERRORED.name());
- repository.save(entity);
- LOGGER.error(msg, exception);
- }
- }
- } catch (Exception exception) {
- LOGGER.error("Error occurred while processing outbound data
orchestrator event", exception);
- entity.setEventStatus(EventStatus.ERRORED.name());
- entity.setError("Error occurred while processing ");
- repository.save(entity);
- }
- }
-
-}
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
index df6edf7..ab077cc 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-stub/src/main/proto/parsing.proto
@@ -60,6 +60,7 @@ message DataParsingJobOutput {
string dataParserOutputInterfaceId = 2;
string dataParsingJobId = 3;
string outputType = 4;
+ string outputDirectoryResourceId = 5;
}
message DataParsingJob {
diff --git
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
index af77f3e..7482c7a 100644
---
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
+++
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataParsingWorkflowManager.java
@@ -90,115 +90,118 @@ public class DataParsingWorkflowManager {
public void submitDataParsingWorkflow(WorkflowInvocationRequest request)
throws Exception {
WorkflowMessage workflowMessage = request.getMessage();
- logger.info("Processing parsing workflow for resource {}",
workflowMessage.getSourceResourceId());
-
- MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient =
MFTApiClient.buildClient(mftHost, mftPort);
-
- DelegateAuth delegateAuth = DelegateAuth.newBuilder()
- .setUserId(workflowMessage.getUsername())
- .setClientId(mftClientId)
- .setClientSecret(mftClientSecret)
- .putProperties("TENANT_ID",
workflowMessage.getTenantId()).build();
-
- FileMetadataResponse metadata =
mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
- .setResourceType("SCP")
- .setResourceId(workflowMessage.getSourceResourceId())
- .setResourceToken(workflowMessage.getSourceCredentialToken())
-
.setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
-
- ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost",
6566).usePlaintext().build();
- DataParserServiceGrpc.DataParserServiceBlockingStub parserClient =
DataParserServiceGrpc.newBlockingStub(channel);
-
- ParsingJobListResponse parsingJobs =
parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
-
- Map<String, StringMap> parserInputMappings = new HashMap<>();
- List<DataParsingJob> selectedPJs =
parsingJobs.getParsersList().stream().filter(pj -> {
- List<DataParsingJobInput> pjis = pj.getDataParsingJobInputsList();
-
- boolean match = true;
- StringMap stringMap = new StringMap();
- for (DataParsingJobInput pji : pjis) {
-
- ScriptEngine engine = new
ScriptEngineManager().getEngineByName("JavaScript");
- Bindings bindings =
engine.getBindings(ScriptContext.ENGINE_SCOPE);
- bindings.put("polyglot.js.allowHostAccess", true);
- bindings.put("polyglot.js.allowHostClassLookup",
(Predicate<String>) s -> true);
- bindings.put("metadata", metadata);
- try {
- Boolean eval = (Boolean)
engine.eval(pji.getSelectionQuery());
- stringMap.put(pji.getDataParserInputInterfaceId(),
"$DOWNLOAD_PATH");
- match = match && eval;
- } catch (ScriptException e) {
- logger.error("Failed to evaluate parsing job {}",
pj.getDataParsingJobId());
- match = false;
+
+ for (String sourceResourceId :
workflowMessage.getSourceResourceIdsList()) {
+ logger.info("Processing parsing workflow for resource {}",
sourceResourceId);
+
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClient =
MFTApiClient.buildClient(mftHost, mftPort);
+
+ DelegateAuth delegateAuth = DelegateAuth.newBuilder()
+ .setUserId(workflowMessage.getUsername())
+ .setClientId(mftClientId)
+ .setClientSecret(mftClientSecret)
+ .putProperties("TENANT_ID",
workflowMessage.getTenantId()).build();
+
+ FileMetadataResponse metadata =
mftClient.getFileResourceMetadata(FetchResourceMetadataRequest.newBuilder()
+ .setResourceType("SCP")
+ .setResourceId(sourceResourceId)
+
.setResourceToken(workflowMessage.getSourceCredentialToken())
+
.setMftAuthorizationToken(AuthToken.newBuilder().setDelegateAuth(delegateAuth).build()).build());
+
+ ManagedChannel channel =
ManagedChannelBuilder.forAddress("localhost", 6566).usePlaintext().build();
+ DataParserServiceGrpc.DataParserServiceBlockingStub parserClient =
DataParserServiceGrpc.newBlockingStub(channel);
+
+ ParsingJobListResponse parsingJobs =
parserClient.listParsingJobs(ParsingJobListRequest.newBuilder().build());
+
+ Map<String, StringMap> parserInputMappings = new HashMap<>();
+ List<DataParsingJob> selectedPJs =
parsingJobs.getParsersList().stream().filter(pj -> {
+ List<DataParsingJobInput> pjis =
pj.getDataParsingJobInputsList();
+
+ boolean match = true;
+ StringMap stringMap = new StringMap();
+ for (DataParsingJobInput pji : pjis) {
+
+ ScriptEngine engine = new
ScriptEngineManager().getEngineByName("JavaScript");
+ Bindings bindings =
engine.getBindings(ScriptContext.ENGINE_SCOPE);
+ bindings.put("polyglot.js.allowHostAccess", true);
+ bindings.put("polyglot.js.allowHostClassLookup",
(Predicate<String>) s -> true);
+ bindings.put("metadata", metadata);
+ try {
+ Boolean eval = (Boolean)
engine.eval(pji.getSelectionQuery());
+ stringMap.put(pji.getDataParserInputInterfaceId(),
"$DOWNLOAD_PATH");
+ match = match && eval;
+ } catch (ScriptException e) {
+ logger.error("Failed to evaluate parsing job {}",
pj.getDataParsingJobId());
+ match = false;
+ }
}
- }
- if (match) {
- parserInputMappings.put(pj.getParserId(), stringMap);
- }
- return match;
- }).collect(Collectors.toList());
-
- Map<String, AbstractTask> taskMap = new HashMap<>();
-
- SyncLocalDataDownloadTask downloadTask = new
SyncLocalDataDownloadTask();
- downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
- downloadTask.setMftClientId(mftClientId);
- downloadTask.setMftClientSecret(mftClientSecret);
- downloadTask.setUserId(workflowMessage.getUsername());
- downloadTask.setTenantId(workflowMessage.getTenantId());
- downloadTask.setMftHost(mftHost);
- downloadTask.setMftPort(mftPort);
-
downloadTask.setSourceResourceId(workflowMessage.getSourceResourceId());
-
downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
-
- taskMap.put(downloadTask.getTaskId(), downloadTask);
-
- for(String parserId: parserInputMappings.keySet()) {
-
- GenericDataParsingTask dataParsingTask = new
GenericDataParsingTask();
- dataParsingTask.setTaskId("DPT-" + UUID.randomUUID().toString());
- dataParsingTask.setParserId(parserId);
- dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
- taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);
-
- OutPort outPort = new OutPort();
- outPort.setNextTaskId(dataParsingTask.getTaskId());
- downloadTask.addOutPort(outPort);
-
- DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj ->
pj.getParserId().equals(parserId)).findFirst().get();
- ParserFetchResponse parser =
parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());
-
- for (DataParserOutputInterface dataParserOutputInterface:
parser.getParser().getOutputInterfacesList()) {
-
- Optional<DataParsingJobOutput> dataParsingJobOutput =
dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
-
o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
- .findFirst();
-
- if (dataParsingJobOutput.isPresent() &&
dataParsingJobOutput.get().getOutputType().equals("JSON")) {
- MetadataPersistTask mpt = new MetadataPersistTask();
- mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
- mpt.setDrmsHost(drmsHost);
- mpt.setDrmsPort(drmsPort);
- mpt.setTenant(workflowMessage.getTenantId());
- mpt.setUser(workflowMessage.getUsername());
- mpt.setServiceAccountKey(mftClientId);
- mpt.setServiceAccountSecret(mftClientSecret);
- mpt.setResourceId(workflowMessage.getSourceResourceId());
- mpt.setJsonFile("$" + dataParsingTask.getTaskId() + "-" +
dataParserOutputInterface.getOutputName());
- OutPort dpOut = new OutPort();
- dpOut.setNextTaskId(mpt.getTaskId());
- dataParsingTask.addOutPort(dpOut);
- taskMap.put(mpt.getTaskId(), mpt);
+ if (match) {
+ parserInputMappings.put(pj.getParserId(), stringMap);
+ }
+ return match;
+ }).collect(Collectors.toList());
+
+ Map<String, AbstractTask> taskMap = new HashMap<>();
+
+ SyncLocalDataDownloadTask downloadTask = new
SyncLocalDataDownloadTask();
+ downloadTask.setTaskId("SLDT-" + UUID.randomUUID().toString());
+ downloadTask.setMftClientId(mftClientId);
+ downloadTask.setMftClientSecret(mftClientSecret);
+ downloadTask.setUserId(workflowMessage.getUsername());
+ downloadTask.setTenantId(workflowMessage.getTenantId());
+ downloadTask.setMftHost(mftHost);
+ downloadTask.setMftPort(mftPort);
+ downloadTask.setSourceResourceId(sourceResourceId);
+
downloadTask.setSourceCredToken(workflowMessage.getSourceCredentialToken());
+
+ taskMap.put(downloadTask.getTaskId(), downloadTask);
+
+ for(String parserId: parserInputMappings.keySet()) {
+
+ GenericDataParsingTask dataParsingTask = new
GenericDataParsingTask();
+ dataParsingTask.setTaskId("DPT-" +
UUID.randomUUID().toString());
+ dataParsingTask.setParserId(parserId);
+
dataParsingTask.setInputMapping(parserInputMappings.get(parserId));
+ taskMap.put(dataParsingTask.getTaskId(), dataParsingTask);
+
+ OutPort outPort = new OutPort();
+ outPort.setNextTaskId(dataParsingTask.getTaskId());
+ downloadTask.addOutPort(outPort);
+
+ DataParsingJob dataParsingJob = selectedPJs.stream().filter(pj
-> pj.getParserId().equals(parserId)).findFirst().get();
+ ParserFetchResponse parser =
parserClient.fetchParser(ParserFetchRequest.newBuilder().setParserId(parserId).build());
+
+ for (DataParserOutputInterface dataParserOutputInterface:
parser.getParser().getOutputInterfacesList()) {
+
+ Optional<DataParsingJobOutput> dataParsingJobOutput =
dataParsingJob.getDataParsingJobOutputsList().stream().filter(o ->
+
o.getDataParserOutputInterfaceId().equals(dataParserOutputInterface.getParserOutputInterfaceId()))
+ .findFirst();
+
+ if (dataParsingJobOutput.isPresent() &&
dataParsingJobOutput.get().getOutputType().equals("JSON")) {
+ MetadataPersistTask mpt = new MetadataPersistTask();
+ mpt.setTaskId("MPT-" + UUID.randomUUID().toString());
+ mpt.setDrmsHost(drmsHost);
+ mpt.setDrmsPort(drmsPort);
+ mpt.setTenant(workflowMessage.getTenantId());
+ mpt.setUser(workflowMessage.getUsername());
+ mpt.setServiceAccountKey(mftClientId);
+ mpt.setServiceAccountSecret(mftClientSecret);
+ mpt.setResourceId(sourceResourceId);
+ mpt.setJsonFile("$" + dataParsingTask.getTaskId() +
"-" + dataParserOutputInterface.getOutputName());
+ OutPort dpOut = new OutPort();
+ dpOut.setNextTaskId(mpt.getTaskId());
+ dataParsingTask.addOutPort(dpOut);
+ taskMap.put(mpt.getTaskId(), mpt);
+ }
}
- }
- }
+ }
- String[] startTaskIds = {downloadTask.getTaskId()};
- String workflowId = workflowOperator.buildAndRunWorkflow(taskMap,
startTaskIds);
+ String[] startTaskIds = {downloadTask.getTaskId()};
+ String workflowId = workflowOperator.buildAndRunWorkflow(taskMap,
startTaskIds);
- logger.info("Submitted workflow {} to parse resource {}", workflowId,
workflowMessage.getSourceResourceId());
+ logger.info("Submitted workflow {} to parse resource {}",
workflowId, sourceResourceId);
+ }
}
}
diff --git
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
index d12a365..ff0ee59 100644
---
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
+++
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/DataSyncWorkflowManager.java
@@ -90,8 +90,6 @@ public class DataSyncWorkflowManager {
@Value("${custos.secret}")
private String custosSecret;
-
-
@Value("${drms.host}")
private String drmsHost;
@@ -214,7 +212,7 @@ public class DataSyncWorkflowManager {
// dt1.setMftCallbackStoreHost(datasyncWmHost);
// dt1.setMftCallbackStorePort(datasyncWmPort);
- DataTransferPreValidationTask dt1 = new
DataTransferPreValidationTask();
+ /*DataTransferPreValidationTask dt1 = new
DataTransferPreValidationTask();
dt1.setTenantId(request.getMessage().getTenantId());
dt1.setCustosHost(custosHost);
dt1.setCustosPort(custosPort);
@@ -239,6 +237,6 @@ public class DataSyncWorkflowManager {
//String[] startTaskIds = {bt1.getTaskId(), bt2.getTaskId(),
bt4.getTaskId()};
String[] startTaskIds = {dt1.getTaskId()};
String workflowId = workflowOperator.buildAndRunWorkflow(taskMap,
startTaskIds);
- logger.info("Launched workflow {}", workflowId);
+ logger.info("Launched workflow {}", workflowId);*/
}
}
diff --git
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
index 29be8a4..8128ff3 100644
---
a/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
+++
b/data-orchestrator/workflow-engine/datasync-workflow-manager/src/main/java/org/apache/airavata/datalake/workflow/engine/wm/datasync/WorkflowEngineAPIHandler.java
@@ -29,6 +29,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
@GRpcService
public class WorkflowEngineAPIHandler extends
WorkflowServiceGrpc.WorkflowServiceImplBase {
@@ -43,13 +45,22 @@ public class WorkflowEngineAPIHandler extends
WorkflowServiceGrpc.WorkflowServic
@Autowired
private DataParsingWorkflowManager dataParsingWorkflowManager;
+ private final ScheduledExecutorService workflowExecutorService =
Executors.newSingleThreadScheduledExecutor();
+
@Override
public void invokeWorkflow(WorkflowInvocationRequest request,
StreamObserver<WorkflowInvocationResponse>
responseObserver) {
try {
- logger.info("Invoking workflow executor for resource {}",
request.getMessage().getSourceResourceId());
- //dataSyncWorkflowManager.submitDataSyncWorkflow(request);
- dataParsingWorkflowManager.submitDataParsingWorkflow(request);
+
+ logger.info("Invoking workflow executor");
+ workflowExecutorService.submit(() -> {
+ try {
+
dataParsingWorkflowManager.submitDataParsingWorkflow(request);
+ } catch (Exception e) {
+ logger.error("Failed to submit data parsing workflow for
destination resource {}",
+ request.getMessage().getDestinationResourceId());
+ }
+ });
responseObserver.onNext(WorkflowInvocationResponse.newBuilder().setStatus(true).build());
responseObserver.onCompleted();
} catch (Exception ex) {
diff --git
a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
index c1b5da3..532aff2 100644
---
a/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
+++
b/data-orchestrator/workflow-engine/workflow-engine-stubs/src/main/proto/service/WorkflowService.proto
@@ -26,8 +26,7 @@ import "google/protobuf/empty.proto";
message WorkflowMessage {
string message_id = 1;
- string resource_id = 2;
- string source_resource_id = 3;
+ repeated string source_resource_ids = 3;
string destination_resource_id = 4;
string username = 5;
string tenantId = 6;