Hi, A common approach for replicating changes across multiple geographically distributed clusters if to put a message queue in front of Elasticsearch and feed all data modifications through this so that they can be applied to the clusters independently. This allows issues with unreliable connections to be handled as long as the message queue is able to queue up enough events.
Any solution based on snapshot and restore would most likely require the existence of a single master cluster for each index. If updates could be applied to any cluster, I data could be lost when a snapshot from another cluster is restored. Regards, Christian On Sunday, 25 January 2015 13:15:45 UTC, Jörg Prante wrote: > > The IndexShardSnapshotAndRestoreService is from the snapshot/restore > feature. It allows to push snapshots to a shared file storage, and the > restore allows to retrieve snapshots and place them into the current > cluster. By snapshot/restore, an "off-line" synchronization utility already > exists. > > The idea is now > > - let the master supervise a queue of inter-cluster snapshot/restore > operations. This could be stored in the cluster state as custom data. > > - instead of using a shared file interim storage, the snapshots are > directly pushed to another cluster (with the help of tribe node mechanism). > It is not the master doing the work, but one or more tribe node workers > that consume the queue operations. > > - on the target cluster, the workers consume "receive" operations and > write the snapshots into the running cluster > > By using RxJava style API for the cluster-to-cluster communication (I can > only imagine a modified tribe node here), the replication process could be > made more adaptive. For example, the inter-cluster snapshot/restore queue > could be throttled when the target cluster is becoming slow. > > If tribe node is not suitable, a transport client mechanism could also do > the job. > > Jörg > > > > > > > > On Fri, Jan 23, 2015 at 11:56 PM, Todd Nine <[email protected] > <javascript:>> wrote: > >> Thanks for the suggestion on the tribe nodes. I'll take a look >> at org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService >> more in depth. A reference implementation would be helpful in >> understanding it's usage, do you happen to know of any projects that use >> it? >> >> From an architecture perspective, I'm concerned with having the cluster >> master initiate any replication operations aside from replaying index >> modifications. As we continue to increase our cluster size, I'm worried it >> may become too much load on the master to keep up. Our system is getting >> larger every day, we have 12 c3.4xl instances in each region currently. >> Our client to ES is a multi-tennant system ( >> http://usergrid.incubator.apache.org/), so each application created in >> the system will get it's own indexes in ES. This allows us to scale the >> indexes using read/write aliases per each application's usage. >> >> >> To take a step back even further, is there a way we can use something >> existing in ES to perform this work, possibly with routing rules etc? My >> primary concern is that we don't have to query across regions, can recover >> from a region or network outage, and that replication can begin once >> communication between regions is restored. I seem to be venturing into >> uncharted territory here by thinking I need to create a plugin, and I doubt >> I'm the first user to encounter such a problem. If there are any other >> known solutions, that would be great. I just need our replication time to >> be every N seconds. >> >> Thanks again! >> Todd >> >> On Friday, January 23, 2015 at 2:43:08 PM UTC-7, Jörg Prante wrote: >>> >>> This looks promising. >>> >>> For admin operations, see also the tribe node. A special >>> "replication-aware tribe node" (or maybe more than one tribe node for >>> resiliency) could supervise the cluster-to-cluster replication. >>> >>> For the segment strategy, I think it is hard to go down to the level of >>> the index store and capture the files properly and put it over the wire to >>> a target. It should be better to replicate on shard level. Maybe by reusing >>> some of the code of org.elasticsearch.index.snapshots. >>> IndexShardSnapshotAndRestoreService so that a tribe node can trigger a >>> snapshot action on the source cluster master, open a transactional >>> connection from a node in the source cluster to a node in the target >>> cluster, and place a restore action on a queue on the target cluster >>> master, plus a rollback logic if shard transaction fails. So in short, the >>> ES cluster to cluster replication process could be realized by a "primary >>> shard replication protocol". >>> >>> Just my 2¢ >>> >>> Jörg >>> >>> >>> On Fri, Jan 23, 2015 at 7:42 PM, Todd Nine <[email protected]> wrote: >>> >>>> Thanks for the pointers Jorg, >>>> We use Rx Java in our current application, so I'm familiar with >>>> backpressure and ensuring we don't overwhelm target systems. I've been >>>> mulling over the high level design a bit more. A common approach in all >>>> systems that perform multi region replication is the concept of "log >>>> shipping". It's used heavily in SQL systems for replication, as well as >>>> in >>>> systems such as Megastore/HBase. This seems like it would be the most >>>> efficient way to ship data from Region A to Region B with a reasonable >>>> amount of latency. I was thinking something like the following. >>>> >>>> *Admin Operation Replication* >>>> >>>> This can get messy quickly. I'm thinking I won't have any sort of >>>> "merge" logic since this can get very different for everyone's use case. >>>> I >>>> was going to support broadcasting the following operations. >>>> >>>> >>>> - Index creation >>>> - Index deletion >>>> - Index mapping updates >>>> - Alias index addition >>>> - Alias index removal >>>> >>>> This can also get tricky because it makes the assumption of unique >>>> index operations in each region. Our indexes are Time UUID based, so I >>>> know we won't get conflicts. I won't handle the case of an operation >>>> being >>>> replayed that conflicts with an existing index, I'll simply log it and >>>> drop >>>> it. Handlers could be built in later so users could create their own >>>> resolution logic. Also, this must be replayed in a very strict order. >>>> I'm >>>> concerned that adding this additional master/master region communication >>>> could result in more load on the master. This can be solved by running a >>>> dedicated master, but I don't really see any other solution. >>>> >>>> >>>> *Data Replication* >>>> >>>> 1) Store last sent segments, probably in a system index. Each region >>>> could be offline at different times, so for each segment I'll need to know >>>> where it's been sent. >>>> >>>> 2) Monitor segments as they're created. I still need to figure this >>>> out a bit more in the context of latent sending. >>>> >>>> Example. Region us-east-1 ES nodes. >>>> >>>> We missed sending 5 segments to us-west-1 , and they were merged into >>>> 1. I now only need to send the 1 merged segment to us-west-1, since the >>>> other 5 segments will be removed. >>>> >>>> However, then a merged segment is created in us-east-1 from 5 segments >>>> I've already sent to us-west-1, I won't want to ship that since it will >>>> already contain the data. As the tree is continually merged, I'll need to >>>> somehow sort out what contains shipped data, and what contains unshipped >>>> data. >>>> >>>> >>>> 3) As a new segment is created perform the following. >>>> 3.a) Replay any administrative operations since the last sync on the >>>> index to the target region, so the state is current. >>>> 3.b) Push the segment to the target region >>>> >>>> 4) The region receives the segment, and adds it to it's current >>>> segments. When a segment merge happens in the receiving region, this will >>>> get merged in. >>>> >>>> >>>> >>>> >>>> >>>> Thoughts? >>>> >>>> >>>> >>>> >>>> On Thursday, January 15, 2015 at 5:29:10 PM UTC-7, Jörg Prante wrote: >>>>> >>>>> While it seems quite easy to attach listeners to an ES node to capture >>>>> operations in translog-style and push out index/delete operations on >>>>> shard >>>>> level somehow, there will be more to consider for a reliable solution. >>>>> >>>>> The Couchbase developers have added a data replication protocol to >>>>> their product which is meant for transporting changes over long distances >>>>> with latency for in-memory processing. >>>>> >>>>> To learn about the most important features, see >>>>> >>>>> https://github.com/couchbaselabs/dcp-documentation >>>>> >>>>> and >>>>> >>>>> http://docs.couchbase.com/admin/admin/Concepts/dcp.html >>>>> >>>>> I think bringing such a concept of an inter cluster protocol into ES >>>>> could be a good starting point, to sketch the complete path for such an >>>>> ambitious project beforehand. >>>>> >>>>> Most challenging could be dealing with back pressure when receiving >>>>> nodes/clusters are becoming slow. For a solution to this, reactive Java / >>>>> reactive streams look like a viable possibility. >>>>> >>>>> See also >>>>> >>>>> https://github.com/ReactiveX/RxJava/wiki/Backpressure >>>>> >>>>> http://www.ratpack.io/manual/current/streams.html >>>>> >>>>> I'm in favor of Ratpack since it comes with Java 8, Groovy, Google >>>>> Guava, and Netty, which has a resemblance to ES. >>>>> >>>>> In ES, for inter cluster communication, there is not much coded afaik, >>>>> except snapshot/restore. Maybe snapshot/restore can provide everything >>>>> you >>>>> want, with incremental mode. Lucene will offer numbered segment files for >>>>> faster incremental snapshot/restore. >>>>> >>>>> Just my 2¢ >>>>> >>>>> Jörg >>>>> >>>>> >>>>> >>>>> On Thu, Jan 15, 2015 at 7:00 PM, Todd Nine <[email protected]> wrote: >>>>> >>>>>> Hey all, >>>>>> I would like to create a plugin, and I need a hand. Below are the >>>>>> requirements I have. >>>>>> >>>>>> >>>>>> - Our documents are immutable. They are only ever created or >>>>>> deleted, updates do not apply. >>>>>> - We want mirrors of our ES cluster in multiple AWS regions. >>>>>> This way if the WAN between regions is severed for any reason, we do >>>>>> not >>>>>> suffer an outage, just a delay in consistency. >>>>>> - As documents are added or removed they are rolled up then >>>>>> shipped in batch to the other AWS Regions. This can be a fast as a >>>>>> few >>>>>> milliseconds, or as slow as minutes, and will be user configurable. >>>>>> Note >>>>>> that a full backup+load is too slow, this is more of a near realtime >>>>>> operation. >>>>>> - This will sync the following operations. >>>>>> - Index creation/deletion >>>>>> - Alias creation/deletion >>>>>> - Document creation/deletion >>>>>> >>>>>> >>>>>> What I'm thinking architecturally. >>>>>> >>>>>> >>>>>> - The plugin is installed on each node in our cluster in all >>>>>> regions >>>>>> - The plugin will only gather changes for the primary shards on >>>>>> the local node >>>>>> - After the timeout elapses, the plugin will ship the changelog >>>>>> to the other AWS regions, where the plugin will receive it and >>>>>> process it >>>>>> >>>>>> >>>>>> Are there any api's I can look at that are a good starting point for >>>>>> developing this? I'd like to do a simple prototype with 2 1 node >>>>>> clusters >>>>>> reasonably soon. I found several plugin tutorials, but I'm more >>>>>> concerned >>>>>> with what part of the ES api I can call to receive events, if any. >>>>>> >>>>>> Thanks, >>>>>> Todd >>>>>> >>>>>> -- >>>>>> You received this message because you are subscribed to the Google >>>>>> Groups "elasticsearch" group. >>>>>> To unsubscribe from this group and stop receiving emails from it, >>>>>> send an email to [email protected]. >>>>>> To view this discussion on the web visit https://groups.google.com/d/ >>>>>> msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40goo >>>>>> glegroups.com >>>>>> <https://groups.google.com/d/msgid/elasticsearch/dff53da5-8a0c-4805-8f97-72844019a79e%40googlegroups.com?utm_medium=email&utm_source=footer> >>>>>> . >>>>>> For more options, visit https://groups.google.com/d/optout. >>>>>> >>>>> >>>>> -- >>>> You received this message because you are subscribed to the Google >>>> Groups "elasticsearch" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to [email protected]. >>>> To view this discussion on the web visit https://groups.google.com/d/ >>>> msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147% >>>> 40googlegroups.com >>>> <https://groups.google.com/d/msgid/elasticsearch/d6a50c70-d092-4455-bfd1-2dd6acb45147%40googlegroups.com?utm_medium=email&utm_source=footer> >>>> . >>>> >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>> >>> -- >> You received this message because you are subscribed to the Google Groups >> "elasticsearch" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <javascript:>. >> To view this discussion on the web visit >> https://groups.google.com/d/msgid/elasticsearch/d9a76640-81f5-4569-8415-8936b3a33e33%40googlegroups.com >> >> <https://groups.google.com/d/msgid/elasticsearch/d9a76640-81f5-4569-8415-8936b3a33e33%40googlegroups.com?utm_medium=email&utm_source=footer> >> . >> >> For more options, visit https://groups.google.com/d/optout. >> > > -- You received this message because you are subscribed to the Google Groups "elasticsearch" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/46e162d2-1a73-4d9d-ac9b-2063f96bedce%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
