On 4/30/14, 3:33 AM, James Taylor wrote:
On Tue, Apr 29, 2014 at 11:57 AM, Josh Elser <[email protected]> wrote:

@Josh - it's less baked in than you'd think on the client where the query
parsing, compilation, optimization, and orchestration occurs. The
client/server interaction is hidden behind the ConnectionQueryServices
interface, the scanning behind ResultIterator (in
particular ScanningResultIterator), the DML behind MutationState, and
KeyValue interaction behind KeyValueBuilder. Yes, though, it would require
some more abstraction, but probably not too bad, though. On the
server-side, the entry points would all be different and that's where I'd
need your insights for what's possible.


Definitely. I'm a little concerned about what's expected to be provided by
the "database" (HBase, Accumulo) as I believe HBase is a little more
flexible in allowing writes internally where Accumulo has thus far said
"you're gonna have a bad time".


Tell me more about what you mean by "allowing writes internally".

Haha, sorry, that was a sufficiently ominous statement with insufficient context.

For discussion sake, let's just say HBase coprocessors and Accumulo iterators are equivalent, purely in the scope of "running server-side code" (in the RegionServer/TabletServer). However, there is a notable difference in the pipeline where each of those are implemented.

Coprocessors have built-in hooks that let you get updates on PUT/GET/DELETE/etc as well as pre and post each of those operations. In other words, they provide hooks at a "high database level".

Iterators tend to be much closer to the data itself, only dealing with streams of data (other iterators stacked on one another). Iterators implement versioning, visibilities, and can even implement complex searches. The downside of this approach is that iterators lack any means to safely write data _outside of the sorted Key-Value pairs in the tablet currently being processed_. It's possible to make in tablet updates, but sorted order within a large tablet might make this difficult as well.

This is why I was thinking percolator would be a better solution, as it's meant for handling updates like this server-side. However, I imagine it would be possible, in the short-term, to make some separate process between Phoenix and Accumulo which handles writes.




  @Eric - I agree about having txn support (probably through snapshot
isolation) by controlling the timestamp, and then layering indexing on top
of that. That's where we're headed. But I wouldn't let that stop the
effort
- it would just be layered on top of what's already there. FWIW, there's
another interesting indexing model that has been termed "local indexing"(
https://github.com/Huawei-Hadoop/hindex) which is being worked on right
now
(should be available in either our 4.1 or 4.2 release). In this model, the
table data and index data are co-located on the same region server through
a kind of "buddy" region mechanism. The advantage is that you take no hit
at write time, as you're writing both the index and table data together.
Not sure how/if this would transfer over to the Accumulo world.


Interesting. Given that Accumulo doesn't have a fixed column family
schema, this might make index generation even easier (maybe "cleaner" is
the proper word). You could easily co-locate the indices with the data,
given them a proper name.


With HBase, you can do something similar (though, you're right, you'd need
to create the column family upfront or take the hit of creating it
dynamically - that's a nice feature that Accumulo has). The reason this
doesn't work is that you need a different row key so that the index rows
are ordered according to their indexed column values. If you put it in a
column family of the data table, they're ordered in the same way as the
data table. This makes range scans over index tables very expensive, as the
rows would need to be re-ordered.


Ah, of course. You need the term up front to make it sort properly.


Problem still exists that we don't have a solid way to do this solely
inside of Accumulo ATM. I'd imagine that if someone stepped up to implement
coprocessors, we'd be taking the route of a separate, standalone process
(as opposed to in-RegionServer). Hypothetically, we could do the same for
Phoenix in the short-term.

Can you quantify what would be expected by Accumulo to integrate with
Phoenix (maybe list what exactly is done inside of HBase at a high level?)
so that we could give some more targeted ideas/feelings as to what the
level of work would be inside Accumulo?


There's not a lot of hard/fast requirements. Most of what Phoenix does is
to optimize performance by leveraging the capabilities of the server. In
terms of hard/fast requirements, these come to mind:
- data is returned in row key order from range scans
- a scan may set a start key/stop key to do a range scan
- a row key may be composed of arbitrary bytes
- a client may "pre-split" a table by providing the region boundaries at
table create time (we rely on this for salting to prevent hotspotting:
http://phoenix.incubator.apache.org/salted.html).
- the client has access to the region boundaries of a table (this allows
for better parallelization)
- the client may issue chunk up a scan into smaller, multiple scans and run
them in parallel
Some of these may be a bit squishy, as there may be existing machinery
already in your client programming model that could be leverage. The client
API of HBase, for example, does not provide the ability out of the box to
parallelize a scan, so this is something Phoenix had to add on top (through
chunking up scans at or within region boundaries).

All of these look fine. The Accumulo BatchScanner does that parallelization for you which is really nice (handling tablet migration and all that fun stuff transparently).


Phoenix manages the metadata of your tables (tables, columns, indexes,
views, etc) in an HBase table. DDL statements such as CREATE TABLE, DROP
TABLE, ALTER TABLE are atomic, transactional operations b/c we don't want
our metadata table to get in a corrupt state. To accomplish this, we rely
on:
- setting a "split policy" that ensures that the table data for a given
"tenant" (we support multi-tenancy:
http://phoenix.incubator.apache.org/multi-tenancy.html) stay together in
the same region.
- putting the data using an API that guarantees that either the entire
batch of mutations succeed or fail completely.
Again, these are details of our implementation on HBase which do not
necessarily need to be implemented in the same way on a different system.

I'd have to look again at how our mutation failures are handled (or someone else can chime in). This might be something to keep an eye on depending on the distribution of mutations in regards to tables.

Phoenix supports sequences which are atomically incremented values. This is
done through a coprocessor currently, due to some limitations with the
HBase Increment API, but the idea is the same as an atomic increment.

Conditional Mutations in the about-to-be-released version 1.6.0 will provide this.

Phoenix does the following push down:
- the WHERE clause gets transformed into three things: a start/stop key of
a scan, a skip scan filter to efficiently navigate the key space (see
http://phoenix-hbase.blogspot.com/2013/05/demystifying-skip-scan-in-phoenix.html),
and a custom filter to rule out a row based on some java code that does
expression evaluation.
- the GROUP BY clause gets pushed to the server and a coprocessor runs the
scan on each region so that the client doesn't have to get back all the raw
data. Instead, the client gets back the aggregated data (to conserve
network bandwidth and to run the scan where the data lives). The client
then does a final merge sort.

I've written an iterator to do a group by previously. Depending on the schema this is fine.

- the ORDER BY clause used in combination with the LIMIT clause is a TopN
query. We optimize this by each region holding on to the top N values with
the client then doing a merge sort with the limit applied.

This is an interesting one. If you remove the possibility of tablets splitting out from underneath you and you had a view of the splits, you could probably pull it off.

- the ORDER BY clause on it's own gets executed on each region (spooled
using memory mapped files) and then the client does a merge sort. This
spooling could potentially be done on the client side.

Unless we can do some trickery with the schema, yeah, client side.

- joins are executed as a broadcast hash join. We run one side of the query
(with the filters applied), compact the results, and send them to each
region server where they are cached while we run the other side of the
query. A coprocessor then does a map lookup (equi-joins only are supported
currently) to join based on the join key and returns the joined results
(i.e. the concatenated values in a single, condensed key value as access
from the client is positional post-join).

The join approach would need to be implemented some other way for the earlier stated comparison of iterators and coprocessors.

For our global secondary indexes (local secondary indexes are different as
we discussed already), we trap updates to the data table through a
coprocessor. For index maintenance you need to know when a change occurs to
a data row what the prior value of the row was. The reason is because you
need to delete the index row corresponding to the old data row and then
insert the index row corresponding to the new value (remember, the index
value makes up the row key). By doing this operation through a coprocessor,
we know that we can get the prior data row state locally. We still need to
issue a Put from one region server to another, but this isn't really an
extra hop, as if it was done on the client, the same hop would need to be
done (but the old row state would need to be pulled over to the client
which is not necessary with the coprocessor based approach). For more on
global secondary indexing, see
http://phoenix.incubator.apache.org/secondary_indexing.html (there are some
good presentations at the end of the page that provide more technical
detail).

Right, you want to remove the old index value and update a new index value (actually being two unique keys) in the same transaction to ensure a valid index. Or, at least ensure that you never remove the old value, and die before inserting the new value.

Again, not going to work well in an iterator.

Phoenix also allows "point-in-time" queries where a client may establish a
connection at an earlier timestamp. If your table is setup to keep multiple
versions of the same row, then you can query "back-in-time" and will see
the data as it was at that point. We more or less get this for free with
the MVCC model of HBase by specifying a max timestamp on a scan. One
slightly tricky bit is we correlate the current DDL of your table based on
the same timestamp as with your data. So when you go back-in-time like
this, you'll also see the structure of your table as it was at time also.

I don't see this as a problem. As long as we remove the versioning iterator from a table (which keeps the most recent version of a key by default), it should be pretty easy to implement an iterator which adheres to the "max timestamp" semantics.

So we do rely on coprocessors, but the underlying APIs were accessing on
the server-side are pretty light.

  TLDR? Let's continue in the JIRA?


Mailing list is fine by me for while we get this hashed out :). We can
move to Jira when we start getting into specifics.


Reply via email to