[ 
https://issues.apache.org/jira/browse/FLINK-3920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15444761#comment-15444761
 ] 

ASF GitHub Bot commented on FLINK-3920:
---------------------------------------

Github user chiwanpark commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2152#discussion_r76551557
  
    --- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/BlockMatrix.scala
 ---
    @@ -0,0 +1,286 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.math.distributed
    +
    +import org.apache.flink.api.common.functions.{MapFunction, 
RichGroupReduceFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala.{createTypeInformation, _}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.SparseVector
    +import org.apache.flink.ml.math.distributed.BlockMatrix.BlockID
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Distributed Matrix represented as blocks. 
    +  * A BlockMapper instance is used to track blocks of the matrix.
    +  * Every block in a BlockMatrix has an associated ID that also 
    +  * identifies its position in the BlockMatrix.
    +  * @param data
    +  * @param blockMapper
    +  */
    +class BlockMatrix(
    +    val data: DataSet[(BlockID, Block)],
    +    blockMapper: BlockMapper
    +)
    +    extends DistributedMatrix {
    +
    +  val numCols = blockMapper.numCols
    +  val numRows = blockMapper.numRows
    +
    +  val getBlockCols = blockMapper.numBlockCols
    +  val getBlockRows = blockMapper.numBlockRows
    +
    +  val getRowsPerBlock = blockMapper.rowsPerBlock
    +  val getColsPerBlock = blockMapper.colsPerBlock
    +
    +  val getNumBlocks = blockMapper.numBlocks
    +
    +  /**
    +    * Compares the format of two block matrices
    +    * @return
    +    */
    +  def hasSameFormat(other: BlockMatrix): Boolean =
    +    this.numRows == other.numRows && this.numCols == other.numCols &&
    +    this.getRowsPerBlock == other.getRowsPerBlock &&
    +    this.getColsPerBlock == other.getColsPerBlock
    +
    +  /**
    +    * Perform an operation on pairs of block. Pairs are formed taking
    +    * matching blocks from the two matrices that are placed in the same 
position.
    +    * A function is then applied to the pair to return a new block.
    +    * These blocks are then composed in a new block matrix.
    +    */
    +  def blockPairOperation(
    +      fun: (Block, Block) => Block, other: BlockMatrix): BlockMatrix = {
    +    require(hasSameFormat(other))
    +
    +    /*Full outer join on blocks. The full outer join is required because of
    +    the sparse nature of the matrix.
    +    Matching blocks may be missing and a block of zeros is used instead.*/
    +    val processedBlocks =
    +      this.data.fullOuterJoin(other.data).where(0).equalTo(0) {
    +        (left: (BlockID, Block), right: (BlockID, Block)) =>
    +          {
    +
    +            val (id1, block1) = Option(left) match {
    +              case Some((id, block)) => (id, block)
    +              case None =>
    +                (right._1, Block.zero(right._2.getRows, right._2.getCols))
    --- End diff --
    
    Zero-filled matrix can be re-used. Please create the matrix once using 
`RichFlatJoinFunction` class.


> Distributed Linear Algebra: block-based matrix
> ----------------------------------------------
>
>                 Key: FLINK-3920
>                 URL: https://issues.apache.org/jira/browse/FLINK-3920
>             Project: Flink
>          Issue Type: New Feature
>          Components: Machine Learning Library
>            Reporter: Simone Robutti
>            Assignee: Simone Robutti
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to