This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/develop by this push:
     new 50abbdd  Transfer event callback integration
50abbdd is described below

commit 50abbdda076a7cb66497db1a5bda2a243ea30fad
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Thu Jul 8 05:50:18 2021 -0400

    Transfer event callback integration
---
 .../java/org/apache/airavata/mft/agent/MFTAgent.java | 20 ++++++++++++++++++++
 api/stub/src/main/proto/MFTApi.proto                 | 10 ++++++++++
 2 files changed, 30 insertions(+)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java 
b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 85355e7..5ccf25a 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -33,6 +33,7 @@ import 
org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
 import org.apache.airavata.mft.agent.http.HttpServer;
 import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
 import org.apache.airavata.mft.agent.rpc.RPCParser;
+import org.apache.airavata.mft.api.service.CallbackEndpoint;
 import org.apache.airavata.mft.api.service.TransferApiRequest;
 import org.apache.airavata.mft.core.ConnectorResolver;
 import org.apache.airavata.mft.core.MetadataCollectorResolver;
@@ -47,8 +48,12 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
 
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -191,10 +196,12 @@ public class MFTAgent implements CommandLineRunner {
                             .setDescription("Started the transfer"));
 
 
+                        TransferApiRequest finalRequest = request;
                         mediator.transfer(transferId, request, inConnector, 
outConnector, srcMetadataCollector, dstMetadataCollector,
                                 (id, st) -> {
                                     try {
                                         
mftConsulClient.submitTransferStateToProcess(id, agentId, 
st.setPublisher(agentId));
+                                        
handleCallbacks(finalRequest.getCallbackEndpointsList(), id, st);
                                     } catch (MFTConsulClientException e) {
                                         logger.error("Failed while updating 
transfer state", e);
                                     }
@@ -243,6 +250,19 @@ public class MFTAgent implements CommandLineRunner {
         transferMessageCache.start();
     }
 
+    private void handleCallbacks(List<CallbackEndpoint> callbackEndpoints, 
String transferId, TransferState transferState) {
+        if (callbackEndpoints != null && !callbackEndpoints.isEmpty()) {
+            for (CallbackEndpoint cbe : callbackEndpoints) {
+                switch (cbe.getType()) {
+                    case HTTP:
+                        break;
+                    case KAFKA:
+                        break;
+                }
+            }
+        }
+    }
+
     private void acceptHTTPRequests() {
         logger.info("Starting the HTTP front end");
 
diff --git a/api/stub/src/main/proto/MFTApi.proto 
b/api/stub/src/main/proto/MFTApi.proto
index 66db86f..0e2f145 100644
--- a/api/stub/src/main/proto/MFTApi.proto
+++ b/api/stub/src/main/proto/MFTApi.proto
@@ -7,6 +7,15 @@ import "google/api/annotations.proto";
 import "google/protobuf/empty.proto";
 import "CredCommon.proto";
 
+message CallbackEndpoint {
+    enum CallbackType {
+        HTTP = 0;
+        KAFKA = 1;
+    }
+    CallbackType type = 1;
+    string endpoint = 2;
+}
+
 message TransferApiRequest {
     string sourceResourceId = 1;
     string sourceChildResourcePath = 2;
@@ -19,6 +28,7 @@ message TransferApiRequest {
     bool affinityTransfer = 9;
     map<string, int32> targetAgents = 10;
     org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 11;
+    repeated CallbackEndpoint callbackEndpoints = 12;
 }
 
 message TransferApiResponse {

Reply via email to