Repository: reef
Updated Branches:
  refs/heads/master d0b675238 -> 83e5e8667


[REEF-1183] Enable KMeans .NET tests

This addressed the issue by
  * removing dependency on an obsolete file
  * adding code for on-the-fly input file generaton
  * fixing KMeans tests to generate an input file before running and take it as 
input
  * fix bug by removing default value for DataPartitionCache.PartitionIndex
  * fix bug of requesting only one evaluator
  * fix bug of miscalculating partial means in algorithm
  * fix bug of using CodecConfiguration instead of 
CodecToStreamingCodecConfiguration

JIRA:
  [REEF-1183](https://issues.apache.org/jira/browse/REEF-1183)

Pull Request:
  This closes #1053


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/83e5e866
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/83e5e866
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/83e5e866

Branch: refs/heads/master
Commit: 83e5e8667320b133ca75e1763627f18af28307b7
Parents: d0b6752
Author: Jason (Joo Seong) Jeong <[email protected]>
Authored: Wed Jun 22 18:14:52 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Jul 15 14:11:43 2016 -0700

----------------------------------------------------------------------
 .../KMeans/DataPartitionCache.cs                |  2 +-
 .../KMeans/KMeansDriverHandlers.cs              | 12 ++--
 .../MachineLearning/KMeans/KMeansMasterTask.cs  |  3 +-
 .../MachineLearning/KMeans/PartialMean.cs       | 11 ++-
 .../Functional/ML/KMeans/TestKMeans.cs          | 72 ++++++++++++++------
 5 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
index 4c60a0f..460cd0b 100644
--- 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
+++ 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/DataPartitionCache.cs
@@ -94,7 +94,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             return ReadDataFile(file);
         }
 
-        [NamedParameter("Data partition index", "partition", "")]
+        [NamedParameter("Data partition index", "partition")]
         public class PartitionIndex : Name<int>
         {
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
index e0e75f8..a266a62 100644
--- 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
+++ 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -73,9 +73,9 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             CommandLineArguments arguments)
         {
             _executionDirectory = 
Path.Combine(Directory.GetCurrentDirectory(), 
Constants.KMeansExecutionBaseDirectory, 
Guid.NewGuid().ToString("N").Substring(0, 4));
-            string dataFile = arguments.Arguments.Single(a => 
a.StartsWith("DataFile", StringComparison.Ordinal)).Split(':')[1];
+            string dataFile = arguments.Arguments.First();
             DataVector.ShuffleDataAndGetInitialCentriods(
-                Path.Combine(Directory.GetCurrentDirectory(), "reef", 
"global", dataFile),
+                dataFile,
                 numPartitions,
                 _clustersNumber,
                 _executionDirectory);
@@ -86,7 +86,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             _evaluatorRequestor = evaluatorRequestor;
 
             _centroidCodecConf = 
CodecToStreamingCodecConfiguration<Centroids>.Conf
-                .Set(CodecConfiguration<Centroids>.Codec, 
GenericType<CentroidsCodec>.Class)
+                .Set(CodecToStreamingCodecConfiguration<Centroids>.Codec, 
GenericType<CentroidsCodec>.Class)
                 .Build();
 
             IConfiguration dataConverterConfig1 = 
PipelineDataConverterConfiguration<Centroids>.Conf
@@ -94,7 +94,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 .Build();
 
             _controlMessageCodecConf = 
CodecToStreamingCodecConfiguration<ControlMessage>.Conf
-                .Set(CodecConfiguration<ControlMessage>.Codec, 
GenericType<ControlMessageCodec>.Class)
+                .Set(CodecToStreamingCodecConfiguration<ControlMessage>.Codec, 
GenericType<ControlMessageCodec>.Class)
                 .Build();
 
             IConfiguration dataConverterConfig2 = 
PipelineDataConverterConfiguration<ControlMessage>.Conf
@@ -102,7 +102,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 .Build();
 
             _processedResultsCodecConf = 
CodecToStreamingCodecConfiguration<ProcessedResults>.Conf
-                .Set(CodecConfiguration<ProcessedResults>.Codec, 
GenericType<ProcessedResultsCodec>.Class)
+                
.Set(CodecToStreamingCodecConfiguration<ProcessedResults>.Codec, 
GenericType<ProcessedResultsCodec>.Class)
                 .Build();
 
             IConfiguration reduceFunctionConfig = 
ReduceFunctionConfiguration<ProcessedResults>.Conf
@@ -187,7 +187,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
 
         public void OnNext(IDriverStarted value)
         {
-            var request = 
_evaluatorRequestor.NewBuilder().SetCores(1).SetMegabytes(2048).Build();
+            var request = 
_evaluatorRequestor.NewBuilder().SetNumber(_totalEvaluators).SetCores(1).SetMegabytes(2048).Build();
 
             _evaluatorRequestor.Submit(request);
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
index c1b3e90..140897b 100644
--- 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
@@ -140,7 +140,8 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 for (int i = 0; i < clustersNum; i++)
                 {
                     List<PartialMean> means = totalList.Where(m => 
m.Mean.Label == i).ToList();
-                    aggregatedMeans.Add(new 
PartialMean(PartialMean.AggreatedMean(means), means.Count));
+                    PartialMean aggregatedPartialMean = 
PartialMean.AggregatedPartialMean(means);
+                    aggregatedMeans.Add(new 
PartialMean(aggregatedPartialMean.Mean, aggregatedPartialMean.Size));
                 }
 
                 ProcessedResults returnMeans = new 
ProcessedResults(aggregatedMeans, aggregatedLoss);

http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
index afeda32..f923a61 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs
@@ -53,7 +53,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             return new PartialMean(DataVector.FromString(parts[0]), 
int.Parse(parts[1], CultureInfo.InvariantCulture));
         }
 
-        public static DataVector AggreatedMean(List<PartialMean> means)
+        public static PartialMean AggregatedPartialMean(List<PartialMean> 
means)
         {
             if (means == null || means.Count == 0)
             {
@@ -64,7 +64,12 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             {
                 mean = mean.CombinePartialMean(means[i]);
             }
-            return mean.Mean;
+            return mean;
+        }
+
+        public static DataVector AggregatedMean(List<PartialMean> means)
+        {
+            return AggregatedPartialMean(means).Mean;
         }
 
         public static List<DataVector> AggregateTrueMeansToFileSystem(int 
partitionsNum, int clustersNum, string executionDirectory)
@@ -93,7 +98,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             for (int i = 0; i < clustersNum; i++)
             {
                 List<PartialMean> means = partialMeans.Where(m => m.Mean.Label 
== i).ToList();
-                newCentroids.Add(PartialMean.AggreatedMean(means));
+                newCentroids.Add(PartialMean.AggregatedMean(means));
             }
             return newCentroids;
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/83e5e866/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
index 0e42ef6..0afda40 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
@@ -40,24 +40,15 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
     {
         private const int K = 3;
         private const int Partitions = 2;
-        private const string SmallMouseDataFile = @"mouseData_small.csv";
-        private const string MouseDataFile = @"mouseData.csv";
-
-        private readonly bool _useSmallDataSet = false;
-        private string _dataFile = MouseDataFile;
+        private const string DataFileNamePrefix = "KMeansInput-";
 
         public TestKMeans()
         {
-            if (_useSmallDataSet)
-            {
-                _dataFile = SmallMouseDataFile;
-            }  
-
             CleanUp();
             Init();
         }
 
-        [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in 
enlistment")]
+        [Fact]
         [Trait("Priority", "1")]
         [Trait("Category", "FunctionalGated")]
         [Trait("Description", "Test KMeans clustering with things directly run 
without reef")]
@@ -65,8 +56,10 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
         public void TestKMeansOnDirectRunViaFileSystem()
         {
             int iteration = 0;
-            string executionDirectory = 
Path.Combine(Directory.GetCurrentDirectory(), 
Constants.KMeansExecutionBaseDirectory, 
Guid.NewGuid().ToString("N").Substring(0, 4));
-            List<DataVector> centroids = 
DataVector.ShuffleDataAndGetInitialCentriods(_dataFile, Partitions, K, 
executionDirectory);
+            string executionDirectory = 
Path.Combine(Directory.GetCurrentDirectory(),
+                string.Join("-", Constants.KMeansExecutionBaseDirectory, 
Guid.NewGuid().ToString("N").Substring(0, 4)));
+            string dataFilePath = GenerateDataFileAndGetPath();
+            List<DataVector> centroids = 
DataVector.ShuffleDataAndGetInitialCentriods(dataFilePath, Partitions, K, 
executionDirectory);
             
             // initialize all tasks
             List<LegacyKMeansTask> tasks = new List<LegacyKMeansTask>();
@@ -105,9 +98,19 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
                 }
                 iteration++;
             }
+
+            // cleanup workspace
+            try
+            {
+                Directory.Delete(executionDirectory, true);
+            }
+            catch (Exception)
+            {
+                // do not fail if clean up is unsuccessful
+            }
         }
 
-        [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in 
enlistment")]
+        [Fact]
         [Trait("Priority", "1")]
         [Trait("Category", "FunctionalGated")]
         [Trait("Description", "Test KMeans clustering on reef local runtime 
with group communications")]
@@ -116,12 +119,14 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
         {
             IsOnLocalRuntime = true;
             string testFolder = DefaultRuntimeFolder + TestId;
-            TestRun(DriverConfiguration(), typeof(KMeansDriverHandlers), 
Partitions + 1, "KMeansDriverHandlers", "local", testFolder);
+            string dataFilePath = GenerateDataFileAndGetPath();
+
+            TestRun(DriverConfiguration(dataFilePath), 
typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "local", 
testFolder);
             ValidateSuccessForLocalRuntime(Partitions + 1, testFolder: 
testFolder);
             CleanUp(testFolder);
         }
 
-        [Fact(Skip = "TODO[JIRA REEF-1183] Requires data files not present in 
enlistment")]
+        [Fact(Skip = "Requires Yarn Single Node")]
         [Trait("Priority", "1")]
         [Trait("Category", "FunctionalGated")]
         [Trait("Description", "Test KMeans clustering on reef YARN runtime - 
one box")]
@@ -129,11 +134,12 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
         public void TestKMeansOnYarnOneBoxWithGroupCommunications()
         {
             string testFolder = DefaultRuntimeFolder + TestId + "Yarn";
-            TestRun(DriverConfiguration(), typeof(KMeansDriverHandlers), 
Partitions + 1, "KMeansDriverHandlers", "yarn", testFolder);
+            string dataFilePath = GenerateDataFileAndGetPath();
+            TestRun(DriverConfiguration(dataFilePath), 
typeof(KMeansDriverHandlers), Partitions + 1, "KMeansDriverHandlers", "yarn", 
testFolder);
             Assert.NotNull("BreakPointChecker");
         }
 
-        private IConfiguration DriverConfiguration()
+        private IConfiguration DriverConfiguration(string dataFilePath)
         {
             int fanOut = 2;
             int totalEvaluators = Partitions + 1;
@@ -144,7 +150,7 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
                     
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnDriverStarted, 
GenericType<KMeansDriverHandlers>.Class)
                     
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, 
GenericType<KMeansDriverHandlers>.Class)
                     
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnContextActive, 
GenericType<KMeansDriverHandlers>.Class)
-                    
.Set(Org.Apache.REEF.Driver.DriverConfiguration.CommandLineArguments, 
"DataFile:" + _dataFile)
+                    
.Set(Org.Apache.REEF.Driver.DriverConfiguration.CommandLineArguments, 
dataFilePath)
                     
.Set(Org.Apache.REEF.Driver.DriverConfiguration.CustomTraceLevel, 
Level.Info.ToString())
                     .Build())
                 .BindIntNamedParam<NumPartitions>(Partitions.ToString())
@@ -160,5 +166,33 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
 
             return Configurations.Merge(driverConfig, 
groupCommunicationDriverConfig);
         }
+
+        private static string GenerateDataFileAndGetPath()
+        {
+            string dataFilePath = Path.Combine(Path.GetTempPath(), 
DataFileNamePrefix + Guid.NewGuid().ToString("N").Substring(0, 4));
+
+            DataVector[] centroids =
+            {
+                new DataVector(new List<float> { +2f, 0f }, 0),
+                new DataVector(new List<float> { -2f, 0f }, 0),
+                new DataVector(new List<float> { +0f, 2f }, 0)
+            };
+
+            using (StreamWriter writer = new 
StreamWriter(File.OpenWrite(dataFilePath)))
+            {
+                Random rnd = new Random();
+                for (int i = 0; i < 10000; i++)
+                {
+                    int label = rnd.Next(centroids.Length);
+                    float x = Convert.ToSingle(centroids[label].Data[0] + 
rnd.NextDouble());
+                    float y = Convert.ToSingle(centroids[label].Data[1] + 
rnd.NextDouble());
+
+                    writer.WriteLine("{0},{1};{2}", x, y, label);
+                }
+                writer.Close();
+            }
+
+            return dataFilePath;
+        }
     }
 }

Reply via email to