Steven Gaunt created BEAM-10990:
-----------------------------------
Summary: Elasticsearch IO Infinite loop with write Error when the
pipeline job streaming mode
Key: BEAM-10990
URL: https://issues.apache.org/jira/browse/BEAM-10990
Project: Beam
Issue Type: Improvement
Components: io-java-elasticsearch
Affects Versions: 2.24.0
Reporter: Steven Gaunt
Fix For: 2.26.0
When streaming messages from PubsubIO , the pipeline is in Streaming mode.
If for some reason the ElasticSearchIO.Write() has an response from the
ElasticSearch index api, the writefn will throw a IOException. Since this
excetpion is part of the Write transform, it becomes an unhandled error. This
will then inheritly cause behaviour from the job pipeline to infinitely retry
that error..
_*snippet form beam website*_
The Dataflow service retries failed tasks up to 4 times in batch mode, and an
unlimited number of times in streaming mode. In batch mode, your job will fail;
in streaming, it may stall indefinitely.
This is the ElasticSearchIO.write transform .
{code:java}
public PDone expand(PCollection<String> input) {
ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
this.getConnectionConfiguration();
Preconditions.checkState(connectionConfiguration != null,
"withConnectionConfiguration() is required");
* input.apply(ParDo.of(new ElasticsearchIO.Write.WriteFn(this)));*
return PDone.in(input.getPipeline());
}
{code}
The pardo function (WriteFn) finishBundle step will call
ElasticsearchIO.checkForErrors helper method which will throw exception if the
http response from elasticsearch has error in the json reponse.
{code:java}
// Some comments here
public void finishBundle(DoFn<String, Void>.FinishBundleContext context)
throws IOException, InterruptedException {
this.flushBatch();
}
private void flushBatch() throws IOException, InterruptedException {
if (!this.batch.isEmpty()) {
StringBuilder bulkRequest = new StringBuilder();
Iterator var2 = this.batch.iterator();
while(var2.hasNext()) {
String json = (String)var2.next();
bulkRequest.append(json);
}
this.batch.clear();
this.currentBatchSizeBytes = 0L;
String endPoint = String.format("/%s/%s/_bulk",
this.spec.getConnectionConfiguration().getIndex(),
this.spec.getConnectionConfiguration().getType());
HttpEntity requestBody = new
NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
Request request = new Request("POST", endPoint);
request.addParameters(Collections.emptyMap());
request.setEntity(requestBody);
Response response = this.restClient.performRequest(request);
HttpEntity responseEntity = new
BufferedHttpEntity(response.getEntity());
if (this.spec.getRetryConfiguration() != null &&
this.spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
responseEntity = this.handleRetry("POST", endPoint,
Collections.emptyMap(), requestBody);
}
ElasticsearchIO.checkForErrors((HttpEntity)responseEntity,
this.backendVersion, this.spec.getUsePartialUpdate());
}
}
static void checkForErrors(HttpEntity responseEntity, int backendVersion,
boolean partialUpdate) throws IOException {
JsonNode searchResult = parseResponse(responseEntity);
boolean errors = searchResult.path("errors").asBoolean();
if (errors) {
StringBuilder errorMessages = new StringBuilder("Error writing to
Elasticsearch, some elements could not be inserted:");
JsonNode items = searchResult.path("items");
Iterator var7 = items.iterator();
while(var7.hasNext()) {
JsonNode item = (JsonNode)var7.next();
String errorRootName = "";
if (partialUpdate) {
errorRootName = "update";
} else if (backendVersion == 2) {
errorRootName = "create";
} else if (backendVersion >= 5) {
errorRootName = "index";
}
JsonNode errorRoot = item.path(errorRootName);
JsonNode error = errorRoot.get("error");
if (error != null) {
String type = error.path("type").asText();
String reason = error.path("reason").asText();
String docId = errorRoot.path("_id").asText();
errorMessages.append(String.format("%nDocument id %s: %s
(%s)", docId, reason, type));
JsonNode causedBy = error.get("caused_by");
if (causedBy != null) {
String cbReason = causedBy.path("reason").asText();
String cbType = causedBy.path("type").asText();
errorMessages.append(String.format("%nCaused by: %s
(%s)", cbReason, cbType));
}
}
}
throw new IOException(errorMessages.toString());
}
}
{code}
As a possible suggestion, rather than throw the exception, could it be possible
to write the exception to an errorhandling tupeltag which then can handled to a
deadletter queue ?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)