Repository: reef Updated Branches: refs/heads/master b45c29c5e -> 4eadc7f03
[REEF-1219] Adding CopyToLocal in FileSystemInputPartitionConfiguration Add CopyToLocal to FileSystemInputPartition Configuration module builder Update FileSystemPartitionInputDataSet to take it as a named parameter and pass it to the FileInputPartitionDescriptor Add it to GetPartitionConfiguration() in FileInputPartitionDescriptor JIRA: [REEF-1219](https://issues.apache.org/jira/browse/REEF-1219) This closes #856 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4eadc7f0 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4eadc7f0 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4eadc7f0 Branch: refs/heads/master Commit: 4eadc7f039cafd365013fb2ca7792495e6cc3154 Parents: b45c29c Author: Julia Wang <[email protected]> Authored: Tue Feb 23 16:49:42 2016 -0800 Committer: Andrew Chung <[email protected]> Committed: Wed Feb 24 15:41:26 2016 -0800 ---------------------------------------------------------------------- .../HadoopFileInputPartitionTest.cs | 2 +- .../Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs | 5 +++-- .../PartitionedData/FileSystem/FileInputPartitionDescriptor.cs | 5 ++++- .../FileSystem/FileSystemInputPartitionConfiguration.cs | 6 ++++++ .../FileSystem/FileSystemPartitionInputDataSet.cs | 3 ++- 5 files changed, 16 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs index a1e5f84..1633325 100644 --- a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs +++ b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs @@ -46,7 +46,6 @@ namespace Org.Apache.REEF.IO.TestClient var serializerConf = TangFactory.GetTang().NewConfigurationBuilder() .BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>(GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class, GenericType<ByteSerializer>.Class) - .BindNamedParam<CopyToLocal, bool>("true") .Build(); var serializerConfString = (new AvroConfigurationSerializer()).ToString(serializerConf); @@ -55,6 +54,7 @@ namespace Org.Apache.REEF.IO.TestClient .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions, remoteFilePath1) .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions, remoteFilePath2) .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FileSerializerConfig, serializerConfString) + .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.CopyToLocal, "true") .Build(), HadoopFileSystemConfiguration.ConfigurationModule.Build()) .GetInstance<IPartitionedInputDataSet>(); http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs index 5d829c3..5cbb983 100644 --- a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs +++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs @@ -52,6 +52,7 @@ namespace Org.Apache.REEF.IO.Tests var dataSet = TangFactory.GetTang() .NewInjector(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.ConfigurationModule .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions, filePaths) + .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.CopyToLocal, "true") .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FileSerializerConfig, GetByteSerializerConfigString()) .Build()) @@ -76,6 +77,7 @@ namespace Org.Apache.REEF.IO.Tests sourceFilePath1 + ";" + sourceFilePath2) .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FileSerializerConfig, GetByteSerializerConfigString()) + .Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.CopyToLocal, "true") .Build()) .GetInstance<IPartitionedInputDataSet>(); @@ -195,6 +197,7 @@ namespace Org.Apache.REEF.IO.Tests .NewInjector(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.ConfigurationModule .Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FilePathForPartitions, sourceFilePath1) .Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FilePathForPartitions, sourceFilePath2) + .Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.CopyToLocal, "true") .Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FileSerializerConfig, GetRowSerializerConfigString()) .Build()) .GetInstance<IPartitionedInputDataSet>(); @@ -287,7 +290,6 @@ namespace Org.Apache.REEF.IO.Tests .BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>( GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class, GenericType<ByteSerializer>.Class) - .BindNamedParam<CopyToLocal, bool>("true") .Build(); return (new AvroConfigurationSerializer()).ToString(serializerConf); } @@ -298,7 +300,6 @@ namespace Org.Apache.REEF.IO.Tests .BindImplementation<IFileDeSerializer<IEnumerable<Row>>, RowSerializer>( GenericType<IFileDeSerializer<IEnumerable<Row>>>.Class, GenericType<RowSerializer>.Class) - .BindNamedParam<CopyToLocal, bool>("true") .Build(); return (new AvroConfigurationSerializer()).ToString(serializerConf); } http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs index a842e25..53afbf1 100644 --- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs @@ -31,11 +31,13 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem private readonly string _id; private readonly IList<string> _filePaths; private readonly IConfiguration _filePartitionDeserializerConfig; + private readonly bool _copyToLocal; - internal FileInputPartitionDescriptor(string id, IList<string> filePaths, IConfiguration filePartitionDeserializerConfig) + internal FileInputPartitionDescriptor(string id, IList<string> filePaths, bool copyToLocal, IConfiguration filePartitionDeserializerConfig) { _id = id; _filePaths = filePaths; + _copyToLocal = copyToLocal; _filePartitionDeserializerConfig = filePartitionDeserializerConfig; } @@ -52,6 +54,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem { var builder = TangFactory.GetTang().NewConfigurationBuilder() .BindImplementation(GenericType<IInputPartition<T>>.Class, GenericType<FileSystemInputPartition<T>>.Class) + .BindNamedParameter<CopyToLocal, bool>(GenericType<CopyToLocal>.Class, _copyToLocal.ToString()) .BindStringNamedParam<PartitionId>(_id); foreach (string p in _filePaths) http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs index cddbe41..ec40902 100644 --- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs @@ -39,6 +39,11 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem public static readonly RequiredParameter<string> FileSerializerConfig = new RequiredParameter<string>(); /// <summary> + /// This specify if the file needs to be copied to local in FilePathsForInputPartitions + /// </summary> + public static readonly OptionalParameter<bool> CopyToLocal = new OptionalParameter<bool>(); + + /// <summary> /// This configuration module set FileSystemDataSet as IPartitionedDataSet. /// It also set required parameters for injecting FileSystemDataSet /// </summary> @@ -46,6 +51,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem .BindImplementation(GenericType<IPartitionedInputDataSet>.Class, GenericType<FileSystemPartitionInputDataSet<T>>.Class) .BindSetEntry(GenericType<FilePathsForInputPartitions>.Class, FilePathForPartitions) .BindNamedParameter(GenericType<FileDeSerializerConfigString>.Class, FileSerializerConfig) + .BindNamedParameter(GenericType<CopyToLocal>.Class, CopyToLocal) .Build(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs index 4c68ce2..a720b6e 100644 --- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs @@ -51,6 +51,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem private FileSystemPartitionInputDataSet( [Parameter(typeof(FilePathsForInputPartitions))] ISet<string> filePaths, IFileSystem fileSystem, + [Parameter(typeof(CopyToLocal))] bool copyToLocal, [Parameter(typeof(FileDeSerializerConfigString))] string fileSerializerConfigString, AvroConfigurationSerializer avroConfigurationSerializer) { @@ -68,7 +69,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem var paths = path.Split(new string[] { StringSeparators }, StringSplitOptions.None); var id = "FilePartition-" + i++; - _partitions[id] = new FileInputPartitionDescriptor<T>(id, paths.ToList(), fileSerializerConfig); + _partitions[id] = new FileInputPartitionDescriptor<T>(id, paths.ToList(), copyToLocal, fileSerializerConfig); } }
