> I'm still debating if there should be some direct knowledge of the > replicas in the primary node. Or if the primary node can just churn away > creating base indexes and updates and publish to a queue when it produces a > new set of segments. Then the replicas are then free to pick up the latest > index as they spin up and subscribe to changes for it.
In nrtsearch, the primary uses replica information to: 1. publish new NRT point 2. pre-copy merged segments It should be possible to let replicas know of new NRT points using an external queue. Pre-copying merged segments can also be done with a queue, but the replicas would need some way to let the primary know that they are done copying the merged files. If the primary doesn't have knowledge of replicas, some third service would have to keep track of the replicas and let the primary know once all replicas are done pre-copying the segments. Alternatively, you can just skip pre-copying. On Wed, Feb 26, 2025 at 4:31 PM Michael Froh <msf...@gmail.com> wrote: > Hi there, > > I'm happy to share some details about how Amazon Product Search does its > segment replication. I haven't worked on Product Search in over three > years, so anything that I remember is not particularly novel. Also, it's > not really secret sauce -- I would have happily talked about it more in the > 2021 re:Invent talk that Mike Sokolov and I did, but we were trying to keep > within our time limit. :) > > That model doesn't exactly have direct communication between primary and > replica (which is generally a good practice in a cloud-based solution -- > the fewer node-to-node dependencies, the better). The flow (if I recall > correctly) is driven by a couple of side-car components for the writers and > searchers and is roughly like this: > > 1. At a specified (pretty coarse) interval, the writer side-car calls a > "create checkpoint" API on the writer to ask it to write a checkpoint. > 2. The writer uploads new segment files to S3, and a metadata object > describing the checkpoint contents (which probably includes segments from > an earlier checkpoint, since they can be reused). > 3. The writer returns the S3 URL for the metadata object to its side-car. > 4. The write side-car publishes the metadata URL to "something" -- see > below for details. > 5. The searcher side-cars all read the metadata URL from "something" -- see > below for details. > 6. The searcher side-cars each call a "use checkpoint" API on their local > searchers > 7. The searchers each download the new segment files from S3 and open new > IndexSearchers. > > For the details of steps 4 and 5, I don't actually remember how it worked, > but I have two pretty good guesses from what I remember of the overall > architecture: > > 1. DynamoDB: This is the more likely mechanism. Each index shard has a > unique ID which serves as a partition key in DynamoDB and there's a > sequence number as a sort key. The writer side-car inserts a DynamoDB > record with the next sequence number and the metadata URL. The searcher > side-car periodically fetches 1 record with the partition key by descending > sequence number (i.e. get latest sequence entry for the partition key). If > the sequence number has increased, then call the searcher's use-checkpoint > API. > 2. Kinesis: This feels like the less likely mechanism, but I guess it could > work. The writer side-car writes the metadata URL to a Kinesis stream. Each > searcher side-car reads from the Kinesis stream and passes the metadata URL > to the searcher. I'm pretty sure we didn't have one Kinesis stream per > index shard, because managing (and paying for) that many Kinesis streams > would be a pain. Even with a sharded Kinesis stream, you'd "leak" some > checkpoints across index shards, leading to data that the searcher > side-cars would throw away. Also, each Kinesis stream shard has a limited > number of concurrent consumers, which would mean that the number of search > replicas would be limited. I'm *pretty sure* we used the DynamoDB approach. > > Another Lucene-based search system that I worked on many years ago at > Amazon had a much simpler architecture: > > 1. Writer periodically writes new segments to S3. > 2. After writing the new segments, the writer writes a metadata object to > S3 with a path like "/<writer guid>/<index id>/<shard id>/<sequence > number>/metadata.json". Because the writer was guaranteed to be the *only* > thing writing with prefixes of <writer guid>, it could manage its own dense > sequence numbers. > 3. A searcher is "sticky" to a writer, and periodically issues an S3 > GetObject for the next metadata object's full URL (i.e. the URL using the > next dense sequence number). Until the next checkpoint is written, it gets > a 404 response. > 4. Searcher fetches the files referenced by the metadata file. > > A nice thing about that approach was that it only depended on S3 and only > used PutObject and GetObject APIs, which tend to be more consistent. The > downside was that we needed a separate mechanism for writer discovery and > failover, to let searchers know the correct writer prefix. > > Hope that helps! Let me know if you need any other suggestions. > > Thanks, > Froh > > On Wed, Feb 26, 2025 at 3:31 PM Steven Schlansker < > stevenschlans...@gmail.com> wrote: > > > > > > > > On Feb 26, 2025, at 2:53 PM, Marc Davenport <madavenp...@cargurus.com > .INVALID> > > wrote: > > > > > > Hello, > > > Our current search solution is a pretty big monolith running on pretty > > > beefy EC2 instances. Every node is responsible for indexing and > serving > > > queries. We want to start decomposing our service and are starting > with > > > separating the indexing and query handling responsibilities. > > > > We run a probably comparatively small but otherwise similar installation, > > using > > Google Kubernetes instances. We just use a persistent disk instead of an > > elastic store, but > > also would consider using something like S3 in the future. > > > > > I'm in the research phases now trying to collect any prior art I can. > The > > > rough sketch is to implement the NRT two replication node classes on > > their > > > respective services and use S3 as a distribution point for the segment > > > files. I'm still debating if there should be some direct knowledge of > > the > > > replicas in the primary node. > > > > We tried to avoid the primary keeping any durable state about replicas, > > as replicas tend to disappear for any or no reason in a cloud > environment. > > A specific example issue we ran into: we disable the > > 'waitForAllRemotesToClose' step entirely > > https://github.com/apache/lucene/pull/11822 > > > > > Or if the primary node can just churn away > > > creating base indexes and updates and publish to a queue when it > > produces a > > > new set of segments. Then the replicas are then free to pick up the > > latest > > > index as they spin up and subscribe to changes for it. It seems like > > having > > > the indexer being responsible for also communicating with the replicas > > > would be double duty for that system. I'd love to hear other > experiences > > > if people can share them or point to writings about them they read when > > > designing their systems. > > > > In our case, the primary keeps the latest CopyState that any replica > > should take in memory. > > Replicas call a HTTP api in an infinite loop, passing in their current > > version, and asking if any newer version is available. > > If the replica is behind, the primary gives it the current CopyState NRT > > point immediately. > > If the replica is caught up, we hold the request in a long-poll for just > > short of our http timeout, waiting for > > a new version to become available, otherwise return "no update for now, > > try again". > > > > Once the replica receives an updated CopyState, it feeds it into the > > ReplicaNode with newNRTPoint which starts the file copy > > > > There's a bit of magic we devised internally around retaining the > > CopyState reference count, as this is stateful and the HTTP replication > is > > mostly stateless. > > We keep this simple by having only a single primary node at any given > time. > > > > Your idea of using a queue instead is interesting but not something we > > extensively looked at :) > > > > > I've looked at nrtsearch from yelp and they seem to let the primary > node > > > have direct knowledge of the replicas. That makes sense since it is > > based > > > on McCandless's LuceneServer. > > > > > > I know that Amazon internally uses Lucene and has indexing separated > from > > > query nodes and that they re-index and publish completely new indexes > > with > > > every release to prod. I've been watching what I can of the great > videos > > of > > > Sokolov, McCandless, & Froh etc. But they don't show much behind the > > > curtain (understandably) about the details of keeping things in sync. > > If > > > someone does know of a publicly available video or resource that > > describes > > > this, I'd love to see it. > > > > Unfortunately our journey with the LuceneServer was basically informed by > > the code and blog posts you've likely already seen - > > and a bit of help from this mailing list - otherwise, there's not a ton > of > > information out there or evidence of widespread use. > > That said, we've been very happy with our setup, despite needing to put > in > > a fair amount of elbow grease and low level details to get things working > > for us. > > > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: java-user-unsubscr...@lucene.apache.org > > For additional commands, e-mail: java-user-h...@lucene.apache.org > > > > >