[
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15444782#comment-15444782
]
ASF GitHub Bot commented on FLINK-3920:
---------------------------------------
Github user chiwanpark commented on a diff in the pull request:
https://github.com/apache/flink/pull/2152#discussion_r76552087
--- Diff:
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
---
@@ -159,3 +183,68 @@ case class IndexedRow(rowIndex: MatrixRowIndex,
values: Vector) extends Ordered[
override def toString: String = s"($rowIndex, ${values.toString})"
}
+
+/**
+ * Serializable Reduction function used by the toBlockMatrix function.
Takes an ordered list of
+ * indexed row and split those rows to form blocks.
+ */
+class RowGroupReducer(blockMapper: BlockMapper)
+ extends RichGroupReduceFunction[(Int, Int, Vector), (Int, Block)] {
+
+ override def reduce(values: java.lang.Iterable[(Int, Int, Vector)],
+ out: Collector[(Int, Block)]): Unit = {
+
+ val sortedRows = values.toList.sortBy(_._2)
+ val blockID = sortedRows.head._1
+ val coo = for {
+ (_, rowIndex, vec) <- sortedRows
+ (colIndex, value) <- vec if value != 0
+ } yield (rowIndex, colIndex, value)
+
+ val block: Block = Block(
+ SparseMatrix.fromCOO(
+ blockMapper.rowsPerBlock, blockMapper.colsPerBlock, coo))
+ out.collect((blockID, block))
+ }
+}
+
+class RowSplitter(blockMapper: BlockMapper)
+ extends RichFlatMapFunction[IndexedRow, (Int, Int, Vector)] {
+ override def flatMap(
+ row: IndexedRow, out: Collector[(Int, Int, Vector)]): Unit = {
+ val IndexedRow(rowIndex, vector) = row
+ val splitRow = sliceVector(vector)
+ for ((mappedCol, slice) <- splitRow) {
+ val mappedRow =
+ math.floor(rowIndex * 1.0 / blockMapper.rowsPerBlock).toInt
+ val blockID = blockMapper.getBlockIdByMappedCoord(mappedRow,
mappedCol)
+ out.collect((blockID, rowIndex % blockMapper.rowsPerBlock, slice))
+ }
+ }
+
+ def sliceVector(v: Vector): List[(Int, Vector)] = {
+
+ def getSliceByColIndex(index: Int): Int =
--- End diff --
This method is used once. Please remove this method.
> Distributed Linear Algebra: block-based matrix
> ----------------------------------------------
>
> Key: FLINK-3920
> URL: https://issues.apache.org/jira/browse/FLINK-3920
> Project: Flink
> Issue Type: New Feature
> Components: Machine Learning Library
> Reporter: Simone Robutti
> Assignee: Simone Robutti
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)