Fred k created BEAM-6052:
----------------------------

             Summary: elasticsearchIO checkForErrors method bug
                 Key: BEAM-6052
                 URL: https://issues.apache.org/jira/browse/BEAM-6052
             Project: Beam
          Issue Type: Bug
          Components: io-java-elasticsearch
         Environment: beam-sdk-java-io-elasticsearch-2.8.0
            Reporter: Fred k
            Assignee: Etienne Chauchot


When i use Write to write update bulk request to elasticsearch, it appear the 
exception below:
{code:java}
Caused by: java.io.IOException: Error writing to Elasticsearch, some elements 
could not be inserted:
    at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215)
    at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235)
    at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199)
{code}
I check the method of checkForErrors, found out that can not parse the response 
include update contents. So i add the logic for parse update, i can see the 
output like below:
{code:java}
Caused by: java.io.IOException: Error writing to Elasticsearch, some elements 
could not be inserted:
Document id 1465285334751e039cc4883a8a270191: rejected execution of 
org.elasticsearch.transport.TransportService$7@6c8edc37 on 
EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, 
pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 
10324166]] (es_rejected_execution_exception)
Document id e2722c653c65a4cb119e9b8dc44e37ad: rejected execution of 
org.elasticsearch.transport.TransportService$7@6c8edc37 on 
EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, 
pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 
10324166]] (es_rejected_execution_exception)
Document id b25472e3665695c49861f6eceee5531a: rejected execution of 
org.elasticsearch.transport.TransportService$7@6c8edc37 on 
EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, 
pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 
10324166]] (es_rejected_execution_exception)
Document id 022c1accdae82f6fe4108ba7989732fc: rejected execution of 
org.elasticsearch.transport.TransportService$7@6c8edc37 on 
EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, 
pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 
10324166]] (es_rejected_execution_exception)
    at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215)
    at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235)
    at 
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199)
{code}
the reponse content is like below:

{
    "took": 293,
    "errors": true,
    "items": [
    {
        "update": {
            "_index": "test_kevin_2018-11",
            "_type": "kevin",
            "_id": "8d7664286c0887c637229166c7c613bc",
            "_version": 1,
            "result": "noop",
            "_shards": {
                "total": 1,
                "successful": 1,
                "failed": 0
            },
            "status": 200
        }
    },                
    {
        "update": {
            "_index": "test_kevin_2018-11",
            "_type": "kevin",
            "_id": "49952be98f4fc160f56bcdb33b1dbf7e",
            "status": 429,
            "error": {
                "type": "es_rejected_execution_exception",
                "reason": "rejected execution of 
org.elasticsearch.transport.TransportService$7@3f70bbe7 on 
EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, 
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, 
pool size = 40, active threads = 40, queued tasks = 200, completed tasks = 
10034174]]"
            }
        }
    }
}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to