[ 
https://issues.apache.org/jira/browse/BEAM-12093?focusedWorklogId=585848&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-585848
 ]

ASF GitHub Bot logged work on BEAM-12093:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/21 14:27
            Start Date: 20/Apr/21 14:27
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r616672005



##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1453,14 +2062,23 @@ private HttpEntity handleRetry(
         // while retry policy exists
         while (BackOffUtils.next(sleeper, backoff)) {
           LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
-          Request request = new Request(method, endpoint);
-          request.addParameters(params);
-          request.setEntity(requestBody);
-          response = restClient.performRequest(request);
-          responseEntity = new BufferedHttpEntity(response.getEntity());
+          try {
+            Request request = new Request(method, endpoint);
+            request.addParameters(params);
+            request.setEntity(requestBody);
+            response = restClient.performRequest(request);
+            responseEntity = new BufferedHttpEntity(response.getEntity());
+          } catch (java.io.IOException ex) {

Review comment:
       Before we just threw the exception and there would be retrials only on 
http 429 (predicate). Now retrials are also done when receiving IOException. 
Are you sure all IOException cases can be retried ? I'm not sure they are all 
timeouts: a misconfigured IO will throw IOException and will be retried. It is 
good to retry on Timeouts IMHO but please filter on only timeouts.

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1310,135 +1970,84 @@ public void startBundle(StartBundleContext context) {
         currentBatchSizeBytes = 0;
       }
 
-      private class DocumentMetadataSerializer extends 
StdSerializer<DocumentMetadata> {
-
-        private DocumentMetadataSerializer() {
-          super(DocumentMetadata.class);
-        }
-
-        @Override
-        public void serialize(
-            DocumentMetadata value, JsonGenerator gen, SerializerProvider 
provider)
-            throws IOException {
-          gen.writeStartObject();
-          if (value.index != null) {
-            gen.writeStringField("_index", value.index);
-          }
-          if (value.type != null) {
-            gen.writeStringField("_type", value.type);
-          }
-          if (value.id != null) {
-            gen.writeStringField("_id", value.id);
-          }
-          if (value.retryOnConflict != null && (backendVersion <= 6)) {
-            gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
-          }
-          if (value.retryOnConflict != null && backendVersion >= 7) {
-            gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
-          }
-          gen.writeEndObject();
-        }
-      }
-      /**
-       * Extracts the components that comprise the document address from the 
document using the
-       * {@link FieldValueExtractFn} configured. This allows any or all of the 
index, type and
-       * document id to be controlled on a per document basis. Sanitization of 
the index is
-       * performed, automatically lower-casing the value as required by 
Elasticsearch.
-       *
-       * @param parsedDocument the json from which the index, type and id may 
be extracted
-       * @return the document address as JSON or the default
-       * @throws IOException if the document cannot be parsed as JSON
-       */
-      private String getDocumentMetadata(JsonNode parsedDocument) throws 
IOException {
-        DocumentMetadata metadata =
-            new DocumentMetadata(
-                spec.getIndexFn() != null
-                    ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
-                    : null,
-                spec.getTypeFn() != null ? 
spec.getTypeFn().apply(parsedDocument) : null,
-                spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) 
: null,
-                spec.getUsePartialUpdate() ? DEFAULT_RETRY_ON_CONFLICT : null);
-        return OBJECT_MAPPER.writeValueAsString(metadata);
-      }
-
-      private static String lowerCaseOrNull(String input) {
-        return input == null ? null : input.toLowerCase();
+      @FinishBundle
+      public void finishBundle(FinishBundleContext context)
+          throws IOException, InterruptedException {
+        flushBatch();
       }
 
       @ProcessElement
-      public void processElement(ProcessContext context) throws Exception {
-        String document = context.element(); // use configuration and 
auto-generated document IDs
-        String documentMetadata = "{}";
-        boolean isDelete = false;
-        if (spec.getIndexFn() != null || spec.getTypeFn() != null || 
spec.getIdFn() != null) {
-          // parse once and reused for efficiency
-          JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
-          documentMetadata = getDocumentMetadata(parsedDocument);
-          if (spec.getIsDeleteFn() != null) {
-            isDelete = spec.getIsDeleteFn().apply(parsedDocument);
-          }
+      public void processElement(@Element @NonNull Iterable<String> 
bulkApiEntities)
+          throws Exception {
+        for (String bulkApiEntity : bulkApiEntities) {
+          addAndMaybeFlush(bulkApiEntity);
         }
+      }
 
-        if (isDelete) {
-          // delete request used for deleting a document.
-          batch.add(String.format("{ \"delete\" : %s }%n", documentMetadata));
-        } else {
-          // index is an insert/upsert and update is a partial update (or 
insert if not existing)
-          if (spec.getUsePartialUpdate()) {
-            batch.add(
-                String.format(
-                    "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : 
true }%n",
-                    documentMetadata, document));
-          } else {
-            batch.add(String.format("{ \"index\" : %s }%n%s%n", 
documentMetadata, document));
-          }
-        }
+      protected void addAndMaybeFlush(String bulkApiEntity)
+          throws IOException, InterruptedException {
+        batch.add(bulkApiEntity);
+        currentBatchSizeBytes += 
bulkApiEntity.getBytes(StandardCharsets.UTF_8).length;
 
-        currentBatchSizeBytes += 
document.getBytes(StandardCharsets.UTF_8).length;
         if (batch.size() >= spec.getMaxBatchSize()
             || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) {
           flushBatch();
         }
       }
 
-      @FinishBundle
-      public void finishBundle(FinishBundleContext context)
-          throws IOException, InterruptedException {
-        flushBatch();
-      }
-
       private void flushBatch() throws IOException, InterruptedException {
         if (batch.isEmpty()) {
           return;
         }
+
+        LOG.info(
+            "ElasticsearchIO batch size: {}, batch size bytes: {}",
+            batch.size(),
+            currentBatchSizeBytes);
+
         StringBuilder bulkRequest = new StringBuilder();
         for (String json : batch) {
           bulkRequest.append(json);
         }
+
         batch.clear();
-        currentBatchSizeBytes = 0;
-        Response response;
-        HttpEntity responseEntity;
-        // Elasticsearch will default to the index/type provided here if none 
are set in the
-        // document meta (i.e. using ElasticsearchIO$Write#withIndexFn and
-        // ElasticsearchIO$Write#withTypeFn options)
-        String endPoint =
-            String.format(
-                "/%s/%s/_bulk",
-                spec.getConnectionConfiguration().getIndex(),
-                spec.getConnectionConfiguration().getType());
+        currentBatchSizeBytes = 0L;
+
+        Response response = null;
+        HttpEntity responseEntity = null;
+
+        // Elasticsearch will default to the index/type provided the {@link
+        // ConnectionConfiguration} if none are set in the document meta (i.e.
+        // using ElasticsearchIO$DocToBulk#withIndexFn and
+        // ElasticsearchIO$DocToBulk#withTypeFn options)
+        String endPoint = spec.getConnectionConfiguration().getBulkEndPoint();
+
         HttpEntity requestBody =
             new NStringEntity(bulkRequest.toString(), 
ContentType.APPLICATION_JSON);
-        Request request = new Request("POST", endPoint);
-        request.addParameters(Collections.emptyMap());
-        request.setEntity(requestBody);
-        response = restClient.performRequest(request);
-        responseEntity = new BufferedHttpEntity(response.getEntity());
+        try {
+          Request request = new Request("POST", endPoint);
+          request.addParameters(Collections.emptyMap());
+          request.setEntity(requestBody);
+          response = restClient.performRequest(request);
+          responseEntity = new BufferedHttpEntity(response.getEntity());
+        } catch (java.io.IOException ex) {

Review comment:
       same here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 585848)
    Time Spent: 2h 50m  (was: 2h 40m)

> Overhaul ElasticsearchIO#Write
> ------------------------------
>
>                 Key: BEAM-12093
>                 URL: https://issues.apache.org/jira/browse/BEAM-12093
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>            Reporter: Evan Galpin
>            Priority: P2
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The current ElasticsearchIO#Write is great, but there are two related areas 
> which could be improved:
>  # Separation of concern
>  # Bulk API batch size optimization
>  
> Presently, the Write transform has 2 responsibilities which are coupled and 
> inseparable by users:
>  # Convert input documents into Bulk API entities, serializing based on user 
> settings (partial update, delete, upsert, etc)
>  # Batch the converted Bulk API entities together and interface with the 
> target ES cluster
>  
> Having these 2 roles tightly coupled means testing requires an available 
> Elasticsearch cluster, making unit testing almost impossible. Allowing access 
> to the serialized documents would make unit testing much easier for pipeline 
> developers, among numerous other benefits to having separation between 
> serialization and IO.
> Relatedly, the batching of entities when creating Bulk API payloads is 
> currently limited by the lesser of Beam Runner bundling semantics, and the 
> `ElasticsearchIO#Write#maxBatchSize` setting. This is understandable for 
> portability between runners, but it also means most Bulk payloads only have a 
> few (1-5) entities. By using Stateful Processing to better adhere to the 
> `ElasticsearchIO#Write#maxBatchSize` setting, we have been able to drop the 
> number of indexing requests in an Elasticsearch cluster by 50-100x. 
> Separating the role of document serialization and IO allows supporting 
> multiple IO techniques with minimal and understandable code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to