Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2465#discussion_r157346574
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
 ---
    @@ -18,36 +18,88 @@
     
     package org.apache.storm.kafka.spout;
     
    +import java.io.IOException;
     import java.io.Serializable;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.core.JsonGenerator;
    +import com.fasterxml.jackson.core.JsonParser;
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.DeserializationContext;
    +import com.fasterxml.jackson.databind.JsonNode;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.SerializerProvider;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
    +import com.fasterxml.jackson.databind.module.SimpleModule;
    +import com.fasterxml.jackson.databind.ser.std.StdSerializer;
     import org.apache.kafka.clients.consumer.ConsumerRecord;
     import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.task.TopologyContext;
     
     public class KafkaSpoutMessageId implements Serializable {
    +    public static final ObjectMapper JSON_MAPPER;
    +
    +    @JsonDeserialize(using = TopicPartitionJsonDeserializer.class)
         private final TopicPartition topicPart;
         private final long offset;
    +    private final String thread;
    +    private final String topologyId;    //TODO: rename
    +
         private int numFails = 0;
         /**
          * true if the record was emitted using a form of collector.emit(...). 
false
          * when skipping null tuples as configured by the user in 
KafkaSpoutConfig
          */
         private boolean emitted;
     
    -    public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
    -        this(consumerRecord, true);
    +    static {
    +        JSON_MAPPER = new ObjectMapper();
    +
    +        SimpleModule module = new SimpleModule();
    +        module.addSerializer(TopicPartition.class, new 
TopicPartitionJsonSerializer(TopicPartition.class));
    +        module.addDeserializer(TopicPartition.class, new 
TopicPartitionJsonDeserializer(TopicPartition.class));
    +
    +        JSON_MAPPER.registerModule(module);
         }
     
    -    public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, 
boolean emitted) {
    -        this(new TopicPartition(consumerRecord.topic(), 
consumerRecord.partition()), consumerRecord.offset(), emitted);
    +    //TODO Revisit constructors
    +    public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, 
TopologyContext context) {
    +        this(consumerRecord, true, context);
         }
     
    -    public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
    -        this(topicPart, offset, true);
    +    public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, 
boolean emitted, TopologyContext context) {
    +        this(new TopicPartition(consumerRecord.topic(), 
consumerRecord.partition()), consumerRecord.offset(), context);
         }
     
    -    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, 
boolean emitted) {
    +    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, 
TopologyContext context) {
    +        this(topicPart, offset, true, context);
    +    }
    +
    +    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, 
boolean emitted, TopologyContext context) {
    +        this(topicPart, offset, emitted, Thread.currentThread().getName(), 
context.getStormId());
    +    }
    +
    +    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, 
boolean emitted, String thread, String topologyId) {
    +        this(topicPart, offset, emitted, thread, topologyId, 0);
    +    }
    +
    +    // Used for JSON Deserialization
    +    @JsonCreator
    +    private KafkaSpoutMessageId(@JsonProperty("topicPartition") 
TopicPartition topicPart,
    +                                @JsonProperty("offset")long offset,
    +                                @JsonProperty("emitted") boolean emitted,
    +                                @JsonProperty("thread") String thread,
    +                                @JsonProperty("topologyId") String 
topologyId,
    +                                @JsonProperty("numFails") int numFails) {
    +
             this.topicPart = topicPart;
             this.offset = offset;
             this.emitted = emitted;
    +        this.thread = thread;
    +        this.topologyId = topologyId;
    +        this.numFails = numFails;
    --- End diff --
    
    We don't need to store/restore numFails and emitted in metadata.


---

Reply via email to