Repository: storm
Updated Branches:
  refs/heads/1.x-branch ee1309d2a -> d5f4c4021


STORM-1363: TridentKafkaState should handle null values from 
TridentTupleToKafkaMapper.getMessageFromTuple()

Incase null value comes from the mapper it will print warning messages also 
added the time take to emit number od messages in logs


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/187d08bf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/187d08bf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/187d08bf

Branch: refs/heads/1.x-branch
Commit: 187d08bf45bf424f3963a604d72e076b00d594c7
Parents: ee1309d
Author: Sachin Pasalkar <[email protected]>
Authored: Tue Feb 14 15:54:23 2017 +0530
Committer: Jungtaek Lim <[email protected]>
Committed: Wed Feb 22 16:34:02 2017 +0900

----------------------------------------------------------------------
 .../storm/kafka/trident/TridentKafkaState.java  | 50 +++++++++++++-------
 1 file changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/187d08bf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
index f24fed5..76e0f02 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -75,20 +75,29 @@ public class TridentKafkaState implements State {
     public void updateState(List<TridentTuple> tuples, TridentCollector 
collector) {
         String topic = null;
         try {
-            List<Future<RecordMetadata>> futures = new 
ArrayList<>(tuples.size());
+            long startTime = System.currentTimeMillis();
+            int numberOfRecords = tuples.size();
+            List<Future<RecordMetadata>> futures = new 
ArrayList<>(numberOfRecords);
             for (TridentTuple tuple : tuples) {
                 topic = topicSelector.getTopic(tuple);
-
-                if(topic != null) {
-                    Future<RecordMetadata> result = producer.send(new 
ProducerRecord(topic,
-                            mapper.getKeyFromTuple(tuple), 
mapper.getMessageFromTuple(tuple)));
-                    futures.add(result);
+                Object messageFromTuple = mapper.getMessageFromTuple(tuple);
+                Object keyFromTuple = mapper.getKeyFromTuple(tuple);
+                               
+                if (topic != null) {
+                   if (messageFromTuple != null) {
+                     Future<RecordMetadata> result = producer.send(new 
ProducerRecord(topic,keyFromTuple, messageFromTuple));
+                     futures.add(result);
+                  } else {
+                     LOG.warn("skipping Message with Key "+ keyFromTuple +" as 
message was null");
+                  }
+                       
                 } else {
-                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) 
+ ", topic selector returned null.");
+                      LOG.warn("skipping key = " + keyFromTuple + ", topic 
selector returned null.");
                 }
             }
-
-            List<ExecutionException> exceptions = new 
ArrayList<>(futures.size());
+            
+            int emittedRecords = futures.size();
+            List<ExecutionException> exceptions = new 
ArrayList<>(emittedRecords);
             for (Future<RecordMetadata> future : futures) {
                 try {
                     future.get();
@@ -97,15 +106,20 @@ public class TridentKafkaState implements State {
                 }
             }
 
-            if(exceptions.size() > 0){
-                String errorMsg = "Could not retrieve result for messages " + 
tuples + " from topic = " + topic 
-                        + " because of the following exceptions: \n";
-                for (ExecutionException exception : exceptions) {
-                    errorMsg = errorMsg + exception.getMessage() + "\n";
-                }
-                LOG.error(errorMsg);
-                throw new FailedException(errorMsg);
-            }
+            if (exceptions.size() > 0){
+               StringBuilder errorMsg = new StringBuilder("Could not retrieve 
result for messages " + tuples + " from topic = " + topic 
+                               + " because of the following exceptions:" + 
System.lineSeparator());
+                               
+               for (ExecutionException exception : exceptions) {
+                       errorMsg = 
errorMsg.append(exception.getMessage()).append(System.lineSeparator()); ;
+               }
+               String message = errorMsg.toString();
+               LOG.error(message);
+               throw new FailedException(message);
+           }
+           long latestTime = System.currentTimeMillis();
+           LOG.info("Emitted record {} sucessfully in {} ms to topic {} ", 
emittedRecords, latestTime-startTime, topic);
+                       
         } catch (Exception ex) {
             String errorMsg = "Could not send messages " + tuples + " to topic 
= " + topic;
             LOG.warn(errorMsg, ex);

Reply via email to