Repository: reef Updated Branches: refs/heads/master 3818bf454 -> dc78d381f
[REEF-1206] Update Deserializer API to allow deserialize from remote files directly * Adding a new Deserializer API * Deprecate old API * Adding a named parameter for CopyToLocal * SetMemory for IMRU JIRA: [REEF-1206](https://issues.apache.org/jira/browse/REEF-1206) Pull request: This closes #843 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/dc78d381 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/dc78d381 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/dc78d381 Branch: refs/heads/master Commit: dc78d381f98c4a3d5559972bed07e2af49ba621d Parents: 3818bf4 Author: Julia Wang <[email protected]> Authored: Thu Jan 28 17:23:22 2016 -0800 Committer: Markus Weimer <[email protected]> Committed: Fri Feb 19 18:13:24 2016 -0800 ---------------------------------------------------------------------- .../OnREEF/Client/REEFIMRUClient.cs | 1 + .../HadoopFileInputPartitionTest.cs | 18 +++++++++++ .../TestFilePartitionInputDataSet.cs | 34 ++++++++++++++++++++ .../Org.Apache.REEF.IO.csproj | 1 + .../FileSystem/FileSystemInputPartition.cs | 30 ++++++++++------- .../FileSystem/IFileDeSerializer.cs | 14 ++++++++ .../FileSystem/Parameters/CopyToLocal.cs | 29 +++++++++++++++++ 7 files changed, 116 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs index 8e33c82..672df9a 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs @@ -146,6 +146,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client .AddDriverConfiguration(imruDriverConfiguration) .AddGlobalAssemblyForType(typeof(IMRUDriver<TMapInput, TMapOutput, TResult>)) .SetJobIdentifier(jobDefinition.JobName) + .SetDriverMemory(5000) .Build(); _jobSubmissionResult = _reefClient.SubmitAndGetJobStatus(imruJobSubmission); http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs index 10f5cbc..a1e5f84 100644 --- a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs +++ b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs @@ -23,6 +23,7 @@ using Org.Apache.REEF.IO.FileSystem; using Org.Apache.REEF.IO.FileSystem.Hadoop; using Org.Apache.REEF.IO.PartitionedData; using Org.Apache.REEF.IO.PartitionedData.FileSystem; +using Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Implementations.Tang; @@ -45,6 +46,7 @@ namespace Org.Apache.REEF.IO.TestClient var serializerConf = TangFactory.GetTang().NewConfigurationBuilder() .BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>(GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class, GenericType<ByteSerializer>.Class) + .BindNamedParam<CopyToLocal, bool>("true") .Build(); var serializerConfString = (new AvroConfigurationSerializer()).ToString(serializerConf); @@ -147,10 +149,26 @@ namespace Org.Apache.REEF.IO.TestClient /// </summary> /// <param name="fileFolder"></param> /// <returns></returns> + [Obsolete("Remove after 0.14")] public IEnumerable<byte> Deserialize(string fileFolder) { + var files = new HashSet<string>(); foreach (var f in Directory.GetFiles(fileFolder)) { + files.Add(f); + } + return Deserialize(files); + } + + /// <summary> + /// Read bytes from all the files in the set and return one by one + /// </summary> + /// <param name="filePaths"></param> + /// <returns></returns> + public IEnumerable<byte> Deserialize(ISet<string> filePaths) + { + foreach (var f in filePaths) + { using (FileStream stream = File.Open(f, FileMode.Open)) { BinaryReader reader = new BinaryReader(stream); http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs index 1666777..5d829c3 100644 --- a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs +++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs @@ -287,6 +287,7 @@ namespace Org.Apache.REEF.IO.Tests .BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>( GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class, GenericType<ByteSerializer>.Class) + .BindNamedParam<CopyToLocal, bool>("true") .Build(); return (new AvroConfigurationSerializer()).ToString(serializerConf); } @@ -297,6 +298,7 @@ namespace Org.Apache.REEF.IO.Tests .BindImplementation<IFileDeSerializer<IEnumerable<Row>>, RowSerializer>( GenericType<IFileDeSerializer<IEnumerable<Row>>>.Class, GenericType<RowSerializer>.Class) + .BindNamedParam<CopyToLocal, bool>("true") .Build(); return (new AvroConfigurationSerializer()).ToString(serializerConf); } @@ -314,10 +316,26 @@ namespace Org.Apache.REEF.IO.Tests /// </summary> /// <param name="fileFolder"></param> /// <returns></returns> + [Obsolete("Remove after 0.14")] public IEnumerable<byte> Deserialize(string fileFolder) { + var files = new HashSet<string>(); foreach (var f in Directory.GetFiles(fileFolder)) { + files.Add(f); + } + return Deserialize(files); + } + + /// <summary> + /// Enumerate all the files in the set and return each byte read + /// </summary> + /// <param name="filePaths"></param> + /// <returns></returns> + public IEnumerable<byte> Deserialize(ISet<string> filePaths) + { + foreach (var f in filePaths) + { using (FileStream stream = File.Open(f, FileMode.Open)) { BinaryReader reader = new BinaryReader(stream); @@ -357,10 +375,26 @@ namespace Org.Apache.REEF.IO.Tests /// </summary> /// <param name="fileFolder"></param> /// <returns></returns> + [Obsolete("Remove after 0.14")] public IEnumerable<Row> Deserialize(string fileFolder) { + ISet<string> files = new HashSet<string>(); foreach (var f in Directory.GetFiles(fileFolder)) { + files.Add(f); + } + return Deserialize(files); + } + + /// <summary> + /// read all the files in the set and return byte read one by one + /// </summary> + /// <param name="filePaths"></param> + /// <returns></returns> + public IEnumerable<Row> Deserialize(ISet<string> filePaths) + { + foreach (var f in filePaths) + { using (FileStream stream = File.Open(f, FileMode.Open)) { BinaryReader reader = new BinaryReader(stream); http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj index 016cb7c..fc321b9 100644 --- a/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj +++ b/lang/cs/Org.Apache.REEF.IO/Org.Apache.REEF.IO.csproj @@ -100,6 +100,7 @@ under the License. <Compile Include="FileSystem\Local\LocalFileSystemConfiguration.cs" /> <Compile Include="PartitionedData\FileSystem\FileSystemInputPartition.cs" /> <Compile Include="PartitionedData\FileSystem\FileInputPartitionDescriptor.cs" /> + <Compile Include="PartitionedData\FileSystem\Parameters\CopyToLocal.cs" /> <Compile Include="PartitionedData\FileSystem\Parameters\FileDeSerializerConfigString.cs" /> <Compile Include="PartitionedData\FileSystem\IFileDeSerializer.cs" /> <Compile Include="PartitionedData\FileSystem\FileSystemPartitionInputDataSet.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs index aba657a..cccc2be 100644 --- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs @@ -38,13 +38,15 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem private readonly IFileDeSerializer<T> _fileSerializer; private readonly ISet<string> _filePaths; private bool _isInitialized; + private readonly bool _copyToLocal; private readonly object _lock = new object(); - private string _localFileFolder; private readonly ITempFileCreator _tempFileCreator; + private readonly ISet<string> _localFiles = new HashSet<string>(); [Inject] private FileSystemInputPartition([Parameter(typeof(PartitionId))] string id, [Parameter(typeof(FilePathsInInputPartition))] ISet<string> filePaths, + [Parameter(typeof(CopyToLocal))] bool copyToLocal, IFileSystem fileSystem, ITempFileCreator tempFileCreator, IFileDeSerializer<T> fileSerializer) @@ -55,6 +57,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem _filePaths = filePaths; _tempFileCreator = tempFileCreator; _isInitialized = false; + _copyToLocal = copyToLocal; } public string Id @@ -82,17 +85,22 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem /// <returns></returns> public T GetPartitionHandle() { - if (!_isInitialized) + if (_copyToLocal) { - Initialize(); + if (!_isInitialized) + { + Initialize(); + } + + return _fileSerializer.Deserialize(_localFiles); } - return _fileSerializer.Deserialize(_localFileFolder); + return _fileSerializer.Deserialize(_filePaths); } private void CopyFromRemote() { - _localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-"); - Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", _localFileFolder)); + string localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-"); + Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", localFileFolder)); foreach (var sourceFilePath in _filePaths) { @@ -104,7 +112,9 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem "Remote File {0} does not exists.", sourceUri)); } - var localFilePath = _localFileFolder + "\\" + Guid.NewGuid().ToString("N").Substring(0, 8); + var localFilePath = localFileFolder + "\\" + Guid.NewGuid().ToString("N").Substring(0, 8); + _localFiles.Add(localFilePath); + Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "LocalFilePath {0}: ", localFilePath)); if (File.Exists(localFilePath)) { @@ -134,14 +144,12 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem /// </summary> public void Dispose() { - if (_localFileFolder != null) + if (_localFiles.Count > 0) { - foreach (var fileName in Directory.GetFiles(_localFileFolder)) + foreach (var fileName in _localFiles) { File.Delete(fileName); } - Directory.Delete(_localFileFolder); - _localFileFolder = null; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs index 6759937..988b11d 100644 --- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/IFileDeSerializer.cs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +using System; +using System.Collections.Generic; + namespace Org.Apache.REEF.IO.PartitionedData.FileSystem { /// <summary> @@ -30,6 +33,17 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem /// </summary> /// <param name="fileFolder"></param> /// <returns></returns> + [Obsolete("Remove after 0.14")] T Deserialize(string fileFolder); + + /// <summary> + /// The input is a set of file paths. + /// The output is of type T which is defined by the client. + /// If there is any IO error, IOException could be thrown. + /// </summary> + /// <param name="filePaths"></param> + /// <param name="local"></param> + /// <returns></returns> + T Deserialize(ISet<string> filePaths); } } http://git-wip-us.apache.org/repos/asf/reef/blob/dc78d381/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs new file mode 100644 index 0000000..5a2da3e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/Parameters/CopyToLocal.cs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters +{ + /// <summary> + /// The flag that specifies if the remote files need to be copied to local or not + /// </summary> + [NamedParameter(Documentation = "Specify if the remote files need to be copied to local", ShortName = "copyToLocal", DefaultValue = "true")] + internal sealed class CopyToLocal : Name<bool> + { + } +} \ No newline at end of file
