[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15444782#comment-15444782
 ] 

ASF GitHub Bot commented on FLINK-3920:
---------------------------------------

Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76552087
  
    --- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
    @@ -159,3 +183,68 @@ case class IndexedRow(rowIndex: MatrixRowIndex, 
values: Vector) extends Ordered[
     
       override def toString: String = s"($rowIndex, ${values.toString})"
     }
    +
    +/**
    +  * Serializable Reduction function used by the toBlockMatrix function. 
Takes an ordered list of
    +  * indexed row and split those rows to form blocks.
    +  */
    +class RowGroupReducer(blockMapper: BlockMapper)
    +    extends RichGroupReduceFunction[(Int, Int, Vector), (Int, Block)] {
    +
    +  override def reduce(values: java.lang.Iterable[(Int, Int, Vector)],
    +                      out: Collector[(Int, Block)]): Unit = {
    +
    +    val sortedRows = values.toList.sortBy(_._2)
    +    val blockID = sortedRows.head._1
    +    val coo = for {
    +      (_, rowIndex, vec) <- sortedRows
    +      (colIndex, value) <- vec if value != 0
    +    } yield (rowIndex, colIndex, value)
    +
    +    val block: Block = Block(
    +        SparseMatrix.fromCOO(
    +            blockMapper.rowsPerBlock, blockMapper.colsPerBlock, coo))
    +    out.collect((blockID, block))
    +  }
    +}
    +
    +class RowSplitter(blockMapper: BlockMapper)
    +    extends RichFlatMapFunction[IndexedRow, (Int, Int, Vector)] {
    +  override def flatMap(
    +      row: IndexedRow, out: Collector[(Int, Int, Vector)]): Unit = {
    +    val IndexedRow(rowIndex, vector) = row
    +    val splitRow = sliceVector(vector)
    +    for ((mappedCol, slice) <- splitRow) {
    +      val mappedRow =
    +        math.floor(rowIndex * 1.0 / blockMapper.rowsPerBlock).toInt
    +      val blockID = blockMapper.getBlockIdByMappedCoord(mappedRow, 
mappedCol)
    +      out.collect((blockID, rowIndex % blockMapper.rowsPerBlock, slice))
    +    }
    +  }
    +
    +  def sliceVector(v: Vector): List[(Int, Vector)] = {
    +
    +    def getSliceByColIndex(index: Int): Int =
    --- End diff --
    
    This method is used once. Please remove this method.


> Distributed Linear Algebra: block-based matrix
> ----------------------------------------------
>
>                 Key: FLINK-3920
>                 URL: https://issues.apache.org/jira/browse/FLINK-3920
>             Project: Flink
>          Issue Type: New Feature
>          Components: Machine Learning Library
>            Reporter: Simone Robutti
>            Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to