Hi Sergio, could you please provide a complete example (including input data) to reproduce your problem. It is hard to tell what's going wrong when one only sees a fraction of the program.
Cheers, Till On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <srami...@correo.ugr.es> wrote: > Hi again, > > I've not been able to solve the problem with the instruction you gave me. > I've tried with static variables (matrices) also unsuccessfully. I've also > tried this simpler code: > > > def mapPartition(it: java.lang.Iterable[LabeledVector], out: > Collector[((Int, Int), Int)]): Unit = { > val index = getRuntimeContext().getIndexOfThisSubtask() // > Partition index > var ninst = 0 > for(reg <- it.asScala) { > requireByteValues(reg.vector) > ninst += 1 > } > for(i <- 0 until nFeatures) out.collect((i, index) -> ninst) > } > > The result is as follows: > > Attribute 10, first seven partitions: > ((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201) > Attribute 12, first seven partitions: > ((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201) > > As you can see, for example, for block 6 different number of instances are > shown, but it's impossible. > > > On 24/03/16 22:39, Chesnay Schepler wrote: > >> Haven't looked to deeply into this, but this sounds like object reuse is >> enabled, at which point buffering values effectively causes you to store >> the same value multiple times. >> >> can you try disabling objectReuse using >> env.getConfig().disableObjectReuse() ? >> >> On 22.03.2016 16:53, Sergio Ramírez wrote: >> >>> Hi all, >>> >>> I've been having some problems with RichMapPartitionFunction. Firstly, I >>> tried to convert the iterable into an array unsuccessfully. Then, I have >>> used some buffers to store the values per column. I am trying to transpose >>> the local matrix of LabeledVectors that I have in each partition. >>> >>> None of these solutions have worked. For example, for partition 7 and >>> feature 10, the vector is empty, whereas for the same partition and feature >>> 11, the vectors contains 200 elements. And this change on each execution, >>> different partitions and features. >>> >>> I think there is a problem with using the collect method out of the >>> iterable loop. >>> >>> new RichMapPartitionFunction[LabeledVector, ((Int, Int), Array[Byte])]() >>> { >>> def mapPartition(it: java.lang.Iterable[LabeledVector], out: >>> Collector[((Int, Int), Array[Byte])]): Unit = { >>> val index = getRuntimeContext().getIndexOfThisSubtask() >>> val mat = for (i <- 0 until nFeatures) yield new >>> scala.collection.mutable.ListBuffer[Byte] >>> for(reg <- it.asScala) { >>> for (i <- 0 until (nFeatures - 1)) mat(i) += >>> reg.vector(i).toByte >>> mat(nFeatures - 1) += classMap(reg.label) >>> } >>> for(i <- 0 until nFeatures) out.collect((i, index) -> >>> mat(i).toArray) // numPartitions >>> } >>> } >>> >>> Regards >>> >>> >> >