Hi there, I'm happy to share some details about how Amazon Product Search does its segment replication. I haven't worked on Product Search in over three years, so anything that I remember is not particularly novel. Also, it's not really secret sauce -- I would have happily talked about it more in the 2021 re:Invent talk that Mike Sokolov and I did, but we were trying to keep within our time limit. :)
That model doesn't exactly have direct communication between primary and replica (which is generally a good practice in a cloud-based solution -- the fewer node-to-node dependencies, the better). The flow (if I recall correctly) is driven by a couple of side-car components for the writers and searchers and is roughly like this: 1. At a specified (pretty coarse) interval, the writer side-car calls a "create checkpoint" API on the writer to ask it to write a checkpoint. 2. The writer uploads new segment files to S3, and a metadata object describing the checkpoint contents (which probably includes segments from an earlier checkpoint, since they can be reused). 3. The writer returns the S3 URL for the metadata object to its side-car. 4. The write side-car publishes the metadata URL to "something" -- see below for details. 5. The searcher side-cars all read the metadata URL from "something" -- see below for details. 6. The searcher side-cars each call a "use checkpoint" API on their local searchers 7. The searchers each download the new segment files from S3 and open new IndexSearchers. For the details of steps 4 and 5, I don't actually remember how it worked, but I have two pretty good guesses from what I remember of the overall architecture: 1. DynamoDB: This is the more likely mechanism. Each index shard has a unique ID which serves as a partition key in DynamoDB and there's a sequence number as a sort key. The writer side-car inserts a DynamoDB record with the next sequence number and the metadata URL. The searcher side-car periodically fetches 1 record with the partition key by descending sequence number (i.e. get latest sequence entry for the partition key). If the sequence number has increased, then call the searcher's use-checkpoint API. 2. Kinesis: This feels like the less likely mechanism, but I guess it could work. The writer side-car writes the metadata URL to a Kinesis stream. Each searcher side-car reads from the Kinesis stream and passes the metadata URL to the searcher. I'm pretty sure we didn't have one Kinesis stream per index shard, because managing (and paying for) that many Kinesis streams would be a pain. Even with a sharded Kinesis stream, you'd "leak" some checkpoints across index shards, leading to data that the searcher side-cars would throw away. Also, each Kinesis stream shard has a limited number of concurrent consumers, which would mean that the number of search replicas would be limited. I'm *pretty sure* we used the DynamoDB approach. Another Lucene-based search system that I worked on many years ago at Amazon had a much simpler architecture: 1. Writer periodically writes new segments to S3. 2. After writing the new segments, the writer writes a metadata object to S3 with a path like "/<writer guid>/<index id>/<shard id>/<sequence number>/metadata.json". Because the writer was guaranteed to be the *only* thing writing with prefixes of <writer guid>, it could manage its own dense sequence numbers. 3. A searcher is "sticky" to a writer, and periodically issues an S3 GetObject for the next metadata object's full URL (i.e. the URL using the next dense sequence number). Until the next checkpoint is written, it gets a 404 response. 4. Searcher fetches the files referenced by the metadata file. A nice thing about that approach was that it only depended on S3 and only used PutObject and GetObject APIs, which tend to be more consistent. The downside was that we needed a separate mechanism for writer discovery and failover, to let searchers know the correct writer prefix. Hope that helps! Let me know if you need any other suggestions. Thanks, Froh On Wed, Feb 26, 2025 at 3:31 PM Steven Schlansker < stevenschlans...@gmail.com> wrote: > > > > On Feb 26, 2025, at 2:53 PM, Marc Davenport > > <madavenp...@cargurus.com.INVALID> > wrote: > > > > Hello, > > Our current search solution is a pretty big monolith running on pretty > > beefy EC2 instances. Every node is responsible for indexing and serving > > queries. We want to start decomposing our service and are starting with > > separating the indexing and query handling responsibilities. > > We run a probably comparatively small but otherwise similar installation, > using > Google Kubernetes instances. We just use a persistent disk instead of an > elastic store, but > also would consider using something like S3 in the future. > > > I'm in the research phases now trying to collect any prior art I can. The > > rough sketch is to implement the NRT two replication node classes on > their > > respective services and use S3 as a distribution point for the segment > > files. I'm still debating if there should be some direct knowledge of > the > > replicas in the primary node. > > We tried to avoid the primary keeping any durable state about replicas, > as replicas tend to disappear for any or no reason in a cloud environment. > A specific example issue we ran into: we disable the > 'waitForAllRemotesToClose' step entirely > https://github.com/apache/lucene/pull/11822 > > > Or if the primary node can just churn away > > creating base indexes and updates and publish to a queue when it > produces a > > new set of segments. Then the replicas are then free to pick up the > latest > > index as they spin up and subscribe to changes for it. It seems like > having > > the indexer being responsible for also communicating with the replicas > > would be double duty for that system. I'd love to hear other experiences > > if people can share them or point to writings about them they read when > > designing their systems. > > In our case, the primary keeps the latest CopyState that any replica > should take in memory. > Replicas call a HTTP api in an infinite loop, passing in their current > version, and asking if any newer version is available. > If the replica is behind, the primary gives it the current CopyState NRT > point immediately. > If the replica is caught up, we hold the request in a long-poll for just > short of our http timeout, waiting for > a new version to become available, otherwise return "no update for now, > try again". > > Once the replica receives an updated CopyState, it feeds it into the > ReplicaNode with newNRTPoint which starts the file copy > > There's a bit of magic we devised internally around retaining the > CopyState reference count, as this is stateful and the HTTP replication is > mostly stateless. > We keep this simple by having only a single primary node at any given time. > > Your idea of using a queue instead is interesting but not something we > extensively looked at :) > > > I've looked at nrtsearch from yelp and they seem to let the primary node > > have direct knowledge of the replicas. That makes sense since it is > based > > on McCandless's LuceneServer. > > > > I know that Amazon internally uses Lucene and has indexing separated from > > query nodes and that they re-index and publish completely new indexes > with > > every release to prod. I've been watching what I can of the great videos > of > > Sokolov, McCandless, & Froh etc. But they don't show much behind the > > curtain (understandably) about the details of keeping things in sync. > If > > someone does know of a publicly available video or resource that > describes > > this, I'd love to see it. > > Unfortunately our journey with the LuceneServer was basically informed by > the code and blog posts you've likely already seen - > and a bit of help from this mailing list - otherwise, there's not a ton of > information out there or evidence of widespread use. > That said, we've been very happy with our setup, despite needing to put in > a fair amount of elbow grease and low level details to get things working > for us. > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org > For additional commands, e-mail: java-user-h...@lucene.apache.org > >