Hi Ilya,

I agree it would be quite nice if there was a way to implement this feature 
without a background worker — while also avoiding write contention for 
transactions that would otherwise not conflict with one another. I’m not sure 
it’s possible.

I have a few comments:

> We could maintain database level Sequence number and store global changes 
> feed in the following form:
>   UpdateSequence = (DbName, EventType, PreviousUpdateSequence)

Tracking a database-wide “latest Sequence” in a single KV would mean we can’t 
execute any transactions on that database in parallel, so yet another reason 
why that strawman approach route cannot work.

> In this case we could store the data we need as follows (under separate 
> subspace TBD).
> VersionStamp = (DbName, EventType)
> DbName = [versionstamps]

I don’t understand why you want to atomically append to an array here instead 
of using a separate (DbName, Versionstamp) KV each time. What’s the advantage? 
Both structures require periodic cleanup. I also don’t understand why you need 
this DbName -> Versionstamp mapping at all. Is there a reason to do some 
per-database cleanup on the contents of this global feed?

Cheers, Adam


> On Mar 27, 2019, at 2:07 PM, Ilya Khlopotov <iil...@apache.org> wrote:
> 
> Hi, 
> 
> Both proposals are fine but need a consumer process. Which is a tricky 
> requirement because it will lead to problems in cases when queue grows faster 
> than we can consume it. This realization got me thinking about finding 
> possible ways to eliminate the need for a consumer.
> 
> I wouldn't spell out the final solution right away since I want to 
> demonstrate the thinking process so others could build better proposals on 
> top of it. 
> 
> Essentially, we need to de-duplicate events. In order to do that we need to 
> know when given database was updated last time. We could maintain database 
> level Sequence number and store global changes feed in the following form:
>   UpdateSequence = (DbName, EventType, PreviousUpdateSequence)
> 
> Then every 10th (or 100th or 1000th) transaction can trigger a compaction 
> process for updated database. It would use PreviousUpdateSequence to get 
> pointer to get its parent, read pointer to grandparent, cleanup parent, and 
> so on so force until we wouldn't have anything to clean up.
> 
> This is a terrible idea for the following reasons:
> - Including UpdateSequence is expensive since we would need to add one more 
> read to every update transaction
> - recursion to do cleanup is expensive and most likely would need to be done 
> in multiple transactions
> 
> What if FDB would support a list type for a value and would have an atomic 
> operation to add the value to the list if it is missing. In this case we 
> could store the data we need as follows (under separate subspace TBD).
> VersionStamp = (DbName, EventType)
> DbName = [versionstamps]
> 
> In this case in order to de-duplicate events, we would do the following:
> - every once in a while (every 10th (or 100th or 1000th) update transaction 
> (we would use PRNG ) to specific database) would execute compaction algorithm 
> - Read list of versionstamps for older updates and issue remove operations 
> for every version stamp except the biggest one
> - update history value to include only biggest versionstamp
> 
> The question is how we would implement atomic addition of a value to a list. 
> There is an IBLT data structure (https://arxiv.org/pdf/1101.2245.pdf) which 
> can help us to achieve that. IBLT consists of the multiple cells where every 
> cell has the following fields:
> - count
> - keySum
> - valueSum
> - hashkeySum
> 
> The beauty of this structure is that all fields are updated using blind 
> addition operations while supporting enumeration of all key-values stored in 
> the structure (with configurable probability). Which is available in FDB (aka 
> atomic addition).
> 
> For our specific case it doesn't look like we need valueSum (because we only 
> need keys) and hashkeySum (because we wouldn't have duplicates), so we can 
> simplify the structure.
> 
> Best regards,
> iilyak
> 
> 
> On 2019/03/20 22:47:42, Adam Kocoloski <kocol...@apache.org> wrote: 
>> Hi all,
>> 
>> Most of the discussions so far have focused on the core features that are 
>> fundamental to CouchDB: JSON documents, revision tracking, _changes. I 
>> thought I’d start a thread on something a bit different: the _db_updates 
>> feed.
>> 
>> The _db_updates feed is an API that enables users to discover database 
>> lifecycle events across an entire CouchDB instance. It’s primarily useful in 
>> deployments that have lots and lots of databases, where it’s impractical to 
>> keep connections open for every database, and where database creations and 
>> deletions may be an automated aspect of the application’s use of CouchDB.
>> 
>> There are really two topics for discussion here. The first is: do we need to 
>> keep it? The primary driver of applications creating lots of DBs is the 
>> per-DB granularity of access controls; if we go down the route of 
>> implementing the document-level _access proposal perhaps users naturally 
>> migrate away from this DB-per-user data model. I’d be curious to hear points 
>> of view there.
>> 
>> I’ll assume for now that we do want to keep it, and offer some thoughts on 
>> how to implement it. The main challenge with _db_updates is managing the 
>> write contention; in write-heavy databases you have a lot of producers 
>> trying to tag that particular database as “updated", but all the consumer 
>> really cares about is getting a single “dbname”:”updated” event as needed. 
>> In the current architecture we try to dedupe a lot of the events in-memory 
>> before updating a regular CouchDB database with this information, but this 
>> leaves us exposed to possibly dropping events within a few second window.
>> 
>> ## Option 1: Queue + Compaction
>> 
>> One way to tackle this in FoundationDB is to have an intermediate subspace 
>> reserved as a queue. Each transaction that modifies a database would insert 
>> a versionstamped KV into the queue like
>> 
>> Versionstamp = (DbName, EventType)
>> 
>> Versionstamps are monotonically increasing and inserting versionstamped keys 
>> is a conflict-free operation. We’d have a consumer of this queue which is 
>> responsible for “log compaction”; i.e., the consumer would do range reads on 
>> the queue subspace, toss out duplicate contiguous “dbname”:“updated” events, 
>> and update a second index which would look more like the _changes feed.
>> 
>> ### Scaling Consumers
>> 
>> A single consumer can likely process 10k events/sec or more, but eventually 
>> we’ll need to scale. Borrowing from systems like Kafka the typical way to do 
>> this is to divide the queue into partitions and have individual consumers 
>> mapped to each partition. A partition in this model would just be a prefix 
>> on the Versionstamp:
>> 
>> (PartitionID, Versionstamp) = (DbName, EventType)
>> 
>> Our consumers will be more efficient and less likely to conflict with one 
>> another on updating the _db_updates index if messages are keyed to a 
>> partition based on DbName, although this still runs the risk that a couple 
>> of high-throughput databases could swamp a partition.
>> 
>> I’m not sure about the best path forward for handling that scenario. One 
>> could implement a rate-limiter that starts sloughing off additional messages 
>> for high-throughput databases (which has some careful edge cases), split the 
>> messages for a single database across multiple partitions, rely on operators 
>> to blacklist certain databases from the _db_updates system, etc. Each has 
>> downsides.
>> 
>> ## Option 2: Atomic Ops + Consumer
>> 
>> In this approach we still have an intermediate subspace, and a consumer of 
>> that subspace which updates the _db_updates index. But this time, we have at 
>> most one KV per database in the subspace, with an atomic counter for a 
>> value. When a document is updated it bumps the counter for its database in 
>> that subspace. So we’ll have entries like
>> 
>> (“counters”, “db1235”) = 1
>> (“counters”, “db0001”) = 42
>> (“counters”, “telemetry-db”) = 12312
>> 
>> and so on. Like versionstamps, atomic operations are conflict-free so we 
>> need not worry about introducing spurious conflicts on high-throughput 
>> databases.
>> 
>> The initial pass of the consumer logic would go something like this:
>> 
>> - Do a snapshot range read of the “counters” subspace (or whatever we call 
>> it)
>> - Record the current values for all counters in a separate summary KV 
>> (you’ll see why in a minute)
>> - Do a limit=1 range read on the _changes space for each DB in the list to 
>> grab the latest Sequence
>> - Update the _db_updates index with the latest Sequence for each of these 
>> databases
>> 
>> On a second pass, the consumer would read the summary KV from the last pass 
>> and compare the previous counters with the current values. If any counters 
>> have not been updated in the interval, the consumer would try to clear those 
>> from the “counters” subspace (adding them as explicit conflict keys to 
>> ensure we don’t miss a concurrent update). It would then proceed with the 
>> rest of the logic from the initial pass. This is a careful balancing act:
>> 
>> - We don’t want to pollute the “counters” subspace with idle databases 
>> because each entry requires an extra read of _changes
>> - We don’t want to attempt to clear counters that are constantly updated 
>> because that’s going to fail with a conflict every time
>> 
>> The scalability axis here is the number of databases updated within any 
>> short window of time (~1 second or less). If we end up with that number 
>> growing large we can have consumers responsible for range of the “counters” 
>> subspace, though I think that’s less likely than in the queue-based design.
>> 
>> I don’t know in detail what optimizations FoundationDB applies to atomic 
>> operations (e.g. coalescing them at a layer above the storage engine). 
>> That’s worth checking into, as otherwise I’d be concerned about introducing 
>> super-hot keys here.
>> 
>> This option does not handle the “created” and “deleted” lifecycle events for 
>> each database, but those are really quite simple and could really be 
>> inserted directly into the _db_updates index.
>> 
>> ===
>> 
>> There are some additional details which can be fleshed out in an RFC, but 
>> this is the basic gist of things. Both designs would be more robust at 
>> capturing every single updated database (because the enqueue/increment 
>> operation would be part of the document update transaction). They would 
>> allow for a small delay between the document update and the appearance of 
>> the database in _db_updates, which is no different than we have today. They 
>> each require a background process.
>> 
>> Let’s hear what you think, both about the interest level for this feature 
>> and any comments on the designs. I may take this one over to the FDB forums 
>> as well for feedback. Cheers,
>> 
>> Adam

Reply via email to