This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 380a674 Fixing local transport to support resource service
380a674 is described below
commit 380a674d904ceab21a32c353b74c3742472801ec
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Wed Apr 15 18:46:03 2020 -0400
Fixing local transport to support resource service
---
.../transport/local/LocalMetadataCollector.java | 51 ++++++++++++++++++----
.../mft/transport/local/LocalReceiver.java | 48 +++++++++++++-------
.../transport/local/LocalResourceIdentifier.java | 31 -------------
.../airavata/mft/transport/local/LocalSender.java | 47 +++++++++++++++-----
.../mft/transport/local/LocalTransportUtil.java | 35 ---------------
5 files changed, 112 insertions(+), 100 deletions(-)
diff --git
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
index bc04f3c..52af3b4 100644
---
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
+++
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalMetadataCollector.java
@@ -19,8 +19,17 @@ package org.apache.airavata.mft.transport.local;
import org.apache.airavata.mft.core.ResourceMetadata;
import org.apache.airavata.mft.core.api.MetadataCollector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.LocalResource;
+import org.apache.airavata.mft.resource.service.LocalResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
import java.io.File;
+import java.io.FileInputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.security.MessageDigest;
public class LocalMetadataCollector implements MetadataCollector {
@@ -41,24 +50,50 @@ public class LocalMetadataCollector implements
MetadataCollector {
private void checkInitialized() {
if (!initialized) {
- throw new IllegalStateException("SCP Metadata Collector is not
initialized");
+ throw new IllegalStateException("Local Metadata Collector is not
initialized");
}
}
@Override
public ResourceMetadata getGetResourceMetadata(String resourceId, String
credentialToken) throws Exception {
- LocalResourceIdentifier resource =
LocalTransportUtil.getLocalResourceIdentifier(resourceId);
- File file = new File(resource.getPath());
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient =
ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ LocalResource localResource =
resourceClient.getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ File resourceFile = new File(localResource.getResourcePath());
+ if (resourceFile.exists()) {
- ResourceMetadata metadata = new ResourceMetadata();
- metadata.setResourceSize(file.length());
- metadata.setUpdateTime(file.lastModified());
- return metadata;
+ BasicFileAttributes basicFileAttributes =
Files.readAttributes(Path.of(localResource.getResourcePath()),
BasicFileAttributes.class);
+ ResourceMetadata metadata = new ResourceMetadata();
+
metadata.setCreatedTime(basicFileAttributes.creationTime().toMillis());
+
metadata.setUpdateTime(basicFileAttributes.lastModifiedTime().toMillis());
+ metadata.setResourceSize(basicFileAttributes.size());
+
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ FileInputStream fis = new
FileInputStream(localResource.getResourcePath());
+ byte[] byteArray = new byte[1024];
+ int bytesCount = 0;
+ while ((bytesCount = fis.read(byteArray)) != -1) {
+ digest.update(byteArray, 0, bytesCount);
+ };
+ fis.close();
+ byte[] bytes = digest.digest();
+ StringBuilder sb = new StringBuilder();
+ for (byte aByte : bytes) {
+ sb.append(Integer.toString((aByte & 0xff) + 0x100,
16).substring(1));
+ }
+ metadata.setMd5sum(sb.toString());
+
+ return metadata;
+ } else {
+ throw new Exception("Resource with id " + resourceId + " in path "
+ localResource.getResourcePath() + " does not exist");
+ }
}
@Override
public Boolean isAvailable(String resourceId, String credentialToken)
throws Exception {
- return false;
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient =
ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ LocalResource localResource =
resourceClient.getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
+ File resourceFile = new File(localResource.getResourcePath());
+ return resourceFile.exists();
}
}
diff --git
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
index e21b612..0ee0abd 100644
---
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
+++
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalReceiver.java
@@ -19,17 +19,29 @@ package org.apache.airavata.mft.transport.local;
import org.apache.airavata.mft.core.ConnectorContext;
import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.LocalResource;
+import org.apache.airavata.mft.resource.service.LocalResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.FileInputStream;
-import java.io.OutputStream;
+import java.io.*;
public class LocalReceiver implements Connector {
- private LocalResourceIdentifier resource;
+ private static final Logger logger =
LoggerFactory.getLogger(LocalReceiver.class);
+
+ private LocalResource resource;
+ private boolean initialized;
+
@Override
public void init(String resourceId, String credentialToken, String
resourceServiceHost, int resourceServicePort,
String secretServiceHost, int secretServicePort) throws
Exception {
- this.resource =
LocalTransportUtil.getLocalResourceIdentifier(resourceId);
+ this.initialized = true;
+
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient =
ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ this.resource =
resourceClient.getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
}
@Override
@@ -37,14 +49,21 @@ public class LocalReceiver implements Connector {
}
+ private void checkInitialized() {
+ if (!initialized) {
+ throw new IllegalStateException("Local Receiver is not
initialized");
+ }
+ }
+
@Override
public void startStream(ConnectorContext context) throws Exception {
- System.out.println("Starting local receive");
- FileInputStream fin = new FileInputStream(this.resource.getPath());
+ logger.info("Starting local receiver stream for transfer {}",
context.getTransferId());
- long fileSize = context.getMetadata().getResourceSize();
+ checkInitialized();
+ OutputStream streamOs = context.getStreamBuffer().getOutputStream();
+ FileInputStream fis = new FileInputStream(new
File(resource.getResourcePath()));
- OutputStream outputStream =
context.getStreamBuffer().getOutputStream();
+ long fileSize = context.getMetadata().getResourceSize();
byte[] buf = new byte[1024];
while (true) {
@@ -55,23 +74,22 @@ public class LocalReceiver implements Connector {
} else {
bufSize = (int) fileSize;
}
- bufSize = fin.read(buf, 0, bufSize);
+ bufSize = fis.read(buf, 0, bufSize);
if (bufSize < 0) {
break;
}
- outputStream.write(buf, 0, bufSize);
- outputStream.flush();
+ streamOs.write(buf, 0, bufSize);
+ streamOs.flush();
fileSize -= bufSize;
if (fileSize == 0L)
break;
}
- fin.close();
- outputStream.close();
- //context.getStreamBuffer().close();
- System.out.println("Completed local receive");
+ fis.close();
+ streamOs.close();
+ logger.info("Completed local receiver stream for transfer {}",
context.getTransferId());
}
}
diff --git
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalResourceIdentifier.java
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalResourceIdentifier.java
deleted file mode 100644
index 2c1f125..0000000
---
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalResourceIdentifier.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.local;
-
-public class LocalResourceIdentifier {
-
- private String path;
-
- public String getPath() {
- return path;
- }
-
- public void setPath(String path) {
- this.path = path;
- }
-}
diff --git
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
index 72cf02e..fad42cb 100644
---
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
+++
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalSender.java
@@ -19,17 +19,29 @@ package org.apache.airavata.mft.transport.local;
import org.apache.airavata.mft.core.ConnectorContext;
import org.apache.airavata.mft.core.api.Connector;
+import org.apache.airavata.mft.resource.client.ResourceServiceClient;
+import org.apache.airavata.mft.resource.service.LocalResource;
+import org.apache.airavata.mft.resource.service.LocalResourceGetRequest;
+import org.apache.airavata.mft.resource.service.ResourceServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.FileOutputStream;
-import java.io.InputStream;
+import java.io.*;
public class LocalSender implements Connector {
- private LocalResourceIdentifier resource;
+ private static final Logger logger =
LoggerFactory.getLogger(LocalSender.class);
+
+ private LocalResource resource;
+ private boolean initialized;
@Override
public void init(String resourceId, String credentialToken, String
resourceServiceHost, int resourceServicePort,
String secretServiceHost, int secretServicePort) throws
Exception {
- this.resource =
LocalTransportUtil.getLocalResourceIdentifier(resourceId);
+
+ this.initialized = true;
+
+ ResourceServiceGrpc.ResourceServiceBlockingStub resourceClient =
ResourceServiceClient.buildClient(resourceServiceHost, resourceServicePort);
+ this.resource =
resourceClient.getLocalResource(LocalResourceGetRequest.newBuilder().setResourceId(resourceId).build());
}
@Override
@@ -37,15 +49,23 @@ public class LocalSender implements Connector {
}
+ private void checkInitialized() {
+ if (!initialized) {
+ throw new IllegalStateException("Local Sender is not initialized");
+ }
+ }
+
@Override
public void startStream(ConnectorContext context) throws Exception {
- System.out.println("Starting local send");
- FileOutputStream fos = new FileOutputStream(this.resource.getPath());
- long fileSize = context.getMetadata().getResourceSize();
- InputStream inputStream = context.getStreamBuffer().getInputStream();
+ logger.info("Starting local sender stream for transfer {}",
context.getTransferId());
+
+ checkInitialized();
+ InputStream in = context.getStreamBuffer().getInputStream();
+ long fileSize = context.getMetadata().getResourceSize();
+ OutputStream fos = new FileOutputStream(resource.getResourcePath());
- byte[] buf = new byte[1];
+ byte[] buf = new byte[1024];
while (true) {
int bufSize = 0;
@@ -54,7 +74,7 @@ public class LocalSender implements Connector {
} else {
bufSize = (int) fileSize;
}
- bufSize = inputStream.read(buf, 0, bufSize);
+ bufSize = in.read(buf, 0, bufSize);
if (bufSize < 0) {
break;
@@ -67,6 +87,11 @@ public class LocalSender implements Connector {
if (fileSize == 0L)
break;
}
- System.out.println("Completed local send");
+
+ in.close();
+ fos.close();
+
+ logger.info("Completed local sender stream for transfer {}",
context.getTransferId());
+
}
}
diff --git
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalTransportUtil.java
b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalTransportUtil.java
deleted file mode 100644
index a06c149..0000000
---
a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalTransportUtil.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.airavata.mft.transport.local;
-
-public class LocalTransportUtil {
-
- public static LocalResourceIdentifier getLocalResourceIdentifier(String
id) {
- LocalResourceIdentifier identifier = new LocalResourceIdentifier();
-
- switch (id) {
- case "1":
- identifier.setPath("/Users/dimuthu/data.csv");
- return identifier;
- case "2":
- identifier.setPath("/Users/dimuthu/new.txt");
- return identifier;
- }
- return null;
- }
-}