This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/develop by this push:
new 50abbdd Transfer event callback integration
50abbdd is described below
commit 50abbdda076a7cb66497db1a5bda2a243ea30fad
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Thu Jul 8 05:50:18 2021 -0400
Transfer event callback integration
---
.../java/org/apache/airavata/mft/agent/MFTAgent.java | 20 ++++++++++++++++++++
api/stub/src/main/proto/MFTApi.proto | 10 ++++++++++
2 files changed, 30 insertions(+)
diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 85355e7..5ccf25a 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -33,6 +33,7 @@ import
org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
import org.apache.airavata.mft.agent.http.HttpServer;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.agent.rpc.RPCParser;
+import org.apache.airavata.mft.api.service.CallbackEndpoint;
import org.apache.airavata.mft.api.service.TransferApiRequest;
import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
@@ -47,8 +48,12 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -191,10 +196,12 @@ public class MFTAgent implements CommandLineRunner {
.setDescription("Started the transfer"));
+ TransferApiRequest finalRequest = request;
mediator.transfer(transferId, request, inConnector,
outConnector, srcMetadataCollector, dstMetadataCollector,
(id, st) -> {
try {
mftConsulClient.submitTransferStateToProcess(id, agentId,
st.setPublisher(agentId));
+
handleCallbacks(finalRequest.getCallbackEndpointsList(), id, st);
} catch (MFTConsulClientException e) {
logger.error("Failed while updating
transfer state", e);
}
@@ -243,6 +250,19 @@ public class MFTAgent implements CommandLineRunner {
transferMessageCache.start();
}
+ private void handleCallbacks(List<CallbackEndpoint> callbackEndpoints,
String transferId, TransferState transferState) {
+ if (callbackEndpoints != null && !callbackEndpoints.isEmpty()) {
+ for (CallbackEndpoint cbe : callbackEndpoints) {
+ switch (cbe.getType()) {
+ case HTTP:
+ break;
+ case KAFKA:
+ break;
+ }
+ }
+ }
+ }
+
private void acceptHTTPRequests() {
logger.info("Starting the HTTP front end");
diff --git a/api/stub/src/main/proto/MFTApi.proto
b/api/stub/src/main/proto/MFTApi.proto
index 66db86f..0e2f145 100644
--- a/api/stub/src/main/proto/MFTApi.proto
+++ b/api/stub/src/main/proto/MFTApi.proto
@@ -7,6 +7,15 @@ import "google/api/annotations.proto";
import "google/protobuf/empty.proto";
import "CredCommon.proto";
+message CallbackEndpoint {
+ enum CallbackType {
+ HTTP = 0;
+ KAFKA = 1;
+ }
+ CallbackType type = 1;
+ string endpoint = 2;
+}
+
message TransferApiRequest {
string sourceResourceId = 1;
string sourceChildResourcePath = 2;
@@ -19,6 +28,7 @@ message TransferApiRequest {
bool affinityTransfer = 9;
map<string, int32> targetAgents = 10;
org.apache.airavata.mft.common.AuthToken mftAuthorizationToken = 11;
+ repeated CallbackEndpoint callbackEndpoints = 12;
}
message TransferApiResponse {