[
https://issues.apache.org/jira/browse/STORM-1363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jungtaek Lim resolved STORM-1363.
---------------------------------
Resolution: Fixed
Assignee: Sachin Pasalkar
Fix Version/s: 2.0.0
1.1.1
Thanks [~Sachin], merged into master and 1.x branch.
> TridentKafkaState should handle null values from
> TridentTupleToKafkaMapper.getMessageFromTuple()
> ------------------------------------------------------------------------------------------------
>
> Key: STORM-1363
> URL: https://issues.apache.org/jira/browse/STORM-1363
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka, storm-kafka-client
> Affects Versions: 0.10.1, 1.x
> Reporter: Sachin Pasalkar
> Assignee: Sachin Pasalkar
> Fix For: 1.1.1, 2.0.0
>
> Time Spent: 3.5h
> Remaining Estimate: 0h
>
> If you look at the updateState API of storm.kafka.trident.TridentKafkaState.
> When producer is sending data its not handling if the null value is sent by
> mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as
> "null" string. There might be case in particular kind of exception user do
> not want to replay tuple and just report it and with that he needs to return
> null.
> Also make the members as protected as I need to copy-paste the class to
> provide my implementation.
> My updateState API looks like this
> {code}
> public void updateState(List<TridentTuple> tuples, TridentCollector
> collector) {
> String topic = null;
> for (TridentTuple tuple : tuples) {
> if(tuple==null) {
> continue;
> }
> Object keyFromTuple = null;
> try {
> keyFromTuple = mapper.getKeyFromTuple(tuple);
> topic = topicSelector.getTopic(tuple);
> Object messageFromTuple =
> mapper.getMessageFromTuple(tuple);
> if (topic != null && messageFromTuple != null) {
> producer.send(new KeyedMessage(topic,
> keyFromTuple, messageFromTuple));
> } else {
> LOG.warn("skipping key = " +
> keyFromTuple + ", topic selector returned null.");
> }
> } catch (Exception ex) {
> String errorMsg = "Could not send message with
> key = " + keyFromTuple + " to topic = " + topic;
> LOG.warn(errorMsg, ex);
> throw new FailedException(errorMsg, ex);
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)