Repository: reef
Updated Branches:
refs/heads/master c90f8aa63 -> d34441ce6
[REEF-1339] Adding IInputPartition.Cache() for data download and cache
This addressed the issue by
* Adding an `[Unstable]` Cache function.
* Modify existing implementations of IInputPartition to allow them to cache
only on invocation of Cache instead of on initialization.
JIRA:
[REEF-1339](https://issues.apache.org/jira/browse/REEF-1339)
Pull Request:
This closes #968
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d34441ce
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d34441ce
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d34441ce
Branch: refs/heads/master
Commit: d34441ce62eb149f4a430db23f3a8e090a8f3350
Parents: c90f8aa
Author: Andrew Chung <[email protected]>
Authored: Mon Apr 25 10:31:05 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Apr 28 13:33:04 2016 -0700
----------------------------------------------------------------------
.../FileSystem/FileSystemInputPartition.cs | 119 ++++++++++---------
.../PartitionedData/IInputPartition.cs | 10 +-
.../Random/RandomInputPartition.cs | 65 +++++++---
3 files changed, 119 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/d34441ce/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
index cccc2be..0d17467 100644
---
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
+++
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
@@ -24,11 +24,14 @@ using
Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters;
using Org.Apache.REEF.IO.PartitionedData.Random.Parameters;
using Org.Apache.REEF.IO.TempFileCreation;
using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Attributes;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
{
+ [ThreadSafe]
internal sealed class FileSystemInputPartition<T> : IInputPartition<T>,
IDisposable
{
private static readonly Logger Logger =
Logger.GetLogger(typeof(FileSystemInputPartition<T>));
@@ -36,16 +39,16 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
private readonly string _id;
private readonly IFileSystem _fileSystem;
private readonly IFileDeSerializer<T> _fileSerializer;
- private readonly ISet<string> _filePaths;
- private bool _isInitialized;
- private readonly bool _copyToLocal;
private readonly object _lock = new object();
private readonly ITempFileCreator _tempFileCreator;
- private readonly ISet<string> _localFiles = new HashSet<string>();
+ private readonly ISet<string> _remoteFilePaths;
+ private readonly bool _copyToLocal;
+ private Optional<ISet<string>> _localFiles;
+
[Inject]
private FileSystemInputPartition([Parameter(typeof(PartitionId))]
string id,
- [Parameter(typeof(FilePathsInInputPartition))] ISet<string>
filePaths,
+ [Parameter(typeof(FilePathsInInputPartition))] ISet<string>
remoteFilePaths,
[Parameter(typeof(CopyToLocal))] bool copyToLocal,
IFileSystem fileSystem,
ITempFileCreator tempFileCreator,
@@ -54,10 +57,10 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
_id = id;
_fileSystem = fileSystem;
_fileSerializer = fileSerializer;
- _filePaths = filePaths;
_tempFileCreator = tempFileCreator;
- _isInitialized = false;
+ _remoteFilePaths = remoteFilePaths;
_copyToLocal = copyToLocal;
+ _localFiles = Optional<ISet<string>>.Empty();
}
public string Id
@@ -65,75 +68,78 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
get { return _id; }
}
- private void Initialize()
+ /// <summary>
+ /// Caches from the remote File System to a local disk.
+ /// </summary>
+ public void Cache()
{
lock (_lock)
{
- if (!_isInitialized)
+ if (!_localFiles.IsPresent())
{
- CopyFromRemote();
- _isInitialized = true;
+ _localFiles = Optional<ISet<string>>.Of(Download());
}
}
}
/// <summary>
- /// This method copy remote files to local and then deserialize the
files.
- /// It returns the IEnumerble of T, the details is defined in the
Deserialize() method
- /// provided by the Serializer
+ /// Downloads the remote file to local disk.
/// </summary>
- /// <returns></returns>
- public T GetPartitionHandle()
+ private ISet<string> Download()
{
- if (_copyToLocal)
+ lock (_lock)
{
- if (!_isInitialized)
+ var set = new HashSet<string>();
+ var localFileFolder =
_tempFileCreator.CreateTempDirectory("-partition-");
+ Logger.Log(Level.Info,
string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}",
localFileFolder));
+
+ foreach (var sourceFilePath in _remoteFilePaths)
{
- Initialize();
+ var sourceUri =
_fileSystem.CreateUriForPath(sourceFilePath);
+ Logger.Log(Level.Verbose, "sourceUri {0}: ", sourceUri);
+
+ var localFilePath = Path.Combine(localFileFolder,
Guid.NewGuid().ToString("N").Substring(0, 8));
+ set.Add(localFilePath);
+
+ Logger.Log(Level.Verbose, "LocalFilePath {0}: ",
localFilePath);
+ if (File.Exists(localFilePath))
+ {
+ File.Delete(localFilePath);
+ Logger.Log(Level.Warning, "localFile {0} already
exists, deleting it. ", localFilePath);
+ }
+
+ _fileSystem.CopyToLocal(sourceUri, localFilePath);
}
- return _fileSerializer.Deserialize(_localFiles);
+ return set;
}
- return _fileSerializer.Deserialize(_filePaths);
}
- private void CopyFromRemote()
+ /// <summary>
+ /// This method copies remote files to local if CopyToLocal is
enabled, and then deserializes the files.
+ /// Otherwise, this method assumes that the files are remote, and that
the injected IFileDeSerializer
+ /// can handle the remote file system access.
+ /// It returns the IEnumerble of T, the details is defined in the
Deserialize() method
+ /// provided by the Serializer
+ /// </summary>
+ /// <returns></returns>
+ public T GetPartitionHandle()
{
- string localFileFolder =
_tempFileCreator.CreateTempDirectory("-partition-");
- Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture,
"Local file temp folder: {0}", localFileFolder));
-
- foreach (var sourceFilePath in _filePaths)
+ lock (_lock)
{
- Uri sourceUri = _fileSystem.CreateUriForPath(sourceFilePath);
- Logger.Log(Level.Info,
string.Format(CultureInfo.CurrentCulture, "sourceUri {0}: ", sourceUri));
- if (!_fileSystem.Exists(sourceUri))
+ if (_copyToLocal)
{
- throw new
FileNotFoundException(string.Format(CultureInfo.CurrentCulture,
- "Remote File {0} does not exists.", sourceUri));
- }
+ if (!_localFiles.IsPresent())
+ {
+ Cache();
+ }
- var localFilePath = localFileFolder + "\\" +
Guid.NewGuid().ToString("N").Substring(0, 8);
- _localFiles.Add(localFilePath);
-
- Logger.Log(Level.Info,
string.Format(CultureInfo.CurrentCulture, "LocalFilePath {0}: ",
localFilePath));
- if (File.Exists(localFilePath))
- {
- File.Delete(localFilePath);
- Logger.Log(Level.Warning, "localFile already exists,
delete it: " + localFilePath);
+ // For now, assume IFileDeSerializer is local.
+ return _fileSerializer.Deserialize(_localFiles.Value);
}
- _fileSystem.CopyToLocal(sourceUri, localFilePath);
- if (File.Exists(localFilePath))
- {
- Logger.Log(Level.Info,
- string.Format(CultureInfo.CurrentCulture, "File {0} is
Copied to local {1}.", sourceUri, localFilePath));
- }
- else
- {
- string msg = string.Format(CultureInfo.CurrentCulture,
- "The IFilesystem completed the copy of `{0}` to `{1}`.
But the file `{1}` does not exist.", sourceUri, localFilePath);
- Exceptions.Throw(new FileLoadException(msg), msg, Logger);
- }
+ // For now, assume IFileDeSerializer is remote.
+ return _fileSerializer.Deserialize(_remoteFilePaths);
}
}
@@ -144,11 +150,14 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
/// </summary>
public void Dispose()
{
- if (_localFiles.Count > 0)
+ lock (_lock)
{
- foreach (var fileName in _localFiles)
+ if (_localFiles.IsPresent())
{
- File.Delete(fileName);
+ foreach (var fileName in _localFiles.Value)
+ {
+ File.Delete(fileName);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d34441ce/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
index 0fc012b..33116cd 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-using System;
+using Org.Apache.REEF.Utilities.Attributes;
namespace Org.Apache.REEF.IO.PartitionedData
{
@@ -24,7 +24,7 @@ namespace Org.Apache.REEF.IO.PartitionedData
/// </summary>
/// <typeparam name="T">Generic Type representing data pointer.
/// For example, for data in local file it can be file pointer </typeparam>
- public interface IInputPartition<T>
+ public interface IInputPartition<T>
{
/// <summary>
/// The id of the partition.
@@ -32,6 +32,12 @@ namespace Org.Apache.REEF.IO.PartitionedData
string Id { get; }
/// <summary>
+ /// Caches the data locally, cached location is based on the
implementation.
+ /// </summary>
+ [Unstable("0.14", "Contract may change.")]
+ void Cache();
+
+ /// <summary>
/// Gives a pointer to the underlying partition.
/// </summary>
/// <returns>The pointer to the underlying partition</returns>
http://git-wip-us.apache.org/repos/asf/reef/blob/d34441ce/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs
index 547dad4..df55555 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs
@@ -20,37 +20,31 @@ using System.Diagnostics;
using System.IO;
using Org.Apache.REEF.IO.PartitionedData.Random.Parameters;
using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Attributes;
namespace Org.Apache.REEF.IO.PartitionedData.Random
{
/// <summary>
/// An implementation of IInputPartition that returns a configurable
number of random doubles.
/// </summary>
+ [ThreadSafe]
internal sealed class RandomInputPartition : IInputPartition<Stream>
{
+ private readonly object _lock = new object();
private readonly string _id;
- private readonly byte[] _randomData;
+ private readonly int _numberOfDoubles;
+
+ private Optional<byte[]> _randomData;
[Inject]
- private RandomInputPartition([Parameter(typeof(PartitionId))] string
id,
+ private RandomInputPartition(
+ [Parameter(typeof(PartitionId))] string id,
[Parameter(typeof(NumberOfDoublesPerPartition))] int
numberOfDoubles)
{
_id = id;
- _randomData = new byte[numberOfDoubles * 8];
- var random = new System.Random();
-
- for (var i = 0; i < numberOfDoubles; ++i)
- {
- var randomDouble = random.NextDouble();
- var randomDoubleAsBytes = BitConverter.GetBytes(randomDouble);
- Debug.Assert(randomDoubleAsBytes.Length == 8,
"randomDoubleAsBytes.Length should be 8.");
- for (var j = 0; j < 8; ++j)
- {
- var index = (i * 8) + j;
- Debug.Assert(index < _randomData.Length, "Index should be
less than _randomData.Length.");
- _randomData[index] = randomDoubleAsBytes[j];
- }
- }
+ _numberOfDoubles = numberOfDoubles;
+ _randomData = Optional<byte[]>.Empty();
}
public string Id
@@ -58,9 +52,44 @@ namespace Org.Apache.REEF.IO.PartitionedData.Random
get { return _id; }
}
+ public void Cache()
+ {
+ lock (_lock)
+ {
+ if (_randomData.IsPresent())
+ {
+ return;
+ }
+
+ var random = new System.Random();
+ var generatedData = new byte[_numberOfDoubles * sizeof(long)];
+ for (var i = 0; i < _numberOfDoubles; ++i)
+ {
+ var randomDouble = random.NextDouble();
+ var randomDoubleAsBytes =
BitConverter.GetBytes(randomDouble);
+ Debug.Assert(randomDoubleAsBytes.Length == 8,
"randomDoubleAsBytes.Length should be 8.");
+ for (var j = 0; j < sizeof(long); ++j)
+ {
+ var index = (i * 8) + j;
+ Debug.Assert(index < generatedData.Length, "Index
should be less than _randomData.Length.");
+ generatedData[index] = randomDoubleAsBytes[j];
+ }
+ }
+ _randomData = Optional<byte[]>.Of(generatedData);
+ }
+ }
+
public Stream GetPartitionHandle()
{
- return new MemoryStream(_randomData, false);
+ lock (_lock)
+ {
+ if (!_randomData.IsPresent())
+ {
+ Cache();
+ }
+
+ return new MemoryStream(_randomData.Value, false);
+ }
}
}
}
\ No newline at end of file