This is an automated email from the ASF dual-hosted git repository.
motus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/reef.git
The following commit(s) were added to refs/heads/master by this push:
new f98f20b [REEF-2033] Introduce configurable azure blob exponential
retry policy
f98f20b is described below
commit f98f20bb67290e74e8f6ccd57aef539f673237d3
Author: Dwaipayan Mukhopadhyay <[email protected]>
AuthorDate: Wed Jun 27 12:37:17 2018 -0700
[REEF-2033] Introduce configurable azure blob exponential retry policy
Currently, the Azure Blob retry policy defaults to Azure Blob Client's
default `ExponentialRetryPolicy` which has retry count as 3 and retry interval
as 4 seconds. Higher load jobs require custom setting of these values to
overcome azure blob throttling
([REEF-2017](https://issues.apache.org/jira/browse/REEF-2017)). This change
allows user to set their own retry count and intervals for azure blob retry
policy.
JIRA:
[REEF-2033](https://issues.apache.org/jira/browse/REEF-2033)
Pull request:
This closes #1470
---
.../TestAzureBlockBlobFileSystemE2E.cs | 5 ---
.../FileSystem/AzureBlob/AzureCloudBlobClient.cs | 7 ++--
.../AzureBlob/AzureCloudBlobContainer.cs | 9 +++--
.../AzureBlob/AzureCloudBlobDirectory.cs | 1 -
.../FileSystem/AzureBlob/AzureCloudBlockBlob.cs | 32 ++++++++--------
.../RetryPolicy/DefaultAzureBlobRetryPolicy.cs | 2 +-
.../ExponentialRetryPolicy.cs} | 21 +++++++----
.../ExponentialRetryPolicyConfiguration.cs | 44 ++++++++++++++++++++++
.../ExponentialRetryPolicyParameterNames.cs} | 26 ++++---------
9 files changed, 91 insertions(+), 56 deletions(-)
diff --git
a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
index 852f63c..f866944 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
@@ -67,28 +67,24 @@ namespace Org.Apache.REEF.IO.Tests
private bool CheckBlobExists(ICloudBlob blob)
{
var task = blob.ExistsAsync();
- task.Wait();
return task.Result;
}
private bool CheckContainerExists(CloudBlobContainer container)
{
var task = container.ExistsAsync();
- task.Wait();
return task.Result;
}
private ICloudBlob GetBlobReferenceFromServer(CloudBlobContainer
container, string blobName)
{
var task = container.GetBlobReferenceFromServerAsync(blobName);
- task.Wait();
return task.Result;
}
private string DownloadText(CloudBlockBlob blob)
{
var task = blob.DownloadTextAsync();
- task.Wait();
return task.Result;
}
@@ -120,7 +116,6 @@ namespace Org.Apache.REEF.IO.Tests
blob = container.GetBlockBlobReference(HelloFile);
Assert.True(CheckBlobExists(blob));
var readTask = blob.OpenReadAsync();
- readTask.Wait();
using (var reader = new StreamReader(readTask.Result))
{
string streamText = reader.ReadToEnd();
diff --git
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs
index 5b54e68..eec4806 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobClient.cs
@@ -32,6 +32,7 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
internal sealed class AzureCloudBlobClient : ICloudBlobClient
{
private readonly CloudBlobClient _client;
+ private readonly BlobRequestOptions _requestOptions;
public StorageCredentials Credentials
{
@@ -44,6 +45,7 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
{
_client =
CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient();
_client.DefaultRequestOptions.RetryPolicy = retryPolicy;
+ _requestOptions = new BlobRequestOptions() { RetryPolicy =
retryPolicy };
}
public Uri BaseUri
@@ -54,18 +56,17 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
public ICloudBlob GetBlobReferenceFromServer(Uri blobUri)
{
var task = _client.GetBlobReferenceFromServerAsync(blobUri);
- task.Wait();
return task.Result;
}
public ICloudBlobContainer GetContainerReference(string containerName)
{
- return new
AzureCloudBlobContainer(_client.GetContainerReference(containerName));
+ return new
AzureCloudBlobContainer(_client.GetContainerReference(containerName),
_requestOptions);
}
public ICloudBlockBlob GetBlockBlobReference(Uri uri)
{
- return new AzureCloudBlockBlob(uri, _client.Credentials);
+ return new AzureCloudBlockBlob(uri, _client.Credentials,
_requestOptions);
}
public BlobResultSegment ListBlobsSegmented(
diff --git
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobContainer.cs
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobContainer.cs
index d252fc7..e2fbd61 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobContainer.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobContainer.cs
@@ -25,22 +25,23 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
internal sealed class AzureCloudBlobContainer : ICloudBlobContainer
{
private readonly CloudBlobContainer _container;
+ private readonly BlobRequestOptions _requestOptions;
- public AzureCloudBlobContainer(CloudBlobContainer container)
+ public AzureCloudBlobContainer(CloudBlobContainer container,
BlobRequestOptions requestOptions)
{
_container = container;
+ _requestOptions = requestOptions;
}
public bool CreateIfNotExists()
{
- var task = _container.CreateIfNotExistsAsync();
- task.Wait();
+ var task = _container.CreateIfNotExistsAsync(_requestOptions,
null);
return task.Result;
}
public void DeleteIfExists()
{
- _container.DeleteIfExistsAsync().Wait();
+ _container.DeleteIfExistsAsync(null, _requestOptions, null).Wait();
}
public ICloudBlobDirectory GetDirectoryReference(string directoryName)
diff --git
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobDirectory.cs
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobDirectory.cs
index bf6c4b0..4d3295e 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobDirectory.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobDirectory.cs
@@ -40,7 +40,6 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
public IEnumerable<IListBlobItem> ListBlobs(bool useFlatListing =
false)
{
var task = _directory.ListBlobsSegmentedAsync(useFlatListing,
BlobListingDetails.All, null, null, null, null);
- task.Wait();
return task.Result.Results;
}
}
diff --git
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
index 5610230..c883568 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlockBlob.cs
@@ -19,6 +19,8 @@ using System;
using System.IO;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Blob;
+using Microsoft.WindowsAzure.Storage.RetryPolicies;
+using Org.Apache.REEF.IO.FileSystem.AzureBlob.RetryPolicy;
namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
{
@@ -29,6 +31,7 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
internal sealed class AzureCloudBlockBlob : ICloudBlockBlob
{
private readonly CloudBlockBlob _blob;
+ private readonly BlobRequestOptions _requestOptions;
public ICloudBlob Blob
{
@@ -54,70 +57,67 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob
}
}
- public AzureCloudBlockBlob(Uri uri, StorageCredentials credentials)
+ public AzureCloudBlockBlob(Uri uri, StorageCredentials credentials,
BlobRequestOptions requestOptions)
{
_blob = new CloudBlockBlob(uri, credentials);
+ _requestOptions = requestOptions;
}
public Stream Open()
{
#if REEF_DOTNET_BUILD
- var task = _blob.OpenReadAsync();
- task.Wait();
+ var task = _blob.OpenReadAsync(null, _requestOptions, null);
return task.Result;
#else
- return _blob.OpenRead();
+ return _blob.OpenRead(null, _requestOptions, null);
#endif
}
public Stream Create()
{
#if REEF_DOTNET_BUILD
- var task = _blob.OpenWriteAsync();
- task.Wait();
+ var task = _blob.OpenWriteAsync(null, _requestOptions, null);
return task.Result;
#else
- return _blob.OpenWrite();
+ return _blob.OpenWrite(null, _requestOptions, null);
#endif
}
public bool Exists()
{
- var task = _blob.ExistsAsync();
- task.Wait();
+ var task = _blob.ExistsAsync(_requestOptions, null);
return task.Result;
}
public void Delete()
{
- _blob.DeleteAsync().Wait();
+ _blob.DeleteAsync(DeleteSnapshotsOption.IncludeSnapshots, null,
_requestOptions, null).Wait();
}
public void DeleteIfExists()
{
- _blob.DeleteIfExistsAsync().Wait();
+ _blob.DeleteIfExistsAsync(DeleteSnapshotsOption.IncludeSnapshots,
null, _requestOptions, null).Wait();
}
public string StartCopy(Uri source)
{
- var task = _blob.StartCopyAsync(source);
- task.Wait();
+ var task = _blob.StartCopyAsync(source, null, null,
_requestOptions, null);
return task.Result;
}
public void DownloadToFile(string path, FileMode mode)
{
- _blob.DownloadToFileAsync(path, mode).Wait();
+ _blob.DownloadToFileAsync(path, mode, null, _requestOptions,
null).Wait();
}
public void UploadFromFile(string path, FileMode mode)
{
- _blob.UploadFromFileAsync(path).Wait();
+ _blob.UploadFromFileAsync(path, null, _requestOptions,
null).Wait();
}
public void FetchAttributes()
{
- _blob.FetchAttributesAsync().Wait();
+ _blob.FetchAttributesAsync(null, _requestOptions, null).Wait();
}
}
}
\ No newline at end of file
diff --git
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/DefaultAzureBlobRetryPolicy.cs
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/DefaultAzureBlobRetryPolicy.cs
index 77943ef..564fa08 100644
---
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/DefaultAzureBlobRetryPolicy.cs
+++
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/DefaultAzureBlobRetryPolicy.cs
@@ -34,7 +34,7 @@ namespace Org.Apache.REEF.IO.FileSystem.AzureBlob.RetryPolicy
public IRetryPolicy CreateInstance()
{
- return new ExponentialRetry();
+ return _retryPolicy;
}
public bool ShouldRetry(int currentRetryCount, int statusCode,
Exception lastException, out TimeSpan retryInterval,
diff --git
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/DefaultAzureBlobRetryPolicy.cs
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/Exponential/ExponentialRetryPolicy.cs
similarity index 65%
copy from
lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/DefaultAzureBlobRetryPolicy.cs
copy to
lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/Exponential/ExponentialRetryPolicy.cs
index 77943ef..f04ee6d 100644
---
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/DefaultAzureBlobRetryPolicy.cs
+++
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/Exponential/ExponentialRetryPolicy.cs
@@ -20,27 +20,34 @@ using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.RetryPolicies;
using Org.Apache.REEF.Tang.Annotations;
-namespace Org.Apache.REEF.IO.FileSystem.AzureBlob.RetryPolicy
+namespace Org.Apache.REEF.IO.FileSystem.AzureBlob.RetryPolicy.Exponential
{
- internal class DefaultAzureBlobRetryPolicy : IAzureBlobRetryPolicy
+ /// <summary>
+ /// Represents a retry policy that performs a specified number of retries,
+ /// using a randomized exponential back off scheme to determine the
interval between retries.
+ /// </summary>
+ internal sealed class ExponentialRetryPolicy : IAzureBlobRetryPolicy
{
private readonly IRetryPolicy _retryPolicy;
[Inject]
- private DefaultAzureBlobRetryPolicy()
+ private ExponentialRetryPolicy(
+
[Parameter(typeof(ExponentialRetryPolicyParameterNames.RetryCount))] int
retryCount,
+
[Parameter(typeof(ExponentialRetryPolicyParameterNames.RetryInterval))] double
retryInterval)
{
- _retryPolicy = new ExponentialRetry();
- }
+ _retryPolicy = new
ExponentialRetry(TimeSpan.FromSeconds(retryInterval), retryCount);
+ }
public IRetryPolicy CreateInstance()
{
- return new ExponentialRetry();
+ return _retryPolicy;
}
public bool ShouldRetry(int currentRetryCount, int statusCode,
Exception lastException, out TimeSpan retryInterval,
OperationContext operationContext)
{
return _retryPolicy.ShouldRetry(currentRetryCount, statusCode,
lastException, out retryInterval, operationContext);
+
}
}
-}
+}
\ No newline at end of file
diff --git
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/Exponential/ExponentialRetryPolicyConfiguration.cs
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/Exponential/ExponentialRetryPolicyConfiguration.cs
new file mode 100644
index 0000000..bbdff41
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/Exponential/ExponentialRetryPolicyConfiguration.cs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IO.FileSystem.AzureBlob.RetryPolicy.Exponential
+{
+ public sealed class ExponentialRetryPolicyConfiguration :
ConfigurationModuleBuilder
+ {
+ /// <summary>
+ /// The exponential retry count.
+ /// </summary>
+ public static readonly OptionalParameter<int> RetryCount = new
OptionalParameter<int>();
+
+ /// <summary>
+ /// The exponential retry interval in seconds.
+ /// </summary>
+ public static readonly OptionalParameter<double> RetryInterval = new
OptionalParameter<double>();
+
+ /// <summary>
+ /// Configuration Module for ExponentialRetryPolicy implementation of
IAzureBlobRetryPolicy.
+ /// </summary>
+ public static ConfigurationModule ConfigurationModule = new
ExponentialRetryPolicyConfiguration()
+ .BindImplementation(GenericType<IAzureBlobRetryPolicy>.Class,
GenericType<ExponentialRetryPolicy>.Class)
+
.BindNamedParameter(GenericType<ExponentialRetryPolicyParameterNames.RetryCount>.Class,
RetryCount)
+
.BindNamedParameter(GenericType<ExponentialRetryPolicyParameterNames.RetryInterval>.Class,
RetryInterval)
+ .Build();
+ }
+}
\ No newline at end of file
diff --git
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/DefaultAzureBlobRetryPolicy.cs
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/Exponential/ExponentialRetryPolicyParameterNames.cs
similarity index 52%
copy from
lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/DefaultAzureBlobRetryPolicy.cs
copy to
lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/Exponential/ExponentialRetryPolicyParameterNames.cs
index 77943ef..6b9f693 100644
---
a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/DefaultAzureBlobRetryPolicy.cs
+++
b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/RetryPolicy/Exponential/ExponentialRetryPolicyParameterNames.cs
@@ -15,32 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-using System;
-using Microsoft.WindowsAzure.Storage;
-using Microsoft.WindowsAzure.Storage.RetryPolicies;
using Org.Apache.REEF.Tang.Annotations;
-namespace Org.Apache.REEF.IO.FileSystem.AzureBlob.RetryPolicy
+namespace Org.Apache.REEF.IO.FileSystem.AzureBlob.RetryPolicy.Exponential
{
- internal class DefaultAzureBlobRetryPolicy : IAzureBlobRetryPolicy
+ public static class ExponentialRetryPolicyParameterNames
{
- private readonly IRetryPolicy _retryPolicy;
-
- [Inject]
- private DefaultAzureBlobRetryPolicy()
- {
- _retryPolicy = new ExponentialRetry();
- }
-
- public IRetryPolicy CreateInstance()
+ [NamedParameter("The exponential retry count", "retryCount",
defaultValue: "3")]
+ public class RetryCount : Name<int>
{
- return new ExponentialRetry();
}
- public bool ShouldRetry(int currentRetryCount, int statusCode,
Exception lastException, out TimeSpan retryInterval,
- OperationContext operationContext)
+ [NamedParameter("The exponential retry interval in seconds",
"retryInterval", defaultValue: "4")]
+ public class RetryInterval : Name<double>
{
- return _retryPolicy.ShouldRetry(currentRetryCount, statusCode,
lastException, out retryInterval, operationContext);
}
}
-}
+}
\ No newline at end of file