Repository: reef Updated Branches: refs/heads/master 58090fec0 -> d01c0c46f
[REEF-1405] Cache the data in IMRU Context layer * Download data file in DataLoadingContext when context starts * Add test cases to inject FilePartitionDataSet to IMRUDriver. * Verify data file has been downloaded in mapper function in test JIRA: [REEF-1405](https://issues.apache.org/jira/browse/REEF-1405) Pull request: This closes #1084 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d01c0c46 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d01c0c46 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d01c0c46 Branch: refs/heads/master Commit: d01c0c46f335bf85446a99243233907ae7d9fad8 Parents: 58090fe Author: Julia Wang <[email protected]> Authored: Wed Aug 3 11:38:13 2016 -0700 Committer: Mariia Mykhailova <[email protected]> Committed: Tue Aug 9 13:15:58 2016 -0700 ---------------------------------------------------------------------- .../OnREEF/Driver/DataLoadingContext.cs | 8 +- .../FileSystem/FileSystemInputPartition.cs | 44 ++- .../Properties/AssemblyInfo.cs | 5 + ...oadcastReduceWithFilePartitionDataSetTest.cs | 388 +++++++++++++++++++ .../IMRU/IMRUBroadcastReduceWithLocalFile.cs | 70 ++++ .../IMRU/IMRUBrodcastReduceTestBase.cs | 3 +- .../Functional/ReefFunctionalTest.cs | 4 +- .../Org.Apache.REEF.Tests.csproj | 2 + 8 files changed, 498 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs index 5ff36de..2ddf63a 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs @@ -16,6 +16,7 @@ // under the License. using System; +using System.Collections.Generic; using Org.Apache.REEF.Common.Events; using Org.Apache.REEF.IO.PartitionedData; using Org.Apache.REEF.Tang.Annotations; @@ -43,12 +44,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> - /// Specifies what to do when context starts. + /// Download data files when context starts. /// </summary> /// <param name="value">context start token</param> - /// TODO[REEF-1339] - AddCache() function of IInputPartition will be called here. public void OnNext(IContextStart value) { + _partition.Cache(); + Logger.Log(Level.Info, "Returned from IInputPartition.Cache()."); } /// <summary> @@ -58,7 +60,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <param name="error">Exception</param> public void OnError(Exception error) { - Exceptions.Throw(error, "Error occured in Data Loading context start", Logger); + Exceptions.Throw(error, "Error occurred in Data Loading context start", Logger); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs index 0d17467..dc2809b 100644 --- a/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs +++ b/lang/cs/Org.Apache.REEF.IO/PartitionedData/FileSystem/FileSystemInputPartition.cs @@ -43,6 +43,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem private readonly ITempFileCreator _tempFileCreator; private readonly ISet<string> _remoteFilePaths; private readonly bool _copyToLocal; + private Optional<T> _data = Optional<T>.Empty(); private Optional<ISet<string>> _localFiles; @@ -69,15 +70,29 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem } /// <summary> - /// Caches from the remote File System to a local disk. + /// This method copies remote files to local if CopyToLocal is enabled, and then deserializes the files. + /// Otherwise, this method assumes that the files are remote, and the injected IFileDeSerializer + /// can handle the remote file system access. + /// It caches the data reference returned from IFileDeSerializer.Deserialize() method. /// </summary> public void Cache() { lock (_lock) { - if (!_localFiles.IsPresent()) + if (_copyToLocal) + { + if (!_localFiles.IsPresent()) + { + _localFiles = Optional<ISet<string>>.Of(Download()); + } + + // For now, assume IFileDeSerializer is local. + _data = Optional<T>.Of(_fileSerializer.Deserialize(_localFiles.Value)); + } + else { - _localFiles = Optional<ISet<string>>.Of(Download()); + // For now, assume IFileDeSerializer is remote. + _data = Optional<T>.Of(_fileSerializer.Deserialize(_remoteFilePaths)); } } } @@ -91,7 +106,7 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem { var set = new HashSet<string>(); var localFileFolder = _tempFileCreator.CreateTempDirectory("-partition-"); - Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Local file temp folder: {0}", localFileFolder)); + Logger.Log(Level.Info, "Local file temp folder: {0}", localFileFolder); foreach (var sourceFilePath in _remoteFilePaths) { @@ -111,35 +126,24 @@ namespace Org.Apache.REEF.IO.PartitionedData.FileSystem _fileSystem.CopyToLocal(sourceUri, localFilePath); } + Logger.Log(Level.Info, "File downloading is completed"); return set; } } /// <summary> - /// This method copies remote files to local if CopyToLocal is enabled, and then deserializes the files. - /// Otherwise, this method assumes that the files are remote, and that the injected IFileDeSerializer - /// can handle the remote file system access. - /// It returns the IEnumerble of T, the details is defined in the Deserialize() method - /// provided by the Serializer + /// If the data is not present, call cache() to get it. Then returns the data. /// </summary> /// <returns></returns> public T GetPartitionHandle() { lock (_lock) { - if (_copyToLocal) + if (!_data.IsPresent()) { - if (!_localFiles.IsPresent()) - { - Cache(); - } - - // For now, assume IFileDeSerializer is local. - return _fileSerializer.Deserialize(_localFiles.Value); + Cache(); } - - // For now, assume IFileDeSerializer is remote. - return _fileSerializer.Deserialize(_remoteFilePaths); + return _data.Value; } } http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs index 2c33732..44a2316 100644 --- a/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs +++ b/lang/cs/Org.Apache.REEF.IO/Properties/AssemblyInfo.cs @@ -48,6 +48,11 @@ using System.Runtime.InteropServices; "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" + "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" + "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] +[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" + + "00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9" + + "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" + + "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" + + "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] // Allow NSubstitute to create proxy implementations [assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=002400000480000" + http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs new file mode 100644 index 0000000..e58e236 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs @@ -0,0 +1,388 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text.RegularExpressions; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.IO.PartitionedData; +using Org.Apache.REEF.IO.PartitionedData.FileSystem; +using Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters; +using Org.Apache.REEF.IO.TempFileCreation; +using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; +using TraceLevel = System.Diagnostics.TraceLevel; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + /// <summary> + /// This class tests DataLoadingContext for IMRU with FilePartitionDataSet + /// </summary> + [Collection("FunctionalTests")] + public class IMRUBroadcastReduceWithFilePartitionDataSetTest : IMRUBrodcastReduceTestBase + { + /// <summary> + /// Defines the number of data in input data bytes in the test + /// </summary> + protected const int DataCount = 3; + + /// <summary> + /// This test tests DataLoadingContext with FilePartitionDataSet + /// The IPartitionedInputDataSet configured in this test is passed to IMRUDriver, and IInputPartition associated with it is injected in + /// DataLoadingContext for the Evaluator. Cache method in IInputPartition is called when the context starts. + /// Most of the validation of the test is done inside test Mapper function. + /// It will verify if the temp file downloaded exists before calling IInputPartition.GetPartitionHandle() + /// </summary> + [Fact] + public void TestWithFilePartitionDataSetOnLocalRuntime() + { + int chunkSize = 2; + int dims = 10; + int iterations = 10; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 4; + string testFolder = DefaultRuntimeFolder + TestId; + TestBroadCastAndReduce(false, + numTasks, + chunkSize, + dims, + iterations, + mapperMemory, + updateTaskMemory, + testFolder); + ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder, 100); + CleanUp(testFolder); + } + + /// <summary> + /// This method overrides base class method to pass IEnumerable<Row> as TPartitionType + /// </summary> + protected new void TestBroadCastAndReduce(bool runOnYarn, + int numTasks, + int chunkSize, + int dims, + int iterations, + int mapperMemory, + int updateTaskMemory, + string testFolder = DefaultRuntimeFolder) + { + string runPlatform = runOnYarn ? "yarn" : "local"; + TestRun(DriverConfiguration<int[], int[], int[], IEnumerable<Row>>( + CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory), + DriverEventHandlerConfigurations<int[], int[], int[], IEnumerable<Row>>()), + typeof(BroadcastReduceDriver), + numTasks, + "BroadcastReduceDriver", + runPlatform, + testFolder); + } + + /// <summary> + /// This method defines event handlers for driver. As default, it uses all the handlers defined in IMRUDriver. + /// </summary> + protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>() + { + return REEF.Driver.DriverConfiguration.ConfigurationModule + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnDriverStarted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextActive, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString()) + .Build(); + } + + /// <summary> + /// This test case tests PartitionedDataset Configuration built in this class for IPartitionedInputDataSet and IInputPartition + /// </summary> + [Fact] + public void TestBuildPartitionedDatasetConfiguration() + { + PartitionDatasetConfig(); + } + + protected void PartitionDatasetConfig() + { + int count = 0; + int numberOfTasks = 4; + + var dataSet = TangFactory.GetTang().NewInjector(BuildPartitionedDatasetConfiguration(numberOfTasks)) + .GetInstance<IPartitionedInputDataSet>(); + + foreach (var partitionDescriptor in dataSet) + { + var partition = + TangFactory.GetTang() + .NewInjector(partitionDescriptor.GetPartitionConfiguration()) + .GetInstance<IInputPartition<IEnumerable<Row>>>(); + + partition.Cache(); + + using (partition as IDisposable) + { + IEnumerable<Row> e = partition.GetPartitionHandle(); + + foreach (var row in e) + { + Logger.Log(Level.Info, "Data read {0}: ", row.GetValue()); + count++; + } + } + } + + Assert.Equal(count, numberOfTasks * DataCount); + } + + /// <summary> + /// Mapper function configuration. + /// </summary> + /// <returns></returns> + protected override IConfiguration BuildMapperFunctionConfig() + { + return IMRUMapConfiguration<int[], int[]>.ConfigurationModule + .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, + GenericType<TestSenderMapFunction<IEnumerable<Row>>>.Class) + .Build(); + } + + /// <summary> + /// Test Mapper function. It will access IInputPartition. + /// It verifies that the data file has been downloaded to temp folder + /// It then iterates the data and verifies the data count. + /// </summary> + /// <typeparam name="T"></typeparam> + internal sealed class TestSenderMapFunction<T> : IMapFunction<int[], int[]> + { + private int _iterations; + + [Inject] + private TestSenderMapFunction( + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, + IInputPartition<T> partition, + [Parameter(typeof(CopyToLocal))] bool copyToLocal) + { + var tempFileDir = TangFactory.GetTang().NewInjector().GetNamedInstance<TempFileFolder, string>(); + var tmpFileFodler = Directory.GetCurrentDirectory() + tempFileDir.Substring(1, tempFileDir.Length - 1); + Assert.True(Directory.Exists(tmpFileFodler)); + + var directories = Directory.EnumerateDirectories(tmpFileFodler); + + if (copyToLocal) + { + Assert.Equal(1, directories.Count()); + + var directory = directories.FirstOrDefault(); + Assert.True(directory.Contains("-partition-")); + + var files = Directory.EnumerateFiles(directory); + Assert.Equal(1, files.Count()); + var file = files.FirstOrDefault(); + var a = file.Split('\\'); + var fileName = a[a.Length - 1]; + Assert.Equal(8, fileName.Length); + + var matchCounter = Regex.Matches(fileName, @"[a-zA-Z0-9]").Count; + Assert.Equal(8, matchCounter); + } + else + { + Assert.Equal(0, directories.Count()); + } + + int count = 0; + var e = (IEnumerable<Row>)partition.GetPartitionHandle(); + + foreach (var row in e) + { + Logger.Log(Level.Info, "Data read {0}: ", row.GetValue()); + count++; + } + + Logger.Log(Level.Info, "TestSenderMapFunction: TaskId: {0}, count {1}", taskId.Length, count); + Assert.Equal(count, DataCount); + } + + /// <summary> + /// Simple test Map function + /// </summary> + /// <param name="mapInput">integer array</param> + /// <returns>The same integer array</returns> + int[] IMapFunction<int[], int[]>.Map(int[] mapInput) + { + _iterations++; + + Logger.Log(Level.Info, "Received value {0} in iteration {1}.", mapInput[0], _iterations); + + if (mapInput[0] != _iterations) + { + Exceptions.Throw(new Exception("Expected value in mappers different from actual value"), Logger); + } + + return mapInput; + } + } + + /// <summary> + /// This method builds partition dataset configuration. It simulate copy to local from local files and test RowSerializer + /// </summary> + /// <param name="numberofMappers"></param> + /// <returns></returns> + protected override IConfiguration BuildPartitionedDatasetConfiguration(int numberofMappers) + { + var cm = CreateDatasetConfigurationModule(numberofMappers); + + return cm.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.CopyToLocal, "true") + .Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FileSerializerConfig, GetRowSerializerConfigString()) + .Build(); + } + + protected static ConfigurationModule CreateDatasetConfigurationModule(int numberofMappers) + { + const string tempFileName = "REEF.TestLocalFileSystem.tmp"; + string sourceFilePath = Path.Combine(Path.GetTempPath(), tempFileName); + + var cm = FileSystemInputPartitionConfiguration<IEnumerable<Row>>.ConfigurationModule; + + for (var i = 0; i < numberofMappers; i++) + { + MakeLocalTestFile(sourceFilePath + i, CreateTestData(DataCount)); + cm = cm.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FilePathForPartitions, sourceFilePath + i); + } + return cm; + } + + /// <summary> + /// Create test data as bytes + /// </summary> + /// <param name="dataNumber"></param> + /// <returns></returns> + protected static byte[] CreateTestData(int dataNumber) + { + var bytes = new byte[dataNumber]; + for (int i = 0; i < dataNumber; i++) + { + bytes[i] = (byte)(i + 'a'); + } + return bytes; + } + + /// <summary> + /// Create a test data file as input file + /// </summary> + /// <param name="filePath"></param> + /// <param name="bytes"></param> + protected static void MakeLocalTestFile(string filePath, byte[] bytes) + { + if (File.Exists(filePath)) + { + File.Delete(filePath); + } + + using (var s = File.Create(filePath)) + { + foreach (var b in bytes) + { + s.WriteByte(b); + } + } + } + + /// <summary> + /// Test DeSerializer + /// </summary> + protected class RowSerializer : IFileDeSerializer<IEnumerable<Row>> + { + [Inject] + private RowSerializer() + { + } + + /// <summary> + /// read all the files in the set and return byte read one by one + /// </summary> + /// <param name="filePaths"></param> + /// <returns></returns> + public IEnumerable<Row> Deserialize(ISet<string> filePaths) + { + foreach (var f in filePaths) + { + using (FileStream stream = File.Open(f, FileMode.Open)) + { + BinaryReader reader = new BinaryReader(stream); + while (reader.PeekChar() != -1) + { + yield return new Row(reader.ReadByte()); + } + } + } + } + } + + /// <summary> + /// Build RowSerialzer configuration and serialize it as string so that to pass it to driver + /// </summary> + /// <returns></returns> + protected static string GetRowSerializerConfigString() + { + var serializerConf = TangFactory.GetTang().NewConfigurationBuilder() + .BindImplementation( + GenericType<IFileDeSerializer<IEnumerable<Row>>>.Class, + GenericType<RowSerializer>.Class) + .Build(); + return (new AvroConfigurationSerializer()).ToString(serializerConf); + } + + /// <summary> + /// Test Row class + /// </summary> + protected class Row + { + private readonly byte _b; + + public Row(byte b) + { + _b = b; + } + + public byte GetValue() + { + return _b; + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs new file mode 100644 index 0000000..aa287b1 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections.Generic; +using Org.Apache.REEF.IO.PartitionedData.FileSystem; +using Org.Apache.REEF.Tang.Interface; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + [Collection("FunctionalTests")] + public class IMRUBroadcastReduceWithLocalFile : IMRUBroadcastReduceWithFilePartitionDataSetTest + { + /// <summary> + /// This test tests DataLoadingContext with FilePartitionDataSet + /// The IPartitionedInputDataSet configured in this test is passed to IMRUDriver, and IInputPartition associated with it is injected in + /// DataLoadingContext for the Evaluator. Cache method in IInputPartition is called when the context starts. + /// The local file is used in the test, so CopyToLocal will be set to false. The validation will be inside test Mapper function. + /// </summary> + [Fact] + public void TestWithFilePartitionDataSetForLocalFile() + { + int chunkSize = 2; + int dims = 10; + int iterations = 10; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 4; + string testFolder = DefaultRuntimeFolder + TestId; + TestBroadCastAndReduce(false, + numTasks, + chunkSize, + dims, + iterations, + mapperMemory, + updateTaskMemory, + testFolder); + ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder, 100); + CleanUp(testFolder); + } + + /// <summary> + /// This method builds partition dataset configuration. It uses local file and test RowSerializer. + /// </summary> + /// <param name="numberofMappers"></param> + /// <returns></returns> + protected override IConfiguration BuildPartitionedDatasetConfiguration(int numberofMappers) + { + var cm = CreateDatasetConfigurationModule(numberofMappers); + + return cm.Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.CopyToLocal, "false") + .Set(FileSystemInputPartitionConfiguration<IEnumerable<Row>>.FileSerializerConfig, GetRowSerializerConfigString()) + .Build(); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs index 16d927b..2f4b109 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs @@ -16,6 +16,7 @@ // under the License. using System; +using System.Collections.Generic; using System.Globalization; using System.IO; using System.Linq; @@ -101,7 +102,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU /// <param name="jobDefinition"></param> /// <param name="driverHandlerConfig"></param> /// <returns></returns> - private IConfiguration DriverConfiguration<TMapInput, TMapOutput, TResult, TPartitionType>( + protected IConfiguration DriverConfiguration<TMapInput, TMapOutput, TResult, TPartitionType>( IMRUJobDefinition jobDefinition, IConfiguration driverHandlerConfig) { http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs index 4923940..6cd848f 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs @@ -155,12 +155,12 @@ namespace Org.Apache.REEF.Tests.Functional CleanUp(); } - protected void ValidateSuccessForLocalRuntime(int numberOfContextsToClose, int numberOfTasksToFail = 0, int numberOfEvaluatorsToFail = 0, string testFolder = DefaultRuntimeFolder) + protected void ValidateSuccessForLocalRuntime(int numberOfContextsToClose, int numberOfTasksToFail = 0, int numberOfEvaluatorsToFail = 0, string testFolder = DefaultRuntimeFolder, int retryCount = 60) { const string successIndication = "EXIT: ActiveContextClr2Java::Close"; const string failedTaskIndication = "Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedTaskHandlerOnNext"; const string failedEvaluatorIndication = "Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedEvaluatorHandlerOnNext"; - string[] lines = ReadLogFile(DriverStdout, "driver", testFolder); + string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, retryCount); Logger.Log(Level.Verbose, "Lines read from log file : " + lines.Count()); string[] successIndicators = lines.Where(s => s.Contains(successIndication)).ToArray(); http://git-wip-us.apache.org/repos/asf/reef/blob/d01c0c46/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 64e6c89..8856fca 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -118,6 +118,8 @@ under the License. <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" /> <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" /> <Compile Include="Functional\IMRU\IMRUBroadcastReduceTest.cs" /> + <Compile Include="Functional\IMRU\IMRUBroadcastReduceWithFilePartitionDataSetTest.cs" /> + <Compile Include="Functional\IMRU\IMRUBroadcastReduceWithLocalFile.cs" /> <Compile Include="Functional\IMRU\IMRUBrodcastReduceWithoutIMRUClientTest.cs" /> <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" /> <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" />
