You should use the org.elasticsearch.action.bulk.BulkProcessor helper class
for concurrent bulk indexing.

Jörg


On Tue, Jun 24, 2014 at 5:34 PM, Frederic Esnault <
[email protected]> wrote:

> Hi again,
>
> any idea about how to parallelize the bulk insert process ?
> I tried creating 4 BulkInserters extending RecursiveAction and executed
> them all, but the result is awful, 3 of them finished very slowly, and one
> did not finish (don't know why), and got only 70K docs in ES instead of 265
> 000...
>
> The result of downsizing the batches sizes to 10 000 is not really big,
> total process took approx. 1 second less (Actually this is much lower than
> in the previous post, because i moved the importing UI to  my server, close
> to one of ES nodes). Was more than 29 seconds, now 28.
> 28 seconds.
>
>
> *Import CSV file took 28.069 secondes*
>
> Here is the insertion code. The Iterator is a CSV reading iterator who
> parses lines and returns Record instances (object with generic object
> values, indexed as string). MAX_RECORDS is my batch size,  set to 10 000.
>
>     public void insert(Iterator<Record> recordsIterator) {
>         while (recordsIterator.hasNext()) {
>             batchInsert(recordsIterator, MAX_RECORDS);
>         }
>     }
>
>     private void batchInsert(Iterator<Record> recordsIterator, int limit) {
>         BulkRequestBuilder bulkRequest = client.prepareBulk();
>         int processed = 0;
>         try {
>             logger.log(Level.INFO, "Adding records to bulk insert batch");
>             while (recordsIterator.hasNext() && processed < limit) {
>                 processed++;
>                 Record record = recordsIterator.next();
>                 IndexRequestBuilder builder =
> client.prepareIndex(datasetName, RECORD);
>                 XContentBuilder data = jsonBuilder();
>                 data.startObject();
>                 for (ColumnMetadata column :
> dataset.getMetadata().getColumns()) {
>                     Object value =
> record.getCell(column.getName()).getValue();
>                     if (value == null || (value instanceof String &&
> value.equals("NULL"))) {
>                         value = null;
>                     }
>                     data.field(column.getNormalizedName(), value);
>                 }
>                 data.endObject();
>                 builder.setSource(data);
>                 bulkRequest.add(builder);
>             }
>             logger.log(Level.INFO, "Added "+ bulkRequest.numberOfActions()
> +" records to bulk insert batch. Inserting batch...");
>             long current = System.currentTimeMillis();
>             BulkResponse bulkResponse =
> bulkRequest.setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet();
>             if (bulkResponse.hasFailures()) {
>                 logger.log(Level.SEVERE, "Could not index : " +
> bulkResponse.buildFailureMessage());
>             }
>             System.out
>                     .println(String.format("Bulk insert took %s secondes",
> NumberUtils
>                             .formatSeconds(((double)
> (System.currentTimeMillis() - current)) / 1000.0)));
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
>
> Le mardi 24 juin 2014 13:44:03 UTC+2, Frederic Esnault a écrit :
>
>> Thanks for all this.
>>
>> I changed my conf, removed all the thread pool config, reduced refresh
>> time to 5s according to Michael advice, and limited my batch to 10 000.
>> I'll see how it works then i'll paralellize the bulk insert.
>> I'll tell you how it ends up.
>>
>> Thanks again !
>>
>> Le lundi 23 juin 2014 12:56:14 UTC+2, Jörg Prante a écrit :
>>>
>>> Your bulk insert size is too large. It makes no sense to insert 100.000
>>> with one request. Use 1000-10000 instead.
>>>
>>> Also you should submit bulk requests in parallel and not sequential like
>>> you do. Sequential bulk is slow if client CPU/network is not saturated.
>>>
>>> Check if you have disabled the index refresh from 1 (1s) to -1 while
>>> bulk indexing is active. 30s makes not much sense if you can execute the
>>> bulk in this time.
>>>
>>> Do not limit indexing memory to 50%.
>>>
>>> It makes no sense to increase queue_size for bulk thread pool to 1000.
>>> This means you want a single ES node should accept 1000 x 100000 = 100 000
>>> 000 = 100m docs at once. This will simply exceeds all reasonable limits and
>>> bring the node down with an OOM (if you really have 100m docs).
>>>
>>> More advice is possible if you can show your client code how you push
>>> docs to ES.
>>>
>>> Jörg
>>>
>>>
>>>
>>> On Mon, Jun 23, 2014 at 12:30 PM, Frederic Esnault <
>>> [email protected]> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I'm inserting around 265 000 documents into an elastic search cluster
>>>> composed of 3 nodes (real servers).
>>>> On two servers i give elastic search 20g of heap, on third one which
>>>> has 64g ram, i set 30g of heap for elastic search.
>>>>
>>>> I set elastic search configuration to :
>>>>
>>>> - 3 shards (1 per server)
>>>> - 0 replicas
>>>> - discovery.zen.ping.multicast.enabled: false (and giving on each node
>>>> the unicast hostnames of the two other nodes);
>>>> - and this :
>>>>
>>>> indices.memory.index_buffer_size: 50%
>>>> index.refresh_interval: 30s
>>>> threadpool:
>>>>   index:
>>>>     type: fixed
>>>>     size: 30
>>>>     queue_size: 1000
>>>>   bulk:
>>>>     queue_size: 1000
>>>>   bulk:
>>>>     type: fixed
>>>>     size: 30
>>>>     queue_size: 1000
>>>>   search:
>>>>     type: fixed
>>>>     size: 100
>>>>     queue_size: 200
>>>>   get:
>>>>     type: fixed
>>>>     size: 100
>>>>     queue_size: 200
>>>>
>>>> Indexing is done by groups of 100 000 docs, and here is my application
>>>> log :
>>>> INFO: Adding records to bulk insert batch
>>>> INFO: Added 100000 records to bulk insert batch. Inserting batch...
>>>> -- Bulk insert took 38.724 secondes
>>>> INFO: Adding records to bulk insert batch
>>>> INFO: Added 100000 records to bulk insert batch. Inserting batch...
>>>> -- Bulk insert took 31.134 secondes
>>>> INFO: Adding records to bulk insert batch
>>>> INFO: Added 64201 records to bulk insert batch. Inserting batch...
>>>> -- Bulk insert took 17.366 secondes
>>>>
>>>> --- Import CSV file took 108.905 secondes ---
>>>>
>>>> I'm wondering if this time is correct or not, or if there is something
>>>> i can do to improve performances ?
>>>>
>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "elasticsearch" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> To view this discussion on the web visit https://groups.google.com/d/
>>>> msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%
>>>> 40googlegroups.com
>>>> <https://groups.google.com/d/msgid/elasticsearch/3a38e79e-9afb-4146-a7e1-7984ec082e22%40googlegroups.com?utm_medium=email&utm_source=footer>
>>>> .
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>  --
> You received this message because you are subscribed to the Google Groups
> "elasticsearch" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com
> <https://groups.google.com/d/msgid/elasticsearch/32280fd5-5879-4424-882d-5b4e7674557a%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
>
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elasticsearch/CAKdsXoGcdfyp9k-iSTUVkHuCd1WrxdRQYygO4b6mG4PdVb-zHA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to