Repository: storm Updated Branches: refs/heads/master 4986373c5 -> 2833e3061
STORM-1363: TridentKafkaState should handle null values from TridentTupleToKafkaMapper.getMessageFromTuple() Incase null value comes from the mapper it will print warning messages also added the time take to emit number od messages in logs Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/913d0c15 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/913d0c15 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/913d0c15 Branch: refs/heads/master Commit: 913d0c15d364195760239a93e4efeb1c2550f975 Parents: 4986373 Author: Sachin Pasalkar <[email protected]> Authored: Tue Feb 14 15:54:23 2017 +0530 Committer: Jungtaek Lim <[email protected]> Committed: Wed Feb 22 16:36:10 2017 +0900 ---------------------------------------------------------------------- .../storm/kafka/trident/TridentKafkaState.java | 50 +++++++++++++------- 1 file changed, 32 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/913d0c15/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java index f24fed5..76e0f02 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -75,20 +75,29 @@ public class TridentKafkaState implements State { public void updateState(List<TridentTuple> tuples, TridentCollector collector) { String topic = null; try { - List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size()); + long startTime = System.currentTimeMillis(); + int numberOfRecords = tuples.size(); + List<Future<RecordMetadata>> futures = new ArrayList<>(numberOfRecords); for (TridentTuple tuple : tuples) { topic = topicSelector.getTopic(tuple); - - if(topic != null) { - Future<RecordMetadata> result = producer.send(new ProducerRecord(topic, - mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); - futures.add(result); + Object messageFromTuple = mapper.getMessageFromTuple(tuple); + Object keyFromTuple = mapper.getKeyFromTuple(tuple); + + if (topic != null) { + if (messageFromTuple != null) { + Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,keyFromTuple, messageFromTuple)); + futures.add(result); + } else { + LOG.warn("skipping Message with Key "+ keyFromTuple +" as message was null"); + } + } else { - LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); + LOG.warn("skipping key = " + keyFromTuple + ", topic selector returned null."); } } - - List<ExecutionException> exceptions = new ArrayList<>(futures.size()); + + int emittedRecords = futures.size(); + List<ExecutionException> exceptions = new ArrayList<>(emittedRecords); for (Future<RecordMetadata> future : futures) { try { future.get(); @@ -97,15 +106,20 @@ public class TridentKafkaState implements State { } } - if(exceptions.size() > 0){ - String errorMsg = "Could not retrieve result for messages " + tuples + " from topic = " + topic - + " because of the following exceptions: \n"; - for (ExecutionException exception : exceptions) { - errorMsg = errorMsg + exception.getMessage() + "\n"; - } - LOG.error(errorMsg); - throw new FailedException(errorMsg); - } + if (exceptions.size() > 0){ + StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages " + tuples + " from topic = " + topic + + " because of the following exceptions:" + System.lineSeparator()); + + for (ExecutionException exception : exceptions) { + errorMsg = errorMsg.append(exception.getMessage()).append(System.lineSeparator()); ; + } + String message = errorMsg.toString(); + LOG.error(message); + throw new FailedException(message); + } + long latestTime = System.currentTimeMillis(); + LOG.info("Emitted record {} sucessfully in {} ms to topic {} ", emittedRecords, latestTime-startTime, topic); + } catch (Exception ex) { String errorMsg = "Could not send messages " + tuples + " to topic = " + topic; LOG.warn(errorMsg, ex);
