Hi all,
I've been having some problems with RichMapPartitionFunction. Firstly, I
tried to convert the iterable into an array unsuccessfully. Then, I have
used some buffers to store the values per column. I am trying to
transpose the local matrix of LabeledVectors that I have in each partition.
None of these solutions have worked. For example, for partition 7 and
feature 10, the vector is empty, whereas for the same partition and
feature 11, the vectors contains 200 elements. And this change on each
execution, different partitions and features.
I think there is a problem with using the collect method out of the
iterable loop.
new RichMapPartitionFunction[LabeledVector, ((Int, Int), Array[Byte])]() {
def mapPartition(it: java.lang.Iterable[LabeledVector], out:
Collector[((Int, Int), Array[Byte])]): Unit = {
val index = getRuntimeContext().getIndexOfThisSubtask()
val mat = for (i <- 0 until nFeatures) yield new
scala.collection.mutable.ListBuffer[Byte]
for(reg <- it.asScala) {
for (i <- 0 until (nFeatures - 1)) mat(i) +=
reg.vector(i).toByte
mat(nFeatures - 1) += classMap(reg.label)
}
for(i <- 0 until nFeatures) out.collect((i, index) ->
mat(i).toArray) // numPartitions
}
}
Regards