You should use the org.elasticsearch.action.bulk.BulkProcessor helper class for concurrent bulk indexing.
Jörg On Tue, Jun 24, 2014 at 5:34 PM, Frederic Esnault < [email protected]> wrote: > Hi again, > > any idea about how to parallelize the bulk insert process ? > I tried creating 4 BulkInserters extending RecursiveAction and executed > them all, but the result is awful, 3 of them finished very slowly, and one > did not finish (don't know why), and got only 70K docs in ES instead of 265 > 000... > > The result of downsizing the batches sizes to 10 000 is not really big, > total process took approx. 1 second less (Actually this is much lower than > in the previous post, because i moved the importing UI to my server, close > to one of ES nodes). Was more than 29 seconds, now 28. > 28 seconds. > > > *Import CSV file took 28.069 secondes* > > Here is the insertion code. The Iterator is a CSV reading iterator who > parses lines and returns Record instances (object with generic object > values, indexed as string). MAX_RECORDS is my batch size, set to 10 000. > > public void insert(Iterator<Record> recordsIterator) { > while (recordsIterator.hasNext()) { > batchInsert(recordsIterator, MAX_RECORDS); > } > } > > private void batchInsert(Iterator<Record> recordsIterator, int limit) { > BulkRequestBuilder bulkRequest = client.prepareBulk(); > int processed = 0; > try { > logger.log(Level.INFO, "Adding records to bulk insert batch"); > while (recordsIterator.hasNext() && processed < limit) { > processed++; > Record record = recordsIterator.next(); > IndexRequestBuilder builder = > client.prepareIndex(datasetName, RECORD); > XContentBuilder data = jsonBuilder(); > data.startObject(); > for (ColumnMetadata column : > dataset.getMetadata().getColumns()) { > Object value = > record.getCell(column.getName()).getValue(); > if (value == null || (value instanceof String && > value.equals("NULL"))) { > value = null; > } > data.field(column.getNormalizedName(), value); > } > data.endObject(); > builder.setSource(data); > bulkRequest.add(builder); > } > logger.log(Level.INFO, "Added "+ bulkRequest.numberOfActions() > +" records to bulk insert batch. Inserting batch..."); > long current = System.currentTimeMillis(); > BulkResponse bulkResponse = > bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet(); > if (bulkResponse.hasFailures()) { > logger.log(Level.SEVERE, "Could not index : " + > bulkResponse.buildFailureMessage()); > } > System.out > .println(String.format("Bulk insert took %s secondes", > NumberUtils > .formatSeconds(((double) > (System.currentTimeMillis() - current)) / 1000.0))); > } catch (Exception e) { > e.printStackTrace(); > } > } > > Le mardi 24 juin 2014 13:44:03 UTC+2, Frederic Esnault a écrit : > >> Thanks for all this. >> >> I changed my conf, removed all the thread pool config, reduced refresh >> time to 5s according to Michael advice, and limited my batch to 10 000. >> I'll see how it works then i'll paralellize the bulk insert. >> I'll tell you how it ends up. >> >> Thanks again ! >> >> Le lundi 23 juin 2014 12:56:14 UTC+2, Jörg Prante a écrit : >>> >>> Your bulk insert size is too large. It makes no sense to insert 100.000 >>> with one request. Use 1000-10000 instead. >>> >>> Also you should submit bulk requests in parallel and not sequential like >>> you do. Sequential bulk is slow if client CPU/network is not saturated. >>> >>> Check if you have disabled the index refresh from 1 (1s) to -1 while >>> bulk indexing is active. 30s makes not much sense if you can execute the >>> bulk in this time. >>> >>> Do not limit indexing memory to 50%. >>> >>> It makes no sense to increase queue_size for bulk thread pool to 1000. >>> This means you want a single ES node should accept 1000 x 100000 = 100 000 >>> 000 = 100m docs at once. This will simply exceeds all reasonable limits and >>> bring the node down with an OOM (if you really have 100m docs). >>> >>> More advice is possible if you can show your client code how you push >>> docs to ES. >>> >>> Jörg >>> >>> >>> >>> On Mon, Jun 23, 2014 at 12:30 PM, Frederic Esnault < >>> [email protected]> wrote: >>> >>>> Hi everyone, >>>> >>>> I'm inserting around 265 000 documents into an elastic search cluster >>>> composed of 3 nodes (real servers). >>>> On two servers i give elastic search 20g of heap, on third one which >>>> has 64g ram, i set 30g of heap for elastic search. >>>> >>>> I set elastic search configuration to : >>>> >>>> - 3 shards (1 per server) >>>> - 0 replicas >>>> - discovery.zen.ping.multicast.enabled: false (and giving on each node >>>> the unicast hostnames of the two other nodes); >>>> - and this : >>>> >>>> indices.memory.index_buffer_size: 50% >>>> index.refresh_interval: 30s >>>> threadpool: >>>> index: >>>> type: fixed >>>> size: 30 >>>> queue_size: 1000 >>>> bulk: >>>> queue_size: 1000 >>>> bulk: >>>> type: fixed >>>> size: 30 >>>> queue_size: 1000 >>>> search: >>>> type: fixed >>>> size: 100 >>>> queue_size: 200 >>>> get: >>>> type: fixed >>>> size: 100 >>>> queue_size: 200 >>>> >>>> Indexing is done by groups of 100 000 docs, and here is my application >>>> log : >>>> INFO: Adding records to bulk insert batch >>>> INFO: Added 100000 records to bulk insert batch. Inserting batch... >>>> -- Bulk insert took 38.724 secondes >>>> INFO: Adding records to bulk insert batch >>>> INFO: Added 100000 records to bulk insert batch. Inserting batch... >>>> -- Bulk insert took 31.134 secondes >>>> INFO: Adding records to bulk insert batch >>>> INFO: Added 64201 records to bulk insert batch. Inserting batch... >>>> -- Bulk insert took 17.366 secondes >>>> >>>> --- Import CSV file took 108.905 secondes --- >>>> >>>> I'm wondering if this time is correct or not, or if there is something >>>> i can do to improve performances ? >>>> >>>> -- >>>> You received this message because you are subscribed to the Google >>>> Groups "elasticsearch" group. >>>> To unsubscribe from this group and stop receiving emails from it, send >>>> an email to [email protected]. >>>> To view this discussion on the web visit https://groups.google.com/d/ >>>> msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22% >>>> 40googlegroups.com >>>> <https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com?utm_medium=email&utm_source=footer> >>>> . >>>> For more options, visit https://groups.google.com/d/optout. >>>> >>> >>> -- > You received this message because you are subscribed to the Google Groups > "elasticsearch" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To view this discussion on the web visit > https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com > <https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com?utm_medium=email&utm_source=footer> > . > > For more options, visit https://groups.google.com/d/optout. > -- You received this message because you are subscribed to the Google Groups "elasticsearch" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/CAKdsXoGcdfyp9k-iSTUVkHuCd1WrxdRQYygO4b6mG4PdVb-zHA%40mail.gmail.com. For more options, visit https://groups.google.com/d/optout.
