[FLINK-6549] [datastream] Improve error message for type mismatches with side 
outputs

This closes #4663.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fc1103a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fc1103a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fc1103a

Branch: refs/heads/master
Commit: 5fc1103ab6cb2819d693530f2ba4e09192a9c88d
Parents: d1acf56
Author: Bowen Li <bowenl...@gmail.com>
Authored: Sat Sep 9 00:03:13 2017 -0700
Committer: zentol <ches...@apache.org>
Committed: Tue Sep 19 11:06:52 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/runtime/tasks/OperatorChain.java      | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5fc1103a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index b15f126..3827982 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -526,6 +526,16 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                StreamRecord<T> copy = 
castRecord.copy(serializer.copy(castRecord.getValue()));
                                operator.setKeyContextElement1(copy);
                                operator.processElement(copy);
+                       } catch (ClassCastException e) {
+                               // Enrich error message
+                               ClassCastException replace = new 
ClassCastException(
+                                               String.format("%s. Failed to 
push OutputTag with id '%s' to operator. " +
+                                                               "This can occur 
when multiple OutputTags with different types " +
+                                                               "but identical 
names are being used.",
+                                                               e.getMessage(), 
outputTag.getId()));
+
+                               throw new 
ExceptionInChainedOperatorException(replace);
+
                        } catch (Exception e) {
                                throw new 
ExceptionInChainedOperatorException(e);
                        }

Reply via email to