Repository: beam Updated Branches: refs/heads/master d1d85dfc7 -> 926ec8e80
[BEAM-425] Add ElasticsearchIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6412389a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6412389a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6412389a Branch: refs/heads/master Commit: 6412389a02ad0dbdac5cfe5425a8a56462071477 Parents: d1d85df Author: Etienne Chauchot and Jean-Baptiste Onofré <[email protected][email protected]> Authored: Thu Oct 20 11:45:47 2016 +0200 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Wed Jan 4 13:39:07 2017 +0100 ---------------------------------------------------------------------- sdks/java/io/elasticsearch/pom.xml | 175 ++++ .../sdk/io/elasticsearch/ElasticsearchIO.java | 819 +++++++++++++++++++ .../beam/sdk/io/elasticsearch/package-info.java | 20 + .../elasticsearch/ElasticSearchIOTestUtils.java | 129 +++ .../io/elasticsearch/ElasticsearchIOTest.java | 358 ++++++++ sdks/java/io/pom.xml | 1 + 6 files changed, 1502 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/pom.xml b/sdks/java/io/elasticsearch/pom.xml new file mode 100644 index 0000000..94e8c6c --- /dev/null +++ b/sdks/java/io/elasticsearch/pom.xml @@ -0,0 +1,175 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-parent</artifactId> + <version>0.5.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-io-elasticsearch</artifactId> + <name>Apache Beam :: SDKs :: Java :: IO :: Elasticsearch</name> + <description>IO to read and write on Elasticsearch.</description> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.6.2</version> + </dependency> + + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </dependency> + + <dependency> + <groupId>org.elasticsearch.client</groupId> + <artifactId>rest</artifactId> + <version>5.0.0</version> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore-nio</artifactId> + <version>4.4.5</version> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>4.4.5</version> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpasyncclient</artifactId> + <version>4.1.2</version> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.2</version> + </dependency> + + <!-- compile dependencies --> + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <!-- test --> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>2.4.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-io</artifactId> + <version>1.3.2</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java new file mode 100644 index 0000000..5073834 --- /dev/null +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -0,0 +1,819 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.elasticsearch; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import javax.annotation.Nullable; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.http.message.BasicHeader; +import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; + +/** + * Transforms for reading and writing data from/to Elasticsearch. + * + * <h3>Reading from Elasticsearch</h3> + * + * <p>{@link ElasticsearchIO#read ElasticsearchIO.read()} returns a bounded + * {@link PCollection PCollection<String>} representing JSON documents. + * + * <p>To configure the {@link ElasticsearchIO#read}, you have to provide a connection configuration + * containing the HTTP address of the instances, an index name and a type. The following example + * illustrates options for configuring the source: + * + * <pre>{@code + * + * pipeline.apply(ElasticsearchIO.read().withConnectionConfiguration( + * ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type") + * ) + * + * }</pre> + * + * <p>The connection configuration also accepts optional configuration: {@code withUsername()} and + * {@code withPassword()}. + * + * <p>You can also specify a query on the {@code read()} using {@code withQuery()}. + * + * <h3>Writing to Elasticsearch</h3> + * + * <p>To write documents to Elasticsearch, use + * {@link ElasticsearchIO#write ElasticsearchIO.write()}, which writes JSON documents from a + * {@link PCollection PCollection<String>} (which can be bounded or unbounded). + * + * <p>To configure {@link ElasticsearchIO#write ElasticsearchIO.write()}, similar to the read, you + * have to provide a connection configuration. For instance: + * + * <pre>{@code + * + * pipeline + * .apply(...) + * .apply(ElasticsearchIO.write().withConnectionConfiguration( + * ElasticsearchIO.ConnectionConfiguration.create("http://host:9200", "my-index", "my-type") + * ) + * + * }</pre> + * + * <p>Optionally, you can provide {@code withBatchSize()} and {@code withBatchSizeBytes()} + * to specify the size of the write batch in number of documents or in bytes. + */ +public class ElasticsearchIO { + + public static Read read() { + // default scrollKeepalive = 5m as a majorant for un-predictable time between 2 start/read calls + // default batchSize to 100 as recommended by ES dev team as a safe value when dealing + // with big documents and still a good compromise for performances + return new AutoValue_ElasticsearchIO_Read.Builder() + .setScrollKeepalive("5m") + .setBatchSize(100L) + .build(); + } + + public static Write write() { + return new AutoValue_ElasticsearchIO_Write.Builder() + // advised default starting batch size in ES docs + .setMaxBatchSize(1000L) + // advised default starting batch size in ES docs + .setMaxBatchSizeBytes(5L * 1024L * 1024L) + .build(); + } + + private ElasticsearchIO() {} + + private static JsonObject parseResponse(Response response) throws IOException { + InputStream content = response.getEntity().getContent(); + InputStreamReader inputStreamReader = new InputStreamReader(content, "UTF-8"); + JsonObject jsonObject = new Gson().fromJson(inputStreamReader, JsonObject.class); + return jsonObject; + } + + /** A POJO describing a connection configuration to Elasticsearch. */ + @AutoValue + public abstract static class ConnectionConfiguration implements Serializable { + + abstract List<String> getAddresses(); + + @Nullable + abstract String getUsername(); + + @Nullable + abstract String getPassword(); + + abstract String getIndex(); + + abstract String getType(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setAddresses(List<String> addresses); + + abstract Builder setUsername(String username); + + abstract Builder setPassword(String password); + + abstract Builder setIndex(String index); + + abstract Builder setType(String type); + + abstract ConnectionConfiguration build(); + } + + /** + * Creates a new Elasticsearch connection configuration. + * + * @param addresses list of addresses of Elasticsearch nodes + * @param index the index toward which the requests will be issued + * @param type the document type toward which the requests will be issued + * @return the connection configuration object + */ + public static ConnectionConfiguration create(String[] addresses, String index, String type) { + checkArgument( + addresses != null, + "ConnectionConfiguration.create(addresses, index, type) called with null address"); + checkArgument( + addresses.length != 0, + "ConnectionConfiguration.create(addresses, " + + "index, type) called with empty addresses"); + checkArgument( + index != null, + "ConnectionConfiguration.create(addresses, index, type) called with null index"); + checkArgument( + type != null, + "ConnectionConfiguration.create(addresses, index, type) called with null type"); + return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder() + .setAddresses(Arrays.asList(addresses)) + .setIndex(index) + .setType(type) + .build(); + } + + /** + * If Elasticsearch authentication is enabled, provide the username. + * + * @param username the username used to authenticate to Elasticsearch + * @return the {@link ConnectionConfiguration} object with username set + */ + public ConnectionConfiguration withUsername(String username) { + checkArgument( + username != null, + "ConnectionConfiguration.create().withUsername(username) called with null username"); + checkArgument( + !username.isEmpty(), + "ConnectionConfiguration.create().withUsername(username) called with empty username"); + return builder().setUsername(username).build(); + } + + /** + * If Elasticsearch authentication is enabled, provide the password. + * + * @param password the password used to authenticate to Elasticsearch + * @return the {@link ConnectionConfiguration} object with password set + */ + public ConnectionConfiguration withPassword(String password) { + checkArgument( + password != null, + "ConnectionConfiguration.create().withPassword(password) called with null password"); + checkArgument( + !password.isEmpty(), + "ConnectionConfiguration.create().withPassword(password) called with empty password"); + return builder().setPassword(password).build(); + } + + private void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("address", getAddresses().toString())); + builder.add(DisplayData.item("index", getIndex())); + builder.add(DisplayData.item("type", getType())); + builder.addIfNotNull(DisplayData.item("username", getUsername())); + } + + private RestClient createClient() throws MalformedURLException { + HttpHost[] hosts = new HttpHost[getAddresses().size()]; + int i = 0; + for (String address : getAddresses()) { + URL url = new URL(address); + hosts[i] = new HttpHost(url.getHost(), url.getPort(), url.getProtocol()); + i++; + } + RestClientBuilder restClientBuilder = RestClient.builder(hosts); + if (getUsername() != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, new UsernamePasswordCredentials(getUsername(), getPassword())); + restClientBuilder.setHttpClientConfigCallback( + new RestClientBuilder.HttpClientConfigCallback() { + public HttpAsyncClientBuilder customizeHttpClient( + HttpAsyncClientBuilder httpAsyncClientBuilder) { + return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + }); + } + return restClientBuilder.build(); + } + } + + /** A {@link PTransform} reading data from Elasticsearch. */ + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<String>> { + + private static final long MAX_BATCH_SIZE = 10000L; + + @Nullable + abstract ConnectionConfiguration getConnectionConfiguration(); + + @Nullable + abstract String getQuery(); + + abstract String getScrollKeepalive(); + + abstract long getBatchSize(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration); + + abstract Builder setQuery(String query); + + abstract Builder setScrollKeepalive(String scrollKeepalive); + + abstract Builder setBatchSize(long batchSize); + + abstract Read build(); + } + + /** + * Provide the Elasticsearch connection configuration object. + * + * @param connectionConfiguration the Elasticsearch {@link ConnectionConfiguration} object + * @return the {@link Read} with connection configuration set + */ + public Read withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) { + checkArgument( + connectionConfiguration != null, + "ElasticsearchIO.read()" + + ".withConnectionConfiguration(configuration) called with null configuration"); + return builder().setConnectionConfiguration(connectionConfiguration).build(); + } + + /** + * Provide a query used while reading from Elasticsearch. + * + * @param query the query. See <a + * href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/query-dsl.html">Query + * DSL</a> + * @return the {@link Read} object with query set + */ + public Read withQuery(String query) { + checkArgument( + !Strings.isNullOrEmpty(query), + "ElasticsearchIO.read().withQuery(query) called" + " with null or empty query"); + return builder().setQuery(query).build(); + } + + /** + * Provide a scroll keepalive. See <a + * href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html">scroll + * API</a> Default is "5m". Change this only if you get "No search context found" errors. + * + * @param scrollKeepalive keepalive duration ex "5m" from 5 minutes + * @return the {@link Read} with scroll keepalive set + */ + public Read withScrollKeepalive(String scrollKeepalive) { + checkArgument( + scrollKeepalive != null && !scrollKeepalive.equals("0m"), + "ElasticsearchIO.read().withScrollKeepalive(keepalive) called" + + " with null or \"0m\" keepalive"); + return builder().setScrollKeepalive(scrollKeepalive).build(); + } + + /** + * Provide a size for the scroll read. See <a + * href="https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html"> + * scroll API</a> Default is 100. Maximum is 10 000. If documents are small, increasing batch + * size might improve read performance. If documents are big, you might need to decrease + * batchSize + * + * @param batchSize number of documents read in each scroll read + * @return the {@link Read} with batch size set + */ + public Read withBatchSize(long batchSize) { + checkArgument( + batchSize > 0, + "ElasticsearchIO.read().withBatchSize(batchSize) called with a negative " + + "or equal to 0 value: %s", + batchSize); + checkArgument( + batchSize <= MAX_BATCH_SIZE, + "ElasticsearchIO.read().withBatchSize(batchSize) " + + "called with a too large value (over %s): %s", + MAX_BATCH_SIZE, + batchSize); + return builder().setBatchSize(batchSize).build(); + } + + @Override + public PCollection<String> expand(PBegin input) { + return input.apply( + org.apache.beam.sdk.io.Read.from(new BoundedElasticsearchSource(this, null))); + } + + @Override + public void validate(PBegin input) { + checkState( + getConnectionConfiguration() != null, + "ElasticsearchIO.read() requires a connection configuration" + + " to be set via withConnectionConfiguration(configuration)"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.addIfNotNull(DisplayData.item("query", getQuery())); + getConnectionConfiguration().populateDisplayData(builder); + } + } + + /** A {@link BoundedSource} reading from Elasticsearch. */ + @VisibleForTesting + static class BoundedElasticsearchSource extends BoundedSource<String> { + + private final ElasticsearchIO.Read spec; + // shardPreference is the shard number where the source will read the documents + @Nullable private final String shardPreference; + + BoundedElasticsearchSource(Read spec, @Nullable String shardPreference) { + this.spec = spec; + this.shardPreference = shardPreference; + } + + @Override + public List<? extends BoundedSource<String>> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + List<BoundedElasticsearchSource> sources = new ArrayList<>(); + + // 1. We split per shard : + // unfortunately, Elasticsearch 2. x doesn 't provide a way to do parallel reads on a single + // shard.So we do not use desiredBundleSize because we cannot split shards. + // With the slice API in ES 5.0 we will be able to use desiredBundleSize. + // Basically we will just ask the slice API to return data + // in nbBundles = estimatedSize / desiredBundleSize chuncks. + // So each beam source will read around desiredBundleSize volume of data. + + // 2. Primary and replica shards have the same shard_id, we filter primary + // to have one source for each shard_id. Even if we specify preference=shards:2, + // ES load balances (round robin) the request between primary shard 2 and replica shard 2. + // But, as each shard (replica or primary) is responsible for only one part of the data, + // there will be no duplicate. + + JsonObject statsJson = getStats(true); + JsonObject shardsJson = + statsJson + .getAsJsonObject("indices") + .getAsJsonObject(spec.getConnectionConfiguration().getIndex()) + .getAsJsonObject("shards"); + Set<Map.Entry<String, JsonElement>> shards = shardsJson.entrySet(); + for (Map.Entry<String, JsonElement> shardJson : shards) { + String shardId = shardJson.getKey(); + JsonArray value = (JsonArray) shardJson.getValue(); + boolean isPrimaryShard = + value + .get(0) + .getAsJsonObject() + .getAsJsonObject("routing") + .getAsJsonPrimitive("primary") + .getAsBoolean(); + if (isPrimaryShard) { + sources.add(new BoundedElasticsearchSource(spec, shardId)); + } + } + checkArgument(!sources.isEmpty(), "No primary shard found"); + return sources; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws IOException { + // we use indices stats API to estimate size and list the shards + // (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/indices-stats.html) + // as Elasticsearch 2.x doesn't not support any way to do parallel read inside a shard + // the estimated size bytes is not really used in the split into bundles. + // However, we implement this method anyway as the runners can use it. + // NB: Elasticsearch 5.x now provides the slice API. + // (https://www.elastic.co/guide/en/elasticsearch/reference/5.0/search-request-scroll.html + // #sliced-scroll) + JsonObject statsJson = getStats(false); + JsonObject indexStats = + statsJson + .getAsJsonObject("indices") + .getAsJsonObject(spec.getConnectionConfiguration().getIndex()) + .getAsJsonObject("primaries"); + JsonObject store = indexStats.getAsJsonObject("store"); + return store.getAsJsonPrimitive("size_in_bytes").getAsLong(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + spec.populateDisplayData(builder); + builder.addIfNotNull(DisplayData.item("shard", shardPreference)); + } + + @Override + public BoundedReader<String> createReader(PipelineOptions options) throws IOException { + return new BoundedElasticsearchReader(this); + } + + @Override + public void validate() { + spec.validate(null); + } + + @Override + public Coder<String> getDefaultOutputCoder() { + return StringUtf8Coder.of(); + } + + private JsonObject getStats(boolean shardLevel) throws IOException { + HashMap<String, String> params = new HashMap<>(); + if (shardLevel) { + params.put("level", "shards"); + } + String endpoint = String.format("/%s/_stats", spec.getConnectionConfiguration().getIndex()); + try (RestClient restClient = spec.getConnectionConfiguration().createClient()) { + return parseResponse( + restClient.performRequest("GET", endpoint, params, new BasicHeader("", ""))); + } + } + } + + private static class BoundedElasticsearchReader extends BoundedSource.BoundedReader<String> { + + private final BoundedElasticsearchSource source; + + private RestClient restClient; + private String current; + private String scrollId; + private ListIterator<String> batchIterator; + + private BoundedElasticsearchReader(BoundedElasticsearchSource source) { + this.source = source; + } + + @Override + public boolean start() throws IOException { + restClient = source.spec.getConnectionConfiguration().createClient(); + + String query = source.spec.getQuery(); + if (query == null) { + query = "{ \"query\": { \"match_all\": {} } }"; + } + + Response response; + String endPoint = + String.format( + "/%s/%s/_search", + source.spec.getConnectionConfiguration().getIndex(), + source.spec.getConnectionConfiguration().getType()); + Map<String, String> params = new HashMap<>(); + params.put("scroll", source.spec.getScrollKeepalive()); + params.put("size", String.valueOf(source.spec.getBatchSize())); + if (source.shardPreference != null) { + params.put("preference", "_shards:" + source.shardPreference); + } + HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON); + response = + restClient.performRequest("GET", endPoint, params, queryEntity, new BasicHeader("", "")); + JsonObject searchResult = parseResponse(response); + updateScrollId(searchResult); + return readNextBatchAndReturnFirstDocument(searchResult); + } + + private void updateScrollId(JsonObject searchResult) { + scrollId = searchResult.getAsJsonPrimitive("_scroll_id").getAsString(); + } + + @Override + public boolean advance() throws IOException { + if (batchIterator.hasNext()) { + current = batchIterator.next(); + return true; + } else { + String requestBody = + String.format( + "{\"scroll\" : \"%s\",\"scroll_id\" : \"%s\"}", + source.spec.getScrollKeepalive(), scrollId); + HttpEntity scrollEntity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON); + Response response = + restClient.performRequest( + "GET", + "/_search/scroll", + Collections.<String, String>emptyMap(), + scrollEntity, + new BasicHeader("", "")); + JsonObject searchResult = parseResponse(response); + updateScrollId(searchResult); + return readNextBatchAndReturnFirstDocument(searchResult); + } + } + + private boolean readNextBatchAndReturnFirstDocument(JsonObject searchResult) { + //stop if no more data + JsonArray hits = searchResult.getAsJsonObject("hits").getAsJsonArray("hits"); + if (hits.size() == 0) { + current = null; + batchIterator = null; + return false; + } + // list behind iterator is empty + List<String> batch = new ArrayList<>(); + for (JsonElement hit : hits) { + String document = hit.getAsJsonObject().getAsJsonObject("_source").toString(); + batch.add(document); + } + batchIterator = batch.listIterator(); + current = batchIterator.next(); + return true; + } + + @Override + public String getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current; + } + + @Override + public void close() throws IOException { + // remove the scroll + String requestBody = String.format("{\"scroll_id\" : [\"%s\"]}", scrollId); + HttpEntity entity = new NStringEntity(requestBody, ContentType.APPLICATION_JSON); + try { + restClient.performRequest( + "DELETE", + "/_search/scroll", + Collections.<String, String>emptyMap(), + entity, + new BasicHeader("", "")); + } finally { + if (restClient != null) { + restClient.close(); + } + } + } + + @Override + public BoundedSource<String> getCurrentSource() { + return source; + } + } + + /** A {@link PTransform} writing data to Elasticsearch. */ + @AutoValue + public abstract static class Write extends PTransform<PCollection<String>, PDone> { + + @Nullable + abstract ConnectionConfiguration getConnectionConfiguration(); + + abstract long getMaxBatchSize(); + + abstract long getMaxBatchSizeBytes(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setConnectionConfiguration(ConnectionConfiguration connectionConfiguration); + + abstract Builder setMaxBatchSize(long maxBatchSize); + + abstract Builder setMaxBatchSizeBytes(long maxBatchSizeBytes); + + abstract Write build(); + } + + /** + * Provide the Elasticsearch connection configuration object. + * + * @param connectionConfiguration the Elasticsearch {@link ConnectionConfiguration} object + * @return the {@link Write} with connection configuration set + */ + public Write withConnectionConfiguration(ConnectionConfiguration connectionConfiguration) { + checkArgument( + connectionConfiguration != null, + "ElasticsearchIO.write()" + + ".withConnectionConfiguration(configuration) called with null configuration"); + return builder().setConnectionConfiguration(connectionConfiguration).build(); + } + + /** + * Provide a maximum size in number of documents for the batch see bulk API + * (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html). Default is 1000 + * docs (like Elasticsearch bulk size advice). See + * https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the + * execution engine, size of bundles may vary, this sets the maximum size. Change this if you + * need to have smaller ElasticSearch bulks. + * + * @param batchSize maximum batch size in number of documents + * @return the {@link Write} with connection batch size set + */ + public Write withMaxBatchSize(long batchSize) { + checkArgument( + batchSize > 0, + "ElasticsearchIO.write()" + + ".withMaxBatchSize(batchSize) called with incorrect <= 0 value"); + return builder().setMaxBatchSize(batchSize).build(); + } + + /** + * Provide a maximum size in bytes for the batch see bulk API + * (https://www.elastic.co/guide/en/elasticsearch/reference/2.4/docs-bulk.html). Default is 5MB + * (like Elasticsearch bulk size advice). See + * https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html Depending on the + * execution engine, size of bundles may vary, this sets the maximum size. Change this if you + * need to have smaller ElasticSearch bulks. + * + * @param batchSizeBytes maximum batch size in bytes + * @return the {@link Write} with connection batch size in bytes set + */ + public Write withMaxBatchSizeBytes(long batchSizeBytes) { + checkArgument( + batchSizeBytes > 0, + "ElasticsearchIO.write()" + + ".withMaxBatchSizeBytes(batchSizeBytes) called with incorrect <= 0 value"); + return builder().setMaxBatchSizeBytes(batchSizeBytes).build(); + } + + @Override + public void validate(PCollection<String> input) { + checkState( + getConnectionConfiguration() != null, + "ElasticsearchIO.write() requires a connection configuration" + + " to be set via withConnectionConfiguration(configuration)"); + } + + @Override + public PDone expand(PCollection<String> input) { + input.apply(ParDo.of(new WriteFn(this))); + return PDone.in(input.getPipeline()); + } + + @VisibleForTesting + static class WriteFn extends DoFn<String, Void> { + + private final Write spec; + + private transient RestClient restClient; + private ArrayList<String> batch; + private long currentBatchSizeBytes; + + WriteFn(Write spec) { + this.spec = spec; + } + + @Setup + public void createClient() throws Exception { + restClient = spec.getConnectionConfiguration().createClient(); + } + + @StartBundle + public void startBundle(Context context) throws Exception { + batch = new ArrayList<>(); + currentBatchSizeBytes = 0; + } + + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + String document = context.element(); + batch.add(String.format("{ \"index\" : {} }%n%s%n", document)); + currentBatchSizeBytes += document.getBytes().length; + if (batch.size() >= spec.getMaxBatchSize() + || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) { + finishBundle(context); + } + } + + @FinishBundle + public void finishBundle(Context context) throws Exception { + if (batch.isEmpty()) { + return; + } + StringBuilder bulkRequest = new StringBuilder(); + for (String json : batch) { + bulkRequest.append(json); + } + batch.clear(); + currentBatchSizeBytes = 0; + Response response; + String endPoint = + String.format( + "/%s/%s/_bulk", + spec.getConnectionConfiguration().getIndex(), + spec.getConnectionConfiguration().getType()); + HttpEntity requestBody = + new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON); + response = + restClient.performRequest( + "POST", + endPoint, + Collections.<String, String>emptyMap(), + requestBody, + new BasicHeader("", "")); + JsonObject searchResult = parseResponse(response); + boolean errors = searchResult.getAsJsonPrimitive("errors").getAsBoolean(); + if (errors) { + StringBuilder errorMessages = + new StringBuilder( + "Error writing to Elasticsearch, some elements could not be inserted:"); + JsonArray items = searchResult.getAsJsonArray("items"); + //some items present in bulk might have errors, concatenate error messages + for (JsonElement item : items) { + JsonObject creationObject = item.getAsJsonObject().getAsJsonObject("create"); + JsonObject error = creationObject.getAsJsonObject("error"); + if (error != null) { + String type = error.getAsJsonPrimitive("type").getAsString(); + String reason = error.getAsJsonPrimitive("reason").getAsString(); + String docId = creationObject.getAsJsonPrimitive("_id").getAsString(); + errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type)); + JsonObject causedBy = error.getAsJsonObject("caused_by"); + if (causedBy != null) { + String cbReason = causedBy.getAsJsonPrimitive("reason").getAsString(); + String cbType = causedBy.getAsJsonPrimitive("type").getAsString(); + errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType)); + } + } + } + throw new IOException(errorMessages.toString()); + } + } + + @Teardown + public void closeClient() throws Exception { + if (restClient != null) { + restClient.close(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java new file mode 100644 index 0000000..396705b --- /dev/null +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Transforms for reading and writing from Elasticsearch. */ +package org.apache.beam.sdk.io.elasticsearch; http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java new file mode 100644 index 0000000..9a121f8 --- /dev/null +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.elasticsearch; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.client.Requests; +import org.elasticsearch.index.IndexNotFoundException; + +/** Test utilities to use with {@link ElasticsearchIO}. */ +class ElasticSearchIOTestUtils { + + /** Enumeration that specifies whether to insert malformed documents. */ + enum InjectionMode { + INJECT_SOME_INVALID_DOCS, + DO_NOT_INJECT_INVALID_DOCS; + } + + /** Deletes the given index synchronously. */ + static void deleteIndex(String index, Client client) throws Exception { + IndicesAdminClient indices = client.admin().indices(); + IndicesExistsResponse indicesExistsResponse = + indices.exists(new IndicesExistsRequest(index)).get(); + if (indicesExistsResponse.isExists()) { + indices.prepareClose(index).get(); + indices.delete(Requests.deleteIndexRequest(index)).get(); + } + } + + /** Inserts the given number of test documents into Elasticsearch. */ + static void insertTestDocuments(String index, String type, long numDocs, Client client) + throws Exception { + final BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefresh(true); + List<String> data = + ElasticSearchIOTestUtils.createDocuments( + numDocs, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + for (String document : data) { + bulkRequestBuilder.add(client.prepareIndex(index, type, null).setSource(document)); + } + final BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); + if (bulkResponse.hasFailures()) { + throw new IOException( + String.format( + "Cannot insert test documents in index %s : %s", + index, bulkResponse.buildFailureMessage())); + } + } + + /** + * Forces an upgrade of the given index to make recently inserted documents available for search. + * + * @return The number of docs in the index + */ + static long upgradeIndexAndGetCurrentNumDocs(String index, String type, Client client) { + try { + client.admin().indices().upgrade(new UpgradeRequest(index)).actionGet(); + SearchResponse response = + client.prepareSearch(index).setTypes(type).execute().actionGet(5000); + return response.getHits().getTotalHits(); + // it is fine to ignore bellow exceptions because in testWriteWithBatchSize* sometimes, + // we call upgrade before any doc have been written + // (when there are fewer docs processed than batchSize). + // In that cases index/type has not been created (created upon first doc insertion) + } catch (IndexNotFoundException e) { + } catch (java.lang.IllegalArgumentException e) { + if (!e.getMessage().contains("No search type")) { + throw e; + } + } + return 0; + } + + /** + * Generates a list of test documents for insertion. + * + * @param numDocs Number of docs to generate + * @param injectionMode {@link InjectionMode} that specifies whether to insert malformed documents + * @return the list of json String representing the documents + */ + static List<String> createDocuments(long numDocs, InjectionMode injectionMode) { + String[] scientists = { + "Einstein", + "Darwin", + "Copernicus", + "Pasteur", + "Curie", + "Faraday", + "Newton", + "Bohr", + "Galilei", + "Maxwell" + }; + ArrayList<String> data = new ArrayList<>(); + for (int i = 0; i < numDocs; i++) { + int index = i % scientists.length; + // insert 2 malformed documents + if (InjectionMode.INJECT_SOME_INVALID_DOCS.equals(injectionMode) && (i == 6 || i == 7)) { + data.add(String.format("{\"scientist\";\"%s\", \"id\":%d}", scientists[index], i)); + } else { + data.add(String.format("{\"scientist\":\"%s\", \"id\":%d}", scientists[index], i)); + } + } + return data; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java new file mode 100644 index 0000000..8b4cb13 --- /dev/null +++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.elasticsearch; + +import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource; +import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.Is.isA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.io.Serializable; +import java.net.ServerSocket; +import java.util.List; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.SourceTestUtils; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.values.PCollection; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeBuilder; +import org.hamcrest.CustomMatcher; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Tests for {@link ElasticsearchIO}. */ +@RunWith(JUnit4.class) +public class ElasticsearchIOTest implements Serializable { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchIOTest.class); + + private static final String ES_INDEX = "beam"; + private static final String ES_TYPE = "test"; + private static final String ES_IP = "127.0.0.1"; + private static final long NUM_DOCS = 400L; + private static final int NUM_SCIENTISTS = 10; + private static final long BATCH_SIZE = 200L; + private static final long AVERAGE_DOC_SIZE = 25L; + private static final long BATCH_SIZE_BYTES = 2048L; + + private static Node node; + private static ElasticsearchIO.ConnectionConfiguration connectionConfiguration; + + @ClassRule public static TemporaryFolder folder = new TemporaryFolder(); + @Rule + public TestPipeline pipeline = TestPipeline.create(); + + @BeforeClass + public static void beforeClass() throws IOException { + ServerSocket serverSocket = new ServerSocket(0); + int esHttpPort = serverSocket.getLocalPort(); + serverSocket.close(); + connectionConfiguration = + ElasticsearchIO.ConnectionConfiguration.create( + new String[] {"http://" + ES_IP + ":" + esHttpPort}, ES_INDEX, ES_TYPE); + LOGGER.info("Starting embedded Elasticsearch instance ({})", esHttpPort); + Settings.Builder settingsBuilder = + Settings.settingsBuilder() + .put("cluster.name", "beam") + .put("http.enabled", "true") + .put("node.data", "true") + .put("path.data", folder.getRoot().getPath()) + .put("path.home", folder.getRoot().getPath()) + .put("node.name", "beam") + .put("network.host", ES_IP) + .put("http.port", esHttpPort) + .put("index.store.stats_refresh_interval", 0) + // had problems with some jdk, embedded ES was too slow for bulk insertion, + // and queue of 50 was full. No pb with real ES instance (cf testWrite integration test) + .put("threadpool.bulk.queue_size", 100); + node = NodeBuilder.nodeBuilder().settings(settingsBuilder).build(); + LOGGER.info("Elasticsearch node created"); + node.start(); + } + + @AfterClass + public static void afterClass() { + node.close(); + } + + @Before + public void before() throws Exception { + ElasticSearchIOTestUtils.deleteIndex(ES_INDEX, node.client()); + } + + @Test + public void testSizes() throws Exception { + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); + PipelineOptions options = PipelineOptionsFactory.create(); + ElasticsearchIO.Read read = + ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); + BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null); + // can't use equal assert as Elasticsearch indexes never have same size + // (due to internal Elasticsearch implementation) + long estimatedSize = initialSource.getEstimatedSizeBytes(options); + LOGGER.info("Estimated size: {}", estimatedSize); + assertThat("Wrong estimated size", estimatedSize, greaterThan(AVERAGE_DOC_SIZE * NUM_DOCS)); + } + + @Test + @Category(RunnableOnService.class) + public void testRead() throws Exception { + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); + + PCollection<String> output = + pipeline.apply( + ElasticsearchIO.read() + .withConnectionConfiguration(connectionConfiguration) + //set to default value, useful just to test parameter passing. + .withScrollKeepalive("5m") + //set to default value, useful just to test parameter passing. + .withBatchSize(100L)); + PAssert.thatSingleton(output.apply("Count", Count.<String>globally())).isEqualTo(NUM_DOCS); + pipeline.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testReadWithQuery() throws Exception { + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); + + String query = + "{\n" + + " \"query\": {\n" + + " \"match\" : {\n" + + " \"scientist\" : {\n" + + " \"query\" : \"Einstein\",\n" + + " \"type\" : \"boolean\"\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + PCollection<String> output = + pipeline.apply( + ElasticsearchIO.read() + .withConnectionConfiguration(connectionConfiguration) + .withQuery(query)); + PAssert.thatSingleton(output.apply("Count", Count.<String>globally())) + .isEqualTo(NUM_DOCS / NUM_SCIENTISTS); + pipeline.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testWrite() throws Exception { + List<String> data = + ElasticSearchIOTestUtils.createDocuments( + NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + pipeline + .apply(Create.of(data)) + .apply(ElasticsearchIO.write().withConnectionConfiguration(connectionConfiguration)); + pipeline.run(); + + long currentNumDocs = + ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(ES_INDEX, ES_TYPE, node.client()); + assertEquals(NUM_DOCS, currentNumDocs); + + QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("Einstein").field("scientist"); + SearchResponse searchResponse = + node.client() + .prepareSearch(ES_INDEX) + .setTypes(ES_TYPE) + .setQuery(queryBuilder) + .execute() + .actionGet(); + assertEquals(NUM_DOCS / NUM_SCIENTISTS, searchResponse.getHits().getTotalHits()); + } + + @Rule public ExpectedException exception = ExpectedException.none(); + + @Test + public void testWriteWithErrors() throws Exception { + ElasticsearchIO.Write write = + ElasticsearchIO.write() + .withConnectionConfiguration(connectionConfiguration) + .withMaxBatchSize(BATCH_SIZE); + // write bundles size is the runner decision, we cannot force a bundle size, + // so we test the Writer as a DoFn outside of a runner. + DoFnTester<String, Void> fnTester = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(write)); + + List<String> input = + ElasticSearchIOTestUtils.createDocuments( + NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS); + exception.expect(isA(IOException.class)); + exception.expectMessage( + new CustomMatcher<String>("RegExp matcher") { + @Override + public boolean matches(Object o) { + String message = (String) o; + // This regexp tests that 2 malformed documents are actually in error + // and that the message contains their IDs. + // It also ensures that root reason, root error type, + // caused by reason and caused by error type are present in message. + // To avoid flakiness of the test in case of Elasticsearch error message change, + // only "failed to parse" root reason is matched, + // the other messages are matched using .+ + return message.matches( + "(?is).*Error writing to Elasticsearch, some elements could not be inserted" + + ".*Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*" + + "Document id .+: failed to parse \\(.+\\).*Caused by: .+ \\(.+\\).*"); + } + }); + // inserts into Elasticsearch + fnTester.processBundle(input); + } + + @Test + public void testWriteWithMaxBatchSize() throws Exception { + ElasticsearchIO.Write write = + ElasticsearchIO.write() + .withConnectionConfiguration(connectionConfiguration) + .withMaxBatchSize(BATCH_SIZE); + // write bundles size is the runner decision, we cannot force a bundle size, + // so we test the Writer as a DoFn outside of a runner. + DoFnTester<String, Void> fnTester = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(write)); + List<String> input = + ElasticSearchIOTestUtils.createDocuments( + NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + long numDocsProcessed = 0; + long numDocsInserted = 0; + for (String document : input) { + fnTester.processElement(document); + numDocsProcessed++; + // test every 100 docs to avoid overloading ES + if ((numDocsProcessed % 100) == 0) { + // force the index to upgrade after inserting for the inserted docs + // to be searchable immediately + long currentNumDocs = + ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs( + ES_INDEX, ES_TYPE, node.client()); + if ((numDocsProcessed % BATCH_SIZE) == 0) { + /* bundle end */ + assertEquals( + "we are at the end of a bundle, we should have inserted all processed documents", + numDocsProcessed, + currentNumDocs); + numDocsInserted = currentNumDocs; + } else { + /* not bundle end */ + assertEquals( + "we are not at the end of a bundle, we should have inserted no more documents", + numDocsInserted, + currentNumDocs); + } + } + } + } + + @Test + public void testWriteWithMaxBatchSizeBytes() throws Exception { + ElasticsearchIO.Write write = + ElasticsearchIO.write() + .withConnectionConfiguration(connectionConfiguration) + .withMaxBatchSizeBytes(BATCH_SIZE_BYTES); + // write bundles size is the runner decision, we cannot force a bundle size, + // so we test the Writer as a DoFn outside of a runner. + DoFnTester<String, Void> fnTester = DoFnTester.of(new ElasticsearchIO.Write.WriteFn(write)); + List<String> input = + ElasticSearchIOTestUtils.createDocuments( + NUM_DOCS, ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS); + long numDocsProcessed = 0; + long sizeProcessed = 0; + long numDocsInserted = 0; + long batchInserted = 0; + for (String document : input) { + fnTester.processElement(document); + numDocsProcessed++; + sizeProcessed += document.getBytes().length; + // test every 40 docs to avoid overloading ES + if ((numDocsProcessed % 40) == 0) { + // force the index to upgrade after inserting for the inserted docs + // to be searchable immediately + long currentNumDocs = + ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs( + ES_INDEX, ES_TYPE, node.client()); + if (sizeProcessed / BATCH_SIZE_BYTES > batchInserted) { + /* bundle end */ + assertThat( + "we have passed a bundle size, we should have inserted some documents", + currentNumDocs, + greaterThan(numDocsInserted)); + numDocsInserted = currentNumDocs; + batchInserted = (sizeProcessed / BATCH_SIZE_BYTES); + } else { + /* not bundle end */ + assertEquals( + "we are not at the end of a bundle, we should have inserted no more documents", + numDocsInserted, + currentNumDocs); + } + } + } + } + + @Test + public void testSplitIntoBundles() throws Exception { + ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client()); + PipelineOptions options = PipelineOptionsFactory.create(); + ElasticsearchIO.Read read = + ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration); + BoundedElasticsearchSource initialSource = new BoundedElasticsearchSource(read, null); + //desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get + // as many bundles as ES shards and bundle size is shard size + int desiredBundleSizeBytes = 0; + List<? extends BoundedSource<String>> splits = + initialSource.splitIntoBundles(desiredBundleSizeBytes, options); + SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options); + //this is the number of ES shards + // (By default, each index in Elasticsearch is allocated 5 primary shards) + int expectedNumSplits = 5; + assertEquals(expectedNumSplits, splits.size()); + int nonEmptySplits = 0; + for (BoundedSource<String> subSource : splits) { + if (readFromSource(subSource, options).size() > 0) { + nonEmptySplits += 1; + } + } + assertEquals("Wrong number of empty splits", expectedNumSplits, nonEmptySplits); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/6412389a/sdks/java/io/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml index 75c4f65..ffe3c02 100644 --- a/sdks/java/io/pom.xml +++ b/sdks/java/io/pom.xml @@ -40,6 +40,7 @@ <module>kinesis</module> <module>mongodb</module> <module>jdbc</module> + <module>elasticsearch</module> <module>mqtt</module> </modules>
