egalpin commented on a change in pull request #15381:
URL: https://github.com/apache/beam/pull/15381#discussion_r731017896



##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -237,49 +250,64 @@ static JsonNode parseResponse(HttpEntity responseEntity) 
throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, @Nullable Set<String> 
allowedErrorTypes)
+  static List<WriteSummary> checkForErrors(
+      HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes, 
boolean throwWriteErrors)
       throws IOException {
 
+    List<WriteSummary> responses = new ArrayList<>();
+    int numErrors = 0;
     JsonNode searchResult = parseResponse(responseEntity);
-    boolean errors = searchResult.path("errors").asBoolean();
-    if (errors) {
-      int numErrors = 0;
-
-      StringBuilder errorMessages =
-          new StringBuilder("Error writing to Elasticsearch, some elements 
could not be inserted:");
-      JsonNode items = searchResult.path("items");
-      if (items.isMissingNode() || items.size() == 0) {
-        errorMessages.append(searchResult.toString());
-      }
-      // some items present in bulk might have errors, concatenate error 
messages
-      for (JsonNode item : items) {
-        JsonNode error = item.findValue("error");
-        if (error != null) {
-          // N.B. An empty-string within the allowedErrorTypes Set implies all 
errors are allowed.
-          String type = error.path("type").asText();
-          String reason = error.path("reason").asText();
-          String docId = item.findValue("_id").asText();
-          JsonNode causedBy = error.path("caused_by"); // May not be present
-          String cbReason = causedBy.path("reason").asText();
-          String cbType = causedBy.path("type").asText();
-
-          if (allowedErrorTypes == null
-              || (!allowedErrorTypes.contains(type) && 
!allowedErrorTypes.contains(cbType))) {
-            // 'error' and 'causedBy` fields are not null, and the error is 
not being ignored.
-            numErrors++;
-
-            errorMessages.append(String.format("%nDocument id %s: %s (%s)", 
docId, reason, type));
-
-            if (!causedBy.isMissingNode()) {
-              errorMessages.append(String.format("%nCaused by: %s (%s)", 
cbReason, cbType));
-            }
+    StringBuilder errorMessages =
+        new StringBuilder("Error writing to Elasticsearch, some elements could 
not be inserted:");
+    JsonNode items = searchResult.path("items");
+
+    if (items.isMissingNode() || items.size() == 0) {
+      // This would only be expected in cases like connectivity issues or 
similar
+      errorMessages.append(searchResult.toString());
+      throw new RuntimeException(

Review comment:
       Good call, this is overzealous. I'll reduce to a log warning




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to