Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76552087
  
    --- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 ---
    @@ -159,3 +183,68 @@ case class IndexedRow(rowIndex: MatrixRowIndex, 
values: Vector) extends Ordered[
     
       override def toString: String = s"($rowIndex, ${values.toString})"
     }
    +
    +/**
    +  * Serializable Reduction function used by the toBlockMatrix function. 
Takes an ordered list of
    +  * indexed row and split those rows to form blocks.
    +  */
    +class RowGroupReducer(blockMapper: BlockMapper)
    +    extends RichGroupReduceFunction[(Int, Int, Vector), (Int, Block)] {
    +
    +  override def reduce(values: java.lang.Iterable[(Int, Int, Vector)],
    +                      out: Collector[(Int, Block)]): Unit = {
    +
    +    val sortedRows = values.toList.sortBy(_._2)
    +    val blockID = sortedRows.head._1
    +    val coo = for {
    +      (_, rowIndex, vec) <- sortedRows
    +      (colIndex, value) <- vec if value != 0
    +    } yield (rowIndex, colIndex, value)
    +
    +    val block: Block = Block(
    +        SparseMatrix.fromCOO(
    +            blockMapper.rowsPerBlock, blockMapper.colsPerBlock, coo))
    +    out.collect((blockID, block))
    +  }
    +}
    +
    +class RowSplitter(blockMapper: BlockMapper)
    +    extends RichFlatMapFunction[IndexedRow, (Int, Int, Vector)] {
    +  override def flatMap(
    +      row: IndexedRow, out: Collector[(Int, Int, Vector)]): Unit = {
    +    val IndexedRow(rowIndex, vector) = row
    +    val splitRow = sliceVector(vector)
    +    for ((mappedCol, slice) <- splitRow) {
    +      val mappedRow =
    +        math.floor(rowIndex * 1.0 / blockMapper.rowsPerBlock).toInt
    +      val blockID = blockMapper.getBlockIdByMappedCoord(mappedRow, 
mappedCol)
    +      out.collect((blockID, rowIndex % blockMapper.rowsPerBlock, slice))
    +    }
    +  }
    +
    +  def sliceVector(v: Vector): List[(Int, Vector)] = {
    +
    +    def getSliceByColIndex(index: Int): Int =
    --- End diff --
    
    This method is used once. Please remove this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to