[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903625#comment-15903625 ]
ASF GitHub Bot commented on FLINK-5874: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240608 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation<KEY> validateKeyType(TypeInformation<KEY> keyType) { + Stack<TypeInformation<?>> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation<?> typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** + * Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be + * used as a key in the {@code DataStream.keyBy()} operation. + * + * @return {@code false} if: + * <ol> + * <li>it is a POJO type but does not override the {@link #hashCode()} method and relies on + * the {@link Object#hashCode()} implementation.</li> + * <li>it is an array of any type (see {@link PrimitiveArrayTypeInfo}, {@link BasicArrayTypeInfo}, + * {@link ObjectArrayTypeInfo}).</li> + * </ol>, + * {@code true} otherwise. + */ + private boolean validateKeyTypeIsHashable(TypeInformation<?> type) { + try { + return (type instanceof PojoTypeInfo) ? + !type.getTypeClass().getMethod("hashCode").getDeclaringClass().equals(Object.class) : --- End diff -- this method would be more readable by not inverting here, and returning true as the default. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > --------------------------------------------------------------------- > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0, 1.1.4 > Reporter: Robert Metzger > Assignee: Kostas Kloudas > Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)