[ 
https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113255#comment-16113255
 ] 

Stig Rohde Døssing commented on STORM-2675:
-------------------------------------------

Hi.

Sorry about the following wall of text, I wrote some notes while debugging 
this, and I thought it might be helpful to understand how Trident works.

As I understand it Trident spouts don't save progress to Kafka. The progress 
should be saved to Storm's Zookeeper. The OpaquePartitionedTridentKafkaSpout 
should be getting passed the previous batch's ending offset as part of the 
metadata passed to emitPartitionBatch.

I'm not very experienced with how Trident works internally, but from what I can 
tell, the spout will be asked to emit a batch with a given transaction id via 
emitPartitionBatch, as well as some metadata for the last batch. The return 
value is some metadata that can be used to construct the following batch later. 
In Kafka's case the metadata is the starting and end offsets for the batch. The 
metadata is saved to Zookeeper once state has been committed for the batch. 
When a batch succeeds, the metadata for earlier batches are removed from 
Zookeeper. If the spout worker is restarted, the last committed metadata is 
read back from Zookeeper, which should determine which offset the spout 
restarts at.

https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
 is the code that wraps around the OpaquePartitionedTridentKafkaSpout. The 
writes to Zookeeper should happen via the RotatingTransactionalState object 
here 
https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L116.
 If the spout has just restarted and there is no in-memory metadata for the 
last batch, the last committed metadata is read from Zookeeper here 
https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L140.

Here is the metadata returned by emitPartitionBatch from the kafka spout 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L130
It should contain the first and last offsets for the current batch.

The metadata path in Zookeeper is constructed from these two classes 
https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java#L63
 and 
https://github.com/apache/storm/blob/a4afacd9617d620f50cf026fc599821f7ac25c79/storm-client/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java.
 OpaquePartitionedTridentSpoutExecutor will create instances of each of these, 
based on the partition ids returned by the spout emitter. For example, for me 
the Zookeeper path for metadata for the Trident spout in 
examples/storm-kafka-client-examples TridentKafkaClientWordCountNamedTopics was 
/transactional/spout1/user/test-trident@0 and 
/transactional/spout1/user/test-trident-1@0. The path format for the kafka 
spout is 
/$transactional.zookeeper.root/$spoutComponentName/user/$topicName@partitionNumber.
 

When I ran the TridentKafkaWordCountNamedTopics example on master, shut it down 
and restarted it, I think I got the behavior you described. Here is the 
Zookeeper metadata after I shut down the example topology the first time:
{code}
get /transactional/spout1/user/test-trident@0/6204
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata@507e1faf{topicPartition=test-trident-0,
 firstOffset=1412, lastOffset=1412}
{code}
After restart
{code}
get /transactional/spout1/user/test-trident@0/6271
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata@50982f4e{topicPartition=test-trident-0,
 firstOffset=71, lastOffset=71}
{code}
I tried logging the RotatingTransactionalState on creation.
{code}
2017-08-03 17:44:37.767 o.a.s.t.t.s.RotatingTransactionalState 
Thread-26-spout-spout1-executor[10, 10] [DEBUG] Created 
RotatingTransactionalState{_state=org.apache.storm.trident.topology.state.TransactionalState@29823124,
 _subdir='test-trident@0', _curr={6204=null}} 
org.apache.storm.trident.topology.state.RotatingTransactionalState.<init>(RotatingTransactionalState.java:47)
{code}
The txid is correct, but the associated metadata object is null, which causes 
the spout to restart.

I looked into it a bit more, and it turns out the TransactionalState code 
assumes the metadata object can be round trip serialized/deserialized with the 
json-simple library. When json-simple can't figure out how to json serialize an 
object, it'll instead quietly return the object's toString (IMO it should be 
throwing an exception instead of quietly doing the wrong thing). That's what 
ends up being saved to Zookeeper for the Kafka spout, and our metadata object's 
toString doesn't return json. When TransactionalState tries to load the 
metadata back in from Zookeeper, it (apparently deliberately) uses a method 
from json-simple that quietly returns null if json parsing fails. The result is 
that we end up both quietly failing to write the metadata properly to 
Zookeeper, and quietly fail to read the incorrectly serialized data back. The 
spout ends up restarting every time the executor is rebooted, because it can't 
read the data in Zookeeper. 

> KafkaTridentSpoutOpaque not committing offsets to Kafka
> -------------------------------------------------------
>
>                 Key: STORM-2675
>                 URL: https://issues.apache.org/jira/browse/STORM-2675
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.0
>            Reporter: Preet Puri
>
> Every time I restart the topology the spout was picking the earliest message 
> even though poll strategy is set UNCOMMITTED_EARLIEST.  I looked at Kafka's  
> __consumer_offsets topic to see if spout (consumer) is committing the offsets 
> but did not find any commits. I am not even able to locate the code in the 
> KafkaTridentSpoutEmitter class where we are updating the commits?
>     conf.put(Config.TOPOLOGY_DEBUG, true);
>     conf.put(Config.TOPOLOGY_WORKERS, 1);
>     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new 
> String[]{"localhost"}));
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
>  protected static KafkaSpoutConfig<String, String> 
> getPMStatKafkaSpoutConfig() {
>     ByTopicRecordTranslator<String, String> byTopic =
>         new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), 
> r.value()),
>             new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
>     return new KafkaSpoutConfig.Builder<String, 
> String>(Utils.getBrokerHosts(),
>         StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
>             .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
>             .setRetry(getRetryService())
>             .setOffsetCommitPeriodMs(10_000)
>             
> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>             .setMaxUncommittedOffsets(250)
>             .setProp("value.deserializer", 
> "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>             .setProp("schema.registry.url","http://localhost:8081";)
>             .setProp("specific.avro.reader",true)
>             .setGroupId(AGGREGATION_CONSUMER_GROUP)
>             .setRecordTranslator(byTopic).build();
>   }
> Stream pmStatStream =
>         topology.newStream("statStream", new 
> KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
> storm-version - 1.1.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to