This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ecf9ee6 [BEAM-3213] MongodbIO performance test (#4859)
ecf9ee6 is described below
commit ecf9ee6ac336398530954456fcbd6b9baa0239a1
Author: Ćukasz Gajowy <[email protected]>
AuthorDate: Mon Apr 2 07:57:00 2018 +0200
[BEAM-3213] MongodbIO performance test (#4859)
* [BEAM-3213] Add MongoDBIOIT
* [BEAM-3213] Add perfkit support for mongodb. Enable dataflow runner
* [BEAM-3213] Add hashes for larger datasets
* [BEAM-3213] Add jenkins job for MongoDBIOIT
* [BEAM-3213] Refactor: remove test code duplication
---
.../job_beam_PerformanceTests_MongoDBIO_IT.groovy | 66 +++++++
.../kubernetes/mongodb/load-balancer/mongo.yml | 49 +++++
.../mongodb/load-balancer/pkb-config.yml | 32 ++++
.test-infra/kubernetes/mongodb/node-port/mongo.yml | 50 +++++
.../kubernetes/mongodb/node-port/pkb-config.yml | 30 +++
.../org/apache/beam/sdk/io/common/IOITHelper.java | 39 ++++
.../beam/sdk/io/common/IOTestPipelineOptions.java | 19 ++
.../org/apache/beam/sdk/io/common/TestRow.java | 8 +-
.../beam/sdk/io/common/FileBasedIOITHelper.java | 13 +-
.../java/org/apache/beam/sdk/io/xml/XmlIOIT.java | 2 +-
sdks/java/io/mongodb/build.gradle | 11 ++
sdks/java/io/mongodb/pom.xml | 206 ++++++++++++++++++++-
.../apache/beam/sdk/io/mongodb/MongoDBIOIT.java | 143 ++++++++++++++
13 files changed, 651 insertions(+), 17 deletions(-)
diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy
b/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy
new file mode 100644
index 0000000..1cd469c
--- /dev/null
+++ b/.test-infra/jenkins/job_beam_PerformanceTests_MongoDBIO_IT.groovy
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import common_job_properties
+
+job('beam_PerformanceTests_MongoDBIO_IT') {
+ // Set default Beam job properties.
+ common_job_properties.setTopLevelMainJobProperties(delegate)
+
+ // Run job in postcommit every 6 hours, don't trigger every push, and
+ // don't email individual committers.
+ common_job_properties.setPostCommit(
+ delegate,
+ '0 */6 * * *',
+ false,
+ '[email protected]',
+ false)
+
+ common_job_properties.enablePhraseTriggeringFromPullRequest(
+ delegate,
+ 'Java MongoDBIO Performance Test',
+ 'Run Java MongoDBIO Performance Test')
+
+ def pipelineOptions = [
+ tempRoot : 'gs://temp-storage-for-perf-tests',
+ project : 'apache-beam-testing',
+ numberOfRecords: '10000000'
+ ]
+
+ String namespace =
common_job_properties.getKubernetesNamespace('mongodbioit')
+ String kubeconfig =
common_job_properties.getKubeconfigLocationForNamespace(namespace)
+
+ def testArgs = [
+ kubeconfig : kubeconfig,
+ beam_it_timeout : '1800',
+ benchmarks : 'beam_integration_benchmark',
+ beam_it_profile : 'io-it',
+ beam_prebuilt : 'true',
+ beam_sdk : 'java',
+ beam_it_module : 'sdks/java/io/mongodb',
+ beam_it_class :
'org.apache.beam.sdk.io.mongodb.MongoDBIOIT',
+ beam_it_options :
common_job_properties.joinPipelineOptions(pipelineOptions),
+ beam_kubernetes_scripts :
common_job_properties.makePathAbsolute('src/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml'),
+ beam_options_config_file:
common_job_properties.makePathAbsolute('src/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml'),
+ bigquery_table :
'beam_performance.mongodbioit_pkb_results'
+ ]
+
+ common_job_properties.setupKubernetes(delegate, namespace, kubeconfig)
+ common_job_properties.buildPerformanceTest(delegate, testArgs)
+ common_job_properties.cleanupKubernetes(delegate, namespace, kubeconfig)
+}
diff --git a/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml
b/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml
new file mode 100644
index 0000000..70e1965
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: v1
+kind: Service
+metadata:
+ name: mongo-load-balancer-service
+ labels:
+ name: mongo
+spec:
+ ports:
+ - port: 27017
+ selector:
+ name: mongo
+ type: LoadBalancer
+
+---
+
+apiVersion: v1
+kind: ReplicationController
+metadata:
+ name: mongo
+spec:
+ replicas: 1
+ selector:
+ name: mongo
+ template:
+ metadata:
+ name: mongo
+ labels:
+ name: mongo
+ spec:
+ containers:
+ - name: mongo
+ image: mongo
+ ports:
+ - containerPort: 27017
diff --git a/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml
b/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml
new file mode 100644
index 0000000..299de0d
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is a pkb benchmark configuration file, used when running the IO ITs
+# that use this data store. It allows users to run tests when they are on a
+# separate network from the kubernetes cluster by reading the mongo IP
+# address from the LoadBalancer service.
+#
+# This file defines pipeline options to pass to beam, as well as how to derive
+# the values for those pipeline options from kubernetes (where appropriate.)
+
+static_pipeline_options:
+ - mongoDBDatabaseName: beam
+ - mongoDBPort: 27017
+dynamic_pipeline_options:
+ - name: mongoDBHostName
+ type: LoadBalancerIp
+ serviceName: mongo-load-balancer-service
diff --git a/.test-infra/kubernetes/mongodb/node-port/mongo.yml
b/.test-infra/kubernetes/mongodb/node-port/mongo.yml
new file mode 100644
index 0000000..d640687
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/node-port/mongo.yml
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: v1
+kind: Service
+metadata:
+ name: mongo-node-port-service
+ labels:
+ name: mongo
+spec:
+ ports:
+ - port: 27017
+ nodePort: 31235
+ selector:
+ name: mongo
+ type: NodePort
+
+---
+
+apiVersion: v1
+kind: ReplicationController
+metadata:
+ name: mongo
+spec:
+ replicas: 1
+ selector:
+ name: mongo
+ template:
+ metadata:
+ name: mongo
+ labels:
+ name: mongo
+ spec:
+ containers:
+ - name: mongo
+ image: mongo
+ ports:
+ - containerPort: 27017
diff --git a/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml
b/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml
new file mode 100644
index 0000000..1436b75
--- /dev/null
+++ b/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file is a pkb benchmark configuration file, used when running the IO ITs
+# that use this data store.
+#
+# This file defines pipeline options to pass to beam, as well as how to derive
+# the values for those pipeline options from kubernetes (where appropriate.)
+
+static_pipeline_options:
+ - mongoDBDatabaseName: beam
+ - mongoDBPort: 27017
+dynamic_pipeline_options:
+ - name: mongoDBHostName
+ type: NodePortIp
+ podLabel: name=mongo
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
new file mode 100644
index 0000000..819bc9a
--- /dev/null
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOITHelper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import java.util.Map;
+
+/**
+ * Methods common to all types of IOITs.
+ */
+public class IOITHelper {
+
+ private IOITHelper() {
+ }
+
+ public static String getHashForRecordCount(int recordCount, Map<Integer,
String> hashes) {
+ String hash = hashes.get(recordCount);
+ if (hash == null) {
+ throw new UnsupportedOperationException(
+ String.format("No hash for that record count: %s", recordCount)
+ );
+ }
+ return hash;
+ }
+}
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
index 89b7ae8..1260167 100644
---
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/IOTestPipelineOptions.java
@@ -114,4 +114,23 @@ public interface IOTestPipelineOptions extends
TestPipelineOptions {
String getCharset();
void setCharset(String charset);
+
+ /* MongoDB */
+ @Description("MongoDB host (host name/ip address)")
+ @Default.String("mongodb-host")
+ String getMongoDBHostName();
+
+ void setMongoDBHostName(String host);
+
+ @Description("Port for MongoDB")
+ @Default.Integer(27017)
+ Integer getMongoDBPort();
+
+ void setMongoDBPort(Integer port);
+
+ @Description("Mongo database name")
+ @Default.String("beam")
+ String getMongoDBDatabaseName();
+
+ void setMongoDBDatabaseName(String name);
}
diff --git
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
index e6bc7e8..4465456 100644
---
a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
+++
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/TestRow.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.common;
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
@@ -108,10 +110,6 @@ public abstract class TestRow implements Serializable,
Comparable<TestRow> {
*/
public static String getExpectedHashForRowCount(int rowCount)
throws UnsupportedOperationException {
- String hash = EXPECTED_HASHES.get(rowCount);
- if (hash == null) {
- throw new UnsupportedOperationException("No hash for that row count");
- }
- return hash;
+ return getHashForRecordCount(rowCount, EXPECTED_HASHES);
}
}
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
index bbf707e..5b7ff38 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/common/FileBasedIOITHelper.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.common;
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import java.io.IOException;
@@ -33,6 +35,7 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
+
/**
* Contains helper methods for file based IO Integration tests.
*/
@@ -64,16 +67,6 @@ public class FileBasedIOITHelper {
return getHashForRecordCount(lineCount, expectedHashes);
}
- public static String getHashForRecordCount(int recordCount, Map<Integer,
String> hashes) {
- String hash = hashes.get(recordCount);
- if (hash == null) {
- throw new UnsupportedOperationException(
- String.format("No hash for that record count: %s", recordCount)
- );
- }
- return hash;
- }
-
/**
* Constructs text lines in files used for testing.
*/
diff --git
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
index 7176d7f..3ea27aa 100644
---
a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
+++
b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.io.xml;
import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
-import static
org.apache.beam.sdk.io.common.FileBasedIOITHelper.getHashForRecordCount;
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
diff --git a/sdks/java/io/mongodb/build.gradle
b/sdks/java/io/mongodb/build.gradle
index 9223780..6e82f35 100644
--- a/sdks/java/io/mongodb/build.gradle
+++ b/sdks/java/io/mongodb/build.gradle
@@ -21,6 +21,15 @@ applyJavaNature(artifactId: "beam-sdks-java-io-mongodb")
description = "Apache Beam :: SDKs :: Java :: IO :: MongoDB"
+/*
+ * We need to rely on manually specifying these evaluationDependsOn to ensure
that
+ * the following projects are evaluated before we evaluate this project. This
is because
+ * we are attempting to reference the "sourceSets.test.output" directly.
+ * TODO: Swap to generating test artifacts which we can then rely on instead of
+ * the test outputs directly.
+ */
+evaluationDependsOn(":sdks:java:io:common")
+
dependencies {
compile library.java.guava
shadow project(path: ":sdks:java:core", configuration: "shadow")
@@ -32,6 +41,8 @@ dependencies {
testCompile library.java.junit
testCompile library.java.slf4j_jdk14
testCompile library.java.hamcrest_core
+ testCompile project(path: ":sdks:java:io:common", configuration: "shadow")
+ testCompile project(":sdks:java:io:common").sourceSets.test.output
testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:1.50.1"
testCompile "de.flapdoodle.embed:de.flapdoodle.embed.process:1.50.1"
}
diff --git a/sdks/java/io/mongodb/pom.xml b/sdks/java/io/mongodb/pom.xml
index ad811b5..2f237cb 100644
--- a/sdks/java/io/mongodb/pom.xml
+++ b/sdks/java/io/mongodb/pom.xml
@@ -34,6 +34,199 @@
<mongo-java-driver.version>3.2.2</mongo-java-driver.version>
</properties>
+
+ <profiles>
+ <!--
+ This profile invokes PerfKitBenchmarker, which does benchmarking of
+ the IO ITs. The arguments passed to it allow it to invoke mvn again
+ with the desired benchmark.
+
+ To invoke this, run:
+
+ mvn verify -Dio-it-suite -pl sdks/java/io/mongodb
+ -DpkbLocation="path-to-pkb.py" \
+ -DintegrationTestPipelineOptions='["-numberOfRecords=1000"]'
+ -->
+ <profile>
+ <id>io-it-suite</id>
+ <activation>
+ <property><name>io-it-suite</name></property>
+ </activation>
+ <properties>
+ <!-- This is based on the location of the current pom relative to the
root
+ See discussion in BEAM-2460 -->
+
<beamRootProjectDir>${project.parent.parent.parent.parent.basedir}</beamRootProjectDir>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.gmaven</groupId>
+ <artifactId>groovy-maven-plugin</artifactId>
+ <version>${groovy-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>find-supported-python-for-compile</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>execute</goal>
+ </goals>
+ <configuration>
+
<source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>${maven-exec-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>${python.interpreter.bin}</executable>
+ <arguments>
+ <argument>${pkbLocation}</argument>
+ <argument>-beam_it_timeout=1800</argument>
+ <argument>-benchmarks=beam_integration_benchmark</argument>
+ <argument>-beam_it_profile=io-it</argument>
+ <argument>-beam_location=${beamRootProjectDir}</argument>
+ <argument>-beam_prebuilt=true</argument>
+ <argument>-beam_sdk=java</argument>
+ <argument>-kubeconfig=${kubeconfig}</argument>
+ <argument>-kubectl=${kubectl}</argument>
+ <!-- runner overrides, controlled via forceDirectRunner -->
+ <argument>${pkbBeamRunnerProfile}</argument>
+ <argument>${pkbBeamRunnerOption}</argument>
+ <!-- specific to this IO -->
+
<argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/node-port/pkb-config.yml</argument>
+
<argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/node-port/mongo.yml</argument>
+ <argument>-beam_it_module=sdks/java/io/mongodb</argument>
+
<argument>-beam_it_class=org.apache.beam.sdk.io.mongodb.MongoDBIOIT</argument>
+ <!-- arguments typically defined by user -->
+
<argument>-beam_it_options=${integrationTestPipelineOptions}</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <!--
+ io-it-suite-local overrides part of io-it-suite, allowing users to run
tests
+ when they are on a separate network from the kubernetes cluster by
+ creating a LoadBalancer service.
+ -->
+ <profile>
+ <id>io-it-suite-local</id>
+
<activation><property><name>io-it-suite-local</name></property></activation>
+ <properties>
+ <!-- This is based on the location of the current pom relative to the
root
+ See discussion in BEAM-2460 -->
+
<beamRootProjectDir>${project.parent.parent.parent.parent.basedir}</beamRootProjectDir>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.gmaven</groupId>
+ <artifactId>groovy-maven-plugin</artifactId>
+ <version>${groovy-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>find-supported-python-for-compile</id>
+ <phase>initialize</phase>
+ <goals>
+ <goal>execute</goal>
+ </goals>
+ <configuration>
+
<source>${beamRootProjectDir}/sdks/python/findSupportedPython.groovy</source>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>${maven-exec-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>verify</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>${python.interpreter.bin}</executable>
+ <arguments>
+ <argument>${pkbLocation}</argument>
+ <argument>-beam_it_timeout=1800</argument>
+ <argument>-benchmarks=beam_integration_benchmark</argument>
+ <argument>-beam_it_profile=io-it</argument>
+ <argument>-beam_location=${beamRootProjectDir}</argument>
+ <argument>-beam_prebuilt=true</argument>
+ <argument>-beam_sdk=java</argument>
+ <argument>-kubeconfig=${kubeconfig}</argument>
+ <argument>-kubectl=${kubectl}</argument>
+ <!-- runner overrides, controlled via forceDirectRunner -->
+ <argument>${pkbBeamRunnerProfile}</argument>
+ <argument>${pkbBeamRunnerOption}</argument>
+ <!-- specific to this IO -->
+
<argument>-beam_options_config_file=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/load-balancer/pkb-config.yml</argument>
+
<argument>-beam_kubernetes_scripts=${beamRootProjectDir}/.test-infra/kubernetes/mongodb/load-balancer/mongo.yml</argument>
+ <argument>-beam_it_module=sdks/java/io/mongodb</argument>
+
<argument>-beam_it_class=org.apache.beam.sdk.io.mongodb.MongoDBIOIT</argument>
+ <!-- arguments typically defined by user -->
+
<argument>-beam_it_options=${integrationTestPipelineOptions}</argument>
+ </arguments>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <!-- Include the Google Cloud Dataflow runner activated by
-DintegrationTestRunner=dataflow -->
+ <profile>
+ <id>dataflow-runner</id>
+ <activation>
+ <property>
+ <name>integrationTestRunner</name>
+ <value>dataflow</value>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
@@ -108,7 +301,18 @@
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
- </dependency>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-common</artifactId>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-common</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
new file mode 100644
index 0000000..4f425a3
--- /dev/null
+++
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+
+import com.google.common.collect.ImmutableMap;
+import com.mongodb.MongoClient;
+import java.util.Date;
+import java.util.Map;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.common.HashingFn;
+import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.bson.Document;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+
+/**
+ * A test of {@link org.apache.beam.sdk.io.mongodb.MongoDbIO} on an
independent Mongo instance.
+ *
+ * <p>This test requires a running instance of MongoDB. Pass in connection
information using
+ * PipelineOptions:
+ * <pre>
+ * mvn -e -Pio-it verify -pl sdks/java/io/mongodb
-DintegrationTestPipelineOptions='[
+ * "--mongoDBHostName=1.2.3.4",
+ * "--mongoDBPort=27017",
+ * "--mongoDBDatabaseName=mypass",
+ * "--numberOfRecords=1000" ]'
+ * </pre>
+ *
+ */
+@RunWith(JUnit4.class)
+public class MongoDBIOIT {
+
+ private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(
+ 1000, "75a0d5803418444e76ae5b421662764c",
+ 100_000, "3bc762dc1c291904e3c7f577774c6276",
+ 10_000_000, "e5e0503902018c83e8c8977ef437feba"
+ );
+
+ private static int numberOfRecords;
+
+ private static String host;
+
+ private static Integer port;
+
+ private static String database;
+
+ private static String collection;
+
+ @Rule
+ public final TestPipeline writePipeline = TestPipeline.create();
+
+ @Rule
+ public final TestPipeline readPipeline = TestPipeline.create();
+
+ @BeforeClass
+ public static void setUp() {
+ PipelineOptionsFactory.register(IOTestPipelineOptions.class);
+ IOTestPipelineOptions options = TestPipeline.testingPipelineOptions()
+ .as(IOTestPipelineOptions.class);
+
+ numberOfRecords = options.getNumberOfRecords();
+ host = options.getMongoDBHostName();
+ port = options.getMongoDBPort();
+ database = options.getMongoDBDatabaseName();
+ collection = String.format("test_%s", new Date().getTime());
+ }
+
+ @After
+ public void tearDown() {
+ new MongoClient(host).getDatabase(database).drop();
+ }
+
+ @Test
+ public void testWriteAndRead() {
+ String mongoUrl = String.format("mongodb://%s:%s", host, port);
+
+ writePipeline
+ .apply("Generate sequence", GenerateSequence.from(0).to(numberOfRecords))
+ .apply("Produce documents", MapElements.via(new LongToDocumentFn()))
+ .apply("Write documents to MongoDB", MongoDbIO.write()
+ .withUri(mongoUrl)
+ .withDatabase(database)
+ .withCollection(collection));
+
+ writePipeline.run().waitUntilFinish();
+
+ PCollection<String> consolidatedHashcode = readPipeline
+ .apply("Read all documents", MongoDbIO.read()
+ .withUri(mongoUrl)
+ .withDatabase(database)
+ .withCollection(collection))
+ .apply("Map documents to Strings", MapElements.via(new
DocumentToStringFn()))
+ .apply("Calculate hashcode", Combine.globally(new HashingFn()));
+
+ String expectedHash = getHashForRecordCount(numberOfRecords,
EXPECTED_HASHES);
+ PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
+
+ readPipeline.run().waitUntilFinish();
+ }
+
+ private static class LongToDocumentFn extends SimpleFunction<Long, Document>
{
+ @Override
+ public Document apply(Long input) {
+ return Document.parse(String.format("{\"scientist\":\"Test %s\"}",
input));
+ }
+ }
+
+ private static class DocumentToStringFn extends SimpleFunction<Document,
String> {
+ @Override
+ public String apply(Document input) {
+ return input.getString("scientist");
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
[email protected].