Steven Gaunt created BEAM-10990:
-----------------------------------

             Summary: Elasticsearch IO Infinite loop with write Error when the 
pipeline job streaming  mode
                 Key: BEAM-10990
                 URL: https://issues.apache.org/jira/browse/BEAM-10990
             Project: Beam
          Issue Type: Improvement
          Components: io-java-elasticsearch
    Affects Versions: 2.24.0
            Reporter: Steven Gaunt
             Fix For: 2.26.0


When streaming messages from PubsubIO , the pipeline is in Streaming mode.

If for some reason the ElasticSearchIO.Write() has an response from the 
ElasticSearch index api, the writefn will throw a IOException.  Since this 
excetpion is part of the Write transform, it becomes an unhandled error.  This 
will then inheritly cause behaviour from the job pipeline to infinitely retry 
that error.. 

_*snippet form beam website*_

The Dataflow service retries failed tasks up to 4 times in batch mode, and an 
unlimited number of times in streaming mode. In batch mode, your job will fail; 
in streaming, it may stall indefinitely.

 

This is the ElasticSearchIO.write transform .
{code:java}
  public PDone expand(PCollection<String> input) {
            ElasticsearchIO.ConnectionConfiguration connectionConfiguration = 
this.getConnectionConfiguration();
            Preconditions.checkState(connectionConfiguration != null, 
"withConnectionConfiguration() is required");
       *     input.apply(ParDo.of(new ElasticsearchIO.Write.WriteFn(this)));*
            return PDone.in(input.getPipeline());
        }
{code}
The pardo function (WriteFn) finishBundle step will call 
ElasticsearchIO.checkForErrors helper method which will throw exception if the 
http response from elasticsearch has error in the json reponse.
{code:java}
// Some comments here
 public void finishBundle(DoFn<String, Void>.FinishBundleContext context) 
throws IOException, InterruptedException {
                this.flushBatch();
            }

            private void flushBatch() throws IOException, InterruptedException {
                if (!this.batch.isEmpty()) {
                    StringBuilder bulkRequest = new StringBuilder();
                    Iterator var2 = this.batch.iterator();

                    while(var2.hasNext()) {
                        String json = (String)var2.next();
                        bulkRequest.append(json);
                    }

                    this.batch.clear();
                    this.currentBatchSizeBytes = 0L;
                    String endPoint = String.format("/%s/%s/_bulk", 
this.spec.getConnectionConfiguration().getIndex(), 
this.spec.getConnectionConfiguration().getType());
                    HttpEntity requestBody = new 
NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
                    Request request = new Request("POST", endPoint);
                    request.addParameters(Collections.emptyMap());
                    request.setEntity(requestBody);
                    Response response = this.restClient.performRequest(request);
                    HttpEntity responseEntity = new 
BufferedHttpEntity(response.getEntity());
                    if (this.spec.getRetryConfiguration() != null && 
this.spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
                        responseEntity = this.handleRetry("POST", endPoint, 
Collections.emptyMap(), requestBody);
                    }

                    ElasticsearchIO.checkForErrors((HttpEntity)responseEntity, 
this.backendVersion, this.spec.getUsePartialUpdate());
                }
            }

 static void checkForErrors(HttpEntity responseEntity, int backendVersion, 
boolean partialUpdate) throws IOException {
        JsonNode searchResult = parseResponse(responseEntity);
        boolean errors = searchResult.path("errors").asBoolean();
        if (errors) {
            StringBuilder errorMessages = new StringBuilder("Error writing to 
Elasticsearch, some elements could not be inserted:");
            JsonNode items = searchResult.path("items");
            Iterator var7 = items.iterator();

            while(var7.hasNext()) {
                JsonNode item = (JsonNode)var7.next();
                String errorRootName = "";
                if (partialUpdate) {
                    errorRootName = "update";
                } else if (backendVersion == 2) {
                    errorRootName = "create";
                } else if (backendVersion >= 5) {
                    errorRootName = "index";
                }

                JsonNode errorRoot = item.path(errorRootName);
                JsonNode error = errorRoot.get("error");
                if (error != null) {
                    String type = error.path("type").asText();
                    String reason = error.path("reason").asText();
                    String docId = errorRoot.path("_id").asText();
                    errorMessages.append(String.format("%nDocument id %s: %s 
(%s)", docId, reason, type));
                    JsonNode causedBy = error.get("caused_by");
                    if (causedBy != null) {
                        String cbReason = causedBy.path("reason").asText();
                        String cbType = causedBy.path("type").asText();
                        errorMessages.append(String.format("%nCaused by: %s 
(%s)", cbReason, cbType));
                    }
                }
            }

            throw new IOException(errorMessages.toString());
        }
    }
{code}
 

As a possible suggestion, rather than throw the exception, could it be possible 
to write the exception to an errorhandling tupeltag which then can handled to a 
deadletter queue ?

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to