[ 
https://issues.apache.org/jira/browse/MAHOUT-1820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15217019#comment-15217019
 ] 

ASF GitHub Bot commented on MAHOUT-1820:
----------------------------------------

Github user andrewpalumbo commented on a diff in the pull request:

    https://github.com/apache/mahout/pull/207#discussion_r57815021
  
    --- Diff: 
flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala ---
    @@ -0,0 +1,51 @@
    +/**
    +  * Licensed to the Apache Software Foundation (ASF) under one
    +  * or more contributor license agreements. See the NOTICE file
    +  * distributed with this work for additional information
    +  * regarding copyright ownership. The ASF licenses this file
    +  * to you under the Apache License, Version 2.0 (the
    +  * "License"); you may not use this file except in compliance
    +  * with the License. You may obtain a copy of the License at
    +  *
    +  * http://www.apache.org/licenses/LICENSE-2.0
    +  *
    +  * Unless required by applicable law or agreed to in writing,
    +  * software distributed under the License is distributed on an
    +  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +  * KIND, either express or implied. See the License for the
    +  * specific language governing permissions and limitations
    +  * under the License.
    +  */
    +package org.apache.mahout.flinkbindings
    +
    +import java.lang.Iterable
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction
    +import org.apache.flink.api.scala._
    +import org.apache.flink.util.Collector
    +
    +import scala.collection._
    +
    +package object blas {
    +
    +  /**
    +    * To compute tuples (PartitionIndex, PartitionElementCount)
    +    *
    +    * @param drmDataSet
    +    * @tparam K
    +    * @return (PartitionIndex, PartitionElementCount)
    +    */
    +  //TODO: Remove this when FLINK-3657 is merged into Flink codebase and
    +  // replace by call to DataSetUtils.countElementsPerPartition(DataSet[K])
    +  private[mahout] def countsPerPartition[K](drmDataSet: DataSet[K]): 
DataSet[(Int, Int)] = {
    +    drmDataSet.mapPartition {
    +      new RichMapPartitionFunction[K, (Int, Int)] {
    +        override def mapPartition(iterable: Iterable[K], collector: 
Collector[(Int, Int)]) = {
    +          val count: Int = Iterator(iterable).size
    --- End diff --
    
    +1 to this.
    
    I'm not sure if it matters but you could  use iterable.toIterator.size.


> Add a method to generate Tuple<PartitionId, Partition elements count>> to 
> support Flink backend
> -----------------------------------------------------------------------------------------------
>
>                 Key: MAHOUT-1820
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1820
>             Project: Mahout
>          Issue Type: Improvement
>          Components: Flink
>    Affects Versions: 0.11.2
>            Reporter: Suneel Marthi
>            Assignee: Suneel Marthi
>             Fix For: 0.12.0
>
>
> Add a method - countElementsPerPartition() that returns a Tuple2<PartitionID, 
> PartitionCount>, this is a temporary fix until the PR for Flink-3657 is 
> merged into Flink Codebase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to