This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new 380a674  Fixing local transport to support resource service
380a674 is described below

commit 380a674d904ceab21a32c353b74c3742472801ec
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Wed Apr 15 18:46:03 2020 -0400

    Fixing local transport to support resource service
---
 .../transport/local/LocalMetadataCollector.java    | 51 ++++++++++++++++++----
 .../mft/transport/local/LocalReceiver.java         | 48 +++++++++++++-------
 .../transport/local/LocalResourceIdentifier.java   | 31 -------------
 .../airavata/mft/transport/local/LocalSender.java  | 47 +++++++++++++++-----
 .../mft/transport/local/LocalTransportUtil.java    | 35 ---------------
 5 files changed, 112 insertions(+), 100 deletions(-)

diff --git 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
index bc04f3c..52af3b4 100644
--- 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
+++ 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
@@ -19,8 +19,17 @@ package org.apache.airavata.mft.transport.local;
 
 import org.apache.airavata.mft.core.ResourceMetadata;
 import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.LocalResource;
+import org.apache.airavata.mft.resource.service.LocalResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.security.MessageDigest;
 
 public class LocalMetadataCollector implements MetadataCollector {
 
@@ -41,24 +50,50 @@ public class LocalMetadataCollector implements 
MetadataCollector {
 
     private void checkInitialized() {
         if (!initialized) {
-            throw new IllegalStateException("SCP Metadata Collector is not 
initialized");
+            throw new IllegalStateException("Local Metadata Collector is not 
initialized");
         }
     }
 
     @Override
     public ResourceMetadata getGetResourceMetadata(String resourceId, String 
credentialToken) throws Exception {
 
-        LocalResourceIdentifier resource = 
LocalTransportUtil.getLocalResourceIdentifier(resourceId);
-        File file = new File(resource.getPath());
+        ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = 
ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+        LocalResource localResource = 
resourceClient.getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        File resourceFile = new File(localResource.getResourcePath());
+        if (resourceFile.exists()) {
 
-        ResourceMetadata metadata = new ResourceMetadata();
-        metadata.setResourceSize(file.length());
-        metadata.setUpdateTime(file.lastModified());
-        return metadata;
+            BasicFileAttributes basicFileAttributes = 
Files.readAttributes(Path.of(localResource.getResourcePath()), 
BasicFileAttributes.class);
+            ResourceMetadata metadata = new ResourceMetadata();
+            
metadata.setCreatedTime(basicFileAttributes.creationTime().toMillis());
+            
metadata.setUpdateTime(basicFileAttributes.lastModifiedTime().toMillis());
+            metadata.setResourceSize(basicFileAttributes.size());
+
+            MessageDigest digest = MessageDigest.getInstance("MD5");
+            FileInputStream fis = new 
FileInputStream(localResource.getResourcePath());
+            byte[] byteArray = new byte[1024];
+            int bytesCount = 0;
+            while ((bytesCount = fis.read(byteArray)) != -1) {
+                digest.update(byteArray, 0, bytesCount);
+            };
+            fis.close();
+            byte[] bytes = digest.digest();
+            StringBuilder sb = new StringBuilder();
+            for (byte aByte : bytes) {
+                sb.append(Integer.toString((aByte & 0xff) + 0x100, 
16).substring(1));
+            }
+            metadata.setMd5sum(sb.toString());
+
+            return metadata;
+        } else {
+            throw new Exception("Resource with id " + resourceId + " in path " 
+ localResource.getResourcePath() + " does not exist");
+        }
     }
 
     @Override
     public Boolean isAvailable(String resourceId, String credentialToken) 
throws Exception {
-        return false;
+        ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = 
ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+        LocalResource localResource = 
resourceClient.getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+        File resourceFile = new File(localResource.getResourcePath());
+        return resourceFile.exists();
     }
 }
diff --git 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
index e21b612..0ee0abd 100644
--- 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
+++ 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
@@ -19,17 +19,29 @@ package org.apache.airavata.mft.transport.local;
 
 import org.apache.airavata.mft.core.ConnectorContext;
 import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.LocalResource;
+import org.apache.airavata.mft.resource.service.LocalResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.FileInputStream;
-import java.io.OutputStream;
+import java.io.*;
 
 public class LocalReceiver implements Connector {
 
-    private LocalResourceIdentifier resource;
+    private static final Logger logger = 
LoggerFactory.getLogger(LocalReceiver.class);
+
+    private LocalResource resource;
+    private boolean initialized;
+
     @Override
     public void init(String resourceId, String credentialToken, String 
resourceServiceHost, int resourceServicePort,
                      String secretServiceHost, int secretServicePort) throws 
Exception {
-        this.resource = 
LocalTransportUtil.getLocalResourceIdentifier(resourceId);
+        this.initialized = true;
+
+        ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = 
ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+        this.resource = 
resourceClient.getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
     }
 
     @Override
@@ -37,14 +49,21 @@ public class LocalReceiver implements Connector {
 
     }
 
+    private void checkInitialized() {
+        if (!initialized) {
+            throw new IllegalStateException("Local Receiver is not 
initialized");
+        }
+    }
+
     @Override
     public void startStream(ConnectorContext context) throws Exception {
-        System.out.println("Starting local receive");
-        FileInputStream fin = new FileInputStream(this.resource.getPath());
+        logger.info("Starting local receiver stream for transfer {}", 
context.getTransferId());
 
-        long fileSize = context.getMetadata().getResourceSize();
+        checkInitialized();
+        OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+        FileInputStream fis = new FileInputStream(new 
File(resource.getResourcePath()));
 
-        OutputStream outputStream = 
context.getStreamBuffer().getOutputStream();
+        long fileSize = context.getMetadata().getResourceSize();
 
         byte[] buf = new byte[1024];
         while (true) {
@@ -55,23 +74,22 @@ public class LocalReceiver implements Connector {
             } else {
                 bufSize = (int) fileSize;
             }
-            bufSize = fin.read(buf, 0, bufSize);
+            bufSize = fis.read(buf, 0, bufSize);
 
             if (bufSize < 0) {
                 break;
             }
 
-            outputStream.write(buf, 0, bufSize);
-            outputStream.flush();
+            streamOs.write(buf, 0, bufSize);
+            streamOs.flush();
 
             fileSize -= bufSize;
             if (fileSize == 0L)
                 break;
         }
 
-        fin.close();
-        outputStream.close();
-        //context.getStreamBuffer().close();
-        System.out.println("Completed local receive");
+        fis.close();
+        streamOs.close();
+        logger.info("Completed local receiver stream for transfer {}", 
context.getTransferId());
     }
 }
diff --git 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalResourceIdentifier.java
 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalResourceIdentifier.java
deleted file mode 100644
index 2c1f125..0000000
--- 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalResourceIdentifier.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.local;
-
-public class LocalResourceIdentifier {
-
-    private String path;
-
-    public String getPath() {
-        return path;
-    }
-
-    public void setPath(String path) {
-        this.path = path;
-    }
-}
diff --git 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
index 72cf02e..fad42cb 100644
--- 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
+++ 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
@@ -19,17 +19,29 @@ package org.apache.airavata.mft.transport.local;
 
 import org.apache.airavata.mft.core.ConnectorContext;
 import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.LocalResource;
+import org.apache.airavata.mft.resource.service.LocalResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.FileOutputStream;
-import java.io.InputStream;
+import java.io.*;
 
 public class LocalSender implements Connector {
 
-    private LocalResourceIdentifier resource;
+    private static final Logger logger = 
LoggerFactory.getLogger(LocalSender.class);
+
+    private LocalResource resource;
+    private boolean initialized;
     @Override
     public void init(String resourceId, String credentialToken, String 
resourceServiceHost, int resourceServicePort,
                      String secretServiceHost, int secretServicePort) throws 
Exception {
-        this.resource = 
LocalTransportUtil.getLocalResourceIdentifier(resourceId);
+
+        this.initialized = true;
+
+        ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient = 
ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+        this.resource = 
resourceClient.getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
     }
 
     @Override
@@ -37,15 +49,23 @@ public class LocalSender implements Connector {
 
     }
 
+    private void checkInitialized() {
+        if (!initialized) {
+            throw new IllegalStateException("Local Sender is not initialized");
+        }
+    }
+
     @Override
     public void startStream(ConnectorContext context) throws Exception {
-        System.out.println("Starting local send");
-        FileOutputStream fos = new FileOutputStream(this.resource.getPath());
-        long fileSize = context.getMetadata().getResourceSize();
 
-        InputStream inputStream = context.getStreamBuffer().getInputStream();
+        logger.info("Starting local sender stream for transfer {}", 
context.getTransferId());
+
+        checkInitialized();
+        InputStream in = context.getStreamBuffer().getInputStream();
+        long fileSize = context.getMetadata().getResourceSize();
+        OutputStream fos = new FileOutputStream(resource.getResourcePath());
 
-        byte[] buf = new byte[1];
+        byte[] buf = new byte[1024];
         while (true) {
             int bufSize = 0;
 
@@ -54,7 +74,7 @@ public class LocalSender implements Connector {
             } else {
                 bufSize = (int) fileSize;
             }
-            bufSize = inputStream.read(buf, 0, bufSize);
+            bufSize = in.read(buf, 0, bufSize);
 
             if (bufSize < 0) {
                 break;
@@ -67,6 +87,11 @@ public class LocalSender implements Connector {
             if (fileSize == 0L)
                 break;
         }
-        System.out.println("Completed local send");
+
+        in.close();
+        fos.close();
+
+        logger.info("Completed local sender stream for transfer {}", 
context.getTransferId());
+
     }
 }
diff --git 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalTransportUtil.java
 
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalTransportUtil.java
deleted file mode 100644
index a06c149..0000000
--- 
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalTransportUtil.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.local;
-
-public class LocalTransportUtil {
-
-    public static LocalResourceIdentifier getLocalResourceIdentifier(String 
id) {
-        LocalResourceIdentifier identifier = new LocalResourceIdentifier();
-
-        switch (id) {
-            case "1":
-                identifier.setPath("/Users/dimuthu/data.csv");
-                return identifier;
-            case "2":
-                identifier.setPath("/Users/dimuthu/new.txt");
-                return identifier;
-        }
-        return null;
-    }
-}

Reply via email to