[ 
https://issues.apache.org/jira/browse/BEAM-5147?focusedWorklogId=134948&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134948
 ]

ASF GitHub Bot logged work on BEAM-5147:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Aug/18 11:46
            Start Date: 15/Aug/18 11:46
    Worklog Time Spent: 10m 
      Work Description: aromanenko-dev closed pull request #6200: [BEAM-5147] 
Expose document metadata in ElasticsearchIO read
URL: https://github.com/apache/beam/pull/6200
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 0a7f1cf9c7f..1e3e7b62dfb 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -214,4 +214,10 @@ public void testWritePartialUpdate() throws Exception {
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWritePartialUpdate();
   }
+
+  @Test
+  public void testReadWithMetadata() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testReadWithMetadata();
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 77e9c48008b..c28b1906f84 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -216,4 +216,10 @@ public void testWritePartialUpdate() throws Exception {
     elasticsearchIOTestCommon.setPipeline(pipeline);
     elasticsearchIOTestCommon.testWritePartialUpdate();
   }
+
+  @Test
+  public void testReadWithMetadata() throws Exception {
+    elasticsearchIOTestCommon.setPipeline(pipeline);
+    elasticsearchIOTestCommon.testReadWithMetadata();
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 329c68e0024..dedc49f0ea3 100644
--- 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++ 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -30,6 +30,7 @@
 import static org.hamcrest.core.Is.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
@@ -44,6 +45,7 @@
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollection;
 import org.elasticsearch.client.RestClient;
 import org.hamcrest.CustomMatcher;
@@ -150,6 +152,21 @@ void testReadWithQuery() throws Exception {
     pipeline.run();
   }
 
+  /** Test reading metadata by reading back the id of a document after writing 
it. */
+  void testReadWithMetadata() throws Exception {
+    if (!useAsITests) {
+      ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, 1, 
restClient);
+    }
+
+    PCollection<String> output =
+        pipeline.apply(
+            ElasticsearchIO.read()
+                .withConnectionConfiguration(connectionConfiguration)
+                .withMetadata());
+    PAssert.that(output).satisfies(new 
ContainsStringCheckerFn("\"_id\":\"0\""));
+    pipeline.run();
+  }
+
   void testWrite() throws Exception {
     List<String> data =
         ElasticSearchIOTestUtils.createDocuments(
@@ -470,4 +487,29 @@ void testWritePartialUpdate() throws Exception {
     assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "0"));
     assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "1"));
   }
+
+  /**
+   * Function for checking if any string in iterable contains expected 
substring. Fails if no match
+   * is found.
+   */
+  private static class ContainsStringCheckerFn
+      implements SerializableFunction<Iterable<String>, Void> {
+
+    private String expectedSubString;
+
+    ContainsStringCheckerFn(String expectedSubString) {
+      this.expectedSubString = expectedSubString;
+    }
+
+    @Override
+    public Void apply(Iterable<String> input) {
+      for (String s : input) {
+        if (s.contains(expectedSubString)) {
+          return null;
+        }
+      }
+      fail("No string found containing " + expectedSubString);
+      return null;
+    }
+  }
 }
diff --git 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 9f50dc199eb..3076f365cdd 100644
--- 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -138,6 +138,7 @@ public static Read read() {
     // default batchSize to 100 as recommended by ES dev team as a safe value 
when dealing
     // with big documents and still a good compromise for performances
     return new AutoValue_ElasticsearchIO_Read.Builder()
+        .setWithMetadata(false)
         .setScrollKeepalive("5m")
         .setBatchSize(100L)
         .build();
@@ -367,6 +368,8 @@ RestClient createClient() throws IOException {
     @Nullable
     abstract String getQuery();
 
+    abstract boolean isWithMetadata();
+
     abstract String getScrollKeepalive();
 
     abstract long getBatchSize();
@@ -379,6 +382,8 @@ RestClient createClient() throws IOException {
 
       abstract Builder setQuery(String query);
 
+      abstract Builder setWithMetadata(boolean withMetadata);
+
       abstract Builder setScrollKeepalive(String scrollKeepalive);
 
       abstract Builder setBatchSize(long batchSize);
@@ -405,6 +410,13 @@ public Read withQuery(String query) {
       return builder().setQuery(query).build();
     }
 
+    /**
+     * Include metadata in result json documents. Document source will be 
under json node _source.
+     */
+    public Read withMetadata() {
+      return builder().setWithMetadata(true).build();
+    }
+
     /**
      * Provide a scroll keepalive. See <a
      * 
href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html";>scroll
@@ -446,6 +458,7 @@ public Read withBatchSize(long batchSize) {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder.addIfNotNull(DisplayData.item("query", getQuery()));
+      builder.addIfNotNull(DisplayData.item("withMetadata", isWithMetadata()));
       builder.addIfNotNull(DisplayData.item("batchSize", getBatchSize()));
       builder.addIfNotNull(DisplayData.item("scrollKeepalive", 
getScrollKeepalive()));
       getConnectionConfiguration().populateDisplayData(builder);
@@ -677,9 +690,14 @@ private boolean 
readNextBatchAndReturnFirstDocument(JsonNode searchResult) {
       }
       // list behind iterator is empty
       List<String> batch = new ArrayList<>();
+      boolean withMetadata = source.spec.isWithMetadata();
       for (JsonNode hit : hits) {
-        String document = hit.path("_source").toString();
-        batch.add(document);
+        if (withMetadata) {
+          batch.add(hit.toString());
+        } else {
+          String document = hit.path("_source").toString();
+          batch.add(document);
+        }
       }
       batchIterator = batch.listIterator();
       current = batchIterator.next();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 134948)
    Time Spent: 40m  (was: 0.5h)

> Expose document metadata in ElasticsearchIO read
> ------------------------------------------------
>
>                 Key: BEAM-5147
>                 URL: https://issues.apache.org/jira/browse/BEAM-5147
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>    Affects Versions: 2.7.0
>            Reporter: Tim Robertson
>            Assignee: Etienne Chauchot
>            Priority: Minor
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> The beam ElasticsearchIO read does not give access to document metadata. A 
> Beam user has requested that it be possible to expose the entire elastic 
> document including metadata from the search result. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to