This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ebf825eed9f09fe04edb96ab0b4db331fb03bf4a Author: Aljoscha Krettek <[email protected]> AuthorDate: Thu Oct 4 16:14:47 2018 +0200 [FLINK-7811] Update breeze dependency and add explicit types in FlinkML This is needed for Scala 2.12 compatibility --- flink-libraries/flink-ml/pom.xml | 2 +- .../main/scala/org/apache/flink/ml/common/FlinkMLTools.scala | 2 +- .../flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala | 8 +++++--- .../apache/flink/ml/outlier/StochasticOutlierSelection.scala | 2 +- .../scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala | 2 +- .../org/apache/flink/ml/preprocessing/PolynomialFeatures.scala | 2 +- .../scala/org/apache/flink/ml/preprocessing/Splitter.scala | 10 +++++----- .../org/apache/flink/ml/preprocessing/StandardScaler.scala | 4 ++-- 8 files changed, 17 insertions(+), 15 deletions(-) diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml index a25b4b9..14b1079 100644 --- a/flink-libraries/flink-ml/pom.xml +++ b/flink-libraries/flink-ml/pom.xml @@ -47,7 +47,7 @@ <dependency> <groupId>org.scalanlp</groupId> <artifactId>breeze_${scala.binary.version}</artifactId> - <version>0.12</version> + <version>0.13</version> </dependency> <!-- the dependencies below are already provided in Flink --> diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala index bfc72a4..4bb21d5 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala @@ -376,7 +376,7 @@ object FlinkMLTools { partitionerOption: Option[Partitioner[Int]] = None) : DataSet[Block[T]] = { val blockIDInput = input map { - element => + element: T => val blockID = element.hashCode() % numBlocks val blockIDResult = if(blockID < 0){ diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala index 527e636..6878703 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala @@ -227,7 +227,8 @@ object KNN { // join input and training set val crossed = crossTuned.mapPartition { - (iter, out: Collector[(FlinkVector, FlinkVector, Long, Double)]) => { + (iter: Iterator[(Block[FlinkVector], Block[(Long, T)])], + out: Collector[(FlinkVector, FlinkVector, Long, Double)]) => { for ((training, testing) <- iter) { // use a quadtree if (4 ^ dim) * Ntest * log(Ntrain) // < Ntest * Ntrain, and distance is Euclidean @@ -247,12 +248,13 @@ object KNN { knnQueryBasic(training.values, testing.values, k, metric, out) } } - } + } } // group by input vector id and pick k nearest neighbor for each group val result = crossed.groupBy(2).sortGroup(3, Order.ASCENDING).reduceGroup { - (iter, out: Collector[(FlinkVector, Array[FlinkVector])]) => { + (iter: Iterator[(FlinkVector, FlinkVector, Long, Double)], + out: Collector[(FlinkVector, Array[FlinkVector])]) => { if (iter.hasNext) { val head = iter.next() val key = head._2 diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala index ee82c03..2ff46db 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala @@ -197,7 +197,7 @@ object StochasticOutlierSelection extends WithParameters { val resultingParameters = instance.parameters ++ transformParameters // Map to the right format - val vectorsWithIndex = input.zipWithUniqueId.map(vector => { + val vectorsWithIndex = input.zipWithUniqueId.map((vector: (Long, T)) => { BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze) }) diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala index 217e2c2..b37748f 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala @@ -149,7 +149,7 @@ object MinMaxScaler { : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = { val minMax = dataSet.map { - v => (v.asBreeze, v.asBreeze) + v: T => (v.asBreeze, v.asBreeze) }.reduce { (minMax1, minMax2) => { diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala index f1c788e..977428d 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala @@ -116,7 +116,7 @@ object PolynomialFeatures{ val degree = resultingParameters(Degree) input.map { - vector => { + vector: T => { calculatePolynomial(degree, vector) } } diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala index 3451c80..c8bf0e7 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/Splitter.scala @@ -76,7 +76,7 @@ object Splitter { } } - val leftSplitLight = leftSplit.map(o => (o._1, false)) + val leftSplitLight = leftSplit.map((o: (Long, T)) => (o._1, false)) val rightSplit: DataSet[T] = indexedInput.leftOuterJoin[(Long, Boolean)](leftSplitLight) .where(0) @@ -87,7 +87,7 @@ object Splitter { } } - Array(leftSplit.map(o => o._2), rightSplit) + Array(leftSplit.map((o: (Long, T)) => o._2), rightSplit) } // -------------------------------------------------------------------------------------------- @@ -117,14 +117,14 @@ object Splitter { eid.reseedRandomGenerator(seed) - val tempDS: DataSet[(Int,T)] = input.map(o => (eid.sample, o)) + val tempDS: DataSet[(Int,T)] = input.map((o: T) => (eid.sample, o)) val splits = fracArray.length val outputArray = new Array[DataSet[T]]( splits ) for (k <- 0 to splits-1){ - outputArray(k) = tempDS.filter(o => o._1 == k) - .map(o => o._2) + outputArray(k) = tempDS.filter((o: (Int, T)) => o._1 == k) + .map((o: (Int, T)) => o._2) } outputArray diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala index 82e8abf..aa38f41 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala @@ -158,7 +158,7 @@ object StandardScaler { fitParameters: ParameterMap, input: DataSet[(T, Double)]) : Unit = { - val vectorDS = input.map(_._1) + val vectorDS = input.map( (i: (T, Double)) => i._1) val metrics = extractFeatureMetrics(vectorDS) instance.metricsOption = Some(metrics) @@ -180,7 +180,7 @@ object StandardScaler { private def extractFeatureMetrics[T <: Vector](dataSet: DataSet[T]) : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = { val metrics = dataSet.map{ - v => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size)) + v: T => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size)) }.reduce{ (metrics1, metrics2) => { /* We use formula 1.5b of the cited technical report for the combination of partial
