[ https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15444782#comment-15444782 ]
ASF GitHub Bot commented on FLINK-3920: --------------------------------------- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/2152#discussion_r76552087 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala --- @@ -159,3 +183,68 @@ case class IndexedRow(rowIndex: MatrixRowIndex, values: Vector) extends Ordered[ override def toString: String = s"($rowIndex, ${values.toString})" } + +/** + * Serializable Reduction function used by the toBlockMatrix function. Takes an ordered list of + * indexed row and split those rows to form blocks. + */ +class RowGroupReducer(blockMapper: BlockMapper) + extends RichGroupReduceFunction[(Int, Int, Vector), (Int, Block)] { + + override def reduce(values: java.lang.Iterable[(Int, Int, Vector)], + out: Collector[(Int, Block)]): Unit = { + + val sortedRows = values.toList.sortBy(_._2) + val blockID = sortedRows.head._1 + val coo = for { + (_, rowIndex, vec) <- sortedRows + (colIndex, value) <- vec if value != 0 + } yield (rowIndex, colIndex, value) + + val block: Block = Block( + SparseMatrix.fromCOO( + blockMapper.rowsPerBlock, blockMapper.colsPerBlock, coo)) + out.collect((blockID, block)) + } +} + +class RowSplitter(blockMapper: BlockMapper) + extends RichFlatMapFunction[IndexedRow, (Int, Int, Vector)] { + override def flatMap( + row: IndexedRow, out: Collector[(Int, Int, Vector)]): Unit = { + val IndexedRow(rowIndex, vector) = row + val splitRow = sliceVector(vector) + for ((mappedCol, slice) <- splitRow) { + val mappedRow = + math.floor(rowIndex * 1.0 / blockMapper.rowsPerBlock).toInt + val blockID = blockMapper.getBlockIdByMappedCoord(mappedRow, mappedCol) + out.collect((blockID, rowIndex % blockMapper.rowsPerBlock, slice)) + } + } + + def sliceVector(v: Vector): List[(Int, Vector)] = { + + def getSliceByColIndex(index: Int): Int = --- End diff -- This method is used once. Please remove this method. > Distributed Linear Algebra: block-based matrix > ---------------------------------------------- > > Key: FLINK-3920 > URL: https://issues.apache.org/jira/browse/FLINK-3920 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library > Reporter: Simone Robutti > Assignee: Simone Robutti > -- This message was sent by Atlassian JIRA (v6.3.4#6332)