This is an automated email from the ASF dual-hosted git repository. isjarana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
commit b6b5cc43be4931b427958e2863d6865b45b5c6ca Author: Dimuthu Wannipurage <[email protected]> AuthorDate: Wed Mar 24 13:03:53 2021 -0400 Implementing generic resource handler and metadata ingestion --- .../drms/api/handlers/ResourceServiceHandler.java | 99 +++++++++++++++++++++- .../handlers/StoragePreferenceServiceHandler.java | 6 +- .../drms/api/handlers/StorageServiceHandler.java | 2 + .../drms/core/constants/ResourceConstants.java | 21 +++++ .../deserializer/GenericResourceDeserializer.java | 89 +++++++++++++++++++ .../main/proto/resource/DRMSResourceService.proto | 35 +++++++- 6 files changed, 247 insertions(+), 5 deletions(-) diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java index 4e34e7a..34efc05 100644 --- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java +++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/ResourceServiceHandler.java @@ -17,15 +17,78 @@ package org.apache.airavata.drms.api.handlers; import com.google.protobuf.Empty; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import org.apache.airavata.datalake.drms.DRMSServiceAuthToken; +import org.apache.airavata.datalake.drms.groups.FetchCurrentUserRequest; +import org.apache.airavata.datalake.drms.groups.FetchCurrentUserResponse; +import org.apache.airavata.datalake.drms.groups.GroupServiceGrpc; +import org.apache.airavata.datalake.drms.groups.User; +import org.apache.airavata.datalake.drms.resource.GenericResource; import org.apache.airavata.datalake.drms.storage.*; +import org.apache.airavata.drms.core.Neo4JConnector; +import org.apache.airavata.drms.core.constants.ResourceConstants; +import org.apache.airavata.drms.core.constants.StorageConstants; +import org.apache.airavata.drms.core.deserializer.AnyStoragePreferenceDeserializer; +import org.apache.airavata.drms.core.deserializer.GenericResourceDeserializer; import org.lognet.springboot.grpc.GRpcService; +import org.neo4j.driver.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.List; @GRpcService public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceImplBase { + + private static final Logger logger = LoggerFactory.getLogger(ResourceServiceHandler.class); + + @Autowired + private Neo4JConnector neo4JConnector; + + @org.springframework.beans.factory.annotation.Value("${group.service.host}") + private String groupServiceHost; + + @org.springframework.beans.factory.annotation.Value("${group.service.port}") + private int groupServicePort; + + private User getUser(DRMSServiceAuthToken authToken) { + ManagedChannel channel = ManagedChannelBuilder.forAddress(groupServiceHost, groupServicePort).usePlaintext().build(); + GroupServiceGrpc.GroupServiceBlockingStub groupClient = GroupServiceGrpc.newBlockingStub(channel); + FetchCurrentUserResponse userResponse = groupClient.fetchCurrentUser( + FetchCurrentUserRequest.newBuilder().setAuthToken(authToken).build()); + return userResponse.getUser(); + } + @Override public void fetchResource(ResourceFetchRequest request, StreamObserver<ResourceFetchResponse> responseObserver) { - super.fetchResource(request, responseObserver); + User callUser = getUser(request.getAuthToken()); + + // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp), + List<Record> records = this.neo4JConnector.searchNodes( + "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource), " + + "(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " + + "where res.resourceId = '" + request.getResourceId() + "' and u.userId = '" + + callUser.getUserId() + "' return distinct res, sp, s"); + + if (!records.isEmpty()) { + try { + List<GenericResource> genericResourceList = GenericResourceDeserializer.deserializeList(records); + responseObserver.onNext(ResourceFetchResponse.newBuilder().setResource(genericResourceList.get(0)).build()); + responseObserver.onCompleted(); + } catch (Exception e) { + + logger.error("Errored while fetching resource with id {}", request.getResourceId(), e); + responseObserver.onError(new Exception("Errored while fetching resource with id " + + request.getResourceId() + ". Msg " + e.getMessage())); + } + } else { + logger.error("Could not find a generic resource with id {}", request.getResourceId()); + responseObserver.onError(new Exception("Could not find a generic resource with id " + + request.getResourceId())); + } } @Override @@ -45,6 +108,38 @@ public class ResourceServiceHandler extends ResourceServiceGrpc.ResourceServiceI @Override public void searchResource(ResourceSearchRequest request, StreamObserver<ResourceSearchResponse> responseObserver) { - super.searchResource(request, responseObserver); + User callUser = getUser(request.getAuthToken()); + + // TODO review (u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp), + List<Record> records = this.neo4JConnector.searchNodes( + "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)-[r6:HAS_RESOURCE]->(res:Resource), " + + "(u)-[r7:MEMBER_OF]->(g3:Group)<-[r8:SHARED_WITH]-(res) " + + "where u.userId = '" + callUser.getUserId() + "' return distinct res, sp, s"); + try { + List<GenericResource> genericResourceList = GenericResourceDeserializer.deserializeList(records); + ResourceSearchResponse.Builder builder = ResourceSearchResponse.newBuilder(); + builder.addAllResources(genericResourceList); + responseObserver.onNext(builder.build()); + responseObserver.onCompleted(); + + } catch (Exception e) { + logger.error("Errored while searching generic resources; Message: {}", e.getMessage(), e); + responseObserver.onError(e); + } + } + + @Override + public void addResourceMetadata(AddResourceMetadataRequest request, StreamObserver<Empty> responseObserver) { + User callUser = getUser(request.getAuthToken()); + this.neo4JConnector.createMetadataNode(ResourceConstants.RESOURCE_LABEL, "resourceId", + request.getResourceId(), callUser.getUserId(), + request.getMetadata().getKey(), request.getMetadata().getValue()); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void fetchResourceMetadata(FetchResourceMetadataRequest request, StreamObserver<FetchResourceMetadataResponse> responseObserver) { + super.fetchResourceMetadata(request, responseObserver); } } diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java index 3d7b44a..ab560c3 100644 --- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java +++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StoragePreferenceServiceHandler.java @@ -65,7 +65,8 @@ public class StoragePreferenceServiceHandler extends StoragePreferenceServiceGrp User callUser = getUser(request.getAuthToken()); List<Record> records = this.neo4JConnector.searchNodes( - "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference) " + + "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference), " + + "(u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp) " + "where sp.storagePreferenceId = '" + request.getStoragePreferenceId() + "' and u.userId = '" + callUser.getUserId() + "' return distinct sp, s"); @@ -108,7 +109,8 @@ public class StoragePreferenceServiceHandler extends StoragePreferenceServiceGrp User callUser = getUser(request.getAuthToken()); List<Record> records = this.neo4JConnector.searchNodes( - "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference)" + + "MATCH (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:Storage)-[r3:HAS_PREFERENCE]->(sp:StoragePreference), " + + "(u)-[r4:MEMBER_OF]->(g2:Group)<-[r5:SHARED_WITH]-(sp)" + " where u.userId ='" + callUser.getUserId() + "' return distinct sp, s"); try { List<AnyStoragePreference> storagePrefList = AnyStoragePreferenceDeserializer.deserializeList(records); diff --git a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java index a806e70..a00b7ca 100644 --- a/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java +++ b/data-resource-management-service/drms-api/src/main/java/org/apache/airavata/drms/api/handlers/StorageServiceHandler.java @@ -113,6 +113,8 @@ public class StorageServiceHandler extends StorageServiceGrpc.StorageServiceImpl this.neo4JConnector.createMetadataNode(StorageConstants.STORAGE_LABEL, "storageId", request.getStorageId(), callUser.getUserId(), request.getKey(), request.getValue()); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); } @Override diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java new file mode 100644 index 0000000..2c7e0e2 --- /dev/null +++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/ResourceConstants.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.airavata.drms.core.constants; + +public class ResourceConstants { + public static final String RESOURCE_LABEL = "Resource"; +} diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java new file mode 100644 index 0000000..c4f4d97 --- /dev/null +++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/GenericResourceDeserializer.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.airavata.drms.core.deserializer; + +import org.apache.airavata.datalake.drms.resource.GenericResource; +import org.apache.airavata.datalake.drms.storage.AnyStorage; +import org.apache.airavata.datalake.drms.storage.AnyStoragePreference; +import org.apache.airavata.drms.core.constants.ResourceConstants; +import org.apache.airavata.drms.core.constants.StorageConstants; +import org.apache.airavata.drms.core.constants.StoragePreferenceConstants; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.internal.InternalRecord; +import org.neo4j.driver.types.Node; +import org.springframework.beans.BeanWrapper; +import org.springframework.beans.PropertyAccessorFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class GenericResourceDeserializer { + + public static List<GenericResource> deserializeList(List<Record> neo4jRecords) throws Exception { + List<GenericResource> resourceList = new ArrayList<>(); + for (Record record : neo4jRecords) { + InternalRecord internalRecord = (InternalRecord) record; + List<Value> values = internalRecord.values(); + if (values.size() == 3) { + Value resourceValue = values.get(0); + Value prfValue = values.get(1); + Value stoValue = values.get(2); + Node resourceNode = resourceValue.asNode(); + Node prefNode = prfValue.asNode(); + Node stoNode = stoValue.asNode(); + if (resourceNode.hasLabel(ResourceConstants.RESOURCE_LABEL) && + prefNode.hasLabel(StoragePreferenceConstants.STORAGE_PREFERENCE_LABEL) && + stoNode.hasLabel(StorageConstants.STORAGE_LABEL)) { + + AnyStorage storage = AnyStorageDeserializer.deriveStorageFromMap(stoNode.asMap()); + AnyStoragePreference preference = AnyStoragePreferenceDeserializer.deriveStoragePrefFromMap( + prefNode.asMap(), storage); + GenericResource genericResource = deriveGenericResourceFromMap(resourceNode.asMap(), preference); + resourceList.add(genericResource); + } + } + } + return resourceList; + } + + public static GenericResource deriveGenericResourceFromMap(Map<String, Object> fixedMap, + AnyStoragePreference preference) throws Exception { + + GenericResource.Builder genericResourceBuilder = GenericResource.newBuilder(); + setObjectFieldsUsingMap(genericResourceBuilder, fixedMap); + switch (preference.getStorageCase()){ + case S3STORAGEPREFERENCE: + genericResourceBuilder.setS3Preference(preference.getS3StoragePreference()); + break; + case SSHSTORAGEPREFERENCE: + genericResourceBuilder.setSshPreference(preference.getSshStoragePreference()); + break; + } + + return genericResourceBuilder.build(); + } + + private static void setObjectFieldsUsingMap(Object target, Map<String, Object> values) { + for (String field :values.keySet()) { + BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(target); + beanWrapper.setPropertyValue(field, values.get(field)); + } + } +} diff --git a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto index 600974a..7251f12 100644 --- a/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto +++ b/data-resource-management-service/drms-stubs/src/main/proto/resource/DRMSResourceService.proto @@ -72,9 +72,30 @@ message ResourceSearchRequest { message ResourceSearchResponse { org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1; - repeated org.apache.airavata.datalake.drms.resource.GenericResource storagesPreference = 2; + repeated org.apache.airavata.datalake.drms.resource.GenericResource resources = 2; } +message Metadata { + string key = 1; + string value = 2; +} + +message AddResourceMetadataRequest { + org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1; + string resourceId = 2; + Metadata metadata = 3; +} + +message FetchResourceMetadataRequest { + org.apache.airavata.datalake.drms.DRMSServiceAuthToken authToken = 1; + string resourceId = 2; +} + +message FetchResourceMetadataResponse { + repeated Metadata metadata = 1; +} + + service ResourceService { rpc fetchResource (ResourceFetchRequest) returns (ResourceFetchResponse) { @@ -106,4 +127,16 @@ service ResourceService { post: "/v1.0/api/drms/resource/searchPreference" }; } + + rpc addResourceMetadata (AddResourceMetadataRequest) returns (google.protobuf.Empty) { + option (google.api.http) = { + post: "/v1.0/api/drms/resource/metadata" + }; + } + + rpc fetchResourceMetadata (FetchResourceMetadataRequest) returns (FetchResourceMetadataResponse) { + option (google.api.http) = { + get: "/v1.0/api/drms/resource/metadata" + }; + } } \ No newline at end of file
