Fred k created BEAM-6052:
----------------------------
Summary: elasticsearchIO checkForErrors method bug
Key: BEAM-6052
URL: https://issues.apache.org/jira/browse/BEAM-6052
Project: Beam
Issue Type: Bug
Components: io-java-elasticsearch
Environment: beam-sdk-java-io-elasticsearch-2.8.0
Reporter: Fred k
Assignee: Etienne Chauchot
When i use Write to write update bulk request to elasticsearch, it appear the
exception below:
{code:java}
Caused by: java.io.IOException: Error writing to Elasticsearch, some elements
could not be inserted:
at
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215)
at
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235)
at
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199)
{code}
I check the method of checkForErrors, found out that can not parse the response
include update contents. So i add the logic for parse update, i can see the
output like below:
{code:java}
Caused by: java.io.IOException: Error writing to Elasticsearch, some elements
could not be inserted:
Document id 1465285334751e039cc4883a8a270191: rejected execution of
org.elasticsearch.transport.TransportService$7@6c8edc37 on
EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200,
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running,
pool size = 40, active threads = 40, queued tasks = 198, completed tasks =
10324166]] (es_rejected_execution_exception)
Document id e2722c653c65a4cb119e9b8dc44e37ad: rejected execution of
org.elasticsearch.transport.TransportService$7@6c8edc37 on
EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200,
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running,
pool size = 40, active threads = 40, queued tasks = 198, completed tasks =
10324166]] (es_rejected_execution_exception)
Document id b25472e3665695c49861f6eceee5531a: rejected execution of
org.elasticsearch.transport.TransportService$7@6c8edc37 on
EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200,
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running,
pool size = 40, active threads = 40, queued tasks = 198, completed tasks =
10324166]] (es_rejected_execution_exception)
Document id 022c1accdae82f6fe4108ba7989732fc: rejected execution of
org.elasticsearch.transport.TransportService$7@6c8edc37 on
EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200,
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running,
pool size = 40, active threads = 40, queued tasks = 198, completed tasks =
10324166]] (es_rejected_execution_exception)
at
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215)
at
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235)
at
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199)
{code}
the reponse content is like below:
{
"took": 293,
"errors": true,
"items": [
{
"update": {
"_index": "test_kevin_2018-11",
"_type": "kevin",
"_id": "8d7664286c0887c637229166c7c613bc",
"_version": 1,
"result": "noop",
"_shards": {
"total": 1,
"successful": 1,
"failed": 0
},
"status": 200
}
},
{
"update": {
"_index": "test_kevin_2018-11",
"_type": "kevin",
"_id": "49952be98f4fc160f56bcdb33b1dbf7e",
"status": 429,
"error": {
"type": "es_rejected_execution_exception",
"reason": "rejected execution of
org.elasticsearch.transport.TransportService$7@3f70bbe7 on
EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200,
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running,
pool size = 40, active threads = 40, queued tasks = 200, completed tasks =
10034174]]"
}
}
}
}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)