Repository: beam Updated Branches: refs/heads/master 6b91eed7e -> 16b9d584c
[BEAM-3112] Improve error messages in ElasticsearchIO test utils Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38077564 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38077564 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38077564 Branch: refs/heads/master Commit: 38077564496f2f7c2accda42a6c0f45f542ac694 Parents: 6b91eed Author: Etienne Chauchot <[email protected]> Authored: Mon Oct 16 14:33:38 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Mon Oct 30 16:36:01 2017 +0100 ---------------------------------------------------------------------- .../elasticsearch/ElasticSearchIOTestUtils.java | 8 +-- .../sdk/io/elasticsearch/ElasticsearchIO.java | 70 +++++++++++--------- 2 files changed, 39 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/38077564/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java index 142789b..bbceb8d 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java @@ -73,12 +73,8 @@ class ElasticSearchIOTestUtils { new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON); Response response = restClient.performRequest("POST", endPoint, Collections.singletonMap("refresh", "true"), requestBody); - JsonNode searchResult = ElasticsearchIO.parseResponse(response); - boolean errors = searchResult.path("errors").asBoolean(); - if (errors){ - throw new IOException(String.format("Failed to insert test documents in index %s", - connectionConfiguration.getIndex())); - } + ElasticsearchIO + .checkForErrors(response, ElasticsearchIO.getBackendVersion(connectionConfiguration)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/38077564/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 5eebe00..c0d0819 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -149,6 +149,41 @@ public class ElasticsearchIO { return mapper.readValue(response.getEntity().getContent(), JsonNode.class); } + static void checkForErrors(Response response, int backendVersion) throws IOException { + JsonNode searchResult = parseResponse(response); + boolean errors = searchResult.path("errors").asBoolean(); + if (errors) { + StringBuilder errorMessages = + new StringBuilder( + "Error writing to Elasticsearch, some elements could not be inserted:"); + JsonNode items = searchResult.path("items"); + //some items present in bulk might have errors, concatenate error messages + for (JsonNode item : items) { + String errorRootName = ""; + if (backendVersion == 2) { + errorRootName = "create"; + } else if (backendVersion == 5) { + errorRootName = "index"; + } + JsonNode errorRoot = item.path(errorRootName); + JsonNode error = errorRoot.get("error"); + if (error != null) { + String type = error.path("type").asText(); + String reason = error.path("reason").asText(); + String docId = errorRoot.path("_id").asText(); + errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type)); + JsonNode causedBy = error.get("caused_by"); + if (causedBy != null) { + String cbReason = causedBy.path("reason").asText(); + String cbType = causedBy.path("type").asText(); + errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType)); + } + } + } + throw new IOException(errorMessages.toString()); + } + } + /** A POJO describing a connection configuration to Elasticsearch. */ @AutoValue public abstract static class ConnectionConfiguration implements Serializable { @@ -837,38 +872,7 @@ public class ElasticsearchIO { endPoint, Collections.<String, String>emptyMap(), requestBody); - JsonNode searchResult = parseResponse(response); - boolean errors = searchResult.path("errors").asBoolean(); - if (errors) { - StringBuilder errorMessages = - new StringBuilder( - "Error writing to Elasticsearch, some elements could not be inserted:"); - JsonNode items = searchResult.path("items"); - //some items present in bulk might have errors, concatenate error messages - for (JsonNode item : items) { - String errorRootName = ""; - if (backendVersion == 2){ - errorRootName = "create"; - } else if (backendVersion == 5){ - errorRootName = "index"; - } - JsonNode errorRoot = item.path(errorRootName); - JsonNode error = errorRoot.get("error"); - if (error != null) { - String type = error.path("type").asText(); - String reason = error.path("reason").asText(); - String docId = errorRoot.path("_id").asText(); - errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type)); - JsonNode causedBy = error.get("caused_by"); - if (causedBy != null) { - String cbReason = causedBy.path("reason").asText(); - String cbType = causedBy.path("type").asText(); - errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType)); - } - } - } - throw new IOException(errorMessages.toString()); - } + checkForErrors(response, backendVersion); } @Teardown @@ -879,7 +883,7 @@ public class ElasticsearchIO { } } } - private static int getBackendVersion(ConnectionConfiguration connectionConfiguration){ + static int getBackendVersion(ConnectionConfiguration connectionConfiguration) { try (RestClient restClient = connectionConfiguration.createClient()) { Response response = restClient.performRequest("GET", ""); JsonNode jsonNode = parseResponse(response);
