Repository: storm
Updated Branches:
  refs/heads/master 4986373c5 -> 2833e3061


STORM-1363: TridentKafkaState should handle null values from 
TridentTupleToKafkaMapper.getMessageFromTuple()

Incase null value comes from the mapper it will print warning messages also 
added the time take to emit number od messages in logs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/913d0c15
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/913d0c15
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/913d0c15

Branch: refs/heads/master
Commit: 913d0c15d364195760239a93e4efeb1c2550f975
Parents: 4986373
Author: Sachin Pasalkar <[email protected]>
Authored: Tue Feb 14 15:54:23 2017 +0530
Committer: Jungtaek Lim <[email protected]>
Committed: Wed Feb 22 16:36:10 2017 +0900

----------------------------------------------------------------------
 .../storm/kafka/trident/TridentKafkaState.java  | 50 +++++++++++++-------
 1 file changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/913d0c15/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
index f24fed5..76e0f02 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -75,20 +75,29 @@ public class TridentKafkaState implements State {
     public void updateState(List<TridentTuple> tuples, TridentCollector 
collector) {
         String topic = null;
         try {
-            List<Future<RecordMetadata>> futures = new 
ArrayList<>(tuples.size());
+            long startTime = System.currentTimeMillis();
+            int numberOfRecords = tuples.size();
+            List<Future<RecordMetadata>> futures = new 
ArrayList<>(numberOfRecords);
             for (TridentTuple tuple : tuples) {
                 topic = topicSelector.getTopic(tuple);
-
-                if(topic != null) {
-                    Future<RecordMetadata> result = producer.send(new 
ProducerRecord(topic,
-                            mapper.getKeyFromTuple(tuple), 
mapper.getMessageFromTuple(tuple)));
-                    futures.add(result);
+                Object messageFromTuple = mapper.getMessageFromTuple(tuple);
+                Object keyFromTuple = mapper.getKeyFromTuple(tuple);
+                               
+                if (topic != null) {
+                   if (messageFromTuple != null) {
+                     Future<RecordMetadata> result = producer.send(new 
ProducerRecord(topic,keyFromTuple, messageFromTuple));
+                     futures.add(result);
+                  } else {
+                     LOG.warn("skipping Message with Key "+ keyFromTuple +" as 
message was null");
+                  }
+                       
                 } else {
-                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) 
+ ", topic selector returned null.");
+                      LOG.warn("skipping key = " + keyFromTuple + ", topic 
selector returned null.");
                 }
             }
-
-            List<ExecutionException> exceptions = new 
ArrayList<>(futures.size());
+            
+            int emittedRecords = futures.size();
+            List<ExecutionException> exceptions = new 
ArrayList<>(emittedRecords);
             for (Future<RecordMetadata> future : futures) {
                 try {
                     future.get();
@@ -97,15 +106,20 @@ public class TridentKafkaState implements State {
                 }
             }
 
-            if(exceptions.size() > 0){
-                String errorMsg = "Could not retrieve result for messages " + 
tuples + " from topic = " + topic 
-                        + " because of the following exceptions: \n";
-                for (ExecutionException exception : exceptions) {
-                    errorMsg = errorMsg + exception.getMessage() + "\n";
-                }
-                LOG.error(errorMsg);
-                throw new FailedException(errorMsg);
-            }
+            if (exceptions.size() > 0){
+               StringBuilder errorMsg = new StringBuilder("Could not retrieve 
result for messages " + tuples + " from topic = " + topic 
+                               + " because of the following exceptions:" + 
System.lineSeparator());
+                               
+               for (ExecutionException exception : exceptions) {
+                       errorMsg = 
errorMsg.append(exception.getMessage()).append(System.lineSeparator()); ;
+               }
+               String message = errorMsg.toString();
+               LOG.error(message);
+               throw new FailedException(message);
+           }
+           long latestTime = System.currentTimeMillis();
+           LOG.info("Emitted record {} sucessfully in {} ms to topic {} ", 
emittedRecords, latestTime-startTime, topic);
+                       
         } catch (Exception ex) {
             String errorMsg = "Could not send messages " + tuples + " to topic 
= " + topic;
             LOG.warn(errorMsg, ex);

Reply via email to