This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new 92ed154  Parallel distribution of batch transfer submission
92ed154 is described below

commit 92ed154fe90f145eb7116e5468303a8bf88e46ce
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Mon Oct 17 13:05:58 2022 -0400

    Parallel distribution of batch transfer submission
---
 .../airavata/mft/api/handler/MFTApiHandler.java    | 59 +++++++++++++++++-----
 1 file changed, 46 insertions(+), 13 deletions(-)

diff --git 
a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
 
b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
index 36aea48..1078f55 100644
--- 
a/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
+++ 
b/api/service/src/main/java/org/apache/airavata/mft/api/handler/MFTApiHandler.java
@@ -32,15 +32,15 @@ import 
org.apache.airavata.mft.core.DirectoryResourceMetadata;
 import org.apache.airavata.mft.core.FileResourceMetadata;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
 import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.commons.lang3.tuple.Pair;
 import org.dozer.DozerBeanMapper;
 import org.lognet.springboot.grpc.GRpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import java.util.List;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.*;
 
 @GRpcService
 public class MFTApiHandler extends 
MFTTransferServiceGrpc.MFTTransferServiceImplBase {
@@ -92,33 +92,66 @@ public class MFTApiHandler extends 
MFTTransferServiceGrpc.MFTTransferServiceImpl
         }
     }
 
+    private class BatchTransferSubmitter implements Callable<Pair<Integer, 
String>> {
+        private int index = 0;
+        private TransferApiRequest apiRequest;
+
+        public BatchTransferSubmitter(int index, TransferApiRequest 
apiRequest) {
+            this.index = index;
+            this.apiRequest = apiRequest;
+        }
+
+        @Override
+        public Pair<Integer, String> call() throws Exception {
+
+            String transferId = mftConsulClient.submitTransfer(apiRequest);
+            logger.info("Submitted the transfer request {}", transferId);
+
+            mftConsulClient.saveTransferState(transferId, new TransferState()
+                    .setUpdateTimeMils(System.currentTimeMillis())
+                    .setState("RECEIVED").setPercentage(0)
+                    .setPublisher("api")
+                    .setDescription("Received transfer job " + transferId));
+
+            return Pair.of(index, transferId);
+        }
+    }
+
     @Override
     public void submitBatchTransfer(BatchTransferApiRequest request, 
StreamObserver<BatchTransferApiResponse> responseObserver) {
+        ExecutorService executorService = Executors.newFixedThreadPool(20);
+
         try {
             List<TransferApiRequest> transferRequests = 
request.getTransferRequestsList();
-
             BatchTransferApiResponse.Builder responseBuilder = 
BatchTransferApiResponse.newBuilder();
-            for (TransferApiRequest apiRequest: transferRequests) {
-                String transferId = mftConsulClient.submitTransfer(apiRequest);
 
-                logger.info("Submitted the transfer request {}", transferId);
+            ExecutorCompletionService<Pair<Integer,String>> completionService 
= new ExecutorCompletionService<>(executorService);
 
-                mftConsulClient.saveTransferState(transferId, new 
TransferState()
-                        .setUpdateTimeMils(System.currentTimeMillis())
-                        .setState("RECEIVED").setPercentage(0)
-                        .setPublisher("api")
-                        .setDescription("Received transfer job " + 
transferId));
+            for (int index = 0; index < transferRequests.size(); index ++) {
+                completionService.submit(new BatchTransferSubmitter(index, 
transferRequests.get(index)));
+            }
 
-                responseBuilder.addTransferIds(transferId);
+            Map<Integer, String> resultMap = new HashMap<>();
+            for (int index = 0; index < transferRequests.size(); index ++) {
+                Future<Pair<Integer, String>> futureResult = 
completionService.take();
+                Pair<Integer, String> result = futureResult.get();
+                resultMap.put(result.getLeft(), result.getRight());
+            }
+
+            for (int index = 0; index < transferRequests.size(); index ++) {
+                responseBuilder.addTransferIds(resultMap.get(index));
             }
 
             responseObserver.onNext(responseBuilder.build());
             responseObserver.onCompleted();
+
         } catch (Exception e) {
             logger.error("Error in submitting batch transfer request", e);
             responseObserver.onError(Status.INTERNAL
                     .withDescription("Failed to submit batch transfer request. 
" + e.getMessage())
                     .asException());
+        } finally {
+            executorService.shutdown();
         }
     }
 

Reply via email to