[
https://issues.apache.org/jira/browse/FLINK-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546532#comment-16546532
]
ASF GitHub Bot commented on FLINK-7251:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/6120#discussion_r203003680
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
---
@@ -237,19 +248,81 @@ private boolean
validateKeyTypeIsHashable(TypeInformation<?> type) {
}
/**
- * Gets the type of the key by which the stream is partitioned.
- * @return The type of the key by which the stream is partitioned.
+ * Tries to fill in the type information. Type information can be
filled in
+ * later when the program uses a type hint. This method checks whether
the
+ * type information has ever been accessed before and does not allow
+ * modifications if the type was accessed already. This ensures
consistency
+ * by making sure different parts of the operation do not assume
different
+ * type information.
+ *
+ * @param keyType The type information to fill in.
+ *
+ * @throws IllegalStateException Thrown, if the type information has
been accessed before.
+ */
+ private void setKeyType(TypeInformation<KEY> keyType) {
+ if (typeUsed) {
+ throw new IllegalStateException(
+ "TypeInformation cannot be filled in
for the type after it has been used. "
+ + "Please make sure
that the type info hints are the first call after "
+ + "the keyBy() function
before any other access.");
+ }
+ this.keyType = keyType;
+ }
+
+ /**
+ * Returns the key type of this {@code KeyedStream} as a {@link
TypeInformation}. Once
+ * this is used once the key type cannot be changed anymore using
{@link #returns(TypeInformation)}.
+ *
+ * @return The output type of this {@code KeyedStream}
*/
@Internal
public TypeInformation<KEY> getKeyType() {
- return keyType;
+ if (keyType instanceof MissingTypeInfo) {
+ MissingTypeInfo typeInfo = (MissingTypeInfo)
this.keyType;
+ throw new InvalidTypesException(
+ "The key type of key selector '"
+ +
typeInfo.getFunctionName()
+ + "' could not be
determined automatically, due to type erasure. "
+ + "You can give type
information hints by using the returns(...) "
+ + "method on the result
of the transformation call, or by letting "
+ + "your selector
implement the 'ResultTypeQueryable' "
+ + "interface.",
typeInfo.getTypeException());
+ }
+
+ // perform the validation when the type is used for the first
time
+ if (!typeUsed) {
+ typeUsed = true;
+ validateKeyType(keyType);
+ }
+
+ return this.keyType;
}
@Override
protected DataStream<T> setConnectionType(StreamPartitioner<T>
partitioner) {
throw new UnsupportedOperationException("Cannot override
partitioning for KeyedStream.");
}
+ //
------------------------------------------------------------------------
+ // Type hinting
+ //
------------------------------------------------------------------------
+
+ /**
+ * Adds a type information hint about the key type of a key selector.
This method
+ * can be used in cases where Flink cannot determine automatically what
the produced
+ * type of a key selector is. That can be the case if the selector uses
generic type variables
+ * in the return type that cannot be inferred from the input type.
+ *
+ * @param typeInfo type information as a key type hint
+ * @return This operator with a given key type hint.
+ */
+ public KeyedStream<T, KEY> returns(TypeInformation<KEY> typeInfo) {
--- End diff --
Should probably make this `@PublicEvolving` for now.
> Merge the flink-java8 project into flink-core
> ---------------------------------------------
>
> Key: FLINK-7251
> URL: https://issues.apache.org/jira/browse/FLINK-7251
> Project: Flink
> Issue Type: Improvement
> Components: Build System
> Reporter: Stephan Ewen
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)