Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1565#discussion_r51707326
--- Diff:
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/regression/IsotonicRegression.scala
---
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.regression
+
+import java.util.Arrays.binarySearch
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.{Parameter, ParameterMap}
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation,
Predictor}
+
+import scala.collection.mutable.ArrayBuffer
+
+case class IsotonicRegressionModel(boundaries: Array[Double], predictions:
Array[Double])
+
+/**
+ * Isotonic regression.
+ * Currently implemented using parallelized pool adjacent violators
algorithm.
+ * Only univariate (single feature) algorithm supported.
+ *
+ * Sequential PAV implementation based on:
+ * Tibshirani, Ryan J., Holger Hoefling, and Robert Tibshirani.
+ * "Nearly-isotonic regression." Technometrics 53.1 (2011): 54-61.
+ * Available from [[http://www.stat.cmu.edu/~ryantibs/papers/neariso.pdf]]
+ *
+ * Sequential PAV parallelization based on:
+ * Kearsley, Anthony J., Richard A. Tapia, and Michael W. Trosset.
+ * "An approach to parallelizing isotonic regression."
+ * Applied Mathematics and Parallel Computing. Physica-Verlag HD, 1996.
141-147.
+ * Available from
[[http://softlib.rice.edu/pub/CRPC-TRs/reports/CRPC-TR96640.pdf]]
+ *
+ * @see [[http://en.wikipedia.org/wiki/Isotonic_regression Isotonic
regression (Wikipedia)]]
+ *
+ * This is a port from the implementation in Apache Spark.
+ * @example
+ * {{{
+ * val ir = IsotonicRegression()
+ * .setIsotonic(true)
+ *
+ * val trainingDS: DataSet[(Double,Double,Double)] = ...
+ * val testingDS: DataSet[(Double)] = ...
+ *
+ * mlr.fit(trainingDS)
+ *
+ * val predictions = mlr.predict(testingDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.regression.IsotonicRegression.Isotonic]]:
+ * true if labels shall be ascending, false if labels shall be descending.
+ *
+ */
+class IsotonicRegression extends Predictor[IsotonicRegression] {
+
+ var isotonic = true
+
+ var model: Option[DataSet[IsotonicRegressionModel]] = None
+
+ def setIsotonic(isotonic: Boolean): this.type = {
+ this.isotonic = isotonic
+ this
+ }
+
+}
+
+object IsotonicRegression {
+
+ // ====================================== Parameters
===========================================
+
+ case object Isotonic extends Parameter[Boolean] {
+ val defaultValue = Some(true)
+ }
+
+ // ======================================== Factory methods
====================================
+
+ def apply(): IsotonicRegression = {
+ new IsotonicRegression()
+ }
+
+ // ====================================== Operations
===========================================
+
+ class AdjacentPoolViolatersMapper extends MapFunction[Array[(Double,
Double, Double)], Array[
+ (Double, Double, Double)]] {
+
+ /**
+ * Performs a pool adjacent violators algorithm (PAV).
+ * Uses approach with single processing of data where violators
+ * in previously processed data created by pooling are fixed
immediately.
+ * Uses optimization of discovering monotonicity violating sequences
(blocks).
+ *
+ * @param input Input data of tuples (label, feature, weight).
+ * @return Result tuples (label, feature, weight) where labels were
updated
+ * to form a monotone sequence as per isotonic regression
definition.
+ */
+ override def map(input: Array[(Double, Double, Double)]):
+ Array[(Double, Double, Double)] = {
+ if (input.isEmpty) {
+ return Array.empty
+ }
+
+ // Pools sub array within given bounds assigning weighted average
value to all
+ // elements.
+ def pool(input: Array[(Double, Double, Double)], start: Int, end:
Int): Unit = {
+ val poolSubArray = input.slice(start, end + 1)
+
+ val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
+ val weight = poolSubArray.map(_._3).sum
+
+ var i = start
+ while (i <= end) {
+ input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
+ i = i + 1
+ }
+ }
+
+ var i = 0
+ val len = input.length
+ while (i < len) {
+ var j = i
+
+ // Find monotonicity violating sequence, if any.
+ while (j < len - 1 && input(j)._1 > input(j + 1)._1) {
+ j = j + 1
+ }
+
+ // If monotonicity was not violated, move to next data point.
+ if (i == j) {
+ i = i + 1
+ } else {
+ // Otherwise pool the violating sequence
+ // and check if pooling caused monotonicity violation in
previously processed
+ // points.
+ while (i >= 0 && input(i)._1 > input(i + 1)._1) {
+ pool(input, i, j)
+ i = i - 1
+ }
+
+ i = j
+ }
+ }
+ // For points having the same prediction, we only keep two boundary
points.
+ val compressed = ArrayBuffer.empty[(Double, Double, Double)]
+
+ var (curLabel, curFeature, curWeight) = input.head
+ var rightBound = curFeature
+ def merge(): Unit = {
+ compressed += ((curLabel, curFeature, curWeight))
+ if (rightBound > curFeature) {
+ compressed += ((curLabel, rightBound, 0.0))
+ }
+ }
+ i = 1
+ while (i < input.length) {
+ val (label, feature, weight) = input(i)
+ if (label == curLabel) {
+ curWeight += weight
+ rightBound = feature
+ } else {
+ merge()
+ curLabel = label
+ curFeature = feature
+ curWeight = weight
+ rightBound = curFeature
+ }
+ i += 1
+ }
+ merge()
+
+ compressed.toArray
+ }
+ }
+
+
+ implicit val fitIR = new FitOperation[IsotonicRegression, (Double,
Double, Double)] {
+
+ override def fit(instance: IsotonicRegression,
+ fitParameters: ParameterMap,
+ input: DataSet[(Double, Double, Double)]): Unit = {
+
+ val isotonic = instance.isotonic
+
+ val preprocessedInput = if (isotonic) {
+ input
+ } else {
+ input.map(x => (-x._1, x._2, x._3))
+ }
+
+ val parallelStepResult = preprocessedInput
+ .partitionByRange(1)
+ .mapPartition(partition => {
+ val buffer = new ArrayBuffer[(Double, Double, Double)]
+ buffer ++= partition
+ Seq(buffer.sortBy(x => (x._2, x._1)).toArray)
+ })
+ .map(new AdjacentPoolViolatersMapper)
+ .setParallelism(1)
+ .mapPartition(partitions => {
+ val buffer = new ArrayBuffer[(Double, Double, Double)]
+ for (partition <- partitions) {
+ buffer ++= partition
+ }
+ Seq(buffer.sortBy(x => (x._2, x._1)).toArray)
--- End diff --
This operation effectively collects all data on a single machine and
materializes it. This won't scale at all.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---