Repository: reef
Updated Branches:
  refs/heads/master c90f8aa63 -> d34441ce6


[REEF-1339] Adding IInputPartition.Cache() for data download and cache

This addressed the issue by
  * Adding an `[Unstable]` Cache function.
  * Modify existing implementations of IInputPartition to allow them to cache
    only on invocation of Cache instead of on initialization.

JIRA:
  [REEF-1339](https://issues.apache.org/jira/browse/REEF-1339)

Pull Request:
  This closes #968


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d34441ce
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d34441ce
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d34441ce

Branch: refs/heads/master
Commit: d34441ce62eb149f4a430db23f3a8e090a8f3350
Parents: c90f8aa
Author: Andrew Chung <[email protected]>
Authored: Mon Apr 25 10:31:05 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Apr 28 13:33:04 2016 -0700

----------------------------------------------------------------------
 .../FileSystem/FileSystemInputPartition.cs      | 119 ++++++++++---------
 .../PartitionedData/IInputPartition.cs          |  10 +-
 .../Random/RandomInputPartition.cs              |  65 +++++++---
 3 files changed, 119 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/d34441ce/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
index cccc2be..0d17467 100644
--- 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
+++ 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
@@ -24,11 +24,14 @@ using 
Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters;
 using Org.Apache.REEF.IO.PartitionedData.Random.Parameters;
 using Org.Apache.REEF.IO.TempFileCreation;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
 {
+    [ThreadSafe]
     internal sealed class FileSystemInputPartition<T> : IInputPartition<T>, 
IDisposable
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(FileSystemInputPartition<T>));
@@ -36,16 +39,16 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         private readonly string _id;
         private readonly IFileSystem _fileSystem;
         private readonly IFileDeSerializer<T> _fileSerializer;
-        private readonly ISet<string> _filePaths;
-        private bool _isInitialized;
-        private readonly bool _copyToLocal;
         private readonly object _lock = new object();
         private readonly ITempFileCreator _tempFileCreator;
-        private readonly ISet<string> _localFiles = new HashSet<string>();
+        private readonly ISet<string> _remoteFilePaths;
+        private readonly bool _copyToLocal;
 
+        private Optional<ISet<string>> _localFiles;
+        
         [Inject]
         private FileSystemInputPartition([Parameter(typeof(PartitionId))] 
string id,
-            [Parameter(typeof(FilePathsInInputPartition))] ISet<string> 
filePaths,
+            [Parameter(typeof(FilePathsInInputPartition))] ISet<string> 
remoteFilePaths,
             [Parameter(typeof(CopyToLocal))] bool copyToLocal,
             IFileSystem fileSystem,
             ITempFileCreator tempFileCreator,
@@ -54,10 +57,10 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
             _id = id;
             _fileSystem = fileSystem;
             _fileSerializer = fileSerializer;
-            _filePaths = filePaths;
             _tempFileCreator = tempFileCreator;
-            _isInitialized = false;
+            _remoteFilePaths = remoteFilePaths;
             _copyToLocal = copyToLocal;
+            _localFiles = Optional<ISet<string>>.Empty();
         }
 
         public string Id
@@ -65,75 +68,78 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
             get { return _id; }
         }
 
-        private void Initialize()
+        /// <summary>
+        /// Caches from the remote File System to a local disk.
+        /// </summary>
+        public void Cache()
         {
             lock (_lock)
             {
-                if (!_isInitialized)
+                if (!_localFiles.IsPresent())
                 {
-                    CopyFromRemote();
-                    _isInitialized = true;
+                    _localFiles = Optional<ISet<string>>.Of(Download());
                 }
             }
         }
 
         /// <summary>
-        /// This method copy remote files to local and then deserialize the 
files.
-        /// It returns the IEnumerble of T, the details is defined in the 
Deserialize() method 
-        /// provided by the Serializer
+        /// Downloads the remote file to local disk.
         /// </summary>
-        /// <returns></returns>
-        public T GetPartitionHandle()
+        private ISet<string> Download()
         {
-            if (_copyToLocal)
+            lock (_lock)
             {
-                if (!_isInitialized)
+                var set = new HashSet<string>();
+                var localFileFolder = 
_tempFileCreator.CreateTempDirectory("-partition-");
+                Logger.Log(Level.Info, 
string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", 
localFileFolder));
+
+                foreach (var sourceFilePath in _remoteFilePaths)
                 {
-                    Initialize();
+                    var sourceUri = 
_fileSystem.CreateUriForPath(sourceFilePath);
+                    Logger.Log(Level.Verbose, "sourceUri {0}: ", sourceUri);
+
+                    var localFilePath = Path.Combine(localFileFolder, 
Guid.NewGuid().ToString("N").Substring(0, 8));
+                    set.Add(localFilePath);
+
+                    Logger.Log(Level.Verbose, "LocalFilePath {0}: ", 
localFilePath);
+                    if (File.Exists(localFilePath))
+                    {
+                        File.Delete(localFilePath);
+                        Logger.Log(Level.Warning, "localFile {0} already 
exists, deleting it. ", localFilePath);
+                    }
+
+                    _fileSystem.CopyToLocal(sourceUri, localFilePath);
                 }
 
-                return _fileSerializer.Deserialize(_localFiles);
+                return set;
             }
-            return _fileSerializer.Deserialize(_filePaths);
         }
 
-        private void CopyFromRemote()
+        /// <summary>
+        /// This method copies remote files to local if CopyToLocal is 
enabled, and then deserializes the files.
+        /// Otherwise, this method assumes that the files are remote, and that 
the injected IFileDeSerializer
+        /// can handle the remote file system access.
+        /// It returns the IEnumerble of T, the details is defined in the 
Deserialize() method 
+        /// provided by the Serializer
+        /// </summary>
+        /// <returns></returns>
+        public T GetPartitionHandle()
         {
-            string localFileFolder = 
_tempFileCreator.CreateTempDirectory("-partition-");
-            Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, 
"Local file temp folder: {0}", localFileFolder));
-
-            foreach (var sourceFilePath in _filePaths)
+            lock (_lock)
             {
-                Uri sourceUri = _fileSystem.CreateUriForPath(sourceFilePath);
-                Logger.Log(Level.Info, 
string.Format(CultureInfo.CurrentCulture, "sourceUri {0}: ", sourceUri));
-                if (!_fileSystem.Exists(sourceUri))
+                if (_copyToLocal)
                 {
-                    throw new 
FileNotFoundException(string.Format(CultureInfo.CurrentCulture,
-                        "Remote File {0} does not exists.", sourceUri));
-                }
+                    if (!_localFiles.IsPresent())
+                    {
+                        Cache();
+                    }
 
-                var localFilePath = localFileFolder + "\\" + 
Guid.NewGuid().ToString("N").Substring(0, 8);
-                _localFiles.Add(localFilePath);
-
-                Logger.Log(Level.Info, 
string.Format(CultureInfo.CurrentCulture, "LocalFilePath {0}: ", 
localFilePath));
-                if (File.Exists(localFilePath))
-                {
-                    File.Delete(localFilePath);
-                    Logger.Log(Level.Warning, "localFile already exists, 
delete it: " + localFilePath);
+                    // For now, assume IFileDeSerializer is local.
+                    return _fileSerializer.Deserialize(_localFiles.Value);
                 }
 
-                _fileSystem.CopyToLocal(sourceUri, localFilePath);
-                if (File.Exists(localFilePath))
-                {
-                    Logger.Log(Level.Info, 
-                        string.Format(CultureInfo.CurrentCulture, "File {0} is 
Copied to local {1}.", sourceUri, localFilePath));
-                }
-                else
-                {
-                    string msg = string.Format(CultureInfo.CurrentCulture, 
-                        "The IFilesystem completed the copy of `{0}` to `{1}`. 
But the file `{1}` does not exist.", sourceUri, localFilePath);
-                    Exceptions.Throw(new FileLoadException(msg), msg, Logger);
-                }
+                // For now, assume IFileDeSerializer is remote.
+                return _fileSerializer.Deserialize(_remoteFilePaths);
             }
         }
 
@@ -144,11 +150,14 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         /// </summary>
         public void Dispose()
         {
-            if (_localFiles.Count > 0)
+            lock (_lock)
             {
-                foreach (var fileName in _localFiles)
+                if (_localFiles.IsPresent())
                 {
-                    File.Delete(fileName);
+                    foreach (var fileName in _localFiles.Value)
+                    {
+                        File.Delete(fileName);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/d34441ce/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
index 0fc012b..33116cd 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/IInputPartition.cs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using System;
+using Org.Apache.REEF.Utilities.Attributes;
 
 namespace Org.Apache.REEF.IO.PartitionedData
 {
@@ -24,7 +24,7 @@ namespace Org.Apache.REEF.IO.PartitionedData
     /// </summary>
     /// <typeparam name="T">Generic Type representing data pointer.
     /// For example, for data in local file it can be file pointer </typeparam>
-    public interface IInputPartition<T> 
+    public interface IInputPartition<T>
     {
         /// <summary>
         /// The id of the partition.
@@ -32,6 +32,12 @@ namespace Org.Apache.REEF.IO.PartitionedData
         string Id { get; }
 
         /// <summary>
+        /// Caches the data locally, cached location is based on the 
implementation.
+        /// </summary>
+        [Unstable("0.14", "Contract may change.")]
+        void Cache();
+
+        /// <summary>
         /// Gives a pointer to the underlying partition.
         /// </summary>
         /// <returns>The pointer to the underlying partition</returns>

http://git-wip-us.apache.org/repos/asf/reef/blob/d34441ce/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs
index 547dad4..df55555 100644
--- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs
+++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/Random/RandomInputPartition.cs
@@ -20,37 +20,31 @@ using System.Diagnostics;
 using System.IO;
 using Org.Apache.REEF.IO.PartitionedData.Random.Parameters;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Attributes;
 
 namespace Org.Apache.REEF.IO.PartitionedData.Random
 {
     /// <summary>
     /// An implementation of IInputPartition that returns a configurable 
number of random doubles.
     /// </summary>
+    [ThreadSafe]
     internal sealed class RandomInputPartition : IInputPartition<Stream>
     {
+        private readonly object _lock = new object();
         private readonly string _id;
-        private readonly byte[] _randomData;
+        private readonly int _numberOfDoubles;
+
+        private Optional<byte[]> _randomData;
 
         [Inject]
-        private RandomInputPartition([Parameter(typeof(PartitionId))] string 
id,
+        private RandomInputPartition(
+            [Parameter(typeof(PartitionId))] string id,
             [Parameter(typeof(NumberOfDoublesPerPartition))] int 
numberOfDoubles)
         {
             _id = id;
-            _randomData = new byte[numberOfDoubles * 8];
-            var random = new System.Random();
-
-            for (var i = 0; i < numberOfDoubles; ++i)
-            {
-                var randomDouble = random.NextDouble();
-                var randomDoubleAsBytes = BitConverter.GetBytes(randomDouble);
-                Debug.Assert(randomDoubleAsBytes.Length == 8, 
"randomDoubleAsBytes.Length should be 8.");
-                for (var j = 0; j < 8; ++j)
-                {
-                    var index = (i * 8) + j;
-                    Debug.Assert(index < _randomData.Length, "Index should be 
less than _randomData.Length.");
-                    _randomData[index] = randomDoubleAsBytes[j];
-                }
-            }
+            _numberOfDoubles = numberOfDoubles;
+            _randomData = Optional<byte[]>.Empty();
         }
 
         public string Id
@@ -58,9 +52,44 @@ namespace Org.Apache.REEF.IO.PartitionedData.Random
             get { return _id; }
         }
 
+        public void Cache()
+        {
+            lock (_lock)
+            {
+                if (_randomData.IsPresent())
+                {
+                    return;
+                }
+
+                var random = new System.Random();
+                var generatedData = new byte[_numberOfDoubles * sizeof(long)];
+                for (var i = 0; i < _numberOfDoubles; ++i)
+                {
+                    var randomDouble = random.NextDouble();
+                    var randomDoubleAsBytes = 
BitConverter.GetBytes(randomDouble);
+                    Debug.Assert(randomDoubleAsBytes.Length == 8, 
"randomDoubleAsBytes.Length should be 8.");
+                    for (var j = 0; j < sizeof(long); ++j)
+                    {
+                        var index = (i * 8) + j;
+                        Debug.Assert(index < generatedData.Length, "Index 
should be less than _randomData.Length.");
+                        generatedData[index] = randomDoubleAsBytes[j];
+                    }
+                }
+                _randomData = Optional<byte[]>.Of(generatedData);
+            }
+        }
+
         public Stream GetPartitionHandle()
         {
-            return new MemoryStream(_randomData, false);
+            lock (_lock)
+            {
+                if (!_randomData.IsPresent())
+                {
+                    Cache();
+                }
+
+                return new MemoryStream(_randomData.Value, false);
+            }
         }
     }
 }
\ No newline at end of file

Reply via email to