Thanks, Jakob. On Fri, Sep 26, 2014 at 2:21 PM, Jakob Homan <[email protected]> wrote:
> This is pretty close to how things are laid out. The data from the 'public > facing' kafka clusters are Mirror-made into Samza-specific Kafka clusters, > which are colocated (though not necessarily on the same box) as the YARN > resources. Data produced through the Samza jobs is written to the Samza > cluster and then mirror-made to other clusters for consumption. This > approach has the advantage of keeping the Samza processes separate, > controlled and out of the production path. The disadvantage is more > complexity, machines and a tiny bit of latency via the mirror making, but > overall this approach is pretty rock solid. > > -Jakob > > > On Fri, Sep 26, 2014 at 2:14 PM, Roger Hoover <[email protected]> > wrote: > > > Chris, > > > > Would mind giving some advice on my deployment question below? > > > > "Do you recommend having two separate Kafka clusters? In the "public" > > cluster, brokers would be deployed on machines by themselves. Then you > > have another Kafka cluster for Samza in which the brokers are co-located > > with YARN NodeManagers on each machine. With this approach, Samza > > topologies would consume from and ultimately publish to topics on the > > "public" cluster. All of the internal topics like repartitioning, > > changelog, etc. would be hidden away in the Kafka cluster dedicated to > > Samza." > > > > Thanks, > > > > Roger > > > > On Fri, Sep 26, 2014 at 11:20 AM, Roger Hoover <[email protected]> > > wrote: > > > > > Chris, > > > > > > Thanks for the great answers. It's helping me clear up my thinking... > > > > > > On Fri, Sep 26, 2014 at 9:10 AM, Chris Riccomini < > > > [email protected]> wrote: > > > > > >> Hey Roger, > > >> > > >> > If the job's input topics are partitioned by key, then you cannot > add > > >> >more partitions without corrupting existing state. > > >> > > >> This is correct. > > >> > > >> > Does this come up for people in practice? > > >> > > >> It does come up occasionally for us. Thus far, we usually just run a > > Kafka > > >> topic-partition expansion (thereby trashing the semantics of the > > >> partitioning) and restart the job. Inconsistent output is then emitted > > for > > >> a while. We do this only when we agree that inconsistent output is > > >> tolerable. > > >> > > > > > > Thanks. This might be a reasonable in many cases (not sure yet). > > > > > > > > >> > > >> Another thing we do for this is over-partition our Kafka topics when > > we're > > >> concerned about growth. > > >> > > >> Both of these solutions are admittedly hacky. As you said, the ideal > > >> solution would be some kind of automatic migration. It seems possible > > that > > >> the AM (job coordinator) might be able to manage this, especially of > we > > >> had a pre-packaged "repartition job" that it could trigger. I haven't > > >> thought about this in detail, though. > > >> > > >> > Deploy jobs to repartition inputs and changelog topics into the new > > >> >topics > > >> > > >> The changelog topic seems problematic to me. It seems that they key > used > > >> in the changelog might not always be directly related to the > > partitioning > > >> of the input topic. For example, if you have a StreamTask that is > > >> consuming a single input partition, and keeping a count in the state > > store > > >> of all messages that it sees, how do you repartition this changelog? > In > > >> the new world, the keys for the single partition that it's consuming > > could > > >> be spread across many different partitions, and the count is pretty > much > > >> meaningless, since it can't be split up by key. > > >> > > >> It almost feels like state has to be totally reset to safely do an > input > > >> partition expansion under all cases. In a sense, you have to treat the > > new > > >> job as a job that's completely new, and start it from scratch. > > >> > > > > > > Ah, you're right. I think there's no way to migrate state in general. > > If > > > a job is saving any kind of aggregate state then that's an irreversible > > > operation that was done on the old partition. There's not enough > > > information to "repartition" the results. > > > > > > Just to be more explicit about "starting it from scratch". The only > way > > > to do this theoretically correctly, I think, would be to have the newly > > > partitioned job start with no state and playback it's input topics from > > the > > > beginning of time. > > > > > > > > > > > >> > > >> > Change job config to point to new topics and restart the job > > >> > > >> One problem with this is going to be the case where you don't control > > the > > >> producers for the old input topic. They'd either have to be migrated > to > > >> produce to the new input topic for your job, or you'd have to > > permanently > > >> run the repartition job to move data from the original topic to the > > >> currently expanded topic. Keeping the repartition job is not all that > > wild > > >> of an idea. Most Samza topologies we run have some form of a > repartition > > >> job that runs permanently at the beginning of their flow. > > >> > > > > > > I was thinking about repartitioning as a good design pattern as well. > > > Having your job always repartition the input decouples it from the it's > > > upstream topic dependencies. This brings me to another question about > > > deployment. Do you recommend having two separate Kafka clusters? In > the > > > "public" cluster, brokers would be deployed on machines by themselves. > > > Then you have another Kafka cluster for Samza in which the brokers are > > > co-located with YARN NodeManagers on each machine. With this approach, > > > Samza topologies would consume from and ultimately publish to topics on > > the > > > "public" cluster. All of the internal topics like repartitioning, > > > changelog, etc. would be hidden away in the Kafka cluster dedicated to > > > Samza. > > > > > > > > >> > > >> > All meaningfully-partitioned topics would need to include their keys > > in > > >> >the stream > > >> > > >> True. Somewhat tangential to this is the case where the key that's > been > > >> used is not the one your job wishes to partition by. In this case, a > > >> repartition job would be required as well. > > >> > > >> > This would be problematic as the order of the dictionary keys can > > change > > >> >but would still mean the same thing. In order to use JSON as a serde > > for > > >> >keys, you'd need to enforce a sort order on dictionaries. > > >> > > >> I struggled with this as well. We basically need a forced ordering for > > the > > >> JSON keys in SAMZA-348. Originally, I was thinking of making the > > key/value > > >> messages just a simple string with a delimiter. Something like > > >> <type>:<key> for the key and <host>:<source>:<blah> for the value. > This > > >> approach is also much more compact than JSON. The problem with the > > latter > > >> approach is that it doesn't easily allow for hierarchical key/value > > pairs. > > >> > > > > > > I've been constructing string keys in my jobs so far as you mentioned > but > > > it adds extra boilerplate to the code. It would be nice if there were > an > > > automatic way to do it. > > > > > > > > >> > > >> Cheers, > > >> Chris > > >> > > >> On 9/24/14 4:55 PM, "Roger Hoover" <[email protected]> wrote: > > >> > > >> >Hi all, > > >> > > > >> >So it seems like one of the first decisions that you have to make > when > > >> >creating a Samza job is how many partitions to have in your input > > topics. > > >> >This will dictate how many tasks are created and how many changelog > > >> >partitions get created. It's great that you can independently change > > the > > >> >number of Samza containers that get deployed but what do you do once > > you > > >> >reach the max (# containers == # tasks)? > > >> > > > >> >If the job's input topics are partitioned by key, then you cannot add > > >> more > > >> >partitions without corrupting existing state. Does this come up for > > >> >people > > >> >in practice? How do you handle it? > > >> > > > >> >Just trying to think it through, it seems like you need a procedure > > >> >something like this: > > >> > > > >> >1) Create new topics to hold the same data but with more partitions > > >> >(inputs, outputs, and changelog topics) > > >> >2) Deploy jobs to repartition inputs and changelog topics into the > new > > >> >topics > > >> >3) When caught up, stop the running job > > >> >4) Change job config to point to new topics and restart the job (if > all > > >> >topics are new, this can be done while previous job run is still > active > > >> >using new job.id) > > >> >5) Change downstream jobs to use new output topic if necessary. > Doing > > >> >this > > >> >in a safe way might be hard. > > >> > > > >> >Ideally at some point, this process could be automated. I was > > wondering > > >> >whether a generic task could be written for step #2 but I think it > > would > > >> >require a couple of constraints: > > >> > > > >> >1) All meaningfully-partitioned topics would need to include their > keys > > >> in > > >> >the stream. In Kafka, this is optional unless you enable compaction > > but > > >> >for this to work generically, it would have to be mandatory in Samza > > for > > >> >any stream for which partitions have meaning (not using random or > > >> >round-robin partitioning). > > >> >2) The partition keys should be re-hashable based on their raw byte > > >> >representation so that the repartition task would not have to know > how > > to > > >> >deserialize the keys in order to compute their new partition. At > first > > >> >glance, this doesn't seem too onerous but I saw in the Config Stream > > >> >proposal (SAMZA-348) that keys might be JSON: > > >> > > > >> > > >> > > > >{"type":"offset","key","my-long-system-name.my-even-longer-stream-name-tha > > >> >t-is-really-long.1000" > > >> >} > > >> > > > >> >This would be problematic as the order of the dictionary keys can > > change > > >> >but would still mean the same thing. In order to use JSON as a serde > > for > > >> >keys, you'd need to enforce a sort order on dictionaries. > > >> > > > >> >I'm curious what others do about this or what your thoughts are. > > Thanks, > > >> > > > >> >Roger > > >> > > >> > > > > > >
