[ 
https://issues.apache.org/jira/browse/STORM-1363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Pasalkar updated STORM-1363:
-----------------------------------
    Description: 
If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
When producer is sending data its not handling if the null value is sent by 
mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
"null" string. There might be case in particular kind of exception user do not 
want to replay tuple and just report it and with that he needs to return null.

Also make the members as protected as I need to copy-paste the class to provide 
my implementation.

My updateState API looks like this

{code}
                public void updateState(List<TridentTuple> tuples, 
TridentCollector collector) {
                String topic = null;
                for (TridentTuple tuple : tuples) {
                        if(tuple==null) {
                                continue;
                        }

                        Object keyFromTuple = null;
                        try {
                                keyFromTuple = mapper.getKeyFromTuple(tuple);
                                topic = topicSelector.getTopic(tuple);
                                Object messageFromTuple = 
mapper.getMessageFromTuple(tuple);
                                if (topic != null && messageFromTuple != null) {
                                        producer.send(new KeyedMessage(topic, 
keyFromTuple, messageFromTuple));
                                } else {
                                        LOG.warn("skipping key = " + 
keyFromTuple + ", topic selector returned null.");
                                }
                        } catch (Exception ex) {
                                String errorMsg = "Could not send message with 
key = " + keyFromTuple + " to topic = " + topic;
                                LOG.warn(errorMsg, ex);
                                throw new FailedException(errorMsg, ex);
                        }
                }
        }
{code}

  was:
If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
When producer is sending data its not handling if the null value is sent by 
mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
"null" string. There might be case in particular kind of exception user do not 
want to replay tuple and just report it and with that he needs to return null.

Also make the members as protected as I need to copy-paste the class to provide 
my implementation.

My updateState API looks like this
<code>
                public void updateState(List<TridentTuple> tuples, 
TridentCollector collector) {
                String topic = null;
                for (TridentTuple tuple : tuples) {
                        if(tuple==null) {
                                continue;
                        }

                        Object keyFromTuple = null;
                        try {
                                keyFromTuple = mapper.getKeyFromTuple(tuple);
                                topic = topicSelector.getTopic(tuple);
                                Object messageFromTuple = 
mapper.getMessageFromTuple(tuple);
                                if (topic != null && messageFromTuple != null) {
                                        producer.send(new KeyedMessage(topic, 
keyFromTuple, messageFromTuple));
                                } else {
                                        LOG.warn("skipping key = " + 
keyFromTuple + ", topic selector returned null.");
                                }
                        } catch (Exception ex) {
                                String errorMsg = "Could not send message with 
key = " + keyFromTuple + " to topic = " + topic;
                                LOG.warn(errorMsg, ex);
                                throw new FailedException(errorMsg, ex);
                        }
                }
        }
</code>


> TridentKafkaState should handle null values from 
> TridentTupleToKafkaMapper.getMessageFromTuple
> ----------------------------------------------------------------------------------------------
>
>                 Key: STORM-1363
>                 URL: https://issues.apache.org/jira/browse/STORM-1363
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 0.10.1
>            Reporter: Sachin Pasalkar
>
> If you look at the updateState API of storm.kafka.trident.TridentKafkaState. 
> When producer is sending data its not handling if the null value is sent by 
> mapper.getMessageFromTuple(tuple). Results into Kafka topic gets value as 
> "null" string. There might be case in particular kind of exception user do 
> not want to replay tuple and just report it and with that he needs to return 
> null.
> Also make the members as protected as I need to copy-paste the class to 
> provide my implementation.
> My updateState API looks like this
> {code}
>               public void updateState(List<TridentTuple> tuples, 
> TridentCollector collector) {
>               String topic = null;
>               for (TridentTuple tuple : tuples) {
>                       if(tuple==null) {
>                               continue;
>                       }
>                       Object keyFromTuple = null;
>                       try {
>                               keyFromTuple = mapper.getKeyFromTuple(tuple);
>                               topic = topicSelector.getTopic(tuple);
>                               Object messageFromTuple = 
> mapper.getMessageFromTuple(tuple);
>                               if (topic != null && messageFromTuple != null) {
>                                       producer.send(new KeyedMessage(topic, 
> keyFromTuple, messageFromTuple));
>                               } else {
>                                       LOG.warn("skipping key = " + 
> keyFromTuple + ", topic selector returned null.");
>                               }
>                       } catch (Exception ex) {
>                               String errorMsg = "Could not send message with 
> key = " + keyFromTuple + " to topic = " + topic;
>                               LOG.warn(errorMsg, ex);
>                               throw new FailedException(errorMsg, ex);
>                       }
>               }
>       }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to