I'm not sure I understand this - "*write request can be send to any node, which in turn will do proxying, we can avoid this and only hit the primaries. This avoids the proxying, rerouting*".
Even if you hit a "primary", ES will still have to re-route the document to "the primary shard handling the hash of the doc" which could very well be on a different node. Isn't this true? Like any other distributed system. So, how do you save that extra hop as you claim "*avoids the proxying, rerouting*". What you described seems to directly conflict what is described here unless I'm mistaken: - http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/docs-index_.html#index-routing - http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/distrib-write.html Regards. On Sunday, May 4, 2014 11:19:30 AM UTC-7, Costin Leau wrote: > > n 1. Performance reasons > While a write request can be send to any node, which in turn will do > proxying, we can avoid this and only hit the primaries. This avoids the > proxying, rerouting. Note that each task that is writing is assigned a > different primary in a round-robin fashion - so effectively the write > happens in parallel across the primaries for the target index. > > 2. What exactly are you trying to achieve? A map-reduce job ends up with > multiple tasks hitting an ES cluster; if you pick only one node, you're > likely to overwhelm it while the rest of the cluster will be idle. If you > spread the load randomly, the nodes will re-route the calls the primaries > first and then the replicas (depending on your settings). > By talking directly to the primaries, you reduce unnecessary IO and CPU > (caused by the proxying and hashing) and you get better through-put and > negotiation between ES and Hadoop. If you consider again an imbalanced > scenario: each node that you add for proxying can become a liability: maybe > the node is busy with other activities, maybe it goes off-line, etc... > > Note that es-hadoop does retries for both rejections (ES is busy) and > network failures (whether ephemeral or permanent, by switching to a > different node). > > As for computing the hash for each document and guessing what exact > primary it will hit, there are certain challenges with that: > a. the client has to replicate the logic in ES (which can change across > versions, settings, etc..) > b. each hadoop task that writes ends up making multiple connections across > ES primaries. While this _might_ work for small indices with a small number > of shards, for anything else this approach will be inefficient. You will > end up with significantly more connections that can (and will) fail. > c. the bulk itself will be divided into smaller bulks that reduce their > efficiency especially since the load itself it's not consistent. > > es-hadoop is a client optimized for Hadoop environments. Emphasis > 'client'. it does not and should not - try to outsmart ES. We strive for > excellent performance but without sacrificing reliability. Once data is > inside ES, it gets all its goodies, from replication, sharding, etc... - > > If you're worried about performance, which we always try to improve, I'd > be happy to look into using some concrete benchmarks. > > Hope this helps, > > On Sun, May 4, 2014 at 8:41 PM, Ashwin Jayaprakash > <[email protected]<javascript:> > > wrote: > >> Hi, I have 2 related questions regarding routing write requests. Thanks >> in advance for answering! >> >> *Question 1:* >> I saw this line in the EsOutputFormat class and I was wondering why: >> ( >> https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java#L221 >> ) >> ( >> https://github.com/elasticsearch/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java#L239 >> ) >> >> >> Map<Shard, Node> targetShards = >> repository.getWriteTargetPrimaryShards(); >> >> repository.close(); >> >> >> List<Shard> orderedShards = new >> ArrayList<Shard>(targetShards.keySet()); >> >> // make sure the order is strict >> Collections.sort(orderedShards); >> >> // if there's no task info, just pick a random bucket >> if (currentInstance <= 0) { >> >> currentInstance = new Random().nextInt(targetShards.size()) >> + 1; >> >> } >> int bucket = currentInstance % targetShards.size(); >> >> Shard chosenShard = orderedShards.get(bucket); >> >> Node targetNode = targetShards.get(chosenShard); >> >> >> // override the global settings to communicate directly with the >> target node >> >> settings.setHosts(targetNode.getIpAddress()).setPort(targetNode.getHttpPort()); >> >> repository = new RestRepository(settings); >> >> uri = SettingsUtils.nodes(settings).get(0); >> >> >> I was trying to understand why this is being done. My understanding of ES >> writes was that: >> >> - You can send a write request to any node in ES >> - That node will receive the request, use the document id, hash it >> and based on which node is supposed to be the primary >> - The write will be re-routed to the node meant to be the primary for >> that document id/hashcode >> - If this is the case (like in any data grid like Hazelcast, >> Coherence or NoSQL stores Hbase, Cassandra etc), why the special logic to >> find the primaries? >> - ( >> >> http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/distrib-write.html<http://www.google.com/url?q=http%3A%2F%2Fwww.elasticsearch.org%2Fguide%2Fen%2Felasticsearch%2Fguide%2Fcurrent%2Fdistrib-write.html&sa=D&sntz=1&usg=AFQjCNGf82A2ZbTAwRJodDFpSH9WrTGN0Q> >> ) >> - This primary may not even be the one that will handle the write >> for my document >> - Is this being done simply to load balance writes uniformly across >> the cluster? In that case why not do a simple round robin? >> - ( >> >> https://github.com/elasticsearch/elasticsearch-hadoop/edit/1.3/docs/src/reference/asciidoc/core/arch.adoc >> ) >> >> >> >> *Question 2:*Is there a way to hook into the routing logic of the >> cluster? >> >> - Instead of picking up documents in bulk, sending them to a node in >> ES >> - Which will then again resend the documents to the primaries for >> different documents >> - Is there a way to find out which node is the primary for a document >> - Collect all documents meant to go to a specific primary and send >> them directly to that node? >> - (Like Hazelcast or Coherence's Partition awareness: >> http://www.hazelcast.org/docs/3.0/manual/html-single/#DataAffinity) >> - The problem with this might be that if the primary dies, then >> the sender has to be aware of the topology change and then resend the >> data >> to the new primary >> - So, we're not talking simple clients but smart-client libraries. >> Perhaps I should be asking, does the Java client library already do this? >> (I didn't think so) >> - Overall, wouldn't this save a lot of data re-routing over the >> network? >> >> >> Thanks. >> >> >> -- >> You received this message because you are subscribed to the Google Groups >> "elasticsearch" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <javascript:>. >> To view this discussion on the web visit >> https://groups.google.com/d/msgid/elasticsearch/6996e823-0b41-42f8-b473-1afe0a8a4c1b%40googlegroups.com<https://groups.google.com/d/msgid/elasticsearch/6996e823-0b41-42f8-b473-1afe0a8a4c1b%40googlegroups.com?utm_medium=email&utm_source=footer> >> . >> For more options, visit https://groups.google.com/d/optout. >> > > -- You received this message because you are subscribed to the Google Groups "elasticsearch" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/680f38b0-b2c4-4e22-b3c2-46343882a988%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
