Repository: reef Updated Branches: refs/heads/master d45954935 -> 0348e54ec
[REEF-1975] Support stream option for Azure Blob storage * Support stream option for Azure Blob storage * Updated Open test to assert on stream Stream to blobs (#8) * Implement Create method to create a blob and return a write stream to the blob JIRA: [REEF-1975](https://issues.apache.org/jira/browse/REEF-1975) Closes #1426 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/0348e54e Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/0348e54e Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/0348e54e Branch: refs/heads/master Commit: 0348e54ec13bd8f267670167af55b8f6797b24bd Parents: d459549 Author: dwaijam <dwai...@gmail.com> Authored: Thu Jan 18 10:30:16 2018 -0800 Committer: Sergiy Matusevych <mo...@apache.org> Committed: Mon Feb 12 12:57:17 2018 -0800 ---------------------------------------------------------------------- .../TestAzureBlockBlobFileSystem.cs | 20 +++- .../TestAzureBlockBlobFileSystemE2E.cs | 102 ++++++++++++------- .../AzureBlob/AzureBlockBlobFileSystem.cs | 8 +- .../FileSystem/AzureBlob/AzureCloudBlockBlob.cs | 10 ++ .../FileSystem/AzureBlob/ICloudBlockBlob.cs | 14 +++ 5 files changed, 111 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs index a1f9b34..dc48d5e 100644 --- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs +++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs @@ -36,18 +36,24 @@ namespace Org.Apache.REEF.IO.Tests /// </summary> public sealed class TestAzureBlockBlobFileSystem { - private readonly static Uri FakeUri = new Uri("http://fake.com"); + private static readonly Uri FakeUri = new Uri("http://fake.com"); [Fact] - public void TestCreateNotSupported() + public void TestCreate() { - Assert.Throws<NotSupportedException>(() => new TestContext().GetAzureFileSystem().Create(FakeUri)); + var testContext = new TestContext(); + Stream stream = testContext.GetAzureFileSystem().Create(new Uri(FakeUri, "container/file")); + testContext.TestCloudBlockBlob.Received(1).Create(); + Assert.Equal(testContext.TestCreateStream, stream); } [Fact] - public void TestOpenNotSupported() + public void TestOpen() { - Assert.Throws<NotSupportedException>(() => new TestContext().GetAzureFileSystem().Open(FakeUri)); + var testContext = new TestContext(); + Stream stream = testContext.GetAzureFileSystem().Open(new Uri(FakeUri, "container/file")); + testContext.TestCloudBlockBlob.Received(1).Open(); + Assert.Equal(testContext.TestOpenStream, stream); } [Fact] @@ -133,6 +139,8 @@ namespace Org.Apache.REEF.IO.Tests public readonly ICloudBlockBlob TestCloudBlockBlob = Substitute.For<ICloudBlockBlob>(); public readonly ICloudBlobContainer TestCloudBlobContainer = Substitute.For<ICloudBlobContainer>(); public readonly ICloudBlobDirectory TestCloudBlobDirectory = Substitute.For<ICloudBlobDirectory>(); + public readonly Stream TestOpenStream = Substitute.For<Stream>(); + public readonly Stream TestCreateStream = Substitute.For<Stream>(); public IFileSystem GetAzureFileSystem() { @@ -144,6 +152,8 @@ namespace Org.Apache.REEF.IO.Tests injector.BindVolatileInstance(TestCloudBlobClient); var fs = injector.GetInstance<AzureBlockBlobFileSystem>(); TestCloudBlobClient.BaseUri.ReturnsForAnyArgs(FakeUri); + TestCloudBlockBlob.Open().Returns(TestOpenStream); + TestCloudBlockBlob.Create().Returns(TestCreateStream); TestCloudBlobClient.GetBlockBlobReference(FakeUri).ReturnsForAnyArgs(TestCloudBlockBlob); TestCloudBlobClient.GetContainerReference("container").ReturnsForAnyArgs(TestCloudBlobContainer); TestCloudBlobContainer.GetDirectoryReference("directory").ReturnsForAnyArgs(TestCloudBlobDirectory); http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs index 8e8a708..7b749e4 100644 --- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs +++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs @@ -34,6 +34,7 @@ namespace Org.Apache.REEF.IO.Tests /// </summary> public sealed class TestAzureBlockBlobFileSystemE2E : IDisposable { + private const string SkipMessage = "Fill in credentials before running test"; // Use null to run tests private const string HelloFile = "hello"; private IFileSystem _fileSystem; private CloudBlobContainer _container; @@ -41,14 +42,14 @@ namespace Org.Apache.REEF.IO.Tests public TestAzureBlockBlobFileSystemE2E() { // Fill in before running test! - const string connectionString = "DefaultEndpointsProtocol=http;AccountName=myAccount;AccountKey=myKey;"; + const string ConnectionString = "DefaultEndpointsProtocol=http;AccountName=myAccount;AccountKey=myKey;"; var defaultContainerName = "reef-test-container-" + Guid.NewGuid(); var conf = AzureBlockBlobFileSystemConfiguration.ConfigurationModule - .Set(AzureBlockBlobFileSystemConfiguration.ConnectionString, connectionString) + .Set(AzureBlockBlobFileSystemConfiguration.ConnectionString, ConnectionString) .Build(); _fileSystem = TangFactory.GetTang().NewInjector(conf).GetInstance<AzureBlockBlobFileSystem>(); - _container = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient().GetContainerReference(defaultContainerName); + _container = CloudStorageAccount.Parse(ConnectionString).CreateCloudBlobClient().GetContainerReference(defaultContainerName); _container.CreateIfNotExistsAsync().Wait(); } @@ -88,7 +89,40 @@ namespace Org.Apache.REEF.IO.Tests return task.Result; } - [Fact(Skip = "Fill in credentials before running test")] + [Fact(Skip = SkipMessage)] + public void TestOpenE2E() + { + const string Text = "hello"; + var blob = _container.GetBlockBlobReference(HelloFile); + UploadFromString(blob, Text); + Assert.True(CheckBlobExists(blob)); + using (var reader = new StreamReader(_fileSystem.Open(PathToFile(HelloFile)))) + { + string streamText = reader.ReadToEnd(); + Assert.Equal(Text, streamText); + } + } + + [Fact(Skip = SkipMessage)] + public void TestCreateE2E() + { + const string Text = "Hello Azure Blob"; + var blob = _container.GetBlockBlobReference(HelloFile); + Assert.False(CheckBlobExists(blob)); + using (var streamWriter = new StreamWriter(_fileSystem.Create(PathToFile(HelloFile)))) + { + streamWriter.Write(Text); + } + blob = _container.GetBlockBlobReference(HelloFile); + Assert.True(CheckBlobExists(blob)); + using (var reader = new StreamReader(blob.OpenRead())) + { + string streamText = reader.ReadToEnd(); + Assert.Equal(Text, streamText); + } + } + + [Fact(Skip = SkipMessage)] public void TestDeleteE2E() { var blob = _container.GetBlockBlobReference(HelloFile); @@ -98,7 +132,7 @@ namespace Org.Apache.REEF.IO.Tests Assert.False(CheckBlobExists(blob)); } - [Fact(Skip = "Fill in credentials before running test")] + [Fact(Skip = SkipMessage)] public void TestExistsE2E() { var helloFilePath = PathToFile(HelloFile); @@ -109,39 +143,39 @@ namespace Org.Apache.REEF.IO.Tests Assert.False(_fileSystem.Exists(helloFilePath)); } - [Fact(Skip = "Fill in credentials before running test")] + [Fact(Skip = SkipMessage)] public void TestCopyE2E() { - const string srcFileName = "src"; - const string destFileName = "dest"; - var srcFilePath = PathToFile(srcFileName); - var destFilePath = PathToFile(destFileName); - ICloudBlob srcBlob = _container.GetBlockBlobReference(srcFileName); + const string SrcFileName = "src"; + const string DestFileName = "dest"; + var srcFilePath = PathToFile(SrcFileName); + var destFilePath = PathToFile(DestFileName); + ICloudBlob srcBlob = _container.GetBlockBlobReference(SrcFileName); UploadFromString(srcBlob, "hello"); Assert.True(CheckBlobExists(srcBlob)); - ICloudBlob destBlob = _container.GetBlockBlobReference(destFileName); + ICloudBlob destBlob = _container.GetBlockBlobReference(DestFileName); Assert.False(CheckBlobExists(destBlob)); _fileSystem.Copy(srcFilePath, destFilePath); - destBlob = GetBlobReferenceFromServer(_container, destFileName); + destBlob = GetBlobReferenceFromServer(_container, DestFileName); Assert.True(CheckBlobExists(destBlob)); - srcBlob = GetBlobReferenceFromServer(_container, srcFileName); + srcBlob = GetBlobReferenceFromServer(_container, SrcFileName); Assert.True(CheckBlobExists(srcBlob)); - Assert.Equal(DownloadText(_container.GetBlockBlobReference(srcFileName)), DownloadText(_container.GetBlockBlobReference(destFileName))); + Assert.Equal(DownloadText(_container.GetBlockBlobReference(SrcFileName)), DownloadText(_container.GetBlockBlobReference(DestFileName))); } - [Fact(Skip = "Fill in credentials before running test")] + [Fact(Skip = SkipMessage)] public void TestCopyToLocalE2E() { var helloFilePath = PathToFile(HelloFile); var blob = _container.GetBlockBlobReference(HelloFile); var tempFilePath = GetTempFilePath(); - const string text = "hello"; + const string Text = "hello"; try { - UploadFromString(blob, text); + UploadFromString(blob, Text); _fileSystem.CopyToLocal(helloFilePath, tempFilePath); Assert.True(File.Exists(tempFilePath)); - Assert.Equal(text, File.ReadAllText(tempFilePath)); + Assert.Equal(Text, File.ReadAllText(tempFilePath)); } finally { @@ -149,17 +183,17 @@ namespace Org.Apache.REEF.IO.Tests } } - [Fact(Skip = "Fill in credentials before running test")] + [Fact(Skip = SkipMessage)] public void TestCopyFromLocalE2E() { var helloFilePath = PathToFile(HelloFile); ICloudBlob blob = _container.GetBlockBlobReference(HelloFile); Assert.False(CheckBlobExists(blob)); var tempFilePath = GetTempFilePath(); - const string text = "hello"; + const string Text = "hello"; try { - File.WriteAllText(tempFilePath, text); + File.WriteAllText(tempFilePath, Text); _fileSystem.CopyFromLocal(tempFilePath, helloFilePath); blob = GetBlobReferenceFromServer(_container, HelloFile); Assert.True(CheckBlobExists(blob)); @@ -171,7 +205,7 @@ namespace Org.Apache.REEF.IO.Tests using (var sr = new StreamReader(stream)) { var matchingText = sr.ReadToEnd(); - Assert.Equal(text, matchingText); + Assert.Equal(Text, matchingText); } } } @@ -181,29 +215,29 @@ namespace Org.Apache.REEF.IO.Tests } } - [Fact(Skip = "Fill in credentials before running test")] + [Fact(Skip = SkipMessage)] public void TestDeleteDirectoryAtContainerE2E() { _fileSystem.DeleteDirectory(_container.Uri); Assert.False(CheckContainerExists(_container)); } - [Fact(Skip = "Fill in credentials before running test")] + [Fact(Skip = SkipMessage)] public void TestDeleteDirectoryFirstLevelE2E() { - const string directory = "dir"; + const string Directory = "dir"; var blockBlobs = new List<CloudBlockBlob>(); for (var i = 0; i < 3; i++) { - var filePath = directory + '/' + i; + var filePath = Directory + '/' + i; var blockBlob = _container.GetBlockBlobReference(filePath); UploadFromString(blockBlob, "hello"); Assert.True(CheckBlobExists(blockBlob)); blockBlobs.Add(blockBlob); } - _fileSystem.DeleteDirectory(PathToFile(directory)); + _fileSystem.DeleteDirectory(PathToFile(Directory)); foreach (var blockBlob in blockBlobs) { @@ -213,18 +247,18 @@ namespace Org.Apache.REEF.IO.Tests Assert.True(CheckContainerExists(_container)); } - [Fact(Skip = "Fill in credentials before running test")] + [Fact(Skip = SkipMessage)] public void TestDeleteDirectorySecondLevelE2E() { - const string directory1 = "dir1"; - const string directory2 = "dir2"; + const string Directory1 = "dir1"; + const string Directory2 = "dir2"; var blockBlobs1 = new List<CloudBlockBlob>(); var blockBlobs2 = new List<CloudBlockBlob>(); for (var i = 0; i < 3; i++) { - var filePath1 = directory1 + '/' + i; - var filePath2 = directory1 + '/' + directory2 + '/' + i; + var filePath1 = Directory1 + '/' + i; + var filePath2 = Directory1 + '/' + Directory2 + '/' + i; var blockBlob1 = _container.GetBlockBlobReference(filePath1); var blockBlob2 = _container.GetBlockBlobReference(filePath2); UploadFromString(blockBlob1, "hello"); @@ -235,7 +269,7 @@ namespace Org.Apache.REEF.IO.Tests blockBlobs2.Add(blockBlob2); } - _fileSystem.DeleteDirectory(PathToFile(directory1 + '/' + directory2)); + _fileSystem.DeleteDirectory(PathToFile(Directory1 + '/' + Directory2)); foreach (var blockBlob in blockBlobs2) { http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs index b5f659c..81a7c1f 100644 --- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs @@ -42,19 +42,19 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob } /// <summary> - /// Not supported for Azure Blobs, will throw <see cref="NotSupportedException"/>. + /// Returns a Stream object to the blob specified by the fileUri. /// </summary> public Stream Open(Uri fileUri) { - throw new NotSupportedException("Open is not supported for AzureBlockBlobFileSystem."); + return _client.GetBlockBlobReference(fileUri).Open(); } /// <summary> - /// Not supported for Azure Blobs, will throw <see cref="NotSupportedException"/>. + /// Creates a blob for the specified fileUri and returns a write Stream object to it. /// </summary> public Stream Create(Uri fileUri) { - throw new NotSupportedException("Open is not supported for AzureBlockBlobFileSystem."); + return _client.GetBlockBlobReference(fileUri).Create(); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs index c4b9c6d..b2289b6 100644 --- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs @@ -59,6 +59,16 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob _blob = new CloudBlockBlob(uri, credentials); } + public Stream Open() + { + return _blob.OpenRead(); + } + + public Stream Create() + { + return _blob.OpenWrite(); + } + public bool Exists() { var task = _blob.ExistsAsync(); http://git-wip-us.apache.org/repos/asf/reef/blob/0348e54e/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs index 2941e6e..ec102cc 100644 --- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs +++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlockBlob.cs @@ -46,6 +46,20 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob CopyState CopyState { get; } /// <summary> + /// Opens a stream to the blob content. + /// </summary> + /// <returns>System.IO.Stream object.</returns> + /// <exception cref="StorageException">If blob does not exist</exception> + Stream Open(); + + /// <summary> + /// Creates a blob and returns a write Stream object to it. + /// </summary> + /// <returns>System.IO.Stream object.</returns> + /// <exception cref="StorageException">If blob cannot be created</exception> + Stream Create(); + + /// <summary> /// Makes a round trip to the server to test if the blob exists. /// </summary> /// <returns>True if exists. False otherwise.</returns>