Hey, Getting Hadoop to play nice with Cassandra has been a desire for many folks on this list and probably more on the other one. For the purposes of this email, I'm going to restrict this goal of getting Hadoop to read from Cassandra in a not-stupid way.
"Not-stupid" has a very specific meaning. "Not-stupid" means that: 1) Each Hadoop Mapper sees only a small subset of the entire desired dataset from Cassandra and that the entire desired dataset will never be seen by any one phase of the Hadoop process. 2) Every portion of the desired dataset will be unique to the Mapper it is delivered to. No two Mappers will ever overlap. 3) There will be no portion of the desired dataset that is not seen by a Mapper. 4) Partitioning of the dataset to the Mappers should try to efficiently use the Cassandra nodes. This means attempting to keep partitions on to one node solely. Conspicuously not on this list is data locality. That is, keeping the data passed from a node to a given Mapper at or near the same machine. This requires further investigation outside the scope of this initial project. Also, please remember that "not-stupid" is not the same as "smart". = How Hadoop Wants It And How It's Been Done By HBase I've dug around the Hadoop and HBase codebases and while my understanding is not yet perfect, this seems to be the general layout of the problem. First, a subclass of InputFormat needs to be written. This class's job is to split up the dataset for a Hadoop job into InputSplits, or more accurately, subclasses of InputSplits. These InputSplits are serialized to disk in HDFS in files named so that each is picked up by just one Mapper. Okay, actually the Mapper has no idea about InputSplits. A InputSplit is loaded up by on a machine running a Mapper, and then getRecordReader() is called on the subclass of InputFormat, and the Mapper's InputSplit is passed in as well as various hadoop job information. getRecordReader() returns a subclass of RecordReader that allows the Mapper to call next() on it over and over again to run through the portion of the dataset represented by InputSplit. In the 0.19.3 version of the HBase codebase, InputSplits are created by gathering all the start keys for each "region" (which conceptually maps, roughly, to a Cassandra node) in the database and divvying up the keys approximately evenly across the number of Mappers desired. Each TableSplits (the HBase subclass of InputSplit) created has information about the start key of the region, the end key of the region and the region location (the "name" of the node the dataset is on). This splitting on keys works because HBase keys are always stored in an ordered fashion. Basically, HBase always partitions using something akin to Cassandra's OrderPreservingPartitioner. I've posted a gist with just the method that does this divvying up, by the by[1]. (Interestingly, the region location seems to only be encoded to allow for a nice toString() method on InputSplit to be used during logging.) HBase's subclass of RecordReader uses an instance of the ClientScanner class to keep state about where they are in the Mapper's portion of the dataset. This ClientScanner queries HBase's META server (which contains information about all regions) with the start key and table name to gather what HBase region to talk to and caches that information. Note, that this is done for each Mapper on each machine. Note, too, that the region information encoded in the TableSplit doesn't seem to be used. These last few points are where the code gets really hairy. I could be wrong about some of it and corrections would be appreciated. The important parts, anyway, are the subclassing of InputFormat, RecordReader and InputSplit. = How To Do It With Cassandra (Maybe) So, I spoke to Jonathan on #cassandra about this and we tried to see how we could take the HBase method and turn it into something that would work with Cassandra. Our initial assumption is that the Cassandra database to be mapped over has to use the OrderPreservingPartitioner to keep the input splitting consistent. Now, Cassandra nodes don't really have a concept of a "start" and "end" key. We could, however, get a start key for a given node by taking the first key returned from SSTableReader#getIndexedKeys(). We would then gather up the start keys from each of the nodes, and sort them. We could then use each key in this gathered list as a start key for a "region" and, given an addition to the slice API, slice from start key to the next start key (the end key, in HBase terminology). We would need to modify the slice API to provide slices where the end key given is exclusive to the set returned, instead of inclusive. In terms of actual code to be written, our subclass of InputFormat is what would gather this list of start keys, and we would serialize the start/finish key pairs with our own subclass of InputSplit. And, of course, our subclass of RecordReader would make the finish-key-exclusive slice call. This method satisifies property 1 of our not-stupid definition. At no point in our Hadoop job are we accessing the entire dataset or even all of the keys (if I'm remembering how getIndexedKeys works correctly). This satisifies property 2 and 3 because we are clearly slicing everything once and only once unless I'm misremembering how replication works w.r.t. ordered partitioning. If I am misremembering and start keys are duplicated, we can just return a sorted set instead of a sorted array. This satisfies property 4 because we are slicing along the seams given us by the nodes themselves. = Back To The Game Right, so that's the first pass. What sucks about this? What rules about it? Questions? [1] http://gist.github.com/150217 -- Jeff
