Github user msaunier-poctu commented on the pull request:

    https://github.com/apache/storm/pull/454#issuecomment-90546074
  
    :+1:
    
    TridentKafkaEmitter should be updated to support emitting offset in Trident 
:
    
    
    ```diff
    diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java 
b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
    index 94bf134..dc9bb6d 100644
    --- 
a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
    +++ 
b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
    @@ -113,7 +113,7 @@ public class TridentKafkaEmitter {
             ByteBufferMessageSet msgs = fetchMessages(consumer, partition, 
offset);
             long endoffset = offset;
             for (MessageAndOffset msg : msgs) {
    -            emit(collector, msg.message());
    +            emit(collector, msg.message(),partition,msg.offset());
                 endoffset = msg.nextOffset();
             }
             Map newMeta = new HashMap();
    @@ -160,14 +160,19 @@ public class TridentKafkaEmitter {
                     if (offset > nextOffset) {
                         throw new RuntimeException("Error when re-emitting 
batch. overshot the end offset");
                     }
    -                emit(collector, msg.message());
    +                emit(collector, msg.message(),partition,msg.offset());
                     offset = msg.nextOffset();
                 }
             }
         }
     
    -    private void emit(TridentCollector collector, Message msg) {
    -        Iterable<List<Object>> values = KafkaUtils.generateTuples(_config, 
msg);
    +    private void emit(TridentCollector collector, Message msg, Partition 
partition, long offset) {
    +        Iterable<List<Object>> values;
    +        if(_config.tupleMetaData) {
    +            values = KafkaUtils.generateTuples(_config, msg, partition, 
offset);
    +        }else{
    +            values = KafkaUtils.generateTuples(_config, msg);
    +        }
             if (values != null) {
                 for (List<Object> value : values) {
                     collector.emit(value);
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to