Hi,

Which version of Flink are you using? This issue should have been resolved at 
least by 1.3.0: https://issues.apache.org/jira/browse/FLINK-5874 
<https://issues.apache.org/jira/browse/FLINK-5874>. Currently such keys should 
be rejected. There is also this issue, that aims to re-introduce proper support 
for arrays as keys: https://issues.apache.org/jira/browse/FLINK-5299 
<https://issues.apache.org/jira/browse/FLINK-5299>

Best,
Aljoscha

> On 31. Jul 2017, at 15:16, Xu Pingyong <xupingyong...@163.com> wrote:
> 
> Hi Aljoschaļ¼š
> 
>       The java.lang.Array hashCode depends on the reference instead of the 
> content. If the keyBy field contains an array, Two records are 
> hash-partitioned to different stream although their keys are equal.
> 
>                 int a1[] = new int[]{1, 2};  //  hashcode is : 5592464
> int a2[] = new int[]{1, 2};  //  hashcode is 1830712962
> 
> 
>        streaming job example:
> 
> 
> Tuple2<byte[], Integer>[] soures = new Tuple2[]{new Tuple2("a".getBytes(), 
> 2), new Tuple2("a".getBytes(), 5)};
> 
> 
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.fromElements(soures)
> .keyBy(0)
> .sum(1)
> .map(new MapFunction<Tuple2<byte[], Integer>, Tuple2<String, Integer>>() {
> @Override
> public Tuple2<String, Integer> map(Tuple2<byte[], Integer> value) throws 
> Exception {
> return new Tuple2<>(new String(value.f0), value.f1);
> }
> }).print();
> 
> 
> env.execute();
> 
> 
>      Expected result is: (a, 7), not the actual result. What do you think 
> about this case?
> 
> 
> Best Regards!
> Xu Pingyong
> 
> 
> 

Reply via email to