Repository: storm Updated Branches: refs/heads/1.x-branch ee1309d2a -> d5f4c4021
STORM-1363: TridentKafkaState should handle null values from TridentTupleToKafkaMapper.getMessageFromTuple() Incase null value comes from the mapper it will print warning messages also added the time take to emit number od messages in logs Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/187d08bf Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/187d08bf Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/187d08bf Branch: refs/heads/1.x-branch Commit: 187d08bf45bf424f3963a604d72e076b00d594c7 Parents: ee1309d Author: Sachin Pasalkar <[email protected]> Authored: Tue Feb 14 15:54:23 2017 +0530 Committer: Jungtaek Lim <[email protected]> Committed: Wed Feb 22 16:34:02 2017 +0900 ---------------------------------------------------------------------- .../storm/kafka/trident/TridentKafkaState.java | 50 +++++++++++++------- 1 file changed, 32 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/187d08bf/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java index f24fed5..76e0f02 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -75,20 +75,29 @@ public class TridentKafkaState implements State { public void updateState(List<TridentTuple> tuples, TridentCollector collector) { String topic = null; try { - List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size()); + long startTime = System.currentTimeMillis(); + int numberOfRecords = tuples.size(); + List<Future<RecordMetadata>> futures = new ArrayList<>(numberOfRecords); for (TridentTuple tuple : tuples) { topic = topicSelector.getTopic(tuple); - - if(topic != null) { - Future<RecordMetadata> result = producer.send(new ProducerRecord(topic, - mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple))); - futures.add(result); + Object messageFromTuple = mapper.getMessageFromTuple(tuple); + Object keyFromTuple = mapper.getKeyFromTuple(tuple); + + if (topic != null) { + if (messageFromTuple != null) { + Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,keyFromTuple, messageFromTuple)); + futures.add(result); + } else { + LOG.warn("skipping Message with Key "+ keyFromTuple +" as message was null"); + } + } else { - LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null."); + LOG.warn("skipping key = " + keyFromTuple + ", topic selector returned null."); } } - - List<ExecutionException> exceptions = new ArrayList<>(futures.size()); + + int emittedRecords = futures.size(); + List<ExecutionException> exceptions = new ArrayList<>(emittedRecords); for (Future<RecordMetadata> future : futures) { try { future.get(); @@ -97,15 +106,20 @@ public class TridentKafkaState implements State { } } - if(exceptions.size() > 0){ - String errorMsg = "Could not retrieve result for messages " + tuples + " from topic = " + topic - + " because of the following exceptions: \n"; - for (ExecutionException exception : exceptions) { - errorMsg = errorMsg + exception.getMessage() + "\n"; - } - LOG.error(errorMsg); - throw new FailedException(errorMsg); - } + if (exceptions.size() > 0){ + StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages " + tuples + " from topic = " + topic + + " because of the following exceptions:" + System.lineSeparator()); + + for (ExecutionException exception : exceptions) { + errorMsg = errorMsg.append(exception.getMessage()).append(System.lineSeparator()); ; + } + String message = errorMsg.toString(); + LOG.error(message); + throw new FailedException(message); + } + long latestTime = System.currentTimeMillis(); + LOG.info("Emitted record {} sucessfully in {} ms to topic {} ", emittedRecords, latestTime-startTime, topic); + } catch (Exception ex) { String errorMsg = "Could not send messages " + tuples + " to topic = " + topic; LOG.warn(errorMsg, ex);
