I just checked out your branch and ran it with a break point set at the 
CollectHelper. If you look into the (list) accumulator you see that always the 
same object is added to it. Strangely enough, object re-use is disabled in the 
config. I don't have time to look further into it now, but it seems to be a 
problem with the object re-use mode.

– Ufuk

On 20 Jan 2015, at 20:53, Max Michels <m...@data-artisans.com> wrote:

> Hi everyone,
> 
> I'm running into some problems implementing a Accumulator for
> returning a list of a DataSet.
> 
> https://github.com/mxm/flink/tree/count/collect
> 
> Basically, it works fine in this test case:
> 
> 
>    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
>    Integer[] input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
> 
>    DataSet<Integer> data = env.fromElements(input);
> 
>    // count
>    long numEntries = data.count();
>    assertEquals(10, numEntries);
> 
>    // collect
>    ArrayList<Integer> list = (ArrayList<Integer>) data.collect();
>    assertArrayEquals(input, list.toArray());
> 
> 
> But with non-primitive objects strange results occur:
> 
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().disableObjectReuse();
> 
> DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
> DataSet<Integer> data2 = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
> 
> DataSet<Tuple2<Integer, Integer>> data3 = data.cross(data2);
> 
> // count
> long numEntries = data3.count();
> assertEquals(100, numEntries);
> 
> // collect
> ArrayList<Tuple2<Integer, Integer>> list = (ArrayList<Tuple2<Integer,
> Integer>>) data3.collect();
> 
> System.out.println(list)
> 
> ....
> 
> Output:
> 
> [(10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,10),
> (10,10), (10,10), (10,10), (10,10), (10,10), (10,10), (10,6), (10,6),
> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
> (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6), (10,6),
> (10,6), (10,6), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,7),
> (10,7), (10,7), (10,7), (10,7), (10,7), (10,7), (10,9), (10,9),
> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
> (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9), (10,9),
> (10,9), (10,9), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8), (10,8),
> (10,8), (10,8), (10,8), (10,8), (10,8), (10,8)]
> 
> I assume, the problem is the clone() method of the ListAccumulator
> where we just create a shallow copy. This is fine for accumulators
> which use primitive objects, like IntCounter but here we have a real
> object.
> 
> How do we work around this problem?
> 
> Best regards,
> Max

Reply via email to