This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
commit 703a0be267426bcc5fed07226615a046c0c4b0d7 Author: Dimuthu Wannipurage <[email protected]> AuthorDate: Tue Sep 27 21:23:02 2022 -0400 Adding failure handling for connectors --- .../airavata/mft/agent/TransportMediator.java | 117 ++++++++++++--------- .../airavata/mft/core/api/BasicConnector.java | 1 + .../mft/core/api/IncomingStreamingConnector.java | 4 +- .../transport/odata/ODataIncomingConnector.java | 5 + .../mft/transport/s3/S3IncomingConnector.java | 5 + .../mft/transport/s3/S3OutgoingConnector.java | 5 + .../transport/s3/S3OutgoingStreamingConnector.java | 5 + .../mft/transport/scp/SCPIncomingConnector.java | 5 + .../mft/transport/scp/SCPOutgoingConnector.java | 5 + .../transport/swift/SwiftIncomingConnector.java | 5 + .../transport/swift/SwiftOutgoingConnector.java | 5 + 11 files changed, 107 insertions(+), 55 deletions(-) diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java index 0ff87d4..e7480c5 100644 --- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java +++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java @@ -117,31 +117,37 @@ public class TransportMediator { inConnector.init(srcCC); outConnector.init(dstCC); - while(uploadLength < fileLength) { + try { + while (uploadLength < fileLength) { - long endPos = uploadLength + chunkSize; - if (endPos > fileLength) { - endPos = fileLength; - } + long endPos = uploadLength + chunkSize; + if (endPos > fileLength) { + endPos = fileLength; + } - completionService.submit(new ChunkMover(inConnector, - outConnector, uploadLength, endPos, chunkIdx, - transferId, doChunkStreaming)); + completionService.submit(new ChunkMover(inConnector, + outConnector, uploadLength, endPos, chunkIdx, + transferId, doChunkStreaming)); - uploadLength = endPos; - chunkIdx++; - } + uploadLength = endPos; + chunkIdx++; + } - for (int i = 0; i < chunkIdx; i++) { - Future<Integer> future = completionService.take(); - } + for (int i = 0; i < chunkIdx; i++) { + Future<Integer> future = completionService.take(); + } - inConnector.complete(); - outConnector.complete(); - logger.info("Completed chunked transfer for transfer {}", transferId); + inConnector.complete(); + outConnector.complete(); + logger.info("Completed chunked transfer for transfer {}", transferId); + } catch (Exception e) { + inConnector.failed(); + outConnector.failed(); + throw e; + } } else if (inStreamingConnectorOp.isPresent() && outStreamingConnectorOp.isPresent()) { logger.info("Starting streaming transfer for transfer {}", transferId); @@ -154,48 +160,54 @@ public class TransportMediator { inConnector.init(srcCC); outConnector.init(dstCC); - String srcChild = request.getSourceChildResourcePath(); - String dstChild = request.getDestinationChildResourcePath(); + try { + String srcChild = request.getSourceChildResourcePath(); + String dstChild = request.getDestinationChildResourcePath(); - InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild); - OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild); + InputStream inputStream = srcChild.equals("") ? inConnector.fetchInputStream() : inConnector.fetchInputStream(srcChild); + OutputStream outputStream = dstChild.equals("") ? outConnector.fetchOutputStream() : outConnector.fetchOutputStream(dstChild); - long count = 0; - final AtomicLong countAtomic = new AtomicLong(); - countAtomic.set(count); + long count = 0; + final AtomicLong countAtomic = new AtomicLong(); + countAtomic.set(count); - monitorPool.submit(() -> { - while (true) { - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - // Ignore - } - if (!transferInProgress.get()) { - logger.info("Status monitor is exiting for transfer {}", transferId); - break; + monitorPool.submit(() -> { + while (true) { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // Ignore + } + if (!transferInProgress.get()) { + logger.info("Status monitor is exiting for transfer {}", transferId); + break; + } + double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize(); + logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage); + onStatusCallback.accept(transferId, new TransferState() + .setPercentage(transferPercentage) + .setState("RUNNING") + .setUpdateTimeMils(System.currentTimeMillis()) + .setDescription("Transfer Progress Updated")); } - double transferPercentage = countAtomic.get() * 100.0 / srcCC.getMetadata().getResourceSize(); - logger.info("Transfer percentage for transfer {} {}", transferId, transferPercentage); - onStatusCallback.accept(transferId, new TransferState() - .setPercentage(transferPercentage) - .setState("RUNNING") - .setUpdateTimeMils(System.currentTimeMillis()) - .setDescription("Transfer Progress Updated")); - } - }); + }); - int n; - byte[] buffer = new byte[128 * 1024]; - for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) { - outputStream.write(buffer, 0, n); - countAtomic.set(count); - } + int n; + byte[] buffer = new byte[128 * 1024]; + for (count = 0L; -1 != (n = inputStream.read(buffer)); count += (long) n) { + outputStream.write(buffer, 0, n); + countAtomic.set(count); + } - inConnector.complete(); - outConnector.complete(); + inConnector.complete(); + outConnector.complete(); - logger.info("Completed streaming transfer for transfer {}", transferId); + logger.info("Completed streaming transfer for transfer {}", transferId); + } catch (Exception e) { + inConnector.failed(); + outConnector.failed(); + throw e; + } } else { throw new Exception("No matching connector found to perform the transfer"); @@ -224,6 +236,7 @@ public class TransportMediator { .setState("FAILED") .setUpdateTimeMils(System.currentTimeMillis()) .setDescription("Transfer failed due to " + ExceptionUtils.getStackTrace(e))); + exitingCallback.accept(transferId, false); } finally { transferInProgress.set(false); } diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java index 1ca72d6..52d85eb 100644 --- a/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java +++ b/core/src/main/java/org/apache/airavata/mft/core/api/BasicConnector.java @@ -3,4 +3,5 @@ package org.apache.airavata.mft.core.api; public interface BasicConnector { public void init(ConnectorConfig connectorConfig) throws Exception; public void complete() throws Exception; + public void failed() throws Exception; } diff --git a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java index 900b148..3bc650a 100644 --- a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java +++ b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingStreamingConnector.java @@ -19,9 +19,7 @@ package org.apache.airavata.mft.core.api; import java.io.InputStream; -public interface IncomingStreamingConnector { - public void init(ConnectorConfig connectorConfig) throws Exception; +public interface IncomingStreamingConnector extends BasicConnector { public InputStream fetchInputStream() throws Exception; public InputStream fetchInputStream(String childPath) throws Exception; - public void complete() throws Exception; } diff --git a/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java b/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java index 3626885..b8415be 100644 --- a/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java +++ b/transport/odata-transport/src/main/java/org/apache/airavata/mft/transport/odata/ODataIncomingConnector.java @@ -113,4 +113,9 @@ public class ODataIncomingConnector implements IncomingStreamingConnector { client.close(); } } + + @Override + public void failed() throws Exception { + + } } diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java index d09ffa2..015470d 100644 --- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java +++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java @@ -116,4 +116,9 @@ public class S3IncomingConnector implements IncomingChunkedConnector, IncomingSt public void complete() throws Exception { } + + @Override + public void failed() throws Exception { + + } } diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java index 617e50c..01a2298 100644 --- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java +++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java @@ -130,4 +130,9 @@ public class S3OutgoingConnector implements OutgoingChunkedConnector { logger.info("Completing the upload for file {} in bucket {}", resource.getFile().getResourcePath(), resource.getS3Storage().getBucketName()); } + + @Override + public void failed() throws Exception { + + } } diff --git a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java index a039855..fa20539 100644 --- a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java +++ b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingStreamingConnector.java @@ -108,6 +108,11 @@ public class S3OutgoingStreamingConnector implements OutgoingStreamingConnector } } + @Override + public void failed() throws Exception { + + } + @Override public OutputStream fetchOutputStream() throws Exception { this.s3OutputStream = S3OutputStream.builder() diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java index d6bacdb..1ec6cd1 100644 --- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java +++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPIncomingConnector.java @@ -204,6 +204,11 @@ public final class SCPIncomingConnector implements IncomingStreamingConnector { session.disconnect(); } + @Override + public void failed() throws Exception { + + } + private int checkAck(InputStream in) throws IOException { int b = in.read(); // b may be 0 for success, diff --git a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java index 28ea5e0..48044aa 100644 --- a/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java +++ b/transport/scp-transport/src/main/java/org/apache/airavata/mft/transport/scp/SCPOutgoingConnector.java @@ -198,6 +198,11 @@ public final class SCPOutgoingConnector implements OutgoingStreamingConnector { session.disconnect(); } + @Override + public void failed() throws Exception { + + } + public int checkAck(InputStream in) throws IOException { int b = in.read(); // b may be 0 for success, diff --git a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java index 47efd38..7c195b7 100644 --- a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java +++ b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftIncomingConnector.java @@ -118,6 +118,11 @@ public class SwiftIncomingConnector implements IncomingChunkedConnector { } } + @Override + public void failed() throws Exception { + + } + @Override public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception { SwiftObject swiftObject = objectApi.get( diff --git a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java index 3734e8b..38fd79a 100644 --- a/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java +++ b/transport/swift-transport/src/main/java/org/apache/airavata/mft/transport/swift/SwiftOutgoingConnector.java @@ -132,6 +132,11 @@ public class SwiftOutgoingConnector implements OutgoingChunkedConnector { } } + @Override + public void failed() throws Exception { + + } + @Override public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception { InputStream fis = new FileInputStream(uploadFile);
