GitHub user srdo opened a pull request:

    STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta 
objects to Zookeeper

    This builds on, so please ignore 
the first commit.
    Trident uses json-simple under the hood to persist some objects to 
Zookeeper. This isn't mentioned on the API docs (or I missed it), so the 
current storm-kafka-client implementation returns a bunch of objects 
json-simple can't figure out how to serialize. The result is that json-simple 
writes the toString of the objects to Zookeeper, which can't be read back out. 
This causes the Trident Kafka spout to start over every time it's rebooted.
    TransactionalState, which is used by Trident to read/write to/from 
Zookeeper, uses JSONValue.parse to read. That function fails quietly by 
returning null when there's a parsing error. There's a note in the code that we 
deliberately don't use the version of the parse function that throws exception 
on error, but we should at least log when it happens, since it's likely to be 
due to a bug in the spout or coordinator.
    This PR makes the following changes:
    * Manually serialize meta objects to List or Map so json-simple can handle 
    * Print a warning log when TransactionalState fails to deserialize 
something from Zookeeper.
    * Fix the Trident Kafka spout's logic around first polls. It was referring 
to the committed offset of the KafkaConsumer, which is never updated because we 
don't commit to Kafka. The offsets are instead stored in Zookeeper via the meta 
object returned by Emitter.emitPartitionBatch.
    * Fix the generic type of Coordinator on OpaquePartitionedTridentSpout. It 
should use the same type as the first parameter for Emitter, since 
OpaquePartitionedSpoutExecutor will call getOrderedPartitions and 
getPartitionsForTask on the emitter with the object returned by 
Coordinator.getPartitionsForBatch. I don't think this is a breaking change, 
since this was a de facto constraint anyway.
    * Clarify in a comment on Subscription that Trident expects partitions to 
remain assigned to specific tasks. As far as I can tell there's a potential 
issue with the batch metadata cache kept by OpaqueTridentSpoutExecutor getting 
outdated if partitions are shuffled without workers dying.
    If anyone has suggestions for tests of this, I'm happy to add some. I'm 
wondering why we don't use Kryo for serialization to Zookeeper, since the 
json-simple library is so inflexible (it can only handle some collections and 
primitive wrappers).

You can merge this pull request into a Git repository by running:

    $ git pull STORM-2675

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2271
commit 4db257ff00ce4bb54bbdd7850fa9e4787752c7ad
Author: Stig Rohde Døssing <>
Date:   2017-08-08T11:04:04Z

    STORM-2689: Simplify dependency configuration for storm-kafka-examples and 

commit 9288b88ffa088878dafc84bd5780754f1212988c
Author: Stig Rohde Døssing <>
Date:   2017-08-04T00:53:42Z

    STORM-2675: Fix storm-kafka-client Trident spout failing to serialize meta 
objects to Zookeeper


If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

Reply via email to