Hello again:

Any news about this problem with enriched MapPartition function?

Thank you

On 06/04/16 17:01, Sergio Ramírez wrote:
Hello,

Ok, please find enclosed the test code and the input data.

Cheers

On 31/03/16 10:07, Till Rohrmann wrote:
Hi Sergio,

could you please provide a complete example (including input data) to
reproduce your problem. It is hard to tell what's going wrong when one only
sees a fraction of the program.

Cheers,
Till

On Tue, Mar 29, 2016 at 5:58 PM, Sergio Ramírez <srami...@correo.ugr.es>
wrote:

Hi again,

I've not been able to solve the problem with the instruction you gave me. I've tried with static variables (matrices) also unsuccessfully. I've also
tried this simpler code:


def mapPartition(it: java.lang.Iterable[LabeledVector], out:
Collector[((Int, Int), Int)]): Unit = {
           val index = getRuntimeContext().getIndexOfThisSubtask() //
Partition index
           var ninst = 0
           for(reg <- it.asScala) {
             requireByteValues(reg.vector)
             ninst += 1
           }
           for(i <- 0 until nFeatures) out.collect((i, index) -> ninst)
         }

The result is as follows:

Attribute 10, first seven partitions:
((10,0),201),((10,1),200),((10,2),201),((10,3),200),((10,4),200),((10,5),201),((10,6),201),((10,7),201)
Attribute 12, first seven partitions:
((12,0),201),((12,1),201),((12,2),201),((12,3),200),((12,4),201),((12,5),200),((12,6),200),((12,7),201)

As you can see, for example, for block 6 different number of instances are
shown, but  it's impossible.


On 24/03/16 22:39, Chesnay Schepler wrote:

Haven't looked to deeply into this, but this sounds like object reuse is enabled, at which point buffering values effectively causes you to store
the same value multiple times.

can you try disabling objectReuse using
env.getConfig().disableObjectReuse() ?

On 22.03.2016 16:53, Sergio Ramírez wrote:

Hi all,

I've been having some problems with RichMapPartitionFunction. Firstly, I tried to convert the iterable into an array unsuccessfully. Then, I have used some buffers to store the values per column. I am trying to transpose
the local matrix of LabeledVectors that I have in each partition.

None of these solutions have worked. For example, for partition 7 and
feature 10, the vector is empty, whereas for the same partition and feature 11, the vectors contains 200 elements. And this change on each execution,
different partitions and features.

I think there is a problem with using the collect method out of the
iterable loop.

new RichMapPartitionFunction[LabeledVector, ((Int, Int), Array[Byte])]()
{
         def mapPartition(it: java.lang.Iterable[LabeledVector], out:
Collector[((Int, Int), Array[Byte])]): Unit = {
           val index = getRuntimeContext().getIndexOfThisSubtask()
           val mat = for (i <- 0 until nFeatures) yield new
scala.collection.mutable.ListBuffer[Byte]
           for(reg <- it.asScala) {
             for (i <- 0 until (nFeatures - 1)) mat(i) +=
reg.vector(i).toByte
             mat(nFeatures - 1) += classMap(reg.label)
           }
           for(i <- 0 until nFeatures) out.collect((i, index) ->
mat(i).toArray) // numPartitions
         }
  }

Regards




Reply via email to