Hi David, One way to have more control over number of partitions is to partition based on combination of shard identifier + join attribute. This will give you more control over number of partitions and you can change the number of partition based on availability of resources and amount of data process at a single node. Also this won't effect the ordering.
Thanks Milinda On Wed, Nov 19, 2014 at 10:50 AM, David Pick <[email protected]> wrote: > Hey Martin and Milinda, > > Thanks for the quick replies! > > Our primary database is Postgres which we've sharded. To get data to Kafka > we use a tool called PGQ (https://wiki.postgresql.org/wiki/PGQ_Tutorial) > which is just a simple queueing system built inside of Postgres. So we have > a database trigger that pushes any state change into PGQ. Then a small > Clojure script picks up all the changes from the database and pushes them > into Kafka where there is a single topic called the datastream. The > datastream topic has a partition for each shard of our database so that we > can ensure messages from each individual Postgres instance come out of > Kafka in the order we're expecting. > > If my understanding is correct it seems that with our partitioning scheme > we would have a Samza task for each partition of our datastream task for a > job that was generating data for something like our search platform. But > given the amount of data we have, our LevelDB instance would get > significantly larger than a few gig. Is there a better way to partition the > data that would keep those ordering guarantees? > > For my example we'd like to publish a feed of all posts with their > associated user information merged onto the document. > > As for why we're storing the merged document, we certainly don't have to. I > was just going based on the example on the state management page ( > > http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html > ) > which says: > > *Example: Join a table of user profiles to a table of user settings by > user_id and emit the joined stream* > > Implementation: The job subscribes to the change streams for the user > profiles database and the user settings database, both partitioned by > user_id. The job keeps a key-value store keyed by user_id, which contains > the latest profile record and the latest settings record for each user_id. > When a new event comes in from either stream, the job looks up the current > value in its store, updates the appropriate fields (depending on whether it > was a profile update or a settings update), and writes back the new joined > record to the store. The changelog of the store doubles as the output > stream of the task. > > > Thanks, > > David > > On Wed, Nov 19, 2014 at 9:10 AM, Milinda Pathirage <[email protected]> > wrote: > > > Hi David, > > > > I am also wondering why you are storing merged document in local LevelDB. > > If you need to check for duplicates, how about using a bloom filter to > > handle duplicates. > > > > Thanks > > Milinda > > > > On Wed, Nov 19, 2014 at 8:50 AM, Martin Kleppmann <[email protected]> > > wrote: > > > > > Hi David, > > > > > > On 19 Nov 2014, at 04:52, David Pick <[email protected]> wrote: > > > > First off, if this is the wrong place to ask these kinds of questions > > > > please let me know. I tried in IRC but didn't get an answer within a > > few > > > > hours so I'm trying here. > > > > > > It's the right place, and they're good questions :) > > > > > > > I had a couple of questions around implementing a table to table join > > > with > > > > data coming from a database changelog through Kafka. > > > > > > Cool! Out of curiosity, can I ask what database you're using and how > > > you're getting the changelog? > > > > > > > Let's say I've got two tables users and posts in my primary db where > > the > > > > posts table has a user_id column. I've written a Samza job that joins > > > those > > > > two tables together by storing every user record and the merged > > document > > > in > > > > Leveldb and then outputing the resulting document to the changelog > > Kafka > > > > topic. > > > > > > Not sure exactly what you mean with a merged document here. Is it a > list > > > of all the posts by a particular user? Or post records augmented with > > user > > > information? Or something else? > > > > > > > Is this the right way to implement that kind of job? > > > > > > On a high level, this sounds reasonable. One detail question is whether > > > you're using the changelog feature for the LevelDB store. If the > contents > > > of the store can be rebuilt by reprocessing the input stream, you could > > get > > > away with turning off the changelog on the store, and making the input > > > stream a bootstrap stream instead. That will save some overhead on > > writes, > > > but still give you fault tolerance. > > > > > > Another question is whether the values you're storing in LevelDB are > > > several merged records together, or whether you store each record > > > individually and use a range query to retrieve them if necessary. You > > could > > > benchmark which works best for you. > > > > > > > It seems that even > > > > with a decent partitioning scheme the leveldb instance in each task > > will > > > > get quite large, especially if we're joining several tables together > > that > > > > have millions of rows (our real world use case would be 7 or 8 tables > > > each > > > > with many millions of records). > > > > > > The goal of Samza's local key-value stores is to support a few > gigabytes > > > per partition. So millions of rows distributed across (say) tens of > > > partitions should be fine, assuming the rows aren't huge. You might > want > > > to try the new RocksDB support, which may perform better than LevelDB. > > > > > > > Also, given a task that's processing that much data where do you > > > > recommended running Samza? Should we spin up another set of boxes or > is > > > it > > > > ok to run it on the Kafka brokers (I heard it mentioned that this is > > how > > > > LinkedIn is running Samza). > > > > > > IIRC LinkedIn saw some issues with the page cache when Kafka and > LevelDB > > > were on the same machine, which was resolved by putting them on > different > > > machines. But I'll let one of the LinkedIn folks comment on that. > > > > > > Best, > > > Martin > > > > > > > > > > > > -- > > Milinda Pathirage > > > > PhD Student | Research Assistant > > School of Informatics and Computing | Data to Insight Center > > Indiana University > > > > twitter: milindalakmal > > skype: milinda.pathirage > > blog: http://milinda.pathirage.org > > > -- Milinda Pathirage PhD Student | Research Assistant School of Informatics and Computing | Data to Insight Center Indiana University twitter: milindalakmal skype: milinda.pathirage blog: http://milinda.pathirage.org
