This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ebf825eed9f09fe04edb96ab0b4db331fb03bf4a
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Thu Oct 4 16:14:47 2018 +0200

    [FLINK-7811] Update breeze dependency and add explicit types in FlinkML
    
    This is needed for Scala 2.12 compatibility
---
 flink-libraries/flink-ml/pom.xml                               |  2 +-
 .../main/scala/org/apache/flink/ml/common/FlinkMLTools.scala   |  2 +-
 .../flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala   |  8 +++++---
 .../apache/flink/ml/outlier/StochasticOutlierSelection.scala   |  2 +-
 .../scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala |  2 +-
 .../org/apache/flink/ml/preprocessing/PolynomialFeatures.scala |  2 +-
 .../scala/org/apache/flink/ml/preprocessing/Splitter.scala     | 10 +++++-----
 .../org/apache/flink/ml/preprocessing/StandardScaler.scala     |  4 ++--
 8 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml
index a25b4b9..14b1079 100644
--- a/flink-libraries/flink-ml/pom.xml
+++ b/flink-libraries/flink-ml/pom.xml
@@ -47,7 +47,7 @@
                <dependency>
                        <groupId>org.scalanlp</groupId>
                        <artifactId>breeze_${scala.binary.version}</artifactId>
-                       <version>0.12</version>
+                       <version>0.13</version>
                </dependency>
 
                <!-- the dependencies below are already provided in Flink -->
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
index bfc72a4..4bb21d5 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
@@ -376,7 +376,7 @@ object FlinkMLTools {
     partitionerOption: Option[Partitioner[Int]] = None)
   : DataSet[Block[T]] = {
     val blockIDInput = input map {
-      element =>
+      element: T =>
         val blockID = element.hashCode() % numBlocks
 
         val blockIDResult = if(blockID < 0){
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
index 527e636..6878703 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
@@ -227,7 +227,8 @@ object KNN {
 
             // join input and training set
             val crossed = crossTuned.mapPartition {
-              (iter, out: Collector[(FlinkVector, FlinkVector, Long, Double)]) 
=> {
+              (iter: Iterator[(Block[FlinkVector], Block[(Long, T)])],
+               out: Collector[(FlinkVector, FlinkVector, Long, Double)]) => {
                 for ((training, testing) <- iter) {
                   // use a quadtree if (4 ^ dim) * Ntest * log(Ntrain)
                   // < Ntest * Ntrain, and distance is Euclidean
@@ -247,12 +248,13 @@ object KNN {
                     knnQueryBasic(training.values, testing.values, k, metric, 
out)
                   }
                 }
-              }
+                }
             }
 
             // group by input vector id and pick k nearest neighbor for each 
group
             val result = crossed.groupBy(2).sortGroup(3, 
Order.ASCENDING).reduceGroup {
-              (iter, out: Collector[(FlinkVector, Array[FlinkVector])]) => {
+              (iter: Iterator[(FlinkVector, FlinkVector, Long, Double)],
+               out: Collector[(FlinkVector, Array[FlinkVector])]) => {
                 if (iter.hasNext) {
                   val head = iter.next()
                   val key = head._2
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
index ee82c03..2ff46db 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
@@ -197,7 +197,7 @@ object StochasticOutlierSelection extends WithParameters {
         val resultingParameters = instance.parameters ++ transformParameters
 
         // Map to the right format
-        val vectorsWithIndex = input.zipWithUniqueId.map(vector => {
+        val vectorsWithIndex = input.zipWithUniqueId.map((vector: (Long, T)) 
=> {
           BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze)
         })
 
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
index 217e2c2..b37748f 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
@@ -149,7 +149,7 @@ object MinMaxScaler {
   : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
 
     val minMax = dataSet.map {
-      v => (v.asBreeze, v.asBreeze)
+      v: T => (v.asBreeze, v.asBreeze)
     }.reduce {
       (minMax1, minMax2) => {
 
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
index f1c788e..977428d 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
@@ -116,7 +116,7 @@ object PolynomialFeatures{
         val degree = resultingParameters(Degree)
 
         input.map {
-          vector => {
+          vector: T => {
             calculatePolynomial(degree, vector)
           }
         }
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
index 3451c80..c8bf0e7 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala
@@ -76,7 +76,7 @@ object Splitter {
       }
     }
 
-    val leftSplitLight = leftSplit.map(o => (o._1, false))
+    val leftSplitLight = leftSplit.map((o: (Long, T)) => (o._1, false))
 
     val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, 
Boolean)](leftSplitLight)
       .where(0)
@@ -87,7 +87,7 @@ object Splitter {
         }
     }
 
-    Array(leftSplit.map(o => o._2), rightSplit)
+    Array(leftSplit.map((o: (Long, T)) => o._2), rightSplit)
   }
 
   // 
--------------------------------------------------------------------------------------------
@@ -117,14 +117,14 @@ object Splitter {
 
     eid.reseedRandomGenerator(seed)
 
-    val tempDS: DataSet[(Int,T)] = input.map(o => (eid.sample, o))
+    val tempDS: DataSet[(Int,T)] = input.map((o: T) => (eid.sample, o))
 
     val splits = fracArray.length
     val outputArray = new Array[DataSet[T]]( splits )
 
     for (k <- 0 to splits-1){
-      outputArray(k) = tempDS.filter(o => o._1 == k)
-                             .map(o => o._2)
+      outputArray(k) = tempDS.filter((o: (Int, T)) => o._1 == k)
+                             .map((o: (Int, T)) => o._2)
     }
 
     outputArray
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
index 82e8abf..aa38f41 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
@@ -158,7 +158,7 @@ object StandardScaler {
           fitParameters: ParameterMap,
           input: DataSet[(T, Double)])
       : Unit = {
-        val vectorDS = input.map(_._1)
+        val vectorDS = input.map( (i: (T, Double)) => i._1)
         val metrics = extractFeatureMetrics(vectorDS)
 
         instance.metricsOption = Some(metrics)
@@ -180,7 +180,7 @@ object StandardScaler {
   private def extractFeatureMetrics[T <: Vector](dataSet: DataSet[T])
   : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
     val metrics = dataSet.map{
-      v => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size))
+      v: T => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size))
     }.reduce{
       (metrics1, metrics2) => {
         /* We use formula 1.5b of the cited technical report for the 
combination of partial

Reply via email to