STORM-2092: throw summarized error messages

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ae3b90fd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ae3b90fd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ae3b90fd

Branch: refs/heads/master
Commit: ae3b90fdfd22dabfe2f72155f48802e1f1cb2d19
Parents: 9888bf6
Author: vesense <best.wang...@163.com>
Authored: Sun Sep 18 11:14:18 2016 +0800
Committer: vesense <best.wang...@163.com>
Committed: Sun Sep 18 11:19:14 2016 +0800

----------------------------------------------------------------------
 .../storm/kafka/trident/TridentKafkaState.java   | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ae3b90fd/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
index 7ff34cd..eb6737a 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -88,17 +88,24 @@ public class TridentKafkaState implements State {
                 }
             }
 
-            for (int i = 0 ; i < futures.size(); i++) {
-                Future<RecordMetadata> future = futures.get(i);
+            List<ExecutionException> exceptions = new 
ArrayList<>(futures.size());
+            for (Future<RecordMetadata> future : futures) {
                 try {
                     future.get();
                 } catch (ExecutionException e) {
-                    String errorMsg = "Could not retrieve result for message 
with key = "
-                            + mapper.getKeyFromTuple(tuples.get(i)) + " from 
topic = " + topic;
-                    LOG.error(errorMsg, e);
-                    throw new FailedException(errorMsg, e);
+                    exceptions.add(e);
                 }
             }
+
+            if(exceptions.size() > 0){
+                String errorMsg = "Could not retrieve result for messages " + 
tuples + " from topic = " + topic 
+                        + " because of the following exceptions: \n";
+                for (ExecutionException exception : exceptions) {
+                    errorMsg = errorMsg + exception.getMessage() + "\n";
+                }
+                LOG.error(errorMsg);
+                throw new FailedException(errorMsg);
+            }
         } catch (Exception ex) {
             String errorMsg = "Could not send messages " + tuples + " to topic 
= " + topic;
             LOG.warn(errorMsg, ex);

Reply via email to