[ https://issues.apache.org/jira/browse/FLINK-3128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15135852#comment-15135852 ]
ASF GitHub Bot commented on FLINK-3128: --------------------------------------- Github user f-sander commented on the pull request: https://github.com/apache/flink/pull/1565#issuecomment-180798486 Sorry for the long delay. I still don't really have time for this, but I wan't to describe it anyways. That's why the writing and formatting is pretty sloppy in this. Sorry for that, I hope you bare with me: We only consider isotonic regression on weighted, two dimensional data. Thus, datapoints are tuples of three doubles: `(y, x, w)`. PAV assumes the data to be sorted by `x`. It starts on the left and goes to the right. Whenever two Point's (or more) are found that are descending in order of `x`, it "pools" them, which means all `y` values (multiplied by their weight) in that pool are averaged by the sum of all weights. Any point in the pool then looks like this: `(y_weighted_pool_avg, x, w)`. Because the `y` values where changed, we have to look back in `x` order if the new pool avg is lower than the value before the pool. If that's the case, we have to pool again until now higher `y` value is present before the pool. Any sequence of data points from `i` to `j` sharing the same `y` value is compressed in the following way: `(y, x_i, sum_of_weights), (y, x_j, 0)`. The hope of Sparks implementation is that enough data gets compressed that way, that all remaining data fits into one node in the last step. However, there are of course cases, where this simply doesn't work. Our approach (not implemented in this PR) works like this: ``` compare two consecutive data points i and j: if y_i < y_j, leave them untouched if y_i > y_j, replace both with ((y_i * w_i + y_j * w_j) / (w_i + w_j), x_i, w_i + w_j). Also remember x_j if y_i = y_j, replace both with (y_i, x_i, w_i + w_j). Also remember x_j Repeat that until no pairs are combined to one ``` After the iteration terminated: Foreach point that has a "remembered" `x_j`, add another `(y, x_j, 0)` directly behind it. We are able to compare each point with its successor, by attaching each point with an index (zipWithIndex) and a "next-pointer" (index+1) and then doing a: `set.join(set).where(next).equalTo(index)` However, because of the weight summation, we must avoid that a point appears in multiple join pairs. Otherwise a point's weight might be summed into multiple combined points. We worked around that by doing two joins in each iteration step: ``` step 1: left join side has only points with even indices, right side only with odd step 2: left join side has only points with odd indices, right side only with even if nothing happened during these two runs, we are done ``` Unfortunately, because of the merging the indices are not incrementing by 1 anymore. That's why we wanted to apply another zipWithIndex after the two joins, but the join repartitioned the data, so we loose our range-partitioning. But, this is required to get indices representing the total order of the data. I hope you can understand the problem. Again sorry for sloppy writing. > Add Isotonic Regression To ML Library > ------------------------------------- > > Key: FLINK-3128 > URL: https://issues.apache.org/jira/browse/FLINK-3128 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library > Reporter: Fridtjof Sander > Assignee: Fridtjof Sander > Priority: Minor > > Isotonic Regression fits a monotonically increasing function (also called > isotonic function) to a plane of datapoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)