Repository: beam
Updated Branches:
  refs/heads/master c37e55080 -> 7954896a5


[BEAM-1184] Add integration tests to ElasticsearchIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d40f474c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d40f474c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d40f474c

Branch: refs/heads/master
Commit: d40f474c623535282a4122f2fa699b4e132326ff
Parents: c37e550
Author: Etienne Chauchot <echauc...@gmail.com>
Authored: Tue Mar 7 09:31:38 2017 +0100
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Thu Mar 9 06:48:52 2017 +0100

----------------------------------------------------------------------
 sdks/java/io/elasticsearch/pom.xml              |   5 +
 .../src/test/contrib/create_elk_container.sh    |  24 +++
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 154 +++++++++++++++++++
 .../elasticsearch/ElasticsearchTestDataSet.java | 109 +++++++++++++
 .../elasticsearch/ElasticsearchTestOptions.java |  46 ++++++
 5 files changed, 338 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d40f474c/sdks/java/io/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/pom.xml 
b/sdks/java/io/elasticsearch/pom.xml
index eecfe6b..d09b660 100644
--- a/sdks/java/io/elasticsearch/pom.xml
+++ b/sdks/java/io/elasticsearch/pom.xml
@@ -86,6 +86,11 @@
       <version>4.5.2</version>
     </dependency>
 
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
     <!-- compile dependencies -->
     <dependency>
       <groupId>com.google.auto.value</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/d40f474c/sdks/java/io/elasticsearch/src/test/contrib/create_elk_container.sh
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/elasticsearch/src/test/contrib/create_elk_container.sh 
b/sdks/java/io/elasticsearch/src/test/contrib/create_elk_container.sh
new file mode 100755
index 0000000..48f6064
--- /dev/null
+++ b/sdks/java/io/elasticsearch/src/test/contrib/create_elk_container.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+################################################################################
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+################################################################################
+
+#Create an ELK (Elasticsearch Logstash Kibana) container for ES v2.4 and 
compatible Logstash and Kibana versions,
+#bind then on host ports, allow shell access to container and mount current 
directory on /home/$USER inside the container
+
+docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 
-it -v $(pwd):/home/$USER/ --name elk-2.4  sebp/elk:es240_l240_k460

http://git-wip-us.apache.org/repos/asf/beam/blob/d40f474c/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
new file mode 100644
index 0000000..bd6c503
--- /dev/null
+++ 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.elasticsearch.client.transport.TransportClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A test of {@link ElasticsearchIO} on an independent Elasticsearch instance.
+ *
+ * <p>This test requires a running instance of Elasticsearch, and the test 
dataset must exist in the
+ * database.
+ *
+ * <p>You can run this test by doing the following from the beam parent module 
directory:
+ *
+ * <pre>
+ *  mvn -e -Pio-it verify -pl sdks/java/io/elasticsearch 
-DintegrationTestPipelineOptions='[
+ *  "--elasticsearchServer=1.2.3.4",
+ *  "--elasticsearchHttpPort=9200",
+ *  "--elasticsearchTcpPort=9300" ]'
+ * </pre>
+ */
+public class ElasticsearchIOIT {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchIOIT.class);
+  private static TransportClient client;
+  private static ElasticsearchTestOptions options;
+  private static ElasticsearchIO.ConnectionConfiguration 
readConnectionConfiguration;
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    PipelineOptionsFactory.register(ElasticsearchTestOptions.class);
+    options = 
TestPipeline.testingPipelineOptions().as(ElasticsearchTestOptions.class);
+    client = ElasticsearchTestDataSet.getClient(options);
+    readConnectionConfiguration =
+        ElasticsearchTestDataSet.getConnectionConfiguration(
+            options, ElasticsearchTestDataSet.ReadOrWrite.READ);
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    ElasticsearchTestDataSet.deleteIndex(client, 
ElasticsearchTestDataSet.ReadOrWrite.WRITE);
+    client.close();
+  }
+
+  @Test
+  public void testSplitsVolume() throws Exception {
+    ElasticsearchIO.Read read =
+        
ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
+    ElasticsearchIO.BoundedElasticsearchSource initialSource =
+        new ElasticsearchIO.BoundedElasticsearchSource(read, null);
+    //desiredBundleSize is ignored because in ES 2.x there is no way to split 
shards. So we get
+    // as many bundles as ES shards and bundle size is shard size
+    long desiredBundleSizeBytes = 0;
+    List<? extends BoundedSource<String>> splits =
+        initialSource.splitIntoBundles(desiredBundleSizeBytes, options);
+    SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, 
options);
+    //this is the number of ES shards
+    // (By default, each index in Elasticsearch is allocated 5 primary shards)
+    long expectedNumSplits = 5;
+    assertEquals(expectedNumSplits, splits.size());
+    int nonEmptySplits = 0;
+    for (BoundedSource<String> subSource : splits) {
+      if (readFromSource(subSource, options).size() > 0) {
+        nonEmptySplits += 1;
+      }
+    }
+    assertEquals(expectedNumSplits, nonEmptySplits);
+  }
+
+  @Test
+  public void testReadVolume() throws Exception {
+    PCollection<String> output =
+        pipeline.apply(
+            
ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration));
+    PAssert.thatSingleton(output.apply("Count", Count.<String>globally()))
+        .isEqualTo(ElasticsearchTestDataSet.NUM_DOCS);
+    pipeline.run();
+  }
+
+  @Test
+  public void testWriteVolume() throws Exception {
+    ElasticsearchIO.ConnectionConfiguration writeConnectionConfiguration =
+        ElasticsearchTestDataSet.getConnectionConfiguration(
+            options, ElasticsearchTestDataSet.ReadOrWrite.WRITE);
+    List<String> data =
+        ElasticSearchIOTestUtils.createDocuments(
+            ElasticsearchTestDataSet.NUM_DOCS,
+            ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+    pipeline
+        .apply(Create.of(data))
+        
.apply(ElasticsearchIO.write().withConnectionConfiguration(writeConnectionConfiguration));
+    pipeline.run();
+
+    long currentNumDocs =
+        ElasticSearchIOTestUtils.upgradeIndexAndGetCurrentNumDocs(
+            ElasticsearchTestDataSet.ES_INDEX, 
ElasticsearchTestDataSet.ES_TYPE, client);
+    assertEquals(ElasticsearchTestDataSet.NUM_DOCS, currentNumDocs);
+  }
+
+  @Test
+  public void testEstimatedSizesVolume() throws Exception {
+    ElasticsearchIO.Read read =
+        
ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
+    ElasticsearchIO.BoundedElasticsearchSource initialSource =
+        new ElasticsearchIO.BoundedElasticsearchSource(read, null);
+    // can't use equal assert as Elasticsearch indexes never have same size
+    // (due to internal Elasticsearch implementation)
+    long estimatedSize = initialSource.getEstimatedSizeBytes(options);
+    LOGGER.info("Estimated size: {}", estimatedSize);
+    assertThat(
+        "Wrong estimated size bellow minimum",
+        estimatedSize,
+        greaterThan(ElasticsearchTestDataSet.AVERAGE_DOC_SIZE * 
ElasticsearchTestDataSet.NUM_DOCS));
+    assertThat(
+        "Wrong estimated size beyond maximum",
+        estimatedSize,
+        greaterThan(ElasticsearchTestDataSet.MAX_DOC_SIZE * 
ElasticsearchTestDataSet.NUM_DOCS));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d40f474c/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
new file mode 100644
index 0000000..6ce89f1
--- /dev/null
+++ 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestDataSet.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import static java.net.InetAddress.getByName;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+
+/**
+ * Manipulates test data used by the {@link ElasticsearchIO}
+ * integration tests.
+ *
+ * <p>This is independent from the tests so that for read tests it can be run 
separately after data
+ * store creation rather than every time (which can be more fragile.)
+ */
+public class ElasticsearchTestDataSet {
+
+  public static final String ES_INDEX = "beam";
+  public static final String ES_TYPE = "test";
+  public static final long NUM_DOCS = 60000;
+  public static final int AVERAGE_DOC_SIZE = 25;
+  public static final int MAX_DOC_SIZE = 35;
+  private static String writeIndex = ES_INDEX + 
org.joda.time.Instant.now().getMillis();
+
+  /**
+   * Use this to create the index for reading before IT read tests.
+   *
+   * <p>To invoke this class, you can use this command line from elasticsearch 
io module directory:
+   *
+   * <pre>
+   * mvn test-compile exec:java \
+   * 
-Dexec.mainClass=org.apache.beam.sdk.io.elasticsearch.ElasticsearchTestDataSet \
+   *   -Dexec.args="--elasticsearchServer=1.2.3.4 \
+   *  --elasticsearchHttpPort=9200 \
+   *  --elasticsearchTcpPort=9300" \
+   *   -Dexec.classpathScope=test
+   *   </pre>
+   *
+   * @param args Please pass options from ElasticsearchTestOptions used for 
connection to
+   *     Elasticsearch as shown above.
+   */
+  public static void main(String[] args) throws Exception {
+    PipelineOptionsFactory.register(ElasticsearchTestOptions.class);
+    ElasticsearchTestOptions options =
+        
PipelineOptionsFactory.fromArgs(args).as(ElasticsearchTestOptions.class);
+
+    createAndPopulateIndex(getClient(options), ReadOrWrite.READ);
+  }
+
+  private static void createAndPopulateIndex(TransportClient client, 
ReadOrWrite rOw)
+      throws Exception {
+    // automatically creates the index and insert docs
+    ElasticSearchIOTestUtils.insertTestDocuments(
+        (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex, ES_TYPE, NUM_DOCS, 
client);
+  }
+
+  public static TransportClient getClient(ElasticsearchTestOptions options) 
throws Exception {
+    TransportClient client =
+        TransportClient.builder()
+            .build()
+            .addTransportAddress(
+                new InetSocketTransportAddress(
+                    getByName(options.getElasticsearchServer()),
+                    Integer.valueOf(options.getElasticsearchTcpPort())));
+    return client;
+  }
+
+  public static ElasticsearchIO.ConnectionConfiguration 
getConnectionConfiguration(
+      ElasticsearchTestOptions options, ReadOrWrite rOw) {
+    ElasticsearchIO.ConnectionConfiguration connectionConfiguration =
+        ElasticsearchIO.ConnectionConfiguration.create(
+            new String[] {
+              "http://";
+                  + options.getElasticsearchServer()
+                  + ":"
+                  + options.getElasticsearchHttpPort()
+            },
+            (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex,
+            ES_TYPE);
+    return connectionConfiguration;
+  };
+
+  public static void deleteIndex(TransportClient client, ReadOrWrite rOw) 
throws Exception {
+    ElasticSearchIOTestUtils.deleteIndex((rOw == ReadOrWrite.READ) ? ES_INDEX 
: writeIndex, client);
+  }
+
+  /** Enum that tells whether we use the index for reading or for writing. */
+  public enum ReadOrWrite {
+    READ,
+    WRITE
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d40f474c/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestOptions.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestOptions.java
 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestOptions.java
new file mode 100644
index 0000000..df7c797
--- /dev/null
+++ 
b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchTestOptions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.elasticsearch;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/**
+ * These options can be used by a test connecting to an Elasticsearch instance 
to configure their
+ * connection.
+ */
+
+public interface ElasticsearchTestOptions extends TestPipelineOptions {
+    @Description("Server name for Elasticsearch server (host name/ip address)")
+    @Default.String("elasticsearch-server-name")
+    String getElasticsearchServer();
+    void setElasticsearchServer(String value);
+
+
+    @Description("Http port for elasticsearch server")
+    @Default.Integer(9200)
+    Integer getElasticsearchHttpPort();
+    void setElasticsearchHttpPort(Integer value);
+
+    @Description("Tcp port for elasticsearch server")
+    @Default.Integer(9300)
+    Integer getElasticsearchTcpPort();
+    void setElasticsearchTcpPort(Integer value);
+
+}

Reply via email to