This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-data-lake.git
The following commit(s) were added to refs/heads/master by this push:
new 8227ff7 Optimizations to speedup resource registration + scanning
internal directories
8227ff7 is described below
commit 8227ff7c735be46fb182af688ea6bc753df9a0af
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Tue Oct 12 21:39:51 2021 -0400
Optimizations to speedup resource registration + scanning internal
directories
---
.../handlers/async/OrchestratorEventProcessor.java | 120 ++++++++++++---------
1 file changed, 72 insertions(+), 48 deletions(-)
diff --git
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
index a62b8ea..adfdc53 100644
---
a/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
+++
b/data-orchestrator/data-orchestrator-service/data-orchestrator-api-server/src/main/java/org/apache/airavata/datalake/orchestrator/handlers/async/OrchestratorEventProcessor.java
@@ -62,8 +62,9 @@ public class OrchestratorEventProcessor implements Runnable {
this.notificationClient = notificationClient;
}
- private List<GenericResource> createResourceRecursively(String hostName,
String storageId, String basePath,
- String
resourcePath, String resourceType, String user)
+ private List<GenericResource> createResourceWithParentDirectories(String
hostName, String storageId, String basePath,
+ String
resourcePath, String resourceType, String user,
+
Map<String, GenericResource> resourceCache)
throws Exception {
List<GenericResource> resourceList = new ArrayList<>();
@@ -77,6 +78,12 @@ public class OrchestratorEventProcessor implements Runnable {
for (int i = 0; i < splitted.length - 1; i++) {
String resourceName = splitted[i];
currentPath = currentPath + "/" + resourceName;
+
+ if (resourceCache.containsKey(currentPath)) {
+ resourceList.add(resourceCache.get(currentPath));
+ continue;
+ }
+
String resourceId = Utils.getId(storageId + ":" + currentPath);
Optional<GenericResource> optionalGenericResource =
this.drmsConnector.createResource(notification.getAuthToken(),
@@ -92,6 +99,7 @@ public class OrchestratorEventProcessor implements Runnable {
this.drmsConnector.addResourceMetadata(notification.getAuthToken(),
notification.getTenantId(), parentId, user,
parentType, metadata);
+ resourceCache.put(currentPath, optionalGenericResource.get());
resourceList.add(optionalGenericResource.get());
} else {
logger.error("Could not create a resource for path {}",
currentPath);
@@ -150,6 +158,7 @@ public class OrchestratorEventProcessor implements Runnable
{
logger.info("Processing resource path {} on storage {}",
notification.getResourcePath(),
notification.getBasePath());
+ Map<String, GenericResource> resourceCache = new HashMap<>();
try {
this.notificationClient.get().registerNotificationStatus(NotificationStatusRegisterRequest.newBuilder()
@@ -196,10 +205,10 @@ public class OrchestratorEventProcessor implements
Runnable {
// Creating parent resource
- List<GenericResource> resourceList =
createResourceRecursively(sourceHostName, sourceStorageId,
+ List<GenericResource> resourceList =
createResourceWithParentDirectories(sourceHostName, sourceStorageId,
notification.getBasePath(),
notification.getResourcePath(),
- "COLLECTION", adminUser);
+ "COLLECTION", adminUser, resourceCache);
shareResourcesWithUsers(Collections.singletonList(resourceList.get(resourceList.size()
- 1)),
adminUser, owner, "VIEWER");
@@ -245,54 +254,15 @@ public class OrchestratorEventProcessor implements
Runnable {
.putProperties("TENANT_ID",
notification.getTenantId()).build();
AuthToken mftAuth =
AuthToken.newBuilder().setDelegateAuth(delegateAuth).build();
-
- FetchResourceMetadataRequest.Builder resourceMetadataReq =
FetchResourceMetadataRequest.newBuilder()
- .setMftAuthorizationToken(mftAuth)
- .setResourceId(resourceObj.getResourceId());
-
- switch (sourceSP.getStorageCase()) {
- case SSH_STORAGE_PREFERENCE:
- resourceMetadataReq.setResourceType("SCP");
-
resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getStoragePreferenceId());
- break;
- case S3_STORAGE_PREFERENCE:
- resourceMetadataReq.setResourceType("S3");
-
resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getStoragePreferenceId());
- break;
- }
-
- // Fetching file list for parent resource
-
- DirectoryMetadataResponse directoryResourceMetadata;
-
- try (MFTApiClient mftApiClient = new MFTApiClient(
-
this.configuration.getOutboundEventProcessor().getMftHost(),
-
this.configuration.getOutboundEventProcessor().getMftPort())) {
- MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub =
mftApiClient.get();
- directoryResourceMetadata =
mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
- }
-
List<String> resourceIDsToProcess = new ArrayList<>();
- for (FileMetadataResponse fileMetadata :
directoryResourceMetadata.getFilesList()) {
- logger.info("Registering file {} for source storage {}",
fileMetadata.getResourcePath(), sourceStorageId);
- resourceList = createResourceRecursively(sourceHostName,
sourceStorageId, notification.getBasePath(),
- fileMetadata.getResourcePath(), "FILE", adminUser);
- GenericResource fileResource =
resourceList.get(resourceList.size() - 1);
- resourceIDsToProcess.add(fileResource.getResourceId());
- }
-
- for (DirectoryMetadataResponse directoryMetadata :
directoryResourceMetadata.getDirectoriesList()) {
- logger.info("Registering directory {} for source storage {}",
directoryMetadata.getResourcePath(), sourceStorageId);
- createResourceRecursively(sourceHostName, sourceStorageId,
notification.getBasePath(),
- directoryMetadata.getResourcePath(),
- "COLLECTION", adminUser);
- // TODO scan directories
- }
+ // Fetching file list for parent resource
+ scanResourceForChildResources(resourceObj, mftAuth, sourceSP,
sourceStorageId, sourceHostName,
+ adminUser, resourceIDsToProcess, resourceCache, 4);
logger.info("Creating destination zip resource for directory {}",
notification.getResourcePath());
- resourceList = createResourceRecursively(destinationHostName,
destinationStorageId, notification.getBasePath(),
- notification.getResourcePath(), "FILE", adminUser);
+ resourceList =
createResourceWithParentDirectories(destinationHostName, destinationStorageId,
notification.getBasePath(),
+ notification.getResourcePath(), "FILE", adminUser,
resourceCache);
GenericResource destinationResource =
resourceList.get(resourceList.size() - 1);
@@ -328,4 +298,58 @@ public class OrchestratorEventProcessor implements
Runnable {
this.eventCache.remove(notification.getResourcePath() + ":" +
notification.getHostName());
}
}
+
+ private void scanResourceForChildResources(GenericResource resourceObj,
AuthToken mftAuth, AnyStoragePreference sourceSP,
+ String sourceStorageId, String
sourceHostName, String adminUser,
+ List<String>
resourceIDsToProcess, Map<String, GenericResource> resourceCache,
+ int scanDepth)
+ throws Exception {
+
+ FetchResourceMetadataRequest.Builder resourceMetadataReq =
FetchResourceMetadataRequest.newBuilder()
+ .setMftAuthorizationToken(mftAuth)
+ .setResourceId(resourceObj.getResourceId());
+
+ switch (sourceSP.getStorageCase()) {
+ case SSH_STORAGE_PREFERENCE:
+ resourceMetadataReq.setResourceType("SCP");
+
resourceMetadataReq.setResourceToken(sourceSP.getSshStoragePreference().getStoragePreferenceId());
+ break;
+ case S3_STORAGE_PREFERENCE:
+ resourceMetadataReq.setResourceType("S3");
+
resourceMetadataReq.setResourceToken(sourceSP.getS3StoragePreference().getStoragePreferenceId());
+ break;
+ }
+
+ DirectoryMetadataResponse directoryResourceMetadata;
+
+ try (MFTApiClient mftApiClient = new MFTApiClient(
+ this.configuration.getOutboundEventProcessor().getMftHost(),
+ this.configuration.getOutboundEventProcessor().getMftPort())) {
+ MFTApiServiceGrpc.MFTApiServiceBlockingStub mftClientStub =
mftApiClient.get();
+ directoryResourceMetadata =
mftClientStub.getDirectoryResourceMetadata(resourceMetadataReq.build());
+ }
+
+ for (FileMetadataResponse fileMetadata :
directoryResourceMetadata.getFilesList()) {
+ logger.info("Registering file {} for source storage {}",
fileMetadata.getResourcePath(), sourceStorageId);
+ List<GenericResource> resourceList =
createResourceWithParentDirectories(sourceHostName, sourceStorageId,
notification.getBasePath(),
+ fileMetadata.getResourcePath(), "FILE", adminUser,
resourceCache);
+ GenericResource fileResource =
resourceList.get(resourceList.size() - 1);
+
+ resourceIDsToProcess.add(fileResource.getResourceId());
+ }
+
+ for (DirectoryMetadataResponse directoryMetadata :
directoryResourceMetadata.getDirectoriesList()) {
+ logger.info("Registering directory {} for source storage {}",
directoryMetadata.getResourcePath(), sourceStorageId);
+ List<GenericResource> createResources =
createResourceWithParentDirectories(sourceHostName, sourceStorageId,
notification.getBasePath(),
+ directoryMetadata.getResourcePath(),
+ "COLLECTION", adminUser, resourceCache);
+ GenericResource dirResource =
createResources.get(createResources.size() - 1);
+
+ if (scanDepth > 0) {
+ // Scanning the directories recursively
+ scanResourceForChildResources(dirResource, mftAuth, sourceSP,
sourceStorageId, sourceHostName, adminUser,
+ resourceIDsToProcess, resourceCache, scanDepth - 1);
+ }
+ }
+ }
}