[
https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113255#comment-16113255
]
Stig Rohde Døssing commented on STORM-2675:
-------------------------------------------
Hi.
Sorry about the following wall of text, I wrote some notes while debugging
this, and I thought it might be helpful to understand how Trident works.
As I understand it Trident spouts don't save progress to Kafka. The progress
should be saved to Storm's Zookeeper. The OpaquePartitionedTridentKafkaSpout
should be getting passed the previous batch's ending offset as part of the
metadata passed to emitPartitionBatch.
I'm not very experienced with how Trident works internally, but from what I can
tell, the spout will be asked to emit a batch with a given transaction id via
emitPartitionBatch, as well as some metadata for the last batch. The return
value is some metadata that can be used to construct the following batch later.
In Kafka's case the metadata is the starting and end offsets for the batch. The
metadata is saved to Zookeeper once state has been committed for the batch.
When a batch succeeds, the metadata for earlier batches are removed from
Zookeeper. If the spout worker is restarted, the last committed metadata is
read back from Zookeeper, which should determine which offset the spout
restarts at.
https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
is the code that wraps around the OpaquePartitionedTridentKafkaSpout. The
writes to Zookeeper should happen via the RotatingTransactionalState object
here
https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L116.
If the spout has just restarted and there is no in-memory metadata for the
last batch, the last committed metadata is read from Zookeeper here
https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L140.
Here is the metadata returned by emitPartitionBatch from the kafka spout
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L130
It should contain the first and last offsets for the current batch.
The metadata path in Zookeeper is constructed from these two classes
https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java#L63
and
https://github.com/apache/storm/blob/a4afacd9617d620f50cf026fc599821f7ac25c79/storm-client/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java.
OpaquePartitionedTridentSpoutExecutor will create instances of each of these,
based on the partition ids returned by the spout emitter. For example, for me
the Zookeeper path for metadata for the Trident spout in
examples/storm-kafka-client-examples TridentKafkaClientWordCountNamedTopics was
/transactional/spout1/user/test-trident@0 and
/transactional/spout1/user/test-trident-1@0. The path format for the kafka
spout is
/$transactional.zookeeper.root/$spoutComponentName/user/$topicName@partitionNumber.
When I ran the TridentKafkaWordCountNamedTopics example on master, shut it down
and restarted it, I think I got the behavior you described. Here is the
Zookeeper metadata after I shut down the example topology the first time:
{code}
get /transactional/spout1/user/test-trident@0/6204
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata@507e1faf{topicPartition=test-trident-0,
firstOffset=1412, lastOffset=1412}
{code}
After restart
{code}
get /transactional/spout1/user/test-trident@0/6271
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata@50982f4e{topicPartition=test-trident-0,
firstOffset=71, lastOffset=71}
{code}
I tried logging the RotatingTransactionalState on creation.
{code}
2017-08-03 17:44:37.767 o.a.s.t.t.s.RotatingTransactionalState
Thread-26-spout-spout1-executor[10, 10] [DEBUG] Created
RotatingTransactionalState{_state=org.apache.storm.trident.topology.state.TransactionalState@29823124,
_subdir='test-trident@0', _curr={6204=null}}
org.apache.storm.trident.topology.state.RotatingTransactionalState.<init>(RotatingTransactionalState.java:47)
{code}
The txid is correct, but the associated metadata object is null, which causes
the spout to restart.
I looked into it a bit more, and it turns out the TransactionalState code
assumes the metadata object can be round trip serialized/deserialized with the
json-simple library. When json-simple can't figure out how to json serialize an
object, it'll instead quietly return the object's toString (IMO it should be
throwing an exception instead of quietly doing the wrong thing). That's what
ends up being saved to Zookeeper for the Kafka spout, and our metadata object's
toString doesn't return json. When TransactionalState tries to load the
metadata back in from Zookeeper, it (apparently deliberately) uses a method
from json-simple that quietly returns null if json parsing fails. The result is
that we end up both quietly failing to write the metadata properly to
Zookeeper, and quietly fail to read the incorrectly serialized data back. The
spout ends up restarting every time the executor is rebooted, because it can't
read the data in Zookeeper.
> KafkaTridentSpoutOpaque not committing offsets to Kafka
> -------------------------------------------------------
>
> Key: STORM-2675
> URL: https://issues.apache.org/jira/browse/STORM-2675
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.1.0
> Reporter: Preet Puri
>
> Every time I restart the topology the spout was picking the earliest message
> even though poll strategy is set UNCOMMITTED_EARLIEST. I looked at Kafka's
> __consumer_offsets topic to see if spout (consumer) is committing the offsets
> but did not find any commits. I am not even able to locate the code in the
> KafkaTridentSpoutEmitter class where we are updating the commits?
> conf.put(Config.TOPOLOGY_DEBUG, true);
> conf.put(Config.TOPOLOGY_WORKERS, 1);
> conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
> conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
> conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new
> String[]{"localhost"}));
> conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
> protected static KafkaSpoutConfig<String, String>
> getPMStatKafkaSpoutConfig() {
> ByTopicRecordTranslator<String, String> byTopic =
> new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(),
> r.value()),
> new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
> return new KafkaSpoutConfig.Builder<String,
> String>(Utils.getBrokerHosts(),
> StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
> .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
> .setRetry(getRetryService())
> .setOffsetCommitPeriodMs(10_000)
>
> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
> .setMaxUncommittedOffsets(250)
> .setProp("value.deserializer",
> "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> .setProp("schema.registry.url","http://localhost:8081")
> .setProp("specific.avro.reader",true)
> .setGroupId(AGGREGATION_CONSUMER_GROUP)
> .setRecordTranslator(byTopic).build();
> }
> Stream pmStatStream =
> topology.newStream("statStream", new
> KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
> storm-version - 1.1.0
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)