[
https://issues.apache.org/jira/browse/METRON-326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15536016#comment-15536016
]
ASF GitHub Bot commented on METRON-326:
---------------------------------------
Github user cestella commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/286#discussion_r81335456
--- Diff:
metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
---
@@ -166,14 +166,37 @@ public void write(String sensorType,
WriterConfiguration configurations, Iterabl
}
- BulkResponse resp = bulkRequest.execute().actionGet();
-
- if (resp.hasFailures()) {
+ BulkResponse bulkResponse = bulkRequest.execute().actionGet();
- throw new Exception(resp.buildFailureMessage());
+ return buildWriteReponse(tuples, bulkResponse);
+ }
+ protected BulkWriterResponse buildWriteReponse(Iterable<Tuple> tuples,
BulkResponse bulkResponse) throws Exception {
+ // Elasticsearch responses are in the same order as the request,
giving us an implicit mapping with Tuples
+ BulkWriterResponse writerResponse = new BulkWriterResponse();
+ if (bulkResponse.hasFailures()) {
+ Iterator<BulkItemResponse> respIter = bulkResponse.iterator();
+ Iterator<Tuple> tupleIter = tuples.iterator();
+ while (respIter.hasNext() && tupleIter.hasNext()) {
+ BulkItemResponse item = respIter.next();
+ Tuple tuple = tupleIter.next();
+
+ if (item.isFailed()) {
+ writerResponse.addError(item.getFailure().getCause(), tuple);
+ } else {
+ writerResponse.addSuccess(tuple);
+ }
+
+ // Should never happen, so fail the entire batch if it does.
+ if (respIter.hasNext() != tupleIter.hasNext()) {
+ throw new Exception(bulkResponse.buildFailureMessage());
--- End diff --
Perhaps a named exception? `IllegalStateException`?
> Error Handling in ElasticsearchWriter
> -------------------------------------
>
> Key: METRON-326
> URL: https://issues.apache.org/jira/browse/METRON-326
> Project: Metron
> Issue Type: Bug
> Reporter: Ajay Yadav
> Assignee: Justin Leet
>
> In Elasticsearch writer we raise a exception if BulkResponse object has
> failures and that results in failing the whole batch even if some objects
> failed in it. This has spiral effect specially when there is continuous
> stream of bad messages and errorStream is tied to indexingBolt.
> If possible we should iterate through items in BulkResponse object and send
> only failed messages to errorStream.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)