[ 
https://issues.apache.org/jira/browse/FLINK-3128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15135852#comment-15135852
 ] 

ASF GitHub Bot commented on FLINK-3128:
---------------------------------------

Github user f-sander commented on the pull request:

    https://github.com/apache/flink/pull/1565#issuecomment-180798486
  
    Sorry for the long delay. I still don't really have time for this, but I 
wan't to describe it anyways. That's why the writing and formatting is pretty 
sloppy in this. Sorry for that, I hope you bare with me:
    
    We only consider isotonic regression on weighted, two dimensional data. 
Thus, datapoints are tuples of three doubles: `(y, x, w)`.
    
    PAV assumes the data to be sorted by `x`. It starts on the left and goes to 
the right. Whenever two Point's (or more) are found that are descending in 
order of `x`, it "pools" them, which means all `y` values (multiplied by their 
weight) in that pool are averaged by the sum of all weights. Any point in the 
pool then looks like this: `(y_weighted_pool_avg, x, w)`. Because the `y` 
values where changed, we have to look back in `x` order if the new pool avg is 
lower than the value before the pool. If that's the case, we have to pool again 
until now higher `y` value is present before the pool.
    
    Any sequence of data points from `i` to `j` sharing the same `y` value is 
compressed in the following way: `(y, x_i, sum_of_weights), (y, x_j, 0)`. The 
hope of Sparks implementation is that enough data gets compressed that way, 
that all remaining data fits into one node in the last step. However, there are 
of course cases, where this simply doesn't work.
    
    Our approach (not implemented in this PR) works like this:
    ```
    compare two consecutive data points i and j:
    if y_i < y_j, leave them untouched
    if y_i > y_j, replace both with ((y_i * w_i + y_j * w_j) / (w_i + w_j), 
x_i, w_i + w_j). Also remember x_j
    if y_i = y_j, replace both with (y_i, x_i, w_i + w_j). Also remember x_j
    Repeat that until no pairs are combined to one
    ```
    After the iteration terminated: Foreach point that has a "remembered" 
`x_j`, add another `(y, x_j, 0)` directly behind it.
    
    We are able to compare each point with its successor, by attaching each 
point with an index (zipWithIndex) and a "next-pointer" (index+1) and then 
doing a:
    `set.join(set).where(next).equalTo(index)`
    However, because of the weight summation, we must avoid that a point 
appears in multiple join pairs. Otherwise a point's weight might be summed into 
multiple combined points.
    
    We worked around that by doing two joins in each iteration step:
    ```
    step 1: left join side has only points with even indices, right side only 
with odd
    step 2: left join side has only points with odd indices, right side only 
with even
    if nothing happened during these two runs, we are done
    ```
    
    Unfortunately, because of the merging the indices are not incrementing by 1 
anymore. That's why we wanted to apply another zipWithIndex after the two 
joins, but the join repartitioned the data, so we loose our range-partitioning. 
But, this is required to get indices representing the total order of the data.
    
    I hope you can understand the problem. Again sorry for sloppy writing. 


> Add Isotonic Regression To ML Library
> -------------------------------------
>
>                 Key: FLINK-3128
>                 URL: https://issues.apache.org/jira/browse/FLINK-3128
>             Project: Flink
>          Issue Type: New Feature
>          Components: Machine Learning Library
>            Reporter: Fridtjof Sander
>            Assignee: Fridtjof Sander
>            Priority: Minor
>
> Isotonic Regression fits a monotonically increasing function (also called 
> isotonic function) to a plane of datapoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to