[ 
https://issues.apache.org/jira/browse/SAMZA-489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14245504#comment-14245504
 ] 

Martin Kleppmann commented on SAMZA-489:
----------------------------------------

"Quite interesting and unique" is a polite way of putting it. I'd have said 
"made the same mistake as [Cassandra before version 
1.2|http://www.datastax.com/dev/blog/virtual-nodes-in-cassandra-1-2]"; — not 
separating logical partitioning from resource allocation ;-)

bq. If a shard's range is 0-100, and we have two StreamTasks responsible for 
ranges 0-75 and 76-150, respectively, who processes the shard?

If the two StreamTasks are in the same container, that container can consume 
the shard, and demultiplex based on each message's partition key. If the two 
StreamTasks are in different containers, both containers would need to consume 
the shard, and discard the messages that are addressed to the other one. This 
is a bit wasteful, but would be logically correct.

bq. what happens if 0-100 splits into 0-20,21-40,41-60,61-80, and 81-100 (5x 
split)?

I believe splits and merges are [always 
two-way|http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/SplitShardRequest.html].
 If we make the number of StreamTasks a power of 2, then it seems quite likely 
that the StreamTasks' hash range boundaries will be aligned with the Kinesis 
shard boundaries. (Since the keys are hashed, there should be no reason to 
split a shard any other way than exactly in half.) If people do stupid things 
with their shard splitting, we can still fall back to double-consuming a shard.

bq. We could also force the StreamTask count to == the shard count at the time 
the job first starts.

This would not be ideal, because billing is by number of shards. People might 
well start off with only one shard, to keep it cheap. This would limit them to 
one Samza container, even as throughput grows.

bq. Also, what happens to a Kinesis consumer in a case where it's consuming a 
shard and the shard suddenly splits? In Sama's model, we'd want the 
JobCoordinator to shut down all containers, re-assign partitions (shards), then 
start the containers back up again.

I was told that AWS is introducing auto-scaling for Kinesis, which means that 
the number of shards may well change quite often (even depending on time of 
day). Restarting the job every time would not be ideal.

I'd be inclined to leave the Samza containers unchanged, and have the consumer 
of the parent shard (the one that was split) continue by consuming the two 
child shards (the shards that were created in the split). If the job can't keep 
up, the user can choose to increase the container count, but that wouldn't 
happen automatically.

For merging, if the two merged shards are consumed by the same container, no 
restart is necessary, as described above. If we fall back to double-consuming a 
shard (because the merged shards were in different containers), a job restart 
would be necessary to rebalance the tasks. We can probably minimize the chances 
of double-consuming by assigning StreamTasks for adjacent ranges of hashes to 
the same container.

If the number of containers is a power of 2, this ought to work nicely. Say you 
have 256 StreamTasks, 8 containers, 8 Kinesis shards. Each message goes to the 
StreamTask according to the first byte of its hash. If StreamTasks 0 to 31 are 
in container 0, StreamTasks 32 to 63 in container 1, etc, and if shards always 
split their range in half, then you'll end up with each container consuming 
exactly one Kinesis shard.

> Support Amazon Kinesis
> ----------------------
>
>                 Key: SAMZA-489
>                 URL: https://issues.apache.org/jira/browse/SAMZA-489
>             Project: Samza
>          Issue Type: New Feature
>            Reporter: Martin Kleppmann
>              Labels: project
>
> [AWS Kinesis|http://aws.amazon.com/kinesis/] is a publish-subscribe message 
> broker service quite similar to Kafka, provided as a hosted service by 
> Amazon. I have spoken to a few people who are interested in using Kinesis 
> with Samza, since the options for stateful stream processing with Kinesis are 
> currently quite limited. Samza's local state support would be great for 
> Kinesis users.
> I've looked a little into what it would take to support Kinesis in Samza. 
> Useful resources:
> * [Kinesis Client Library for 
> Java|https://github.com/awslabs/amazon-kinesis-client]
> * [Kinesis developer 
> guide|http://docs.aws.amazon.com/kinesis/latest/dev/introduction.html]
> * [Description of 
> resharding|http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-api-java.html#kinesis-using-api-java-resharding]
> Kinesis is similar to Kafka in that it has total ordering of messages within 
> a partition (which Kinesis calls a "shard"). The most notable differences I 
> noticed are:
> * Kinesis does not support compaction by key, and only keeps messages for 24 
> hours (the "trim horizon"). Thus it cannot be used for checkpointing and 
> state store changelogging. Another service must be used for durable storage 
> (Amazon recommends DynamoDB).
> * It is common for the number of shards in a stream to change ("resharding"), 
> because a Kinesis shard is a unit of resourcing, not a logical grouping. A 
> Kinesis shard is more like a Kafka broker node, not like a Kafka partition.
> The second point suggests that Kinesis shards should not be mapped 1:1 to 
> Samza StreamTasks like we do for Kafka, because whenever the number of shards 
> changes, any state associated with a StreamTask would no longer be in the 
> right place.
> Kinesis assigns a message to a shard based on the MD5 hash of the message's 
> partition key (so all messages with the same partition key are guaranteed to 
> be in the same shard). Each shard owns a continuous range of the MD5 hash 
> space. When the number of shards is increased by one, a shard's hash range is 
> subdivided into two sub-ranges. When the number of shards is decreased by 
> one, two adjacent shards' hash ranges are merged into a single range.
> I think the nicest way of modelling this in Samza would be to create a fixed 
> number of StreamTasks (e.g. 256, but make it configurable), and to assign 
> each task a fixed slice of this MD5 hash space. Each Kinesis shard then 
> corresponds to a subset of these StreamTasks, and the SystemConsumer 
> implementation routes messages from a shard to the appropriate StreamTask 
> based on the hash of the message's partition key. This implies that all the 
> StreamTasks for a particular Kinesis shard should be processed within the 
> same container. This is not unlike the Kafka consumer in Samza, which fetches 
> messages for all of a container's Kafka partitions in one go.
> This solves removes the semantic problem of resharding: we can ensure that 
> messages with the same partition key are always routed to the same 
> StreamTask, even across shard splits and merges.
> However, there are still some tricky edge cases to handle. For example, if 
> Kinesis decides to merge two shards that are currently processed by two 
> different Samza containers, what should Samza do? A simple (but perhaps a bit 
> wasteful) solution would be for both containers to continue consuming the 
> merged shard. Alternatively, Samza could reassign some StreamTasks from one 
> container to another, but that would require any state to be moved or 
> rebuilt. Probably double-consuming would make most sense for a first 
> implementation.
> In summary, it looks like Kinesis support is feasible, and would be a fun 
> challenge for someone to take on. Contributions welcome :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to