> I don’t understand why you want to atomically append to an array here instead
> of using a separate
> (DbName, Versionstamp) KV each time. What’s the advantage? Both structures
> require periodic
> cleanup. I also don’t understand why you need this DbName -> Versionstamp
> mapping at all. Is there > a reason to do some per-database cleanup on the
> contents of this global feed?
The idea is to amortize the cleanup/de-duplication cost. We can trigger cleanup
from the write transactions chosen by random sampling. However, we want to
constrain cleanup to the events of a single database to avoid coordination
between multiple de-duplication processes. Therefore, we need to maintain a
history of updates to the database (since we use the versionstamp as a key). In
this context (DbName, Versionstamp) is a very good idea. Because it would allow
us to use standard range operations instead of messing with IBLTs.
# Summary
- Every update transaction would write following
Sequence = (DbName, EventType)
(DbName, Sequence) = True
- When update transaction is finished we would generate random number to decide
if we need to trigger de-duplication
- If we need to trigger de-duplication we would spawn a new process and pass
the name of the database to it
- In that process we would do the following
maxSequence = 0
for _, Sequence in range(DbName, *):
- remove oldest entries in "Sequence -> (DbName, EventType)" mapping
- materialize the results in more consumable form
maxSequence = max(maxSequence, Sequence)
- issue delete range request range((DbName, *), last_less_than((DbName,
maxSequence))
Due to scoping of the de-duplication operation to single database and use of
random sampling we would be able to cleanup frequently updated operation at a
different rate than less frequently updated ones.
I hope it does make sense.
On 2019/03/27 20:33:16, Adam Kocoloski <kocol...@apache.org> wrote:
> Hi Ilya,
>
> I agree it would be quite nice if there was a way to implement this feature
> without a background worker — while also avoiding write contention for
> transactions that would otherwise not conflict with one another. I’m not sure
> it’s possible.
>
> I have a few comments:
>
> > We could maintain database level Sequence number and store global changes
> > feed in the following form:
> > UpdateSequence = (DbName, EventType, PreviousUpdateSequence)
>
> Tracking a database-wide “latest Sequence” in a single KV would mean we can’t
> execute any transactions on that database in parallel, so yet another reason
> why that strawman approach route cannot work.
>
> > In this case we could store the data we need as follows (under separate
> > subspace TBD).
> > VersionStamp = (DbName, EventType)
> > DbName = [versionstamps]
>
> I don’t understand why you want to atomically append to an array here instead
> of using a separate (DbName, Versionstamp) KV each time. What’s the
> advantage? Both structures require periodic cleanup. I also don’t understand
> why you need this DbName -> Versionstamp mapping at all. Is there a reason to
> do some per-database cleanup on the contents of this global feed?
>
> Cheers, Adam
>
>
> > On Mar 27, 2019, at 2:07 PM, Ilya Khlopotov <iil...@apache.org> wrote:
> >
> > Hi,
> >
> > Both proposals are fine but need a consumer process. Which is a tricky
> > requirement because it will lead to problems in cases when queue grows
> > faster than we can consume it. This realization got me thinking about
> > finding possible ways to eliminate the need for a consumer.
> >
> > I wouldn't spell out the final solution right away since I want to
> > demonstrate the thinking process so others could build better proposals on
> > top of it.
> >
> > Essentially, we need to de-duplicate events. In order to do that we need to
> > know when given database was updated last time. We could maintain database
> > level Sequence number and store global changes feed in the following form:
> > UpdateSequence = (DbName, EventType, PreviousUpdateSequence)
> >
> > Then every 10th (or 100th or 1000th) transaction can trigger a compaction
> > process for updated database. It would use PreviousUpdateSequence to get
> > pointer to get its parent, read pointer to grandparent, cleanup parent, and
> > so on so force until we wouldn't have anything to clean up.
> >
> > This is a terrible idea for the following reasons:
> > - Including UpdateSequence is expensive since we would need to add one more
> > read to every update transaction
> > - recursion to do cleanup is expensive and most likely would need to be
> > done in multiple transactions
> >
> > What if FDB would support a list type for a value and would have an atomic
> > operation to add the value to the list if it is missing. In this case we
> > could store the data we need as follows (under separate subspace TBD).
> > VersionStamp = (DbName, EventType)
> > DbName = [versionstamps]
> >
> > In this case in order to de-duplicate events, we would do the following:
> > - every once in a while (every 10th (or 100th or 1000th) update transaction
> > (we would use PRNG ) to specific database) would execute compaction
> > algorithm
> > - Read list of versionstamps for older updates and issue remove operations
> > for every version stamp except the biggest one
> > - update history value to include only biggest versionstamp
> >
> > The question is how we would implement atomic addition of a value to a
> > list. There is an IBLT data structure (https://arxiv.org/pdf/1101.2245.pdf)
> > which can help us to achieve that. IBLT consists of the multiple cells
> > where every cell has the following fields:
> > - count
> > - keySum
> > - valueSum
> > - hashkeySum
> >
> > The beauty of this structure is that all fields are updated using blind
> > addition operations while supporting enumeration of all key-values stored
> > in the structure (with configurable probability). Which is available in FDB
> > (aka atomic addition).
> >
> > For our specific case it doesn't look like we need valueSum (because we
> > only need keys) and hashkeySum (because we wouldn't have duplicates), so we
> > can simplify the structure.
> >
> > Best regards,
> > iilyak
> >
> >
> > On 2019/03/20 22:47:42, Adam Kocoloski <kocol...@apache.org> wrote:
> >> Hi all,
> >>
> >> Most of the discussions so far have focused on the core features that are
> >> fundamental to CouchDB: JSON documents, revision tracking, _changes. I
> >> thought I’d start a thread on something a bit different: the _db_updates
> >> feed.
> >>
> >> The _db_updates feed is an API that enables users to discover database
> >> lifecycle events across an entire CouchDB instance. It’s primarily useful
> >> in deployments that have lots and lots of databases, where it’s
> >> impractical to keep connections open for every database, and where
> >> database creations and deletions may be an automated aspect of the
> >> application’s use of CouchDB.
> >>
> >> There are really two topics for discussion here. The first is: do we need
> >> to keep it? The primary driver of applications creating lots of DBs is the
> >> per-DB granularity of access controls; if we go down the route of
> >> implementing the document-level _access proposal perhaps users naturally
> >> migrate away from this DB-per-user data model. I’d be curious to hear
> >> points of view there.
> >>
> >> I’ll assume for now that we do want to keep it, and offer some thoughts on
> >> how to implement it. The main challenge with _db_updates is managing the
> >> write contention; in write-heavy databases you have a lot of producers
> >> trying to tag that particular database as “updated", but all the consumer
> >> really cares about is getting a single “dbname”:”updated” event as needed.
> >> In the current architecture we try to dedupe a lot of the events in-memory
> >> before updating a regular CouchDB database with this information, but this
> >> leaves us exposed to possibly dropping events within a few second window.
> >>
> >> ## Option 1: Queue + Compaction
> >>
> >> One way to tackle this in FoundationDB is to have an intermediate subspace
> >> reserved as a queue. Each transaction that modifies a database would
> >> insert a versionstamped KV into the queue like
> >>
> >> Versionstamp = (DbName, EventType)
> >>
> >> Versionstamps are monotonically increasing and inserting versionstamped
> >> keys is a conflict-free operation. We’d have a consumer of this queue
> >> which is responsible for “log compaction”; i.e., the consumer would do
> >> range reads on the queue subspace, toss out duplicate contiguous
> >> “dbname”:“updated” events, and update a second index which would look more
> >> like the _changes feed.
> >>
> >> ### Scaling Consumers
> >>
> >> A single consumer can likely process 10k events/sec or more, but
> >> eventually we’ll need to scale. Borrowing from systems like Kafka the
> >> typical way to do this is to divide the queue into partitions and have
> >> individual consumers mapped to each partition. A partition in this model
> >> would just be a prefix on the Versionstamp:
> >>
> >> (PartitionID, Versionstamp) = (DbName, EventType)
> >>
> >> Our consumers will be more efficient and less likely to conflict with one
> >> another on updating the _db_updates index if messages are keyed to a
> >> partition based on DbName, although this still runs the risk that a couple
> >> of high-throughput databases could swamp a partition.
> >>
> >> I’m not sure about the best path forward for handling that scenario. One
> >> could implement a rate-limiter that starts sloughing off additional
> >> messages for high-throughput databases (which has some careful edge
> >> cases), split the messages for a single database across multiple
> >> partitions, rely on operators to blacklist certain databases from the
> >> _db_updates system, etc. Each has downsides.
> >>
> >> ## Option 2: Atomic Ops + Consumer
> >>
> >> In this approach we still have an intermediate subspace, and a consumer of
> >> that subspace which updates the _db_updates index. But this time, we have
> >> at most one KV per database in the subspace, with an atomic counter for a
> >> value. When a document is updated it bumps the counter for its database in
> >> that subspace. So we’ll have entries like
> >>
> >> (“counters”, “db1235”) = 1
> >> (“counters”, “db0001”) = 42
> >> (“counters”, “telemetry-db”) = 12312
> >>
> >> and so on. Like versionstamps, atomic operations are conflict-free so we
> >> need not worry about introducing spurious conflicts on high-throughput
> >> databases.
> >>
> >> The initial pass of the consumer logic would go something like this:
> >>
> >> - Do a snapshot range read of the “counters” subspace (or whatever we call
> >> it)
> >> - Record the current values for all counters in a separate summary KV
> >> (you’ll see why in a minute)
> >> - Do a limit=1 range read on the _changes space for each DB in the list to
> >> grab the latest Sequence
> >> - Update the _db_updates index with the latest Sequence for each of these
> >> databases
> >>
> >> On a second pass, the consumer would read the summary KV from the last
> >> pass and compare the previous counters with the current values. If any
> >> counters have not been updated in the interval, the consumer would try to
> >> clear those from the “counters” subspace (adding them as explicit conflict
> >> keys to ensure we don’t miss a concurrent update). It would then proceed
> >> with the rest of the logic from the initial pass. This is a careful
> >> balancing act:
> >>
> >> - We don’t want to pollute the “counters” subspace with idle databases
> >> because each entry requires an extra read of _changes
> >> - We don’t want to attempt to clear counters that are constantly updated
> >> because that’s going to fail with a conflict every time
> >>
> >> The scalability axis here is the number of databases updated within any
> >> short window of time (~1 second or less). If we end up with that number
> >> growing large we can have consumers responsible for range of the
> >> “counters” subspace, though I think that’s less likely than in the
> >> queue-based design.
> >>
> >> I don’t know in detail what optimizations FoundationDB applies to atomic
> >> operations (e.g. coalescing them at a layer above the storage engine).
> >> That’s worth checking into, as otherwise I’d be concerned about
> >> introducing super-hot keys here.
> >>
> >> This option does not handle the “created” and “deleted” lifecycle events
> >> for each database, but those are really quite simple and could really be
> >> inserted directly into the _db_updates index.
> >>
> >> ===
> >>
> >> There are some additional details which can be fleshed out in an RFC, but
> >> this is the basic gist of things. Both designs would be more robust at
> >> capturing every single updated database (because the enqueue/increment
> >> operation would be part of the document update transaction). They would
> >> allow for a small delay between the document update and the appearance of
> >> the database in _db_updates, which is no different than we have today.
> >> They each require a background process.
> >>
> >> Let’s hear what you think, both about the interest level for this feature
> >> and any comments on the designs. I may take this one over to the FDB
> >> forums as well for feedback. Cheers,
> >>
> >> Adam
>
>