Thanks for the explanations, Josh. This sounds very doable. Few more comments inline below.
James On Wed, Apr 30, 2014 at 8:37 AM, Josh Elser <[email protected]> wrote: > > > On 4/30/14, 3:33 AM, James Taylor wrote: > >> On Tue, Apr 29, 2014 at 11:57 AM, Josh Elser <[email protected]> >> wrote: >> >> @Josh - it's less baked in than you'd think on the client where the query >>> >>>> parsing, compilation, optimization, and orchestration occurs. The >>>> client/server interaction is hidden behind the ConnectionQueryServices >>>> interface, the scanning behind ResultIterator (in >>>> particular ScanningResultIterator), the DML behind MutationState, and >>>> KeyValue interaction behind KeyValueBuilder. Yes, though, it would >>>> require >>>> some more abstraction, but probably not too bad, though. On the >>>> server-side, the entry points would all be different and that's where >>>> I'd >>>> need your insights for what's possible. >>>> >>>> >>> Definitely. I'm a little concerned about what's expected to be provided >>> by >>> the "database" (HBase, Accumulo) as I believe HBase is a little more >>> flexible in allowing writes internally where Accumulo has thus far said >>> "you're gonna have a bad time". >>> >> >> >> Tell me more about what you mean by "allowing writes internally". >> > > Haha, sorry, that was a sufficiently ominous statement with insufficient > context. > > For discussion sake, let's just say HBase coprocessors and Accumulo > iterators are equivalent, purely in the scope of "running server-side code" > (in the RegionServer/TabletServer). However, there is a notable difference > in the pipeline where each of those are implemented. > > Coprocessors have built-in hooks that let you get updates on > PUT/GET/DELETE/etc as well as pre and post each of those operations. In > other words, they provide hooks at a "high database level". > > Iterators tend to be much closer to the data itself, only dealing with > streams of data (other iterators stacked on one another). Iterators > implement versioning, visibilities, and can even implement complex > searches. The downside of this approach is that iterators lack any means to > safely write data _outside of the sorted Key-Value pairs in the tablet > currently being processed_. It's possible to make in tablet updates, but > sorted order within a large tablet might make this difficult as well. > > This is why I was thinking percolator would be a better solution, as it's > meant for handling updates like this server-side. However, I imagine it > would be possible, in the short-term, to make some separate process between > Phoenix and Accumulo which handles writes. Another fallback might be to do global index maintenance on the client. It'd just be more expensive, especially if you want to handle out-of-order updates (which are particularly tricky, as you have to get multiple versions of the rows to work out all the different scenarios here). A second fallback might be to support only local indexing. Does Accumulo have the concept of a "custom load balancer" that would allow you to co-locate two regions from different tables? The local-index features has kind of driven some feature requests on that front for HBase - mainly callbacks when a region is split or re-located. The rows of the local index are prefixed with the region start key to keep them together and identify them. > > > >> >>> >>> @Eric - I agree about having txn support (probably through snapshot >>> >>>> isolation) by controlling the timestamp, and then layering indexing on >>>> top >>>> of that. That's where we're headed. But I wouldn't let that stop the >>>> effort >>>> - it would just be layered on top of what's already there. FWIW, there's >>>> another interesting indexing model that has been termed "local >>>> indexing"( >>>> https://github.com/Huawei-Hadoop/hindex) which is being worked on right >>>> now >>>> (should be available in either our 4.1 or 4.2 release). In this model, >>>> the >>>> table data and index data are co-located on the same region server >>>> through >>>> a kind of "buddy" region mechanism. The advantage is that you take no >>>> hit >>>> at write time, as you're writing both the index and table data together. >>>> Not sure how/if this would transfer over to the Accumulo world. >>>> >>>> >>> Interesting. Given that Accumulo doesn't have a fixed column family >>> schema, this might make index generation even easier (maybe "cleaner" is >>> the proper word). You could easily co-locate the indices with the data, >>> given them a proper name. >>> >>> >> With HBase, you can do something similar (though, you're right, you'd need >> to create the column family upfront or take the hit of creating it >> dynamically - that's a nice feature that Accumulo has). The reason this >> doesn't work is that you need a different row key so that the index rows >> are ordered according to their indexed column values. If you put it in a >> column family of the data table, they're ordered in the same way as the >> data table. This makes range scans over index tables very expensive, as >> the >> rows would need to be re-ordered. >> >> > Ah, of course. You need the term up front to make it sort properly. > > > >>> Problem still exists that we don't have a solid way to do this solely >>> inside of Accumulo ATM. I'd imagine that if someone stepped up to >>> implement >>> coprocessors, we'd be taking the route of a separate, standalone process >>> (as opposed to in-RegionServer). Hypothetically, we could do the same for >>> Phoenix in the short-term. >>> >>> Can you quantify what would be expected by Accumulo to integrate with >>> Phoenix (maybe list what exactly is done inside of HBase at a high >>> level?) >>> so that we could give some more targeted ideas/feelings as to what the >>> level of work would be inside Accumulo? >>> >> >> >> There's not a lot of hard/fast requirements. Most of what Phoenix does is >> to optimize performance by leveraging the capabilities of the server. In >> terms of hard/fast requirements, these come to mind: >> - data is returned in row key order from range scans >> - a scan may set a start key/stop key to do a range scan >> - a row key may be composed of arbitrary bytes >> - a client may "pre-split" a table by providing the region boundaries at >> table create time (we rely on this for salting to prevent hotspotting: >> http://phoenix.incubator.apache.org/salted.html). >> - the client has access to the region boundaries of a table (this allows >> for better parallelization) >> - the client may issue chunk up a scan into smaller, multiple scans and >> run >> them in parallel >> Some of these may be a bit squishy, as there may be existing machinery >> already in your client programming model that could be leverage. The >> client >> API of HBase, for example, does not provide the ability out of the box to >> parallelize a scan, so this is something Phoenix had to add on top >> (through >> chunking up scans at or within region boundaries). >> > > All of these look fine. The Accumulo BatchScanner does that > parallelization for you which is really nice (handling tablet migration and > all that fun stuff transparently). That's nice that Accumulo has this built-in. Does it allow the client to specify the split points for the scan in some way? > > > > Phoenix manages the metadata of your tables (tables, columns, indexes, >> views, etc) in an HBase table. DDL statements such as CREATE TABLE, DROP >> TABLE, ALTER TABLE are atomic, transactional operations b/c we don't want >> our metadata table to get in a corrupt state. To accomplish this, we rely >> on: >> - setting a "split policy" that ensures that the table data for a given >> "tenant" (we support multi-tenancy: >> http://phoenix.incubator.apache.org/multi-tenancy.html) stay together in >> the same region. >> - putting the data using an API that guarantees that either the entire >> batch of mutations succeed or fail completely. >> Again, these are details of our implementation on HBase which do not >> necessarily need to be implemented in the same way on a different system. >> > > I'd have to look again at how our mutation failures are handled (or > someone else can chime in). This might be something to keep an eye on > depending on the distribution of mutations in regards to tables. > > > Phoenix supports sequences which are atomically incremented values. This >> is >> done through a coprocessor currently, due to some limitations with the >> HBase Increment API, but the idea is the same as an atomic increment. >> > > Conditional Mutations in the about-to-be-released version 1.6.0 will > provide this. > > > Phoenix does the following push down: >> - the WHERE clause gets transformed into three things: a start/stop key of >> a scan, a skip scan filter to efficiently navigate the key space (see >> http://phoenix-hbase.blogspot.com/2013/05/demystifying-skip- >> scan-in-phoenix.html), >> and a custom filter to rule out a row based on some java code that does >> expression evaluation. >> - the GROUP BY clause gets pushed to the server and a coprocessor runs the >> scan on each region so that the client doesn't have to get back all the >> raw >> data. Instead, the client gets back the aggregated data (to conserve >> network bandwidth and to run the scan where the data lives). The client >> then does a final merge sort. >> > > I've written an iterator to do a group by previously. Depending on the > schema this is fine. > > > - the ORDER BY clause used in combination with the LIMIT clause is a TopN >> query. We optimize this by each region holding on to the top N values with >> the client then doing a merge sort with the limit applied. >> > > This is an interesting one. If you remove the possibility of tablets > splitting out from underneath you and you had a view of the splits, you > could probably pull it off. > > > - the ORDER BY clause on it's own gets executed on each region (spooled >> using memory mapped files) and then the client does a merge sort. This >> spooling could potentially be done on the client side. >> > > Unless we can do some trickery with the schema, yeah, client side. > > > - joins are executed as a broadcast hash join. We run one side of the >> query >> (with the filters applied), compact the results, and send them to each >> region server where they are cached while we run the other side of the >> query. A coprocessor then does a map lookup (equi-joins only are supported >> currently) to join based on the join key and returns the joined results >> (i.e. the concatenated values in a single, condensed key value as access >> from the client is positional post-join). >> > > The join approach would need to be implemented some other way for the > earlier stated comparison of iterators and coprocessors. Client-side could be another fallback. The coprocessor approach is really only a big win in two cases: if you have a join which doesn't have many matches (as those rows get filtered on the server-side), or for correlated sub queries or exists queries where you can filter or collapse many rows to one or none on the server-side rather than return them all to the client. > > > For our global secondary indexes (local secondary indexes are different as >> we discussed already), we trap updates to the data table through a >> coprocessor. For index maintenance you need to know when a change occurs >> to >> a data row what the prior value of the row was. The reason is because you >> need to delete the index row corresponding to the old data row and then >> insert the index row corresponding to the new value (remember, the index >> value makes up the row key). By doing this operation through a >> coprocessor, >> we know that we can get the prior data row state locally. We still need to >> issue a Put from one region server to another, but this isn't really an >> extra hop, as if it was done on the client, the same hop would need to be >> done (but the old row state would need to be pulled over to the client >> which is not necessary with the coprocessor based approach). For more on >> global secondary indexing, see >> http://phoenix.incubator.apache.org/secondary_indexing.html (there are >> some >> good presentations at the end of the page that provide more technical >> detail). >> > > Right, you want to remove the old index value and update a new index value > (actually being two unique keys) in the same transaction to ensure a valid > index. Or, at least ensure that you never remove the old value, and die > before inserting the new value. > > Again, not going to work well in an iterator. > > > Phoenix also allows "point-in-time" queries where a client may establish a >> connection at an earlier timestamp. If your table is setup to keep >> multiple >> versions of the same row, then you can query "back-in-time" and will see >> the data as it was at that point. We more or less get this for free with >> the MVCC model of HBase by specifying a max timestamp on a scan. One >> slightly tricky bit is we correlate the current DDL of your table based on >> the same timestamp as with your data. So when you go back-in-time like >> this, you'll also see the structure of your table as it was at time also. >> > > I don't see this as a problem. As long as we remove the versioning > iterator from a table (which keeps the most recent version of a key by > default), it should be pretty easy to implement an iterator which adheres > to the "max timestamp" semantics. > > > So we do rely on coprocessors, but the underlying APIs were accessing on >> the server-side are pretty light. >> >> TLDR? Let's continue in the JIRA? >> >>> >>>> >>> Mailing list is fine by me for while we get this hashed out :). We can >>> move to Jira when we start getting into specifics. >>> >>> >>
