Hi all,

I've been having some problems with RichMapPartitionFunction. Firstly, I tried to convert the iterable into an array unsuccessfully. Then, I have used some buffers to store the values per column. I am trying to transpose the local matrix of LabeledVectors that I have in each partition.

None of these solutions have worked. For example, for partition 7 and feature 10, the vector is empty, whereas for the same partition and feature 11, the vectors contains 200 elements. And this change on each execution, different partitions and features.

I think there is a problem with using the collect method out of the iterable loop.

new RichMapPartitionFunction[LabeledVector, ((Int, Int), Array[Byte])]() {
def mapPartition(it: java.lang.Iterable[LabeledVector], out: Collector[((Int, Int), Array[Byte])]): Unit = {
          val index = getRuntimeContext().getIndexOfThisSubtask()
val mat = for (i <- 0 until nFeatures) yield new scala.collection.mutable.ListBuffer[Byte]
          for(reg <- it.asScala) {
for (i <- 0 until (nFeatures - 1)) mat(i) += reg.vector(i).toByte
            mat(nFeatures - 1) += classMap(reg.label)
          }
for(i <- 0 until nFeatures) out.collect((i, index) -> mat(i).toArray) // numPartitions
        }
 }

Regards

Reply via email to