[
https://issues.apache.org/jira/browse/FLINK-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14587315#comment-14587315
]
Stephan Ewen commented on FLINK-2220:
-------------------------------------
It should be possible to put in a check that verifies that the POJO types that
are used for keys are actually overriding hashCode() and equals().
The class Keys in the Java API would probably be the right place for that.
Beware: Using POJOs as keys (rather than their primitive fields) is not very
fast for sorts right now, as the POJOs miss the normalized keys support.
A good fix for that would be to allow (byte) arrays as keys.
> Join on Pojo without hashCode() silently fails
> ----------------------------------------------
>
> Key: FLINK-2220
> URL: https://issues.apache.org/jira/browse/FLINK-2220
> Project: Flink
> Issue Type: Bug
> Affects Versions: 0.9, 0.8.1
> Reporter: Marcus Leich
>
> I need to perform a join using a complete Pojo as join key.
> With DOP > 1 this only works if the Pojo comes with a meaningful hasCode()
> implementation, as otherwise equal objects will get hashed to different
> partitions based on their memory address and not on the content.
> I guess it's fine if users are required to implement hasCode() themselves,
> but it would be nice of documentation or better yet, Flink itself could alert
> users that this is a requirement, similar to how Comparable is required for
> keys.
> Use the following code to reproduce the issue:
> public class Pojo implements Comparable<Pojo> {
> public byte[] data;
> public Pojo () {
> }
> public Pojo (byte[] data) {
> this.data = data;
> }
> @Override
> public int compareTo(Pojo o) {
> return UnsignedBytes.lexicographicalComparator().compare(data,
> o.data);
> }
> // uncomment me for making the join work
> /* @Override
> public int hashCode() {
> return Arrays.hashCode(data);
> }*/
> }
> public void testJoin () throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.createLocalEnvironment();
> env.setParallelism(4);
> DataSet<Tuple2<Pojo, String>> left = env.fromElements(
> new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "black"),
> new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), "red"),
> new Tuple2<>(new Pojo(new byte[] {1}), "Spark"),
> new Tuple2<>(new Pojo(new byte[] {2}), "good"),
> new Tuple2<>(new Pojo(new byte[] {5}), "bug"));
> DataSet<Tuple2<Pojo, String>> right = env.fromElements(
> new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "white"),
> new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}),
> "green"),
> new Tuple2<>(new Pojo(new byte[] {1}), "Flink"),
> new Tuple2<>(new Pojo(new byte[] {2}), "evil"),
> new Tuple2<>(new Pojo(new byte[] {5}), "fix"));
> // will not print anything unless Pojo has a real hashCode()
> implementation
>
> left.join(right).where(0).equalTo(0).projectFirst(1).projectSecond(1).print();
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)