Repository: reef
Updated Branches:
  refs/heads/master b45c29c5e -> 4eadc7f03


[REEF-1219]  Adding CopyToLocal in FileSystemInputPartitionConfiguration

Add CopyToLocal  to FileSystemInputPartition Configuration module builder
Update FileSystemPartitionInputDataSet to take it as a named parameter and pass 
it to the FileInputPartitionDescriptor
Add it to GetPartitionConfiguration() in FileInputPartitionDescriptor

JIRA: [REEF-1219](https://issues.apache.org/jira/browse/REEF-1219)

This closes #856


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4eadc7f0
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4eadc7f0
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4eadc7f0

Branch: refs/heads/master
Commit: 4eadc7f039cafd365013fb2ca7792495e6cc3154
Parents: b45c29c
Author: Julia Wang <[email protected]>
Authored: Tue Feb 23 16:49:42 2016 -0800
Committer: Andrew Chung <[email protected]>
Committed: Wed Feb 24 15:41:26 2016 -0800

----------------------------------------------------------------------
 .../HadoopFileInputPartitionTest.cs                            | 2 +-
 .../Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs  | 5 +++--
 .../PartitionedData/FileSystem/FileInputPartitionDescriptor.cs | 5 ++++-
 .../FileSystem/FileSystemInputPartitionConfiguration.cs        | 6 ++++++
 .../FileSystem/FileSystemPartitionInputDataSet.cs              | 3 ++-
 5 files changed, 16 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs 
b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
index a1e5f84..1633325 100644
--- a/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
+++ b/lang/cs/Org.Apache.REEF.IO.TestClient/HadoopFileInputPartitionTest.cs
@@ -46,7 +46,6 @@ namespace Org.Apache.REEF.IO.TestClient
             var serializerConf = 
TangFactory.GetTang().NewConfigurationBuilder()
                 .BindImplementation<IFileDeSerializer<IEnumerable<byte>>, 
ByteSerializer>(GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class,
                     GenericType<ByteSerializer>.Class)
-                .BindNamedParam<CopyToLocal, bool>("true")
                 .Build();
             var serializerConfString = (new 
AvroConfigurationSerializer()).ToString(serializerConf);
 
@@ -55,6 +54,7 @@ namespace Org.Apache.REEF.IO.TestClient
                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions,
 remoteFilePath1)
                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions,
 remoteFilePath2)
                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FileSerializerConfig,
 serializerConfString)
+                    
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.CopyToLocal, 
"true")
                 .Build(),
                   HadoopFileSystemConfiguration.ConfigurationModule.Build())
                 .GetInstance<IPartitionedInputDataSet>();

http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs 
b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
index 5d829c3..5cbb983 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestFilePartitionInputDataSet.cs
@@ -52,6 +52,7 @@ namespace Org.Apache.REEF.IO.Tests
             var dataSet = TangFactory.GetTang()
                 
.NewInjector(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.ConfigurationModule
                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions,
 filePaths)
+                    
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.CopyToLocal, 
"true")
                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FileSerializerConfig,
                         GetByteSerializerConfigString())
                     .Build())
@@ -76,6 +77,7 @@ namespace Org.Apache.REEF.IO.Tests
                         sourceFilePath1 + ";" + sourceFilePath2)
                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FileSerializerConfig,
                         GetByteSerializerConfigString())
+                    
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.CopyToLocal, 
"true")
                     .Build())
                 .GetInstance<IPartitionedInputDataSet>();
 
@@ -195,6 +197,7 @@ namespace Org.Apache.REEF.IO.Tests
                 
.NewInjector(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.ConfigurationModule
                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FilePathForPartitions,
 sourceFilePath1)
                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FilePathForPartitions,
 sourceFilePath2)
+                    
.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.CopyToLocal, 
"true")
                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FileSerializerConfig,
 GetRowSerializerConfigString())
                     .Build())
                 .GetInstance<IPartitionedInputDataSet>();
@@ -287,7 +290,6 @@ namespace Org.Apache.REEF.IO.Tests
                 .BindImplementation<IFileDeSerializer<IEnumerable<byte>>, 
ByteSerializer>(
                     GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class,
                     GenericType<ByteSerializer>.Class)
-                .BindNamedParam<CopyToLocal, bool>("true")
                 .Build();
             return (new 
AvroConfigurationSerializer()).ToString(serializerConf);
         }
@@ -298,7 +300,6 @@ namespace Org.Apache.REEF.IO.Tests
                 .BindImplementation<IFileDeSerializer<IEnumerable<Row>>, 
RowSerializer>(
                     GenericType<IFileDeSerializer<IEnumerable<Row>>>.Class,
                     GenericType<RowSerializer>.Class)
-                .BindNamedParam<CopyToLocal, bool>("true")
                 .Build();
             return (new 
AvroConfigurationSerializer()).ToString(serializerConf);
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs
 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs
index a842e25..53afbf1 100644
--- 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs
+++ 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileInputPartitionDescriptor.cs
@@ -31,11 +31,13 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         private readonly string _id;
         private readonly IList<string> _filePaths;
         private readonly IConfiguration _filePartitionDeserializerConfig;
+        private readonly bool _copyToLocal;
 
-        internal FileInputPartitionDescriptor(string id, IList<string> 
filePaths, IConfiguration filePartitionDeserializerConfig)
+        internal FileInputPartitionDescriptor(string id, IList<string> 
filePaths, bool copyToLocal, IConfiguration filePartitionDeserializerConfig)
         {
             _id = id;
             _filePaths = filePaths;
+            _copyToLocal = copyToLocal;
             _filePartitionDeserializerConfig = filePartitionDeserializerConfig;
         }
 
@@ -52,6 +54,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         {
             var builder = TangFactory.GetTang().NewConfigurationBuilder()
                 .BindImplementation(GenericType<IInputPartition<T>>.Class, 
GenericType<FileSystemInputPartition<T>>.Class)
+                .BindNamedParameter<CopyToLocal, 
bool>(GenericType<CopyToLocal>.Class, _copyToLocal.ToString())
                 .BindStringNamedParam<PartitionId>(_id);
 
             foreach (string p in _filePaths)

http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs
 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs
index cddbe41..ec40902 100644
--- 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs
+++ 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartitionConfiguration.cs
@@ -39,6 +39,11 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         public static readonly RequiredParameter<string> FileSerializerConfig 
= new RequiredParameter<string>();
 
         /// <summary>
+        /// This specify if the file needs to be copied to local in 
FilePathsForInputPartitions 
+        /// </summary>
+        public static readonly OptionalParameter<bool> CopyToLocal = new 
OptionalParameter<bool>();
+
+        /// <summary>
         /// This configuration module set FileSystemDataSet as 
IPartitionedDataSet.
         /// It also set required parameters for injecting FileSystemDataSet
         /// </summary>
@@ -46,6 +51,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
             .BindImplementation(GenericType<IPartitionedInputDataSet>.Class, 
GenericType<FileSystemPartitionInputDataSet<T>>.Class)
             .BindSetEntry(GenericType<FilePathsForInputPartitions>.Class, 
FilePathForPartitions)
             
.BindNamedParameter(GenericType<FileDeSerializerConfigString>.Class, 
FileSerializerConfig)
+            .BindNamedParameter(GenericType<CopyToLocal>.Class, CopyToLocal)
             .Build();
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/4eadc7f0/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs
 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs
index 4c68ce2..a720b6e 100644
--- 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs
+++ 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemPartitionInputDataSet.cs
@@ -51,6 +51,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         private FileSystemPartitionInputDataSet(
             [Parameter(typeof(FilePathsForInputPartitions))] ISet<string> 
filePaths,
             IFileSystem fileSystem,
+            [Parameter(typeof(CopyToLocal))] bool copyToLocal,
             [Parameter(typeof(FileDeSerializerConfigString))] string 
fileSerializerConfigString,
             AvroConfigurationSerializer avroConfigurationSerializer)
         {
@@ -68,7 +69,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
                 var paths = path.Split(new string[] { StringSeparators }, 
StringSplitOptions.None);
                
                 var id = "FilePartition-" + i++;
-                _partitions[id] = new FileInputPartitionDescriptor<T>(id, 
paths.ToList(), fileSerializerConfig); 
+                _partitions[id] = new FileInputPartitionDescriptor<T>(id, 
paths.ToList(), copyToLocal, fileSerializerConfig); 
             }
         }
 

Reply via email to