This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch revert-9285-BEAM-7916_Query_ValueProvider in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9701f8e9006a8ce67c92c77377cb63556a97e197 Author: Alexey Romanenko <[email protected]> AuthorDate: Fri Aug 9 11:28:38 2019 +0200 Revert "[BEAM-7916] Add ElasticsearchIO query parameter to take a ValueProvider (#9285)" This reverts commit c545b8aae7cf2519c7776279132650a68445510f. --- .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 10 ++------- .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 13 ++--------- .../sdk/io/elasticsearch/ElasticsearchIOTest.java | 13 ++--------- .../elasticsearch/ElasticsearchIOTestCommon.java | 25 ++++++---------------- .../beam/sdk/io/elasticsearch/ElasticsearchIO.java | 21 +++--------------- 5 files changed, 15 insertions(+), 67 deletions(-) diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index f234716..19bda80 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -131,15 +131,9 @@ public class ElasticsearchIOTest implements Serializable { } @Test - public void testReadWithQueryString() throws Exception { + public void testReadWithQuery() throws Exception { elasticsearchIOTestCommon.setPipeline(pipeline); - elasticsearchIOTestCommon.testReadWithQueryString(); - } - - @Test - public void testReadWithQueryValueProvider() throws Exception { - elasticsearchIOTestCommon.setPipeline(pipeline); - elasticsearchIOTestCommon.testReadWithQueryValueProvider(); + elasticsearchIOTestCommon.testReadWithQuery(); } @Test diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index d809cfd..05686cd 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -122,21 +122,12 @@ public class ElasticsearchIOTest extends ESIntegTestCase implements Serializable } @Test - public void testReadWithQueryString() throws Exception { + public void testReadWithQuery() throws Exception { // need to create the index using the helper method (not create it at first insertion) // for the indexSettings() to be run createIndex(getEsIndex()); elasticsearchIOTestCommon.setPipeline(pipeline); - elasticsearchIOTestCommon.testReadWithQueryString(); - } - - @Test - public void testReadWithQueryValueProvider() throws Exception { - // need to create the index using the helper method (not create it at first insertion) - // for the indexSettings() to be run - createIndex(getEsIndex()); - elasticsearchIOTestCommon.setPipeline(pipeline); - elasticsearchIOTestCommon.testReadWithQueryValueProvider(); + elasticsearchIOTestCommon.testReadWithQuery(); } @Test diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java index 84696e5..6638b7d 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-6/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -122,21 +122,12 @@ public class ElasticsearchIOTest extends ESIntegTestCase implements Serializable } @Test - public void testReadWithQueryString() throws Exception { + public void testReadWithQuery() throws Exception { // need to create the index using the helper method (not create it at first insertion) // for the indexSettings() to be run createIndex(getEsIndex()); elasticsearchIOTestCommon.setPipeline(pipeline); - elasticsearchIOTestCommon.testReadWithQueryString(); - } - - @Test - public void testReadWithQueryValueProvider() throws Exception { - // need to create the index using the helper method (not create it at first insertion) - // for the indexSettings() to be run - createIndex(getEsIndex()); - elasticsearchIOTestCommon.setPipeline(pipeline); - elasticsearchIOTestCommon.testReadWithQueryValueProvider(); + elasticsearchIOTestCommon.testReadWithQuery(); } @Test diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java index a39f2c7..112e0e2 100644 --- a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java @@ -45,13 +45,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.function.BiFunction; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.DefaultRetryPredicate; import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.RetryConfiguration.RetryPredicate; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; @@ -198,17 +196,7 @@ class ElasticsearchIOTestCommon implements Serializable { pipeline.run(); } - void testReadWithQueryString() throws Exception { - testReadWithQueryInternal(Read::withQuery); - } - - void testReadWithQueryValueProvider() throws Exception { - testReadWithQueryInternal( - (read, query) -> read.withQuery(ValueProvider.StaticValueProvider.of(query))); - } - - private void testReadWithQueryInternal(BiFunction<Read, String, Read> queryConfigurer) - throws IOException { + void testReadWithQuery() throws Exception { if (!useAsITests) { ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration, numDocs, restClient); } @@ -224,12 +212,11 @@ class ElasticsearchIOTestCommon implements Serializable { + " }\n" + "}"; - Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); - - read = queryConfigurer.apply(read, query); - - PCollection<String> output = pipeline.apply(read); - + PCollection<String> output = + pipeline.apply( + ElasticsearchIO.read() + .withConnectionConfiguration(connectionConfiguration) + .withQuery(query)); PAssert.thatSingleton(output.apply("Count", Count.globally())) .isEqualTo(numDocs / NUM_SCIENTISTS); pipeline.run(); diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 41b2eb9..ec688fb 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -51,7 +51,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -455,7 +454,7 @@ public class ElasticsearchIO { abstract ConnectionConfiguration getConnectionConfiguration(); @Nullable - abstract ValueProvider<String> getQuery(); + abstract String getQuery(); abstract boolean isWithMetadata(); @@ -469,7 +468,7 @@ public class ElasticsearchIO { abstract static class Builder { abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration); - abstract Builder setQuery(ValueProvider<String> query); + abstract Builder setQuery(String query); abstract Builder setWithMetadata(boolean withMetadata); @@ -503,20 +502,6 @@ public class ElasticsearchIO { public Read withQuery(String query) { checkArgument(query != null, "query can not be null"); checkArgument(!query.isEmpty(), "query can not be empty"); - return withQuery(ValueProvider.StaticValueProvider.of(query)); - } - - /** - * Provide a {@link ValueProvider} that provides the query used while reading from - * Elasticsearch. This is useful for cases when the query must be dynamic. - * - * @param query the query. See <a - * href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/query-dsl.html">Query - * DSL</a> - * @return a {@link PTransform} reading data from Elasticsearch. - */ - public Read withQuery(ValueProvider<String> query) { - checkArgument(query != null, "query can not be null"); return builder().setQuery(query).build(); } @@ -741,7 +726,7 @@ public class ElasticsearchIO { public boolean start() throws IOException { restClient = source.spec.getConnectionConfiguration().createClient(); - String query = source.spec.getQuery() != null ? source.spec.getQuery().get() : null; + String query = source.spec.getQuery(); if (query == null) { query = "{\"query\": { \"match_all\": {} }}"; }
