This is an automated email from the ASF dual-hosted git repository.
isjarana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new f1200a1 Move transfer mappings to storage
new 4bb9ee4 Merge pull request #23 from isururanawaka/workflow_invocation
f1200a1 is described below
commit f1200a1a77d1647b75ea28824eeddb1463b0fdcc
Author: Isuru Ranawaka <[email protected]>
AuthorDate: Sat Jul 10 20:28:23 2021 -0400
Move transfer mappings to storage
---
.../orchestrator/connectors/DRMSConnector.java | 122 +++++++-------
.../processor/OutboundEventProcessor.java | 3 +-
.../java/org/apache/airavata/drms/api/Client.java | 32 ++--
.../handlers/StoragePreferenceServiceHandler.java | 169 --------------------
.../drms/api/handlers/StorageServiceHandler.java | 177 +++++++++++++++++++++
.../deserializer/TransferMappingDeserializer.java | 22 +--
.../drms-rest-proxy/src/main/resources/drms.pb | Bin 109388 -> 109344 bytes
.../preference/StoragePreferenceService.proto | 50 ------
.../src/main/proto/storage/StorageService.proto | 57 +++++++
9 files changed, 324 insertions(+), 308 deletions(-)
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
index d76ffa6..f87542e 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/connectors/DRMSConnector.java
@@ -54,67 +54,67 @@ public class DRMSConnector implements
AbstractConnector<Configuration> {
return !this.drmsChannel.isShutdown();
}
- public Optional<String>
getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
- DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
-
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
- .setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(entity.getOwnerId())
- .setTenantId(entity.getTenantId())
- .build())
- .build();
- FindTransferMappingsRequest request =
FindTransferMappingsRequest.newBuilder()
- .setAuthToken(serviceAuthToken)
- .build();
- FindTransferMappingsResponse response =
storagePreferenceServiceBlockingStub.getTransferMappings(request);
- List<TransferMapping> transferMappingList = response.getMappingsList();
- AtomicReference<String> storagePreferenceId = new
AtomicReference<>(null);
- if (!transferMappingList.isEmpty()) {
- transferMappingList.forEach(transferMapping -> {
- if
(transferMapping.getSourceStoragePreference().getStorageCase()
-
.equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
- if
(transferMapping.getSourceStoragePreference().getSshStoragePreference()
- .getStorage().getHostName().equals(hostname)) {
- storagePreferenceId
-
.set(transferMapping.getSourceStoragePreference()
-
.getSshStoragePreference().getStoragePreferenceId());
- }
- }
- });
- }
- return Optional.ofNullable(storagePreferenceId.get());
- }
-
- public Optional<String>
getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String
hostname) {
- DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
- .setAccessToken(entity.getAuthToken())
-
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
- .setAuthenticatedUser(AuthenticatedUser.newBuilder()
- .setUsername(entity.getOwnerId())
- .setTenantId(entity.getTenantId())
- .build())
- .build();
- FindTransferMappingsRequest request =
FindTransferMappingsRequest.newBuilder()
- .setAuthToken(serviceAuthToken)
- .build();
- FindTransferMappingsResponse response =
storagePreferenceServiceBlockingStub.getTransferMappings(request);
- List<TransferMapping> transferMappingList = response.getMappingsList();
- AtomicReference<String> storagePreferenceId = new
AtomicReference<>(null);
- if (!transferMappingList.isEmpty()) {
- transferMappingList.forEach(transferMapping -> {
- if
(transferMapping.getDestinationStoragePreference().getStorageCase()
-
.equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
- if
(transferMapping.getDestinationStoragePreference().getSshStoragePreference()
- .getStorage().getHostName().equals(hostname)) {
- storagePreferenceId
-
.set(transferMapping.getDestinationStoragePreference()
-
.getSshStoragePreference().getStoragePreferenceId());
- }
- }
- });
- }
- return Optional.ofNullable(storagePreferenceId.get());
- }
+// public Optional<String>
getSourceStoragePreferenceId(DataOrchestratorEntity entity, String hostname) {
+// DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
+// .setAccessToken(entity.getAuthToken())
+//
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+// .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+// .setUsername(entity.getOwnerId())
+// .setTenantId(entity.getTenantId())
+// .build())
+// .build();
+// FindTransferMappingsRequest request =
FindTransferMappingsRequest.newBuilder()
+// .setAuthToken(serviceAuthToken)
+// .build();
+// FindTransferMappingsResponse response =
storagePreferenceServiceBlockingStub.getTransferMappings(request);
+// List<TransferMapping> transferMappingList =
response.getMappingsList();
+// AtomicReference<String> storagePreferenceId = new
AtomicReference<>(null);
+// if (!transferMappingList.isEmpty()) {
+// transferMappingList.forEach(transferMapping -> {
+// if
(transferMapping.getSourceStoragePreference().getStorageCase()
+//
.equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+// if
(transferMapping.getSourceStoragePreference().getSshStoragePreference()
+// .getStorage().getHostName().equals(hostname)) {
+// storagePreferenceId
+//
.set(transferMapping.getSourceStoragePreference()
+//
.getSshStoragePreference().getStoragePreferenceId());
+// }
+// }
+// });
+// }
+// return Optional.ofNullable(storagePreferenceId.get());
+// }
+//
+// public Optional<String>
getDestinationStoragePreferenceId(DataOrchestratorEntity entity, String
hostname) {
+// DRMSServiceAuthToken serviceAuthToken =
DRMSServiceAuthToken.newBuilder()
+// .setAccessToken(entity.getAuthToken())
+//
.setAuthCredentialType(AuthCredentialType.AGENT_ACCOUNT_CREDENTIAL)
+// .setAuthenticatedUser(AuthenticatedUser.newBuilder()
+// .setUsername(entity.getOwnerId())
+// .setTenantId(entity.getTenantId())
+// .build())
+// .build();
+// FindTransferMappingsRequest request =
FindTransferMappingsRequest.newBuilder()
+// .setAuthToken(serviceAuthToken)
+// .build();
+// FindTransferMappingsResponse response =
storagePreferenceServiceBlockingStub.getTransferMappings(request);
+// List<TransferMapping> transferMappingList =
response.getMappingsList();
+// AtomicReference<String> storagePreferenceId = new
AtomicReference<>(null);
+// if (!transferMappingList.isEmpty()) {
+// transferMappingList.forEach(transferMapping -> {
+// if
(transferMapping.getDestinationStoragePreference().getStorageCase()
+//
.equals(AnyStoragePreference.StorageCase.SSH_STORAGE_PREFERENCE)) {
+// if
(transferMapping.getDestinationStoragePreference().getSshStoragePreference()
+// .getStorage().getHostName().equals(hostname)) {
+// storagePreferenceId
+//
.set(transferMapping.getDestinationStoragePreference()
+//
.getSshStoragePreference().getStoragePreferenceId());
+// }
+// }
+// });
+// }
+// return Optional.ofNullable(storagePreferenceId.get());
+// }
public Optional<GenericResource>
createResource(DataOrchestratorEventRepository repository,
DataOrchestratorEntity entity,
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
index 82e554c..013ee59 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/processor/OutboundEventProcessor.java
@@ -89,7 +89,8 @@ public class OutboundEventProcessor implements
MessageProcessor<Configuration> {
String tail =
resourcePath.substring(resourcePath.indexOf(ownerId));
String[] collections = tail.split("/");
- Optional<String> optionalStorPref =
drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
+// Optional<String> optionalStorPref =
drmsConnector.getSourceStoragePreferenceId(entity, entity.getHostName());
+ Optional<String> optionalStorPref = null;
if (optionalStorPref.isEmpty()) {
entity.setEventStatus(EventStatus.ERRORED.name());
entity.setError("StoragePreference not found for host: " +
entity.getHostName());
diff --git
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java
index e0e9428..9611fee 100644
---
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java
+++
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/Client.java
@@ -23,7 +23,7 @@ import io.grpc.ManagedChannelBuilder;
import org.apache.airavata.datalake.drms.DRMSServiceAuthToken;
import org.apache.airavata.datalake.drms.resource.GenericResource;
import org.apache.airavata.datalake.drms.storage.*;
-import
org.apache.airavata.datalake.drms.storage.preference.ssh.SSHStoragePreference;
+import org.apache.airavata.datalake.drms.storage.ssh.SSHStorage;
import org.apache.custos.clients.CustosClientProvider;
import org.apache.custos.identity.management.client.IdentityManagementClient;
import org.json.JSONObject;
@@ -55,7 +55,7 @@ public class Client {
// StorageCreateRequest request =
StorageCreateRequest.newBuilder().setAuthToken(authToken).
//
setStorage(AnyStorage.newBuilder().setSshStorage(SSHStorage.newBuilder()
-// .setStorageId("qwerft-rftgyhu-oplmnj")
+// .setStorageId("testing.com")
// .setHostName("localhost")
// .setPort(3565)
// .build())
@@ -116,31 +116,39 @@ public class Client {
TransferMapping transferMapping = TransferMapping.newBuilder()
.setUserId("[email protected]")
.setTransferScope(TransferScope.GLOBAL)
-
.setDestinationStoragePreference(AnyStoragePreference.newBuilder()
-
.setSshStoragePreference(SSHStoragePreference.newBuilder()
-
.setStoragePreferenceId("ssh_storage_preference").build()))
- .setSourceStoragePreference(AnyStoragePreference.newBuilder()
-
.setSshStoragePreference(SSHStoragePreference.newBuilder()
-
.setStoragePreferenceId("ssh_storage_preference_2").build()))
+ .setSourceStorage(AnyStorage
+ .newBuilder()
+
.setSshStorage(SSHStorage.newBuilder().setStorageId("testing.com")
+ .build())
+ .build())
+ .setDestinationStorage(AnyStorage
+ .newBuilder()
+
.setSshStorage(SSHStorage.newBuilder().setStorageId("qwerft-rftgyhu-oplmnj")
+ .build())
+ .build())
.build();
-
+//
CreateTransferMappingRequest request =
CreateTransferMappingRequest.newBuilder()
.setAuthToken(authToken)
.setTransferMapping(transferMapping)
.build();
+// resourceClient.createTransferMapping(request);
+//
FindTransferMappingsRequest findTransferMappingsRequest =
FindTransferMappingsRequest.newBuilder()
.setAuthToken(authToken)
.build();
-
+// resourceClient.getTransferMappings(findTransferMappingsRequest);
+//
DeleteTransferMappingRequest transferMappingRequest =
DeleteTransferMappingRequest.newBuilder()
.setAuthToken(authToken)
- .setId("ssh_storage_preference_2_ssh_storage_preference")
+ .setId("testing.com_qwerft-rftgyhu-oplmnj")
.build();
+ resourceClient.deleteTransferMappings(transferMappingRequest);
//
storagePreferenceServiceBlockingStub.deleteTransferMappings(transferMappingRequest);
- storagePreferenceServiceBlockingStub.createTransferMapping(request);
+// storagePreferenceServiceBlockingStub.createTransferMapping(request);
ResourceServiceGrpc.ResourceServiceBlockingStub
resourceServiceBlockingStub = ResourceServiceGrpc.newBlockingStub(channel);
// ResourceSearchQuery query =
ResourceSearchQuery.newBuilder().setField("type").setValue("COLLECTION").build();
diff --git
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
index 7221e6d..57858f9 100644
---
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
+++
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java
@@ -285,177 +285,8 @@ public class StoragePreferenceServiceHandler extends
StoragePreferenceServiceGrp
}
}
- @Override
- public void createTransferMapping(CreateTransferMappingRequest request,
StreamObserver<CreateTransferMappingResponse> responseObserver) {
- try {
- AuthenticatedUser authenticatedUser =
request.getAuthToken().getAuthenticatedUser();
- AnyStoragePreference sourceStoragePreference =
request.getTransferMapping().getSourceStoragePreference();
- AnyStoragePreference destinationStoragePreference =
request.getTransferMapping().getDestinationStoragePreference();
- String sourceId = getStorageId(sourceStoragePreference);
- String destinationId = getStorageId(destinationStoragePreference);
-
- TransferScope scope =
request.getTransferMapping().getTransferScope();
- Map<String, Object> properties = new HashMap<>();
- Map<String, Object> props = new HashMap<>();
- props.put("tenantId", authenticatedUser.getTenantId());
- props.put("owner", authenticatedUser.getUsername());
- props.put("srcStoragePreferenceId", sourceId);
- props.put("dstStoragePreferenceId", destinationId);
- String entityId = sourceId + "_" + destinationId;
- if (scope.equals(TransferScope.GLOBAL)) {
- props.put("scope", TransferScope.GLOBAL.name());
- } else {
- props.put("scope", TransferScope.USER.name());
- }
- properties.put("props", props);
- properties.put("tenantId", authenticatedUser.getTenantId());
- properties.put("entityId", entityId);
- properties.put("username", authenticatedUser.getUsername());
- properties.put("srcStoragePreferenceId", sourceId);
- properties.put("dstStoragePreferenceId", destinationId);
- properties.put("owner", authenticatedUser.getUsername());
-
-
- if (hasAccess(authenticatedUser.getUsername(),
authenticatedUser.getTenantId(), sourceId) &&
- hasAccess(authenticatedUser.getUsername(),
authenticatedUser.getTenantId(), destinationId)) {
- String query = " Match (u:User), (srcSp:StoragePreference),
(dstSp:StoragePreference) where " +
- " u.username=$username AND u.tenantId=$tenantId AND " +
- "srcSp.storagePreferenceId=$srcStoragePreferenceId AND
" +
- "srcSp.tenantId = $tenantId AND
dstSp.storagePreferenceId=$dstStoragePreferenceId " +
- "AND dstSp.tenantId =$tenantId " +
- " Merge
(u)-[:HAS_TRANSFER_MAPPING]->(tm:TransferMapping{entityId:$entityId,
tenantId:$tenantId, " +
- "srcStoragePreferenceId:$srcStoragePreferenceId," +
-
"dstStoragePreferenceId:$dstStoragePreferenceId,owner:$owner}) set tm +=
$props" +
- " Merge (tm)<-[:TRANSFER_OUT]-(srcSp)" +
- " Merge (tm)-[:TRANSFER_IN]->(dstSp) return (tm)";
- this.neo4JConnector.runTransactionalQuery(properties, query);
-
- String searchQuery = " Match (srcStr:Storage)<-[:CHILD_OF]-" +
-
"(srcSp:StoragePreference)-[:TRANSFER_OUT]->(tm:TransferMapping)" +
-
"-[:TRANSFER_IN]->(dstSp:StoragePreference)-[:CHILD_OF]->(dstStr:Storage)
where " +
- " tm.entityId=$entityId AND tm.tenantId=$tenantId
return srcStr, srcSp, dstStr, dstSp, tm";
- List<Record> records =
this.neo4JConnector.searchNodes(properties, searchQuery);
- if (!records.isEmpty()) {
- List<TransferMapping> transferMappings =
TransferMappingDeserializer.deserializeList(records);
- if (!transferMappings.isEmpty()) {
- CreateTransferMappingResponse response =
CreateTransferMappingResponse
- .newBuilder()
- .setTransferMapping(transferMappings.get(0))
- .build();
- responseObserver.onNext(response);
- responseObserver.onCompleted();
- } else {
- String msg = "Errored while creating transfer mapping;
Message:";
- logger.error("Errored while creating transfer mapping;
Message:");
-
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
- }
- } else {
- String msg = "Errored while creating transfer mapping;
Message:";
- logger.error("Errored while creating transfer mapping;
Message:");
-
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
- }
- } else {
- String msg = "User does not have permission to create mapping
";
- logger.error("User does not have permission to create mapping
");
-
responseObserver.onError(Status.PERMISSION_DENIED.withDescription(msg).asRuntimeException());
-
- }
- } catch (Exception e) {
- String msg = "Errored while creating transfer mapping; Message:" +
e.getMessage();
- logger.error("Errored while creating transfer mapping; Message:
{}", e.getMessage(), e);
-
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
- }
- }
-
- @Override
- public void getTransferMappings(FindTransferMappingsRequest request,
StreamObserver<FindTransferMappingsResponse> responseObserver) {
- try {
- List<TransferMapping> transferMappings = new ArrayList<>();
- AuthenticatedUser authenticatedUser =
request.getAuthToken().getAuthenticatedUser();
- Map<String, Object> properties = new HashMap<>();
- properties.put("username", authenticatedUser.getUsername());
- properties.put("tenantId", authenticatedUser.getTenantId());
- properties.put("scope", TransferScope.USER.name());
- String query = " MATCH
(u:User)-[:HAS_TRANSFER_MAPPING]->(t:TransferMapping{scope:$scope}) where
u.username = $username AND u.tenantId = $tenantId" +
- " Match
(srcStr:Storage)<-[:CHILD_OF]-(srcSp:StoragePreference)-[:TRANSFER_OUT]->(t)-[:TRANSFER_IN]->(dstSp:StoragePreference)-[:CHILD_OF]->(dstStr:Storage)"
+
- " return srcStr, srcSp, dstStr, dstSp, t";
- List<Record> records = this.neo4JConnector.searchNodes(properties,
query);
- properties.put("scope", TransferScope.GLOBAL.name());
- String queryFetchGlobal = "Match (srcStr:Storage)<-[:CHILD_OF]-" +
-
"(srcSp:StoragePreference)-[:TRANSFER_OUT]->(t:TransferMapping{scope:$scope,
tenantId:$tenantId})-[:TRANSFER_IN]->(dstSp:StoragePreference)-[:CHILD_OF]->(dstStr:Storage)"
+
- " return srcStr, srcSp, dstStr, dstSp, t";
- List<Record> globalRecords =
this.neo4JConnector.searchNodes(properties, queryFetchGlobal);
- if (!records.isEmpty()) {
- transferMappings =
TransferMappingDeserializer.deserializeList(records);
- }
- if (!globalRecords.isEmpty()) {
-
transferMappings.addAll(TransferMappingDeserializer.deserializeList(globalRecords));
- }
- FindTransferMappingsResponse findTransferMappingsResponse =
FindTransferMappingsResponse
- .newBuilder()
- .addAllMappings(transferMappings)
- .build();
- responseObserver.onNext(findTransferMappingsResponse);
- responseObserver.onCompleted();
-
- } catch (Exception ex) {
- String msg = "Errored while fetching transfer mappings, Message:"
+ ex.getMessage();
- logger.error("Errored while fetching transfer mappings, Message:
{}", ex.getMessage(), ex);
-
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
- }
- }
-
- @Override
- public void deleteTransferMappings(DeleteTransferMappingRequest request,
StreamObserver<Empty> responseObserver) {
- try {
- AuthenticatedUser authenticatedUser =
request.getAuthToken().getAuthenticatedUser();
- String transferMappingId = request.getId();
- Map<String, Object> properties = new HashMap<>();
- properties.put("username", authenticatedUser.getUsername());
- properties.put("tenantId", authenticatedUser.getTenantId());
- properties.put("entityId", transferMappingId);
- String query = " MATCH (u:User)-[:HAS_TRANSFER_MAPPING]->" +
- "(t:TransferMapping{entityId:$entityId})" +
- " where u.username = $username AND u.tenantId = $tenantId
detach delete t";
- this.neo4JConnector.runTransactionalQuery(properties, query);
- responseObserver.onNext(Empty.newBuilder().build());
- responseObserver.onCompleted();
- } catch (Exception ex) {
- String msg = "Errored while delete transfer mappings, Message:" +
ex.getMessage();
- logger.error("Errored while delete transfer mappings, Message:
{}", ex.getMessage(), ex);
-
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
- }
- }
- private boolean hasAccess(String username, String tenantId, String
storagePrefId) throws Exception {
- Map<String, Object> userProps = new HashMap<>();
- userProps.put("username", username);
- userProps.put("tenantId", tenantId);
- userProps.put("entityId", storagePrefId);
- List<Record> records = this.neo4JConnector.searchNodes(userProps,
- " MATCH (u:User) where u.username = $username AND u.tenantId =
$tenantId" +
- " OPTIONAL MATCH
(u)<-[:SHARED_WITH]-(s1:Storage)<-[:CHILD_OF]->(sp1:StoragePreference{entityId:$entityId})"
+
- " OPTIONAL MATCH
(cg:Group)-[:CHILD_OF*]->(g:Group)<-[:MEMBER_OF]-(u)" +
- " OPTIONAL MATCH
(sp2:StoragePreference{entityId:$entityId})-[:CHILD_OF]->(s2:Storage)-[:SHARED_WITH]->(cg)
" +
- " OPTIONAL MATCH (sp3:StoragePreference
{entityId:$entityId})-[:CHILD_OF]->(s3:Storage)-[:SHARED_WITH]->(g)" +
- " OPTIONAL MATCH
(s4:Storage)<-[:CHILD_OF]->(sp4:StoragePreference{entityId:$entityId})-[:SHARED_WITH]->(u)"
+
- " OPTIONAL MATCH
(s5:Storage)<-[:CHILD_OF]->(sp5:StoragePreference{entityId:$entityId})-[:SHARED_WITH]->(cg)"
+
- " OPTIONAL MATCH
(s6:Storage)<-[:CHILD_OF]->(sp6:StoragePreference{entityId:$entityId})-[:SHARED_WITH]->(g)"
+
- " return distinct s1, sp1, s2, sp2, s3, sp3, s4,sp4,
s5,sp5, s6,sp6");
- if (!records.isEmpty()) {
- return true;
- }
- return false;
- }
- private String getStorageId(AnyStoragePreference storage) {
- if (storage.getStorageCase()
-
.equals(AnyStoragePreference.StorageCase.S3_STORAGE_PREFERENCE)) {
- return storage.getS3StoragePreference().getStoragePreferenceId();
- } else {
- return storage.getSshStoragePreference().getStoragePreferenceId();
- }
- }
}
diff --git
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
index 14abb21..a9dc2d0 100644
---
a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
+++
b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java
@@ -17,6 +17,7 @@
package org.apache.airavata.drms.api.handlers;
import com.google.protobuf.Empty;
+import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.apache.airavata.datalake.drms.AuthenticatedUser;
import org.apache.airavata.datalake.drms.storage.*;
@@ -24,6 +25,7 @@ import org.apache.airavata.drms.api.utils.CustosUtils;
import org.apache.airavata.drms.core.Neo4JConnector;
import org.apache.airavata.drms.core.constants.StorageConstants;
import org.apache.airavata.drms.core.deserializer.AnyStorageDeserializer;
+import org.apache.airavata.drms.core.deserializer.TransferMappingDeserializer;
import org.apache.airavata.drms.core.serializer.AnyStorageSerializer;
import org.apache.custos.clients.CustosClientProvider;
import org.lognet.springboot.grpc.GRpcService;
@@ -32,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -187,4 +190,178 @@ public class StorageServiceHandler extends
StorageServiceGrpc.StorageServiceImpl
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
}
}
+
+
+ @Override
+ public void createTransferMapping(CreateTransferMappingRequest request,
StreamObserver<CreateTransferMappingResponse> responseObserver) {
+ try {
+ AuthenticatedUser authenticatedUser =
request.getAuthToken().getAuthenticatedUser();
+ AnyStorage sourceStoragePreference =
request.getTransferMapping().getSourceStorage();
+ AnyStorage destinationStoragePreference =
request.getTransferMapping().getDestinationStorage();
+ String sourceId = getStorageId(sourceStoragePreference);
+ String destinationId = getStorageId(destinationStoragePreference);
+
+ TransferScope scope =
request.getTransferMapping().getTransferScope();
+ Map<String, Object> properties = new HashMap<>();
+ Map<String, Object> props = new HashMap<>();
+ props.put("tenantId", authenticatedUser.getTenantId());
+ props.put("owner", authenticatedUser.getUsername());
+ props.put("srcStorageId", sourceId);
+ props.put("dstStorageId", destinationId);
+ String entityId = sourceId + "_" + destinationId;
+ if (scope.equals(TransferScope.GLOBAL)) {
+ props.put("scope", TransferScope.GLOBAL.name());
+ } else {
+ props.put("scope", TransferScope.USER.name());
+ }
+ properties.put("props", props);
+ properties.put("tenantId", authenticatedUser.getTenantId());
+ properties.put("entityId", entityId);
+ properties.put("username", authenticatedUser.getUsername());
+ properties.put("srcStorageId", sourceId);
+ properties.put("dstStorageId", destinationId);
+ properties.put("owner", authenticatedUser.getUsername());
+
+
+ if (hasAccess(authenticatedUser.getUsername(),
authenticatedUser.getTenantId(), sourceId) &&
+ hasAccess(authenticatedUser.getUsername(),
authenticatedUser.getTenantId(), destinationId)) {
+ String query = " Match (u:User), (srcSp:Storage),
(dstSp:Storage) where " +
+ " u.username=$username AND u.tenantId=$tenantId AND " +
+ "srcSp.storageId=$srcStorageId AND " +
+ "srcSp.tenantId = $tenantId AND
dstSp.storageId=$dstStorageId " +
+ "AND dstSp.tenantId =$tenantId " +
+ " Merge
(u)-[:HAS_TRANSFER_MAPPING]->(tm:TransferMapping{entityId:$entityId,
tenantId:$tenantId, " +
+ "srcStorageId:$srcStorageId," +
+ "dstStorageId:$dstStorageId,owner:$owner}) set tm +=
$props" +
+ " Merge (tm)<-[:TRANSFER_OUT]-(srcSp)" +
+ " Merge (tm)-[:TRANSFER_IN]->(dstSp) return (tm)";
+ this.neo4JConnector.runTransactionalQuery(properties, query);
+
+ String searchQuery = " Match
(srcStr:Storage)-[:TRANSFER_OUT]->(tm:TransferMapping)" +
+ "-[:TRANSFER_IN]->(dstStr:Storage) where " +
+ " tm.entityId=$entityId AND tm.tenantId=$tenantId
return srcStr, dstStr, tm";
+ List<Record> records =
this.neo4JConnector.searchNodes(properties, searchQuery);
+ if (!records.isEmpty()) {
+ List<TransferMapping> transferMappings =
TransferMappingDeserializer.deserializeList(records);
+ if (!transferMappings.isEmpty()) {
+ CreateTransferMappingResponse response =
CreateTransferMappingResponse
+ .newBuilder()
+ .setTransferMapping(transferMappings.get(0))
+ .build();
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ } else {
+ String msg = "Errored while creating transfer mapping;
Message:";
+ logger.error("Errored while creating transfer mapping;
Message:");
+
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ } else {
+ String msg = "Errored while creating transfer mapping;
Message:";
+ logger.error("Errored while creating transfer mapping;
Message:");
+
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ } else {
+ String msg = "User does not have permission to create mapping
";
+ logger.error("User does not have permission to create mapping
");
+
responseObserver.onError(Status.PERMISSION_DENIED.withDescription(msg).asRuntimeException());
+
+ }
+ } catch (Exception e) {
+ String msg = "Errored while creating transfer mapping; Message:" +
e.getMessage();
+ logger.error("Errored while creating transfer mapping; Message:
{}", e.getMessage(), e);
+
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ }
+
+ @Override
+ public void getTransferMappings(FindTransferMappingsRequest request,
StreamObserver<FindTransferMappingsResponse> responseObserver) {
+ try {
+ List<TransferMapping> transferMappings = new ArrayList<>();
+ AuthenticatedUser authenticatedUser =
request.getAuthToken().getAuthenticatedUser();
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("username", authenticatedUser.getUsername());
+ properties.put("tenantId", authenticatedUser.getTenantId());
+ properties.put("scope", TransferScope.USER.name());
+ String query = " MATCH
(u:User)-[:HAS_TRANSFER_MAPPING]->(t:TransferMapping{scope:$scope}) where
u.username = $username AND u.tenantId = $tenantId" +
+ " Match
(srcStr:Storage)-[:TRANSFER_OUT]->(t)-[:TRANSFER_IN]->(dstStr:Storage)" +
+ " return srcStr, dstStr, t";
+ List<Record> records = this.neo4JConnector.searchNodes(properties,
query);
+ properties.put("scope", TransferScope.GLOBAL.name());
+ String queryFetchGlobal = "Match
(srcStr:Storage)<-[:TRANSFER_OUT]->(t:TransferMapping{scope:$scope,
tenantId:$tenantId})-[:TRANSFER_IN]->(dstStr:Storage)" +
+ " return srcStr, dstStr, t";
+ List<Record> globalRecords =
this.neo4JConnector.searchNodes(properties, queryFetchGlobal);
+ if (!records.isEmpty()) {
+ transferMappings =
TransferMappingDeserializer.deserializeList(records);
+ }
+ if (!globalRecords.isEmpty()) {
+
transferMappings.addAll(TransferMappingDeserializer.deserializeList(globalRecords));
+ }
+ FindTransferMappingsResponse findTransferMappingsResponse =
FindTransferMappingsResponse
+ .newBuilder()
+ .addAllMappings(transferMappings)
+ .build();
+ responseObserver.onNext(findTransferMappingsResponse);
+ responseObserver.onCompleted();
+
+ } catch (Exception ex) {
+ String msg = "Errored while fetching transfer mappings, Message:"
+ ex.getMessage();
+ logger.error("Errored while fetching transfer mappings, Message:
{}", ex.getMessage(), ex);
+
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ }
+
+ @Override
+ public void deleteTransferMappings(DeleteTransferMappingRequest request,
StreamObserver<Empty> responseObserver) {
+ try {
+ AuthenticatedUser authenticatedUser =
request.getAuthToken().getAuthenticatedUser();
+ String transferMappingId = request.getId();
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("username", authenticatedUser.getUsername());
+ properties.put("tenantId", authenticatedUser.getTenantId());
+ properties.put("entityId", transferMappingId);
+ String query = " MATCH (u:User)-[:HAS_TRANSFER_MAPPING]->" +
+ "(t:TransferMapping{entityId:$entityId})" +
+ " where u.username = $username AND u.tenantId = $tenantId
detach delete t";
+ this.neo4JConnector.runTransactionalQuery(properties, query);
+ responseObserver.onNext(Empty.newBuilder().build());
+ responseObserver.onCompleted();
+
+ } catch (Exception ex) {
+ String msg = "Errored while delete transfer mappings, Message:" +
ex.getMessage();
+ logger.error("Errored while delete transfer mappings, Message:
{}", ex.getMessage(), ex);
+
responseObserver.onError(io.grpc.Status.INTERNAL.withDescription(msg).asRuntimeException());
+ }
+ }
+
+
+ private String getStorageId(AnyStorage storage) {
+ if (storage.getStorageCase()
+ .equals(AnyStorage.StorageCase.S3_STORAGE)) {
+ return storage.getS3Storage().getStorageId();
+ } else {
+ return storage.getSshStorage().getStorageId();
+ }
+ }
+
+ private boolean hasAccess(String username, String tenantId, String
storageId) throws Exception {
+ Map<String, Object> userProps = new HashMap<>();
+ userProps.put("username", username);
+ userProps.put("tenantId", tenantId);
+ userProps.put("entityId", storageId);
+
+ List<Record> records = this.neo4JConnector.searchNodes(userProps,
+ " MATCH (u:User) where u.username = $username AND u.tenantId =
$tenantId" +
+ " OPTIONAL MATCH
(u)<-[:SHARED_WITH]-(s1:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp1:StoragePreference)"
+
+ " OPTIONAL MATCH
(cg:Group)-[:CHILD_OF*]->(g:Group)<-[:MEMBER_OF]-(u)" +
+ " OPTIONAL MATCH
(sp2:StoragePreference)-[:CHILD_OF]->(s2:Storage{entityId:$entityId})-[:SHARED_WITH]->(cg)
" +
+ " OPTIONAL MATCH (sp3:StoragePreference
)-[:CHILD_OF]->(s3:Storage{entityId:$entityId})-[:SHARED_WITH]->(g)" +
+ " OPTIONAL MATCH
(s4:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp4:StoragePreference)-[:SHARED_WITH]->(u)"
+
+ " OPTIONAL MATCH
(s5:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp5:StoragePreference)-[:SHARED_WITH]->(cg)"
+
+ " OPTIONAL MATCH
(s6:Storage{entityId:$entityId})<-[:CHILD_OF]->(sp6:StoragePreference)-[:SHARED_WITH]->(g)"
+
+ " return distinct s1, sp1, s2, sp2, s3, sp3, s4,sp4,
s5,sp5, s6,sp6");
+ if (!records.isEmpty()) {
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java
b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java
index 461bbe8..51e9a57 100644
---
a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java
+++
b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/TransferMappingDeserializer.java
@@ -1,7 +1,6 @@
package org.apache.airavata.drms.core.deserializer;
import org.apache.airavata.datalake.drms.storage.AnyStorage;
-import org.apache.airavata.datalake.drms.storage.AnyStoragePreference;
import org.apache.airavata.datalake.drms.storage.TransferMapping;
import org.apache.airavata.datalake.drms.storage.TransferScope;
import org.neo4j.driver.Record;
@@ -20,29 +19,22 @@ public class TransferMappingDeserializer {
InternalRecord internalRecord = (InternalRecord) record;
List<Value> values = internalRecord.values();
- if (values.size() == 5) {
+ if (values.size() == 3) {
Value srcStr = values.get(0);
- Value srcSpr = values.get(1);
- Value dstStr = values.get(2);
- Value dstSp = values.get(3);
- Value tm = values.get(4);
+ Value dstStr = values.get(1);
+ Value tm = values.get(2);
- if (!srcStr.isNull() && !srcSpr.isNull() && !tm.isNull()
- && !dstStr.isNull() && !dstSp.isNull()) {
+ if (!srcStr.isNull() && !tm.isNull()
+ && !dstStr.isNull()) {
AnyStorage storage =
AnyStorageDeserializer.deriveStorageFromMap(srcStr.asMap());
- AnyStoragePreference srcPreference =
AnyStoragePreferenceDeserializer
- .deriveStoragePrefFromMap(srcSpr.asMap(), storage);
-
AnyStorage dstStorage =
AnyStorageDeserializer.deriveStorageFromMap(dstStr.asMap());
- AnyStoragePreference dstPreference =
AnyStoragePreferenceDeserializer
- .deriveStoragePrefFromMap(dstSp.asMap(),
dstStorage);
Map<String, Object> map = tm.asMap();
TransferMapping transferMapping =
TransferMapping.newBuilder()
.setTransferScope(TransferScope.valueOf(map.get("scope").toString()))
.setId(map.get("entityId").toString())
- .setSourceStoragePreference(srcPreference)
- .setDestinationStoragePreference(dstPreference)
+ .setSourceStorage(storage)
+ .setDestinationStorage(dstStorage)
.setUserId(map.get("owner").toString())
.build();
diff --git
a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
index 21a39a1..0227833 100644
Binary files
a/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
and
b/data-resource-management-service/drms-rest-proxy/src/main/resources/drms.pb
differ
diff --git
a/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto
b/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto
index 6c2c6ef..0b0bad2 100644
---
a/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto
+++
b/data-resource-management-service/drms-stubs/src/main/proto/preference/StoragePreferenceService.proto
@@ -81,43 +81,9 @@ message StoragePreferenceSearchResponse {
repeated AnyStoragePreference storages_preference = 2;
}
-enum TransferScope {
- UNKNOWN = 0;
- GLOBAL = 1;
- USER = 2;
-}
-
-message TransferMapping {
- string id = 1;
- string user_id = 2;
- AnyStoragePreference source_storage_preference = 3;
- AnyStoragePreference destination_storage_preference = 4;
- TransferScope transfer_scope = 5;
-}
-
-message CreateTransferMappingRequest {
- org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
- TransferMapping transfer_mapping = 2;
-}
-message CreateTransferMappingResponse {
- TransferMapping transfer_mapping = 1;
-}
-message FindTransferMappingsRequest {
- org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
- string id = 2;
- TransferScope transfer_scope = 3;
-}
-message FindTransferMappingsResponse {
- repeated TransferMapping mappings = 1;
-}
-
-message DeleteTransferMappingRequest {
- org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
- string id = 2;
-}
service StoragePreferenceService {
@@ -152,21 +118,5 @@ service StoragePreferenceService {
};
}
- rpc createTransferMapping (CreateTransferMappingRequest) returns
(CreateTransferMappingResponse) {
- option (google.api.http) = {
- post: "/v1.0/api/drms/storagePreference/transferMapping"
- };
- }
- rpc getTransferMappings (FindTransferMappingsRequest) returns
(FindTransferMappingsResponse) {
- option (google.api.http) = {
- get: "/v1.0/api/drms/storagePreference/transferMapping"
- };
- }
-
- rpc deleteTransferMappings (DeleteTransferMappingRequest) returns
(google.protobuf.Empty) {
- option (google.api.http) = {
- delete: "/v1.0/api/drms/storagePreference/transferMapping"
- };
- }
}
\ No newline at end of file
diff --git
a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
index abab38a..3f5b3a2 100644
---
a/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
+++
b/data-resource-management-service/drms-stubs/src/main/proto/storage/StorageService.proto
@@ -88,6 +88,45 @@ message AddStorageMetadataRequest {
string value = 4;
}
+enum TransferScope {
+ UNKNOWN = 0;
+ GLOBAL = 1;
+ USER = 2;
+}
+
+message TransferMapping {
+ string id = 1;
+ string user_id = 2;
+ AnyStorage source_storage = 3;
+ AnyStorage destination_storage = 4;
+ TransferScope transfer_scope = 5;
+}
+
+
+message CreateTransferMappingRequest {
+ org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
+ TransferMapping transfer_mapping = 2;
+}
+
+message CreateTransferMappingResponse {
+ TransferMapping transfer_mapping = 1;
+}
+
+message FindTransferMappingsRequest {
+ org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
+ string id = 2;
+ TransferScope transfer_scope = 3;
+}
+
+message FindTransferMappingsResponse {
+ repeated TransferMapping mappings = 1;
+}
+
+message DeleteTransferMappingRequest {
+ org.apache.airavata.datalake.drms.DRMSServiceAuthToken auth_token = 1;
+ string id = 2;
+}
+
service StorageService {
rpc fetchStorage (StorageFetchRequest) returns (StorageFetchResponse) {
@@ -125,4 +164,22 @@ service StorageService {
post: "/v1.0/api/drms/storage/metadata"
};
}
+
+ rpc createTransferMapping (CreateTransferMappingRequest) returns
(CreateTransferMappingResponse) {
+ option (google.api.http) = {
+ post: "/v1.0/api/drms/storagePreference/transferMapping"
+ };
+ }
+
+ rpc getTransferMappings (FindTransferMappingsRequest) returns
(FindTransferMappingsResponse) {
+ option (google.api.http) = {
+ get: "/v1.0/api/drms/storagePreference/transferMapping"
+ };
+ }
+
+ rpc deleteTransferMappings (DeleteTransferMappingRequest) returns
(google.protobuf.Empty) {
+ option (google.api.http) = {
+ delete: "/v1.0/api/drms/storagePreference/transferMapping"
+ };
+ }
}
\ No newline at end of file