Repository: reef
Updated Branches:
  refs/heads/master 58090fec0 -> d01c0c46f


[REEF-1405] Cache the data in IMRU Context layer

* Download data file in DataLoadingContext when context starts
* Add test cases to inject FilePartitionDataSet to IMRUDriver.
* Verify data file has been downloaded in mapper function in test

JIRA:
  [REEF-1405](https://issues.apache.org/jira/browse/REEF-1405)

Pull request:
  This closes #1084


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d01c0c46
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d01c0c46
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d01c0c46

Branch: refs/heads/master
Commit: d01c0c46f335bf85446a99243233907ae7d9fad8
Parents: 58090fe
Author: Julia Wang <[email protected]>
Authored: Wed Aug 3 11:38:13 2016 -0700
Committer: Mariia Mykhailova <[email protected]>
Committed: Tue Aug 9 13:15:58 2016 -0700

----------------------------------------------------------------------
 .../OnREEF/Driver/DataLoadingContext.cs         |   8 +-
 .../FileSystem/FileSystemInputPartition.cs      |  44 ++-
 .../Properties/AssemblyInfo.cs                  |   5 +
 ...oadcastReduceWithFilePartitionDataSetTest.cs | 388 +++++++++++++++++++
 .../IMRU/IMRUBroadcastReduceWithLocalFile.cs    |  70 ++++
 .../IMRU/IMRUBrodcastReduceTestBase.cs          |   3 +-
 .../Functional/ReefFunctionalTest.cs            |   4 +-
 .../Org.Apache.REEF.Tests.csproj                |   2 +
 8 files changed, 498 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs
index 5ff36de..2ddf63a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
 using Org.Apache.REEF.Common.Events;
 using Org.Apache.REEF.IO.PartitionedData;
 using Org.Apache.REEF.Tang.Annotations;
@@ -43,12 +44,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Specifies what to do when context starts.
+        /// Download data files when context starts.
         /// </summary>
         /// <param name="value">context start token</param>
-        /// TODO[REEF-1339] - AddCache() function of IInputPartition will be 
called here.
         public void OnNext(IContextStart value)
         {
+            _partition.Cache();
+            Logger.Log(Level.Info, "Returned from IInputPartition.Cache().");
         }
 
         /// <summary>
@@ -58,7 +60,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <param name="error">Exception</param>
         public void OnError(Exception error)
         {
-            Exceptions.Throw(error, "Error occured in Data Loading context 
start", Logger);
+            Exceptions.Throw(error, "Error occurred in Data Loading context 
start", Logger);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
index 0d17467..dc2809b 100644
--- 
a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
+++ 
b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs
@@ -43,6 +43,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         private readonly ITempFileCreator _tempFileCreator;
         private readonly ISet<string> _remoteFilePaths;
         private readonly bool _copyToLocal;
+        private Optional<T> _data = Optional<T>.Empty();
 
         private Optional<ISet<string>> _localFiles;
         
@@ -69,15 +70,29 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
         }
 
         /// <summary>
-        /// Caches from the remote File System to a local disk.
+        /// This method copies remote files to local if CopyToLocal is 
enabled, and then deserializes the files.
+        /// Otherwise, this method assumes that the files are remote, and the 
injected IFileDeSerializer
+        /// can handle the remote file system access.
+        /// It caches the data reference returned from 
IFileDeSerializer.Deserialize() method. 
         /// </summary>
         public void Cache()
         {
             lock (_lock)
             {
-                if (!_localFiles.IsPresent())
+                if (_copyToLocal)
+                {
+                    if (!_localFiles.IsPresent())
+                    {
+                        _localFiles = Optional<ISet<string>>.Of(Download());
+                    }
+
+                    // For now, assume IFileDeSerializer is local.
+                    _data = 
Optional<T>.Of(_fileSerializer.Deserialize(_localFiles.Value));
+                }
+                else
                 {
-                    _localFiles = Optional<ISet<string>>.Of(Download());
+                    // For now, assume IFileDeSerializer is remote.
+                    _data = 
Optional<T>.Of(_fileSerializer.Deserialize(_remoteFilePaths));
                 }
             }
         }
@@ -91,7 +106,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
             {
                 var set = new HashSet<string>();
                 var localFileFolder = 
_tempFileCreator.CreateTempDirectory("-partition-");
-                Logger.Log(Level.Info, 
string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", 
localFileFolder));
+                Logger.Log(Level.Info, "Local file temp folder: {0}", 
localFileFolder);
 
                 foreach (var sourceFilePath in _remoteFilePaths)
                 {
@@ -111,35 +126,24 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem
                     _fileSystem.CopyToLocal(sourceUri, localFilePath);
                 }
 
+                Logger.Log(Level.Info, "File downloading is completed");
                 return set;
             }
         }
 
         /// <summary>
-        /// This method copies remote files to local if CopyToLocal is 
enabled, and then deserializes the files.
-        /// Otherwise, this method assumes that the files are remote, and that 
the injected IFileDeSerializer
-        /// can handle the remote file system access.
-        /// It returns the IEnumerble of T, the details is defined in the 
Deserialize() method 
-        /// provided by the Serializer
+        /// If the data is not present, call cache() to get it. Then returns 
the data. 
         /// </summary>
         /// <returns></returns>
         public T GetPartitionHandle()
         {
             lock (_lock)
             {
-                if (_copyToLocal)
+                if (!_data.IsPresent())
                 {
-                    if (!_localFiles.IsPresent())
-                    {
-                        Cache();
-                    }
-
-                    // For now, assume IFileDeSerializer is local.
-                    return _fileSerializer.Deserialize(_localFiles.Value);
+                    Cache();
                 }
-
-                // For now, assume IFileDeSerializer is remote.
-                return _fileSerializer.Deserialize(_remoteFilePaths);
+                return _data.Value;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs 
b/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs
index 2c33732..44a2316 100644
--- a/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs
+++ b/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs
@@ -48,6 +48,11 @@ using System.Runtime.InteropServices;
  
"9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc"
 +
  
"b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17"
 +
  
"618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]
+[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" +
+ 
"00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9"
 +
+ 
"9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc"
 +
+ 
"b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17"
 +
+ 
"618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]
 
 // Allow NSubstitute to create proxy implementations
 [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, 
PublicKey=002400000480000" +

http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
new file mode 100644
index 0000000..e58e236
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text.RegularExpressions;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IO.PartitionedData;
+using Org.Apache.REEF.IO.PartitionedData.FileSystem;
+using Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters;
+using Org.Apache.REEF.IO.TempFileCreation;
+using 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+using TraceLevel = System.Diagnostics.TraceLevel;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    /// <summary>
+    /// This class tests DataLoadingContext for IMRU with FilePartitionDataSet
+    /// </summary>
+    [Collection("FunctionalTests")]
+    public class IMRUBroadcastReduceWithFilePartitionDataSetTest : 
IMRUBrodcastReduceTestBase
+    {
+        /// <summary>
+        /// Defines the number of data in input data bytes in the test
+        /// </summary>
+        protected const int DataCount = 3;
+
+        /// <summary>
+        /// This test tests DataLoadingContext with FilePartitionDataSet
+        /// The IPartitionedInputDataSet configured in this test is passed to 
IMRUDriver, and IInputPartition associated with it is injected in 
+        /// DataLoadingContext for the Evaluator. Cache method in 
IInputPartition is called when the context starts. 
+        /// Most of the validation of the test is done inside test Mapper 
function.
+        /// It will verify if the temp file downloaded exists before calling 
IInputPartition.GetPartitionHandle()
+        /// </summary>
+        [Fact]
+        public void TestWithFilePartitionDataSetOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 10;
+            int iterations = 10;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                testFolder);
+            ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder, 100);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This method overrides base class method to pass IEnumerable<Row> 
as TPartitionType
+        /// </summary>
+        protected new void TestBroadCastAndReduce(bool runOnYarn,
+            int numTasks,
+            int chunkSize,
+            int dims,
+            int iterations,
+            int mapperMemory,
+            int updateTaskMemory,
+            string testFolder = DefaultRuntimeFolder)
+        {
+            string runPlatform = runOnYarn ? "yarn" : "local";
+            TestRun(DriverConfiguration<int[], int[], int[], IEnumerable<Row>>(
+                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory),
+                DriverEventHandlerConfigurations<int[], int[], int[], 
IEnumerable<Row>>()),
+                typeof(BroadcastReduceDriver),
+                numTasks,
+                "BroadcastReduceDriver",
+                runPlatform,
+                testFolder);
+        }    
+
+        /// <summary>
+        /// This method defines event handlers for driver. As default, it uses 
all the handlers defined in IMRUDriver.
+        /// </summary>
+        protected override IConfiguration 
DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, 
TPartitionType>()
+        {
+            return REEF.Driver.DriverConfiguration.ConfigurationModule
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, 
TraceLevel.Info.ToString())
+                .Build();
+        }
+
+        /// <summary>
+        /// This test case tests PartitionedDataset Configuration built in 
this class for IPartitionedInputDataSet and IInputPartition
+        /// </summary>
+        [Fact]
+        public void TestBuildPartitionedDatasetConfiguration()
+        {
+            PartitionDatasetConfig();
+        }
+
+        protected void PartitionDatasetConfig()
+        {
+            int count = 0;
+            int numberOfTasks = 4;
+
+            var dataSet = 
TangFactory.GetTang().NewInjector(BuildPartitionedDatasetConfiguration(numberOfTasks))
+                .GetInstance<IPartitionedInputDataSet>();
+
+            foreach (var partitionDescriptor in dataSet)
+            {
+                var partition =
+                    TangFactory.GetTang()
+                        
.NewInjector(partitionDescriptor.GetPartitionConfiguration())
+                        .GetInstance<IInputPartition<IEnumerable<Row>>>();
+
+                partition.Cache();
+
+                using (partition as IDisposable)
+                {
+                    IEnumerable<Row> e = partition.GetPartitionHandle();
+
+                    foreach (var row in e)
+                    {
+                        Logger.Log(Level.Info, "Data read {0}: ", 
row.GetValue());
+                        count++;
+                    }
+                }
+            }
+
+            Assert.Equal(count, numberOfTasks * DataCount);
+        }
+
+        /// <summary>
+        /// Mapper function configuration.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            return IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction<IEnumerable<Row>>>.Class)
+                .Build();
+        }
+
+        /// <summary>
+        /// Test Mapper function. It will access IInputPartition.
+        /// It verifies that the data file has been downloaded to temp folder
+        /// It then iterates the data and verifies the data count. 
+        /// </summary>
+        /// <typeparam name="T"></typeparam>
+        internal sealed class TestSenderMapFunction<T> : IMapFunction<int[], 
int[]>
+        {
+            private int _iterations;
+
+            [Inject]
+            private TestSenderMapFunction(
+                [Parameter(typeof(TaskConfigurationOptions.Identifier))] 
string taskId,
+                IInputPartition<T> partition,
+                [Parameter(typeof(CopyToLocal))] bool copyToLocal)
+            {
+                var tempFileDir = 
TangFactory.GetTang().NewInjector().GetNamedInstance<TempFileFolder, string>();
+                var tmpFileFodler = Directory.GetCurrentDirectory() + 
tempFileDir.Substring(1, tempFileDir.Length - 1);
+                Assert.True(Directory.Exists(tmpFileFodler));
+                
+                var directories = 
Directory.EnumerateDirectories(tmpFileFodler);
+
+                if (copyToLocal)
+                {
+                    Assert.Equal(1, directories.Count());
+
+                    var directory = directories.FirstOrDefault();
+                    Assert.True(directory.Contains("-partition-"));
+
+                    var files = Directory.EnumerateFiles(directory);
+                    Assert.Equal(1, files.Count());
+                    var file = files.FirstOrDefault();
+                    var a = file.Split('\\');
+                    var fileName = a[a.Length - 1];
+                    Assert.Equal(8, fileName.Length);
+
+                    var matchCounter = Regex.Matches(fileName, 
@"[a-zA-Z0-9]").Count;
+                    Assert.Equal(8, matchCounter);
+                }
+                else
+                {
+                    Assert.Equal(0, directories.Count());
+                }
+
+                int count = 0;
+                var e = (IEnumerable<Row>)partition.GetPartitionHandle();
+
+                foreach (var row in e)
+                {
+                    Logger.Log(Level.Info, "Data read {0}: ", row.GetValue());
+                    count++;
+                }
+
+                Logger.Log(Level.Info, "TestSenderMapFunction: TaskId: {0}, 
count {1}", taskId.Length, count);
+                Assert.Equal(count, DataCount);
+            }
+
+            /// <summary>
+            /// Simple test Map function
+            /// </summary>
+            /// <param name="mapInput">integer array</param>
+            /// <returns>The same integer array</returns>
+            int[] IMapFunction<int[], int[]>.Map(int[] mapInput)
+            {
+                _iterations++;
+
+                Logger.Log(Level.Info, "Received value {0} in iteration {1}.", 
mapInput[0], _iterations);
+
+                if (mapInput[0] != _iterations)
+                {
+                    Exceptions.Throw(new Exception("Expected value in mappers 
different from actual value"), Logger);
+                }
+
+                return mapInput;
+            }
+        }
+
+        /// <summary>
+        /// This method builds partition dataset configuration. It simulate 
copy to local from local files and test RowSerializer
+        /// </summary>
+        /// <param name="numberofMappers"></param>
+        /// <returns></returns>
+        protected override IConfiguration 
BuildPartitionedDatasetConfiguration(int numberofMappers)
+        {
+            var cm = CreateDatasetConfigurationModule(numberofMappers);
+
+            return 
cm.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.CopyToLocal, 
"true")
+                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FileSerializerConfig,
 GetRowSerializerConfigString())
+                     .Build();
+        }
+
+        protected static ConfigurationModule 
CreateDatasetConfigurationModule(int numberofMappers)
+        {
+            const string tempFileName = "REEF.TestLocalFileSystem.tmp";
+            string sourceFilePath = Path.Combine(Path.GetTempPath(), 
tempFileName);
+
+            var cm = 
FileSystemInputPartitionConfiguration<IEnumerable<Row>>.ConfigurationModule;
+
+            for (var i = 0; i < numberofMappers; i++)
+            {
+                MakeLocalTestFile(sourceFilePath + i, 
CreateTestData(DataCount));
+                cm = 
cm.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FilePathForPartitions,
 sourceFilePath + i);
+            }
+            return cm;
+        }
+
+        /// <summary>
+        /// Create test data as bytes
+        /// </summary>
+        /// <param name="dataNumber"></param>
+        /// <returns></returns>
+        protected static byte[] CreateTestData(int dataNumber)
+        {
+            var bytes = new byte[dataNumber];
+            for (int i = 0; i < dataNumber; i++)
+            {
+                bytes[i] = (byte)(i + 'a');
+            }
+            return bytes;
+        }
+
+        /// <summary>
+        /// Create a test data file as input file
+        /// </summary>
+        /// <param name="filePath"></param>
+        /// <param name="bytes"></param>
+        protected static void MakeLocalTestFile(string filePath, byte[] bytes)
+        {
+            if (File.Exists(filePath))
+            {
+                File.Delete(filePath);
+            }
+
+            using (var s = File.Create(filePath))
+            {
+                foreach (var b in bytes)
+                {
+                    s.WriteByte(b);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Test DeSerializer
+        /// </summary>
+        protected class RowSerializer : IFileDeSerializer<IEnumerable<Row>>
+        {
+            [Inject]
+            private RowSerializer()
+            {
+            }
+
+            /// <summary>
+            /// read all the files in the set and return byte read one by one
+            /// </summary>
+            /// <param name="filePaths"></param>
+            /// <returns></returns>
+            public IEnumerable<Row> Deserialize(ISet<string> filePaths)
+            {
+                foreach (var f in filePaths)
+                {
+                    using (FileStream stream = File.Open(f, FileMode.Open))
+                    {
+                        BinaryReader reader = new BinaryReader(stream);
+                        while (reader.PeekChar() != -1)
+                        {
+                            yield return new Row(reader.ReadByte());
+                        }
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Build RowSerialzer configuration and serialize it as string so 
that to pass it to driver
+        /// </summary>
+        /// <returns></returns>
+        protected static string GetRowSerializerConfigString()
+        {
+            var serializerConf = 
TangFactory.GetTang().NewConfigurationBuilder()
+                .BindImplementation(
+                    GenericType<IFileDeSerializer<IEnumerable<Row>>>.Class,
+                    GenericType<RowSerializer>.Class)
+                .Build();
+            return (new 
AvroConfigurationSerializer()).ToString(serializerConf);
+        }
+
+        /// <summary>
+        /// Test Row class
+        /// </summary>
+        protected class Row
+        {
+            private readonly byte _b;
+
+            public Row(byte b)
+            {
+                _b = b;
+            }
+
+            public byte GetValue()
+            {
+                return _b;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
new file mode 100644
index 0000000..aa287b1
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using Org.Apache.REEF.IO.PartitionedData.FileSystem;
+using Org.Apache.REEF.Tang.Interface;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class IMRUBroadcastReduceWithLocalFile : 
IMRUBroadcastReduceWithFilePartitionDataSetTest
+    {
+        /// <summary>
+        /// This test tests DataLoadingContext with FilePartitionDataSet
+        /// The IPartitionedInputDataSet configured in this test is passed to 
IMRUDriver, and IInputPartition associated with it is injected in 
+        /// DataLoadingContext for the Evaluator. Cache method in 
IInputPartition is called when the context starts. 
+        /// The local file is used in the test, so CopyToLocal will be set to 
false. The validation will be inside test Mapper function.
+        /// </summary>
+        [Fact]
+        public void TestWithFilePartitionDataSetForLocalFile()
+        {
+            int chunkSize = 2;
+            int dims = 10;
+            int iterations = 10;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                testFolder);
+            ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder, 100);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This method builds partition dataset configuration. It uses local 
file and test RowSerializer.
+        /// </summary>
+        /// <param name="numberofMappers"></param>
+        /// <returns></returns>
+        protected override IConfiguration 
BuildPartitionedDatasetConfiguration(int numberofMappers)
+        {
+            var cm = CreateDatasetConfigurationModule(numberofMappers);
+
+            return 
cm.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.CopyToLocal, 
"false")
+                     
.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FileSerializerConfig,
 GetRowSerializerConfigString())
+                     .Build();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
index 16d927b..2f4b109 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Collections.Generic;
 using System.Globalization;
 using System.IO;
 using System.Linq;
@@ -101,7 +102,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <param name="jobDefinition"></param>
         /// <param name="driverHandlerConfig"></param>
         /// <returns></returns>
-        private IConfiguration DriverConfiguration<TMapInput, TMapOutput, 
TResult, TPartitionType>(
+        protected IConfiguration DriverConfiguration<TMapInput, TMapOutput, 
TResult, TPartitionType>(
             IMRUJobDefinition jobDefinition,
             IConfiguration driverHandlerConfig)
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
index 4923940..6cd848f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
@@ -155,12 +155,12 @@ namespace Org.Apache.REEF.Tests.Functional
             CleanUp();
         }
 
-        protected void ValidateSuccessForLocalRuntime(int 
numberOfContextsToClose, int numberOfTasksToFail = 0, int 
numberOfEvaluatorsToFail = 0, string testFolder = DefaultRuntimeFolder)
+        protected void ValidateSuccessForLocalRuntime(int 
numberOfContextsToClose, int numberOfTasksToFail = 0, int 
numberOfEvaluatorsToFail = 0, string testFolder = DefaultRuntimeFolder, int 
retryCount = 60)
         {
             const string successIndication = "EXIT: 
ActiveContextClr2Java::Close";
             const string failedTaskIndication = 
"Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedTaskHandlerOnNext";
             const string failedEvaluatorIndication = 
"Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedEvaluatorHandlerOnNext";
-            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 
retryCount);
 
             Logger.Log(Level.Verbose, "Lines read from log file : " + 
lines.Count());
             string[] successIndicators = lines.Where(s => 
s.Contains(successIndication)).ToArray();

http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 64e6c89..8856fca 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -118,6 +118,8 @@ under the License.
     <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" />
     <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" />
     <Compile Include="Functional\IMRU\IMRUBroadcastReduceTest.cs" />
+    <Compile 
Include="Functional\IMRU\IMRUBroadcastReduceWithFilePartitionDataSetTest.cs" />
+    <Compile Include="Functional\IMRU\IMRUBroadcastReduceWithLocalFile.cs" />
     <Compile 
Include="Functional\IMRU\IMRUBrodcastReduceWithoutIMRUClientTest.cs" />
     <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" />
     <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" />

Reply via email to