Hi Samza devs, I think this project has the best documentation I've even seen! Amazing job. It's extremely well written and Hello Samza is a really great example that I was able to run + modify without issue. It was a joy reading the docs and playing around with example. Kudos!
After thoroughly reading all the docs, I still have a few questions and would appreciate any feedback. I was thinking about how to support deterministic behavior on recovery or rewind. Maybe it can't always be 100% deterministic but I think we can get close. Have other people thought about this? Is it desirable? For example, let's say we're joining two streams: orders and user_info. As orders come in, we use the user_id field of the order to lookup additional information about the user and enrich the stream. Say we're keeping all the user_info state in the local KV store. t1: User updates her email to "[email protected]" t2: User buys a pair of jeans (product_id == 99) t3: User updates her email to "[email protected]" In the case of normal operation (no delays in the user_info stream), the enriched record will be: {product_id: 99, email: "[email protected]", ...} But say that our job fails before it can checkpoint and is configured to bootstrap from user_info. When it gets restarted and bootstraps from the user_info stream, it will end up with the email set to "[email protected]" in the local KV store. Then it will reprocess the order event and produce the "wrong" output: {product_id: 99, email: "[email protected]", ...} I haven't verified that but the documentation says "a bootstrap stream waits for the consumer to explicitly confirm that the stream has been fully consumed." Shouldn't it wait until it's consumed up the the checkpoint offset for the bootsrap stream instead (when there is saved checkpoint offset)? Likewise, for local state replicated in the change log. During the checkpoint process, Samza could include it's producer offset in the checkpoint data so that during recovery, the local state will be restored to a state that corresponds with it's offsets for the input streams. Everything would be coherent rather than having the input streams restored to checkpoint and local state restored to most recent value. I'm assuming that change log commits for local state and checkpoint are done together in an atomic transaction so that they may not always match. The other missing piece is a nearly deterministic MessageChooser. During recovery + rewind, all the messages in both streams are already present in Kafka and we want a way to replay them in the same order as if they were played in real-time. The only way to approximate this behavior that I can see is to use Kafka broker timestamps for each message. Is it possible to write an "EarliestFirstChooser" that always chooses the oldest message available according to the timestamp it was received by the Kafka broker? I don't know if Kafka stores a timestamp with each message but I'm assuming it does because it supports an API on the simple consumer called getOffsetsBefore() that would seem to map from timestamps to offsets. Finally, a nit pick. I'm using Samza 0.7.0 but the metrics data has the version as {"samza-version": "0.0.1"}. Is this intentional? If it makes sense, I can put in some JIRA tickets for this stuff... Cheers, Roger
