[FLINK-6549] [datastream] Improve error message for type mismatches with side outputs
This closes #4663. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fc1103a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fc1103a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fc1103a Branch: refs/heads/master Commit: 5fc1103ab6cb2819d693530f2ba4e09192a9c88d Parents: d1acf56 Author: Bowen Li <bowenl...@gmail.com> Authored: Sat Sep 9 00:03:13 2017 -0700 Committer: zentol <ches...@apache.org> Committed: Tue Sep 19 11:06:52 2017 +0200 ---------------------------------------------------------------------- .../flink/streaming/runtime/tasks/OperatorChain.java | 10 ++++++++++ 1 file changed, 10 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5fc1103a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index b15f126..3827982 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -526,6 +526,16 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); + } catch (ClassCastException e) { + // Enrich error message + ClassCastException replace = new ClassCastException( + String.format("%s. Failed to push OutputTag with id '%s' to operator. " + + "This can occur when multiple OutputTags with different types " + + "but identical names are being used.", + e.getMessage(), outputTag.getId())); + + throw new ExceptionInChainedOperatorException(replace); + } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); }