This is an automated email from the ASF dual-hosted git repository. isjarana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
commit 229140eedd92d1c162741ff1f891a2c45dfe3603 Author: Isuru Ranawaka <[email protected]> AuthorDate: Tue Jun 1 10:46:44 2021 -0400 custos data synchronizer --- .../apache/airavata/drms/core/Neo4JConnector.java | 32 ++++- .../drms/core/constants/UserAndGroupConstants.java | 6 + .../deserializer/UserAndGroupDeserializer.java | 51 +++++++ .../drms-custos-synchronizer/pom.xml | 77 ++++++++++ .../drms/custos/synchronizer/Configuration.java | 139 ++++++++++++++++++ .../custos/synchronizer/CustosSynchronizer.java | 60 ++++++++ .../airavata/drms/custos/synchronizer/Utils.java | 62 ++++++++ .../datafetcher/CustosDataFetchingJob.java | 43 ++++++ .../synchronizer/handlers/SharingHandler.java | 140 ++++++++++++++++++ .../synchronizer/handlers/UserAndGroupHandler.java | 156 +++++++++++++++++++++ .../src/main/resources/config.yml | 12 ++ .../src/main/resources/logback.xml | 45 ++++++ data-resource-management-service/pom.xml | 1 + 13 files changed, 823 insertions(+), 1 deletion(-) diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java index 1d39ce4..96a283b 100644 --- a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java +++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/Neo4JConnector.java @@ -28,12 +28,24 @@ public class Neo4JConnector { private String userName; private String password; + private Driver driver; + + public Neo4JConnector() { + } + public Neo4JConnector(String uri, String userName, String password) { this.uri = uri; this.userName = userName; this.password = password; } + public void init(String uri, String userName, String password) { + this.uri = uri; + this.userName = userName; + this.password = password; + this.driver = GraphDatabase.driver(uri, AuthTokens.basic(userName, password)); + } + public List<Record> searchNodes(String query) { Driver driver = GraphDatabase.driver(uri, AuthTokens.basic(userName, password)); Session session = driver.session(); @@ -52,6 +64,22 @@ public class Neo4JConnector { tx.close(); } + public void runTransactionalQuery(Map<String, Object> parameters, String query) { + Session session = driver.session(); + Transaction tx = session.beginTransaction(); + Result result = tx.run(query, parameters); + tx.commit(); + tx.close(); + } + + public void runTransactionalQuery(String query) { + Session session = driver.session(); + Transaction tx = session.beginTransaction(); + Result result = tx.run(query); + tx.commit(); + tx.close(); + } + public void createMetadataNode(String parentLabel, String parentIdName, String parentIdValue, String userId, String key, String value) { Driver driver = GraphDatabase.driver(uri, AuthTokens.basic(userName, password)); @@ -59,8 +87,10 @@ public class Neo4JConnector { Transaction tx = session.beginTransaction(); tx.run("match (u:User)-[r1:MEMBER_OF]->(g:Group)<-[r2:SHARED_WITH]-(s:" + parentLabel + ") where u.userId='" + userId + "' and s." + parentIdName + "='" + parentIdValue + - "' merge (m:Metadata)<-[r3:HAS_METADATA]-(s) set m." + key + "='" + value + "' return m"); + "' merge (m:Metadata)<-[r3:HAS_METADATA]-(s) set m." + key + "='" + value + "' return m"); tx.commit(); tx.close(); } + + } diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/UserAndGroupConstants.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/UserAndGroupConstants.java new file mode 100644 index 0000000..a65b053 --- /dev/null +++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/constants/UserAndGroupConstants.java @@ -0,0 +1,6 @@ +package org.apache.airavata.drms.core.constants; + +public class UserAndGroupConstants { + + public static final String USER_LABEL = "User"; +} diff --git a/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/UserAndGroupDeserializer.java b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/UserAndGroupDeserializer.java new file mode 100644 index 0000000..aea14d8 --- /dev/null +++ b/data-resource-management-service/drms-core/src/main/java/org/apache/airavata/drms/core/deserializer/UserAndGroupDeserializer.java @@ -0,0 +1,51 @@ +package org.apache.airavata.drms.core.deserializer; + +import org.apache.airavata.datalake.drms.groups.User; +import org.apache.airavata.drms.core.constants.UserAndGroupConstants; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.internal.InternalRecord; +import org.neo4j.driver.types.Node; +import org.springframework.beans.BeanWrapper; +import org.springframework.beans.PropertyAccessorFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class UserAndGroupDeserializer { + + + public static List<User> deserializeUserList(List<Record> neo4jRecords) throws Exception { + List<User> userList = new ArrayList<>(); + for (Record record : neo4jRecords) { + InternalRecord internalRecord = (InternalRecord) record; + List<Value> values = internalRecord.values(); + for (Value value : values) { + Node node = value.asNode(); + if (node.hasLabel(UserAndGroupConstants.USER_LABEL)) { + userList.add(deriveUserFromMap(node.asMap())); + } + } + } + return userList; + } + + public static User deriveUserFromMap(Map<String, Object> fixedMap) throws Exception { + + Map<String, Object> asMap = new HashMap<>(fixedMap); + User.Builder builder = User.newBuilder(); + asMap.remove(UserAndGroupConstants.USER_LABEL); + setObjectFieldsUsingMap(builder, asMap); + return builder.build(); + } + + + private static void setObjectFieldsUsingMap(Object target, Map<String, Object> values) { + for (String field : values.keySet()) { + BeanWrapper beanWrapper = PropertyAccessorFactory.forBeanPropertyAccess(target); + beanWrapper.setPropertyValue(field, values.get(field)); + } + } +} diff --git a/data-resource-management-service/drms-custos-synchronizer/pom.xml b/data-resource-management-service/drms-custos-synchronizer/pom.xml new file mode 100644 index 0000000..166cf3b --- /dev/null +++ b/data-resource-management-service/drms-custos-synchronizer/pom.xml @@ -0,0 +1,77 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>data-resource-management-service</artifactId> + <groupId>org.apache.airavata.data.lake</groupId> + <version>0.01-SNAPSHOT</version> + </parent> + <artifactId>drms-custos-synchronizer</artifactId> + + <properties> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + </properties> + + <dependencies> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter</artifactId> + <version>${spring.boot.data.jpa}</version> + </dependency> + + <dependency> + <groupId>net.sf.dozer</groupId> + <artifactId>dozer</artifactId> + <version>5.5.1</version> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.java}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${io.grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${io.grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> + <version>${io.grpc.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + <version>${log4j.over.slf4j}</version> + </dependency> + <dependency> + <groupId>org.yaml</groupId> + <artifactId>snakeyaml</artifactId> + <version>${yaml.version}</version> + </dependency> + <dependency> + <groupId>org.quartz-scheduler</groupId> + <artifactId>quartz</artifactId> + <version>2.3.2</version> + </dependency> + <dependency> + <groupId>org.apache.airavata.data.lake</groupId> + <artifactId>drms-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.custos</groupId> + <artifactId>custos-java-sdk</artifactId> + <version>${custos.clients.version}</version> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Configuration.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Configuration.java new file mode 100644 index 0000000..f302a98 --- /dev/null +++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Configuration.java @@ -0,0 +1,139 @@ +package org.apache.airavata.drms.custos.synchronizer; + +public class Configuration { + + private long pollingInterval; + private Custos custos; + private DataResourceManagementService dataResourceManagementService; + + public Configuration() { + + } + + public Custos getCustos() { + return custos; + } + + public void setCustos(Custos custos) { + this.custos = custos; + } + + public DataResourceManagementService getDataResourceManagementService() { + return dataResourceManagementService; + } + + public void setDataResourceManagementService(DataResourceManagementService dataResourceManagementService) { + this.dataResourceManagementService = dataResourceManagementService; + } + + public long getPollingInterval() { + return pollingInterval; + } + + public void setPollingInterval(long pollingInterval) { + this.pollingInterval = pollingInterval; + } + + public static class Custos { + + private String host; + private int port; + private String custosId; + private String custosSec; + private String[] tenantsToBeSynced; + + public Custos(String host, int port, String custosId, String custosSec, String[] tenantsToBeSynced) { + this.host = host; + this.port = port; + this.custosId = custosId; + this.custosSec = custosSec; + this.tenantsToBeSynced = tenantsToBeSynced; + } + + public Custos() { + + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getCustosId() { + return custosId; + } + + public void setCustosId(String custosId) { + this.custosId = custosId; + } + + public String getCustosSec() { + return custosSec; + } + + public void setCustosSec(String custosSec) { + this.custosSec = custosSec; + } + + public String[] getTenantsToBeSynced() { + return tenantsToBeSynced; + } + + public void setTenantsToBeSynced(String[] tenantsToBeSynced) { + this.tenantsToBeSynced = tenantsToBeSynced; + } + } + + public static class DataResourceManagementService { + + private String dbURI; + private String dbUser; + private String dbPassword; + + public DataResourceManagementService(String dbURI, String dbUser, String dbPassword) { + this.dbURI = dbURI; + this.dbUser = dbUser; + this.dbPassword = dbPassword; + } + + public DataResourceManagementService() { + } + + public String getDbURI() { + return dbURI; + } + + public void setDbURI(String dbURI) { + this.dbURI = dbURI; + } + + public String getDbUser() { + return dbUser; + } + + public void setDbUser(String dbUser) { + this.dbUser = dbUser; + } + + public String getDbPassword() { + return dbPassword; + } + + public void setDbPassword(String dbPassword) { + this.dbPassword = dbPassword; + } + } + + +} diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/CustosSynchronizer.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/CustosSynchronizer.java new file mode 100644 index 0000000..b19dbf3 --- /dev/null +++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/CustosSynchronizer.java @@ -0,0 +1,60 @@ +package org.apache.airavata.drms.custos.synchronizer; + +import org.apache.airavata.drms.custos.synchronizer.datafetcher.CustosDataFetchingJob; +import org.quartz.*; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class CustosSynchronizer implements CommandLineRunner { + + private static final Logger LOGGER = LoggerFactory.getLogger(CustosSynchronizer.class); + private static String configFilePath; + + + public static void main(String[] args) { + SpringApplication.run(CustosSynchronizer.class, args); + } + + @Override + public void run(String... args) throws Exception { + LOGGER.info("Starting Custos synchronizer ..."); + if (args.length > 0) { + configFilePath = args[0]; + } + configFilePath = "/Users/isururanawaka/Documents/Airavata_Repository/airavata-data-lake" + + "/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml"; + + LOGGER.info("Configuring scheduler ..."); + Utils.initializeConnectors(Utils.loadConfiguration(configFilePath)); + configureScheduler(configFilePath); + + } + + + private void configureScheduler(String configPath) throws SchedulerException { + SchedulerFactory schedulerFactory = new StdSchedulerFactory(); + Scheduler scheduler = schedulerFactory.getScheduler(); + Configuration configuration = Utils.loadConfig(configPath); + Trigger trigger = TriggerBuilder.newTrigger() + .withIdentity("custosDataFetcher", "synchronizer1") + .startNow() + .withSchedule(SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds((int) configuration.getPollingInterval()) + .repeatForever()) + .build(); + + JobDetail job = JobBuilder.newJob(CustosDataFetchingJob.class) + .withIdentity("myJob", "group1") + .usingJobData("configurationPath", configPath) + .build(); + scheduler.start(); + scheduler.scheduleJob(job, trigger); + } + + +} diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Utils.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Utils.java new file mode 100644 index 0000000..ee664ef --- /dev/null +++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/Utils.java @@ -0,0 +1,62 @@ +package org.apache.airavata.drms.custos.synchronizer; + +import org.apache.airavata.drms.core.Neo4JConnector; +import org.apache.custos.clients.CustosClientProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.Optional; + +public class Utils { + private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class); + + private static final Neo4JConnector neo4JConnector = new Neo4JConnector(); + private static CustosClientProvider custosClientProvider = null; + + + public static Configuration loadConfiguration(String path) { + return Optional.ofNullable(path). + map(Utils::loadConfig) + .orElseThrow(() -> { + String msg = "Configuration path cannot be null"; + LOGGER.error(msg); + throw new RuntimeException(msg); + }); + } + + public static Configuration loadConfig(String filePath) { + try (InputStream in = new FileInputStream(filePath)) { + Yaml yaml = new Yaml(); + return yaml.loadAs(in, Configuration.class); + } catch (Exception exception) { + LOGGER.error("Error loading config file", exception); + } + return null; + } + + public static void initializeConnectors(Configuration configuration) { + neo4JConnector.init(configuration.getDataResourceManagementService().getDbURI(), + configuration.getDataResourceManagementService().getDbUser(), + configuration.getDataResourceManagementService().getDbPassword()); + LOGGER.info(configuration.getCustos().getCustosId()); + LOGGER.info(configuration.getCustos().getCustosSec()); + custosClientProvider = new CustosClientProvider.Builder() + .setClientId(configuration.getCustos().getCustosId()) + .setClientSec(configuration.getCustos().getCustosSec()) + .setServerHost(configuration.getCustos().getHost()) + .setServerPort(configuration.getCustos().getPort()) + .build(); + } + + public static Neo4JConnector getNeo4JConnector() { + return neo4JConnector; + } + + public static CustosClientProvider getCustosClientProvider() { + return custosClientProvider; + } + +} diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/datafetcher/CustosDataFetchingJob.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/datafetcher/CustosDataFetchingJob.java new file mode 100644 index 0000000..b6960b7 --- /dev/null +++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/datafetcher/CustosDataFetchingJob.java @@ -0,0 +1,43 @@ +package org.apache.airavata.drms.custos.synchronizer.datafetcher; + +import org.apache.airavata.drms.custos.synchronizer.Configuration; +import org.apache.airavata.drms.custos.synchronizer.handlers.SharingHandler; +import org.apache.airavata.drms.custos.synchronizer.handlers.UserAndGroupHandler; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.airavata.drms.custos.synchronizer.Utils.loadConfiguration; + +/** + * Custos data fetching job + */ + +public class CustosDataFetchingJob implements Job { + private static final Logger LOGGER = LoggerFactory.getLogger(CustosDataFetchingJob.class); + + + @Override + public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + try { + LOGGER.debug("Executing CustosDataFetchingJob ....... "); + JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap(); + String path = jobDataMap.getString("configurationPath"); + Configuration configuration = loadConfiguration(path); + UserAndGroupHandler userAndGroupHandler = new UserAndGroupHandler(); + userAndGroupHandler.mergeUserAndGroups(configuration); + SharingHandler sharingHandler = new SharingHandler(); + sharingHandler.mergeSharings(configuration); + } catch (Exception ex) { + String msg = "Error occurred while executing job" + ex.getMessage(); + LOGGER.error(msg, ex); + } + + + } + + +} diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/SharingHandler.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/SharingHandler.java new file mode 100644 index 0000000..332aa8d --- /dev/null +++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/SharingHandler.java @@ -0,0 +1,140 @@ +package org.apache.airavata.drms.custos.synchronizer.handlers; + +import org.apache.airavata.drms.core.Neo4JConnector; +import org.apache.airavata.drms.custos.synchronizer.Configuration; +import org.apache.airavata.drms.custos.synchronizer.Utils; +import org.apache.custos.clients.CustosClientProvider; +import org.apache.custos.sharing.management.client.SharingManagementClient; +import org.apache.custos.sharing.service.Entity; +import org.apache.custos.sharing.service.GetAllDirectSharingsResponse; +import org.apache.custos.sharing.service.SharingMetadata; +import org.apache.custos.sharing.service.SharingRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SharingHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(SharingHandler.class); + + private final Neo4JConnector neo4JConnector; + private CustosClientProvider custosClientProvider; + + public SharingHandler() { + this.neo4JConnector = Utils.getNeo4JConnector(); + this.custosClientProvider = Utils.getCustosClientProvider(); + } + + public void mergeSharings(Configuration configuration) { + try { + LOGGER.debug("Merging sharings for custos client with id "+ configuration.getCustos().getCustosId()); + SharingManagementClient sharingManagementClient = custosClientProvider.getSharingManagementClient(); + mergeSharings(sharingManagementClient, configuration.getCustos().getTenantsToBeSynced()); + + } catch (Exception ex) { + String msg = "Exception occurred while merging user" + ex.getMessage(); + LOGGER.error(msg, ex); + } + + } + + + private void mergeSharings(SharingManagementClient sharingManagementClient, String[] clientIds) { + try { + SharingRequest sharingRequest = SharingRequest.newBuilder().build(); + Arrays.stream(clientIds).forEach(clientId -> { + GetAllDirectSharingsResponse response = sharingManagementClient + .getAllDirectSharings(clientId, sharingRequest); + List<SharingMetadata> metadataList = response.getSharedDataList(); + metadataList.forEach(metadata -> { + mergeEntities(metadata.getEntity(), clientId); + + }); + metadataList.forEach(metadata -> { + mergeEntityParentChildRelationShips(sharingManagementClient, metadata.getEntity(), clientId); + mergeEntitySharings(metadata, clientId); + }); + }); + + } catch (Exception ex) { + String msg = "Error occurred while merging sharings from Custos "; + LOGGER.error(msg, ex); + } + + + } + + private void mergeEntities(Entity entity, String clientId) { + String query = "Merge (u:" + entity.getType() + "{entityId: '" + entity.getId() + "'," + + "custosClientId:'" + clientId + "'}" + ")" + + " SET u = $props return u "; + Map<String, Object> map = new HashMap<>(); + map.put("description", entity.getDescription()); + map.put("name", entity.getName()); + map.put("createdTime", entity.getCreatedAt()); + map.put("custosClientId", clientId); + map.put("entityId", entity.getId()); + Map<String, Object> parameters = new HashMap<>(); + parameters.put("props", map); + try { + this.neo4JConnector.runTransactionalQuery(parameters, query); + } catch (Exception ex) { + String msg = "Error occurred while merging entities "; + LOGGER.error(msg, ex); + } + + } + + private void mergeEntityParentChildRelationShips(SharingManagementClient sharingManagementClient, Entity entity, + String clientId) { + + if (!entity.getParentId().trim().isEmpty()) { + Entity parentEntity = Entity.newBuilder().setId(entity.getParentId()).build(); + Entity fullParentEntity = sharingManagementClient.getEntity(clientId, parentEntity); + String query = "MATCH (a:" + entity.getType() + "), (b:" + fullParentEntity.getType() + ") WHERE a.entityId = '" + + entity.getId() + "' AND a.custosClientId = '" + + clientId + "' AND " + "b.entityId ='" + fullParentEntity.getId() + "' AND b.custosClientId ='" + clientId + + "' MERGE (a)-[r:CHILD_OF]->(b) RETURN a, b"; + try { + this.neo4JConnector.runTransactionalQuery(query); + } catch (Exception ex) { + String msg = "Error occurred while merging parent child relationships "; + LOGGER.error(msg, ex); + } + } + } + + private void mergeEntitySharings(SharingMetadata metadata, String clientId) { + Entity entity = metadata.getEntity(); + String sourceId = metadata.getEntity().getId(); + String permissionId = metadata.getPermission().getId(); + String userId = metadata.getOwnerId(); + String type = metadata.getOwnerType(); + String query = null; + if (type.equalsIgnoreCase("USER")) { + query = "MATCH (a:" + entity.getType() + "), (b:User) WHERE a.entityId = '" + + sourceId + "' AND a.custosClientId = '" + + clientId + "' AND " + "b.username ='" + userId + "' AND b.custosClientId ='" + clientId + + "' MERGE (a)-[r:SHARED_WITH]->(b) SET r.permission='" + permissionId + "' RETURN a, b"; + + } else if (type.equalsIgnoreCase("GROUP")) { + query = "MATCH (a:" + entity.getType() + "), (b:Group) WHERE a.entityId = '" + + sourceId + "' AND a.custosClientId = '" + + clientId + "' AND " + "b.groupId ='" + userId + "' AND b.custosClientId ='" + clientId + + "' MERGE (a)-[r:SHARED_WITH]->(b) SET r.permission='" + permissionId + "' RETURN a, b"; + } + if (query != null) { + try { + this.neo4JConnector.runTransactionalQuery(query); + } catch (Exception ex) { + String msg = "Error occurred while merging sharings "; + LOGGER.error(msg, ex); + } + } + + } +} diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/UserAndGroupHandler.java b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/UserAndGroupHandler.java new file mode 100644 index 0000000..625cfcb --- /dev/null +++ b/data-resource-management-service/drms-custos-synchronizer/src/main/java/org/apache/airavata/drms/custos/synchronizer/handlers/UserAndGroupHandler.java @@ -0,0 +1,156 @@ +package org.apache.airavata.drms.custos.synchronizer.handlers; + +import org.apache.airavata.drms.core.Neo4JConnector; +import org.apache.airavata.drms.custos.synchronizer.Configuration; +import org.apache.airavata.drms.custos.synchronizer.Utils; +import org.apache.custos.clients.CustosClientProvider; +import org.apache.custos.group.management.client.GroupManagementClient; +import org.apache.custos.user.management.client.UserManagementClient; +import org.apache.custos.user.profile.service.GetAllGroupsResponse; +import org.apache.custos.user.profile.service.GetAllUserProfilesResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + + +public class UserAndGroupHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(UserAndGroupHandler.class); + + private final Neo4JConnector neo4JConnector; + private CustosClientProvider custosClientProvider; + + public UserAndGroupHandler() { + this.neo4JConnector = Utils.getNeo4JConnector(); + this.custosClientProvider = Utils.getCustosClientProvider(); + } + + public void mergeUserAndGroups(Configuration configuration) { + try { + LOGGER.debug("Merging groups for custos client with id " + configuration.getCustos().getCustosId()); + String[] clientIds = configuration.getCustos().getTenantsToBeSynced(); + UserManagementClient userManagementClient = this.custosClientProvider.getUserManagementClient(); + GroupManagementClient groupManagementClient = this.custosClientProvider.getGroupManagementClient(); + mergeUsers(userManagementClient, clientIds); + mergeGroups(groupManagementClient, clientIds); + mergeUserAndGroupMemberships(groupManagementClient, userManagementClient, clientIds); + } catch (Exception ex) { + String msg = "Exception occurred while merging user" + ex.getMessage(); + LOGGER.error(msg, ex); + } + + + } + + private void mergeUsers(UserManagementClient userManagementClient, String[] clientIds) { + try { + Arrays.stream(clientIds).forEach(val -> { + GetAllUserProfilesResponse response = userManagementClient.getAllUserProfiles(val); + response.getProfilesList().forEach(userProfile -> { + String query = "Merge (u:User{username: '" + userProfile.getUsername() + "'," + + "custosClientId:'" + val + "'}" + ")" + + " SET u = $props return u "; + Map<String, Object> map = new HashMap<>(); + map.put("firstName", userProfile.getFirstName()); + map.put("name", userProfile.getUsername()); + map.put("lastName", userProfile.getLastName()); + map.put("email", userProfile.getEmail()); + map.put("username", userProfile.getUsername()); + map.put("custosClientId", val); + Map<String, Object> parameters = new HashMap<>(); + parameters.put("props", map); + this.neo4JConnector.runTransactionalQuery(parameters, query); + }); + + }); + + + } catch (Exception ex) { + LOGGER.error("Error occurred while merging user ", ex); + } + } + + + private void mergeGroups(GroupManagementClient groupManagementClient, String[] clientIds) { + try { + Arrays.stream(clientIds).forEach(val -> { + GetAllGroupsResponse response = groupManagementClient.getAllGroups(val); + response.getGroupsList().forEach(gr -> { + String id = gr.getId().replaceAll("'", ""); + String query = "Merge (u:Group{groupId: '" + id + "'," + + "custosClientId:'" + val + "'}" + ")" + + " SET u = $props return u "; + Map<String, Object> map = new HashMap<>(); + map.put("description", gr.getDescription()); + map.put("name", gr.getName()); + map.put("groupId", id); + map.put("createdTime", gr.getCreatedTime()); + map.put("lastModifiedTime", gr.getLastModifiedTime()); + map.put("custosClientId", val); + Map<String, Object> parameters = new HashMap<>(); + parameters.put("props", map); + try { + this.neo4JConnector.runTransactionalQuery(parameters, query); + } catch (Exception ex) { + LOGGER.error("Error occurred while merging groups ", ex); + } + }); + }); + } catch (Exception ex) { + LOGGER.error("Error occurred while merging groups ", ex); + } + } + + private void mergeUserAndGroupMemberships(GroupManagementClient groupManagementClient, UserManagementClient userManagementClient, + String[] clientIds) { + try { + Arrays.stream(clientIds).forEach(val -> { + GetAllGroupsResponse response = groupManagementClient.getAllGroups(val); + response.getGroupsList().forEach(gr -> { + String id = gr.getId().replaceAll("'", ""); + GetAllUserProfilesResponse userProfilesResponse = groupManagementClient.getAllChildUsers(val, gr.getId()); + userProfilesResponse.getProfilesList().forEach(prof -> { + String memberShipType = prof.getMembershipType(); + String userId = prof.getUsername(); + mergeUserMemberShip(userId, id, val, memberShipType); + }); + GetAllGroupsResponse getAllGroupsResponse = groupManagementClient.getAllChildGroups(val, gr.getId()); + getAllGroupsResponse.getGroupsList().forEach(grMem -> { + String childId = gr.getId().replaceAll("'", ""); + mergeGroupMemberShip(id, childId, val); + }); + }); + }); + } catch (Exception ex) { + LOGGER.error("Error occurred while merging groups ", ex); + } + } + + private void mergeUserMemberShip(String username, String groupId, String custosClientId, String role) { + String query = "MATCH (a:User), (b:Group) WHERE a.username = '" + username + "' AND a.custosClientId = '" + + custosClientId + "' AND " + "b.groupId ='" + groupId + "' AND b.custosClientId ='" + custosClientId + + "' MERGE (a)-[r:MEMBER_OF]->(b) SET r.role='" + role + "' RETURN a, b"; + try { + this.neo4JConnector.runTransactionalQuery(query); + } catch (Exception ex) { + LOGGER.error("Error occurred while merging UserGroupMembership ", ex); + } + + } + + private void mergeGroupMemberShip(String parentGroupId, String childGroupId, String custosClientId) { + String query = "MATCH (a:Group), (b:Group) WHERE a.groupId = '" + parentGroupId + "' AND a.custosClientId = '" + + custosClientId + "' AND " + "b.groupId ='" + childGroupId + "' AND b.custosClientId ='" + + custosClientId + "' MERGE (a)<-[r:CHILD_OF]-(b) RETURN a, b"; + try { + this.neo4JConnector.runTransactionalQuery(query); + } catch (Exception ex) { + LOGGER.error("Error occurred while merging Group memberships ", ex); + } + + } + +} diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml new file mode 100644 index 0000000..f3d467f --- /dev/null +++ b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/config.yml @@ -0,0 +1,12 @@ +pollingInterval: 60 +dataResourceManagementService: + dbURI: "bolt://149.165.156.173:7687" + dbUser: "neo4j" + dbPassword: "blastcovid19" +custos: + host: "custos.scigap.org" + port: 31499 + custosId: "custos-2zuomcugra3ebgsqtzmf-10000514" + custosSec: "mupUhF4JL0S3IFHBjfhiTfLJS1NgSWfvkCj3l6c7" + tenantsToBeSynced: + - "custos-cmcdclbywlxmc2ktzv0d-10000702" diff --git a/data-resource-management-service/drms-custos-synchronizer/src/main/resources/logback.xml b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/logback.xml new file mode 100644 index 0000000..270e6f7 --- /dev/null +++ b/data-resource-management-service/drms-custos-synchronizer/src/main/resources/logback.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<configuration> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern> + </encoder> + </appender> + + <appender name="LOGFILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <File>../logs/custos_synchronizer.log</File> + <Append>true</Append> + <encoder> + <pattern>%d [%t] %-5p %c{30} %m [%X]%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>../logs/custos_synchronizer.log.%d{yyyy-MM-dd}</fileNamePattern> + <maxHistory>30</maxHistory> + <totalSizeCap>1GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="ch.qos.logback" level="WARN"/> + <logger name="org.apache.airavata" level="INFO"/> + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="LOGFILE"/> + </root> +</configuration> \ No newline at end of file diff --git a/data-resource-management-service/pom.xml b/data-resource-management-service/pom.xml index d1c7685..f8eb041 100644 --- a/data-resource-management-service/pom.xml +++ b/data-resource-management-service/pom.xml @@ -31,6 +31,7 @@ <module>drms-stubs</module> <module>drms-core</module> <module>drms-api</module> + <module>drms-custos-synchronizer</module> </modules> <dependencies>
