Hi folks, I did some more thinking on this topic and wrote up an RFC: https://github.com/apache/couchdb-documentation/pull/651
Cheers, Adam > On Apr 2, 2021, at 9:03 AM, Adam Kocoloski <kocol...@apache.org> wrote: > > Hi Glynn, my thoughts in-line: > >> On Apr 2, 2021, at 1:40 AM, Glynn Bird <glynnb...@apache.org> wrote: >> >> Is there a possibility that a future replicator, instead of consuming the >> "firehose" changes feed, could instead be split into >> 1-worker-per-changes-feed-shard as a neat way of parallelizing data >> transfer? > > It certainly crossed my mind. > >> If there is to be a configurable changes feed shard count, what would be >> the default? 1 assuming smallish databases? > > Yes, I would start at 1. > >> What would the public api look like for consuming a single changes feed >> shard? > > I’d imagine a service discovery endpoint that would hand out the URLs for the > current set of shards as of a given sequence. On the individual endpoints I’d > consider eliminating the "one giant JSON object" response format and just > using JSONL. > > If you allow for dynamically changing the shard count on a database over time > things can get a little tricky. For example, you might have 1 shard for the > first million sequences, then 8 shards for the next million, then back down > to 4. I’d consider making each shard ID a UUID, and writing a tombstone in > that shard whenever a resharding event occurs. When a consumer reaches the > tombstone sequence for one set of shards it makes a followup request to the > service discovery endpoint at that sequence to discover the next list of URLs > to consume. > >> Does the value of changes feed shard count have an upper bound? > > Probably a good idea, eh? I’m not aware of some other constraint that would > specify a limit for us, but we should definitely specify one. > > Adam > >> On Fri, 2 Apr 2021, 03:11 Adam Kocoloski, <kocol...@apache.org> wrote: >> >>> Hi all, >>> >>> CouchDB’s _changes feed has always featured a single endpoint per DB that >>> delivers a firehose of update events. The sharding model in 2.x/3.x meant >>> that internally each replica of a shard had its own _changes feed, and in >>> fact we used those individual feeds to maintain secondary indexes. If you >>> wanted to support a higher indexing throughput, you added more shards to >>> the database. Simple. >>> >>> The current implementation of _changes in FoundationDB uses a single, >>> totally-ordered range of keys. While this is a straightforward model, it >>> has some downsides. High throughput databases introduce a hotspot in the >>> range-partitioned FoundationDB cluster, and there’s no natural mechanism >>> for parallel processing of the changes. The producer/consumer asymmetry >>> here makes it very easy to define a view that can never keep up with >>> incoming write load. >>> >>> I think we should look at sharding each _changes index into a set of >>> individual subspaces. It would help balance writes across multiple key >>> ranges, and would provide a natural way to scale the view maintenance work >>> to multiple processes. We could introduce a new external API to allow >>> consumers to access the individual shard feeds directly. The existing >>> interface would be maintained for backwards compatibility, using >>> essentially the same logic that we have today for merging view responses >>> from multiple shards. Some additional thoughts: >>> >>> - Each entry would still be indexed by a globally unique and >>> totally-ordered sequence number, so a consumer that needed to order entries >>> across all shards could still do so. >>> >>> - We could consider a few different strategies for assigning updates to >>> shards. A natural one would be to use some form of consistent hashing to >>> ensure updates to the same document (or the same partition) always land in >>> the same shard. This appears to be the default behavior for both Kafka and >>> Pulsar when publishing to partitioned topics: >>> >>> https://kafka.apache.org/documentation/#intro_concepts_and_terms >>> https://pulsar.apache.org/docs/en/concepts-messaging/#routing-modes >>> >>> - We’ve recently had some discussions about the importance of being able >>> to query a view that observes a consistent snapshot of a DB as it existed >>> at some point in time. Parallelizing the index builds introduces a bit of >>> extra complexity here, but it seems manageable and actually probably >>> encourages us to be more concrete about the specific commit points where we >>> can provide that guarantee. I’ll omit extra detail on this for now as it >>> can get subtle quickly and probably detracts from the main point of this >>> thread. >>> >>> - I’m not sure how I feel about asking users to select a shard count here. >>> I guess it’s probably inevitable. The good news is that we should be able >>> to dynamically scale shard counts up and down without any sort of data >>> rebalancing, provided we document that changing the shard count will cause >>> a re-mapping of partition keys to shards. >>> >>> - I took a look through the codebase and I think this may be a fairly >>> compact patch. We really only consume the changes feed in two locations >>> (one for the external API and one for the view engine). >>> >>> I think this makes a lot of sense but looking forward to hearing other >>> points of view. Cheers, >>> >>> Adam >