Repository: reef Updated Branches: refs/heads/master d0b675238 -> 83e5e8667
[REEF-1183] Enable KMeans .NET tests This addressed the issue by * removing dependency on an obsolete file * adding code for on-the-fly input file generaton * fixing KMeans tests to generate an input file before running and take it as input * fix bug by removing default value for DataPartitionCache.PartitionIndex * fix bug of requesting only one evaluator * fix bug of miscalculating partial means in algorithm * fix bug of using CodecConfiguration instead of CodecToStreamingCodecConfiguration JIRA: [REEF-1183](https://issues.apache.org/jira/browse/REEF-1183) Pull Request: This closes #1053 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/83e5e866 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/83e5e866 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/83e5e866 Branch: refs/heads/master Commit: 83e5e8667320b133ca75e1763627f18af28307b7 Parents: d0b6752 Author: Jason (Joo Seong) Jeong <[email protected]> Authored: Wed Jun 22 18:14:52 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri Jul 15 14:11:43 2016 -0700 ---------------------------------------------------------------------- .../KMeans/DataPartitionCache.cs | 2 +- .../KMeans/KMeansDriverHandlers.cs | 12 ++-- .../MachineLearning/KMeans/KMeansMasterTask.cs | 3 +- .../MachineLearning/KMeans/PartialMean.cs | 11 ++- .../Functional/ML/KMeans/TestKMeans.cs | 72 ++++++++++++++------ 5 files changed, 70 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs index 4c60a0f..460cd0b 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs @@ -94,7 +94,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans return ReadDataFile(file); } - [NamedParameter("Data partition index", "partition", "")] + [NamedParameter("Data partition index", "partition")] public class PartitionIndex : Name<int> { } http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs index e0e75f8..a266a62 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs @@ -73,9 +73,9 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans CommandLineArguments arguments) { _executionDirectory = Path.Combine(Directory.GetCurrentDirectory(), Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4)); - string dataFile = arguments.Arguments.Single(a => a.StartsWith("DataFile", StringComparison.Ordinal)).Split(':')[1]; + string dataFile = arguments.Arguments.First(); DataVector.ShuffleDataAndGetInitialCentriods( - Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", dataFile), + dataFile, numPartitions, _clustersNumber, _executionDirectory); @@ -86,7 +86,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans _evaluatorRequestor = evaluatorRequestor; _centroidCodecConf = CodecToStreamingCodecConfiguration<Centroids>.Conf - .Set(CodecConfiguration<Centroids>.Codec, GenericType<CentroidsCodec>.Class) + .Set(CodecToStreamingCodecConfiguration<Centroids>.Codec, GenericType<CentroidsCodec>.Class) .Build(); IConfiguration dataConverterConfig1 = PipelineDataConverterConfiguration<Centroids>.Conf @@ -94,7 +94,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans .Build(); _controlMessageCodecConf = CodecToStreamingCodecConfiguration<ControlMessage>.Conf - .Set(CodecConfiguration<ControlMessage>.Codec, GenericType<ControlMessageCodec>.Class) + .Set(CodecToStreamingCodecConfiguration<ControlMessage>.Codec, GenericType<ControlMessageCodec>.Class) .Build(); IConfiguration dataConverterConfig2 = PipelineDataConverterConfiguration<ControlMessage>.Conf @@ -102,7 +102,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans .Build(); _processedResultsCodecConf = CodecToStreamingCodecConfiguration<ProcessedResults>.Conf - .Set(CodecConfiguration<ProcessedResults>.Codec, GenericType<ProcessedResultsCodec>.Class) + .Set(CodecToStreamingCodecConfiguration<ProcessedResults>.Codec, GenericType<ProcessedResultsCodec>.Class) .Build(); IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<ProcessedResults>.Conf @@ -187,7 +187,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans public void OnNext(IDriverStarted value) { - var request = _evaluatorRequestor.NewBuilder().SetCores(1).SetMegabytes(2048).Build(); + var request = _evaluatorRequestor.NewBuilder().SetNumber(_totalEvaluators).SetCores(1).SetMegabytes(2048).Build(); _evaluatorRequestor.Submit(request); } http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs index c1b3e90..140897b 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs @@ -140,7 +140,8 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans for (int i = 0; i < clustersNum; i++) { List<PartialMean> means = totalList.Where(m => m.Mean.Label == i).ToList(); - aggregatedMeans.Add(new PartialMean(PartialMean.AggreatedMean(means), means.Count)); + PartialMean aggregatedPartialMean = PartialMean.AggregatedPartialMean(means); + aggregatedMeans.Add(new PartialMean(aggregatedPartialMean.Mean, aggregatedPartialMean.Size)); } ProcessedResults returnMeans = new ProcessedResults(aggregatedMeans, aggregatedLoss); http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs index afeda32..f923a61 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs @@ -53,7 +53,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans return new PartialMean(DataVector.FromString(parts[0]), int.Parse(parts[1], CultureInfo.InvariantCulture)); } - public static DataVector AggreatedMean(List<PartialMean> means) + public static PartialMean AggregatedPartialMean(List<PartialMean> means) { if (means == null || means.Count == 0) { @@ -64,7 +64,12 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans { mean = mean.CombinePartialMean(means[i]); } - return mean.Mean; + return mean; + } + + public static DataVector AggregatedMean(List<PartialMean> means) + { + return AggregatedPartialMean(means).Mean; } public static List<DataVector> AggregateTrueMeansToFileSystem(int partitionsNum, int clustersNum, string executionDirectory) @@ -93,7 +98,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans for (int i = 0; i < clustersNum; i++) { List<PartialMean> means = partialMeans.Where(m => m.Mean.Label == i).ToList(); - newCentroids.Add(PartialMean.AggreatedMean(means)); + newCentroids.Add(PartialMean.AggregatedMean(means)); } return newCentroids; } http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs index 0e42ef6..0afda40 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs @@ -40,24 +40,15 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans { private const int K = 3; private const int Partitions = 2; - private const string SmallMouseDataFile = @"mouseData_small.csv"; - private const string MouseDataFile = @"mouseData.csv"; - - private readonly bool _useSmallDataSet = false; - private string _dataFile = MouseDataFile; + private const string DataFileNamePrefix = "KMeansInput-"; public TestKMeans() { - if (_useSmallDataSet) - { - _dataFile = SmallMouseDataFile; - } - CleanUp(); Init(); } - [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in enlistment")] + [Fact] [Trait("Priority", "1")] [Trait("Category", "FunctionalGated")] [Trait("Description", "Test KMeans clustering with things directly run without reef")] @@ -65,8 +56,10 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans public void TestKMeansOnDirectRunViaFileSystem() { int iteration = 0; - string executionDirectory = Path.Combine(Directory.GetCurrentDirectory(), Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4)); - List<DataVector> centroids = DataVector.ShuffleDataAndGetInitialCentriods(_dataFile, Partitions, K, executionDirectory); + string executionDirectory = Path.Combine(Directory.GetCurrentDirectory(), + string.Join("-", Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4))); + string dataFilePath = GenerateDataFileAndGetPath(); + List<DataVector> centroids = DataVector.ShuffleDataAndGetInitialCentriods(dataFilePath, Partitions, K, executionDirectory); // initialize all tasks List<LegacyKMeansTask> tasks = new List<LegacyKMeansTask>(); @@ -105,9 +98,19 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans } iteration++; } + + // cleanup workspace + try + { + Directory.Delete(executionDirectory, true); + } + catch (Exception) + { + // do not fail if clean up is unsuccessful + } } - [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in enlistment")] + [Fact] [Trait("Priority", "1")] [Trait("Category", "FunctionalGated")] [Trait("Description", "Test KMeans clustering on reef local runtime with group communications")] @@ -116,12 +119,14 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans { IsOnLocalRuntime = true; string testFolder = DefaultRuntimeFolder + TestId; - TestRun(DriverConfiguration(), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "local", testFolder); + string dataFilePath = GenerateDataFileAndGetPath(); + + TestRun(DriverConfiguration(dataFilePath), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "local", testFolder); ValidateSuccessForLocalRuntime(Partitions + 1, testFolder: testFolder); CleanUp(testFolder); } - [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in enlistment")] + [Fact(Skip = "Requires Yarn Single Node")] [Trait("Priority", "1")] [Trait("Category", "FunctionalGated")] [Trait("Description", "Test KMeans clustering on reef YARN runtime - one box")] @@ -129,11 +134,12 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans public void TestKMeansOnYarnOneBoxWithGroupCommunications() { string testFolder = DefaultRuntimeFolder + TestId + "Yarn"; - TestRun(DriverConfiguration(), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "yarn", testFolder); + string dataFilePath = GenerateDataFileAndGetPath(); + TestRun(DriverConfiguration(dataFilePath), typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "yarn", testFolder); Assert.NotNull("BreakPointChecker"); } - private IConfiguration DriverConfiguration() + private IConfiguration DriverConfiguration(string dataFilePath) { int fanOut = 2; int totalEvaluators = Partitions + 1; @@ -144,7 +150,7 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans .Set(Org.Apache.REEF.Driver.DriverConfiguration.OnDriverStarted, GenericType<KMeansDriverHandlers>.Class) .Set(Org.Apache.REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, GenericType<KMeansDriverHandlers>.Class) .Set(Org.Apache.REEF.Driver.DriverConfiguration.OnContextActive, GenericType<KMeansDriverHandlers>.Class) - .Set(Org.Apache.REEF.Driver.DriverConfiguration.CommandLineArguments, "DataFile:" + _dataFile) + .Set(Org.Apache.REEF.Driver.DriverConfiguration.CommandLineArguments, dataFilePath) .Set(Org.Apache.REEF.Driver.DriverConfiguration.CustomTraceLevel, Level.Info.ToString()) .Build()) .BindIntNamedParam<NumPartitions>(Partitions.ToString()) @@ -160,5 +166,33 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans return Configurations.Merge(driverConfig, groupCommunicationDriverConfig); } + + private static string GenerateDataFileAndGetPath() + { + string dataFilePath = Path.Combine(Path.GetTempPath(), DataFileNamePrefix + Guid.NewGuid().ToString("N").Substring(0, 4)); + + DataVector[] centroids = + { + new DataVector(new List<float> { +2f, 0f }, 0), + new DataVector(new List<float> { -2f, 0f }, 0), + new DataVector(new List<float> { +0f, 2f }, 0) + }; + + using (StreamWriter writer = new StreamWriter(File.OpenWrite(dataFilePath))) + { + Random rnd = new Random(); + for (int i = 0; i < 10000; i++) + { + int label = rnd.Next(centroids.Length); + float x = Convert.ToSingle(centroids[label].Data[0] + rnd.NextDouble()); + float y = Convert.ToSingle(centroids[label].Data[1] + rnd.NextDouble()); + + writer.WriteLine("{0},{1};{2}", x, y, label); + } + writer.Close(); + } + + return dataFilePath; + } } }
