This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0830a02 Run python SpannerIO IT with python 3.7 only to avoid
overload spanner instance (#16112)
0830a02 is described below
commit 0830a02f8984383028a74ca9b89b9e3dc9f1597c
Author: Yichi Zhang <[email protected]>
AuthorDate: Fri Dec 3 09:20:20 2021 -0800
Run python SpannerIO IT with python 3.7 only to avoid overload spanner
instance (#16112)
---
build.gradle.kts | 3 ++
.../io/gcp/experimental/spannerio_read_it_test.py | 20 ++++++------
.../io/gcp/experimental/spannerio_write_it_test.py | 10 +++---
sdks/python/test-suites/dataflow/common.gradle | 24 ++++++++++++++
sdks/python/test-suites/direct/common.gradle | 20 ++++++++++++
sdks/python/test-suites/portable/common.gradle | 38 +++++++++++++++++++++-
6 files changed, 99 insertions(+), 16 deletions(-)
diff --git a/build.gradle.kts b/build.gradle.kts
index 37e0164..f5421ed 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -287,11 +287,14 @@ task("python36PostCommit") {
task("python37PostCommit") {
dependsOn(":sdks:python:test-suites:dataflow:py37:postCommitIT")
+ dependsOn(":sdks:python:test-suites:dataflow:py37:spannerioIT")
dependsOn(":sdks:python:test-suites:direct:py37:postCommitIT")
dependsOn(":sdks:python:test-suites:direct:py37:directRunnerIT")
dependsOn(":sdks:python:test-suites:direct:py37:hdfsIntegrationTest")
dependsOn(":sdks:python:test-suites:direct:py37:mongodbioIT")
+ dependsOn(":sdks:python:test-suites:direct:py37:spannerioIT")
dependsOn(":sdks:python:test-suites:portable:py37:postCommitPy37")
+ dependsOn(":sdks:python:test-suites:portable:py37:xlangSpannerIOIT")
}
task("python38PostCommit") {
diff --git
a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
index bce7a66..3be3e74 100644
--- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
@@ -105,7 +105,7 @@ class SpannerReadIntegrationTest(unittest.TestCase):
cls._add_dummy_entries()
_LOGGER.info("Spanner Read IT Setup Complete...")
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_read_via_table(self):
_LOGGER.info("Spanner Read via table")
with beam.Pipeline(argv=self.args) as p:
@@ -117,7 +117,7 @@ class SpannerReadIntegrationTest(unittest.TestCase):
columns=["UserId", "Key"])
assert_that(r, equal_to(self._data))
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_read_via_sql(self):
_LOGGER.info("Running Spanner via sql")
with beam.Pipeline(argv=self.args) as p:
@@ -128,7 +128,7 @@ class SpannerReadIntegrationTest(unittest.TestCase):
sql="select * from Users")
assert_that(r, equal_to(self._data))
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_transaction_table_metrics_ok_call(self):
if 'DirectRunner' not in self.runner_name:
raise unittest.SkipTest('This test only runs with DirectRunner.')
@@ -151,7 +151,7 @@ class SpannerReadIntegrationTest(unittest.TestCase):
self.verify_table_read_call_metric(
self.project, self.TEST_DATABASE, 'Users', 'ok', 1)
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_transaction_table_metrics_error_call(self):
if 'DirectRunner' not in self.runner_name:
raise unittest.SkipTest('This test only runs with DirectRunner.')
@@ -177,7 +177,7 @@ class SpannerReadIntegrationTest(unittest.TestCase):
self.verify_table_read_call_metric(
self.project, self.TEST_DATABASE, 'INVALID_TABLE', '404', 1)
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_transaction_sql_metrics_ok_call(self):
if 'DirectRunner' not in self.runner_name:
raise unittest.SkipTest('This test only runs with DirectRunner.')
@@ -200,7 +200,7 @@ class SpannerReadIntegrationTest(unittest.TestCase):
self.verify_sql_read_call_metric(
self.project, self.TEST_DATABASE, 'query-1', 'ok', 1)
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_transaction_sql_metrics_error_call(self):
if 'DirectRunner' not in self.runner_name:
raise unittest.SkipTest('This test only runs with DirectRunner.')
@@ -226,7 +226,7 @@ class SpannerReadIntegrationTest(unittest.TestCase):
self.verify_sql_read_call_metric(
self.project, self.TEST_DATABASE, 'query-2', '400', 1)
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_table_metrics_ok_call(self):
if 'DirectRunner' not in self.runner_name:
raise unittest.SkipTest('This test only runs with DirectRunner.')
@@ -245,7 +245,7 @@ class SpannerReadIntegrationTest(unittest.TestCase):
self.verify_table_read_call_metric(
self.project, self.TEST_DATABASE, 'Users', 'ok', 1)
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_table_metrics_error_call(self):
if 'DirectRunner' not in self.runner_name:
raise unittest.SkipTest('This test only runs with DirectRunner.')
@@ -267,7 +267,7 @@ class SpannerReadIntegrationTest(unittest.TestCase):
self.verify_table_read_call_metric(
self.project, self.TEST_DATABASE, 'INVALID_TABLE', '404', 1)
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_sql_metrics_ok_call(self):
if 'DirectRunner' not in self.runner_name:
raise unittest.SkipTest('This test only runs with DirectRunner.')
@@ -286,7 +286,7 @@ class SpannerReadIntegrationTest(unittest.TestCase):
self.verify_sql_read_call_metric(
self.project, self.TEST_DATABASE, 'query-1', 'ok', 1)
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_sql_metrics_error_call(self):
if 'DirectRunner' not in self.runner_name:
raise unittest.SkipTest('This test only runs with DirectRunner.')
diff --git
a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
index e2f46d2..7172e97 100644
--- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
@@ -112,7 +112,7 @@ class SpannerWriteIntegrationTest(unittest.TestCase):
cls._create_database()
_LOGGER.info('Spanner Write IT Setup Complete...')
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_write_batches(self):
_prefex = 'test_write_batches'
mutations = [
@@ -138,7 +138,7 @@ class SpannerWriteIntegrationTest(unittest.TestCase):
res.wait_until_finish()
self.assertEqual(self._count_data(_prefex), len(mutations))
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_spanner_update(self):
_prefex = 'test_update'
@@ -174,7 +174,7 @@ class SpannerWriteIntegrationTest(unittest.TestCase):
res.wait_until_finish()
self.assertEqual(self._count_data(_prefex), 2)
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_spanner_error(self):
mutations_update = [
WriteMutation.update(
@@ -190,7 +190,7 @@ class SpannerWriteIntegrationTest(unittest.TestCase):
database_id=self.TEST_DATABASE))
p.run()
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_metrics_ok_call(self):
if 'DirectRunner' not in self.runner_name:
raise unittest.SkipTest('This test only runs with DirectRunner.')
@@ -219,7 +219,7 @@ class SpannerWriteIntegrationTest(unittest.TestCase):
self.verify_write_call_metric(
self.project, self.TEST_DATABASE, 'Albums', 'ok', 1)
- @pytest.mark.it_postcommit
+ @pytest.mark.spannerio_it
def test_metrics_error_call(self):
if 'DirectRunner' not in self.runner_name:
raise unittest.SkipTest('This test only runs with DirectRunner.')
diff --git a/sdks/python/test-suites/dataflow/common.gradle
b/sdks/python/test-suites/dataflow/common.gradle
index 79481dd..66929ce 100644
--- a/sdks/python/test-suites/dataflow/common.gradle
+++ b/sdks/python/test-suites/dataflow/common.gradle
@@ -124,6 +124,30 @@ task postCommitIT {
}
}
+task spannerioIT {
+ dependsOn 'installGcpTest'
+ dependsOn ':sdks:python:sdist'
+ dependsOn ':runners:google-cloud-dataflow-java:worker:shadowJar'
+
+ def dataflowWorkerJar =
project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
+
+ doLast {
+ def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"]
+ def argMap = [
+ "test_opts": testOpts,
+ "sdk_location": files(configurations.distTarBall.files).singleFile,
+ "worker_jar": dataflowWorkerJar,
+ "suite": "postCommitIT-df${pythonVersionSuffix}",
+ "collect": "spannerio_it"
+ ]
+ def cmdArgs = mapToArgString(argMap)
+ exec {
+ executable 'sh'
+ args '-c', ". ${envdir}/bin/activate &&
${runScriptsDir}/run_integration_test.sh $cmdArgs"
+ }
+ }
+}
+
task examples {
dependsOn 'installGcpTest'
dependsOn ':sdks:python:sdist'
diff --git a/sdks/python/test-suites/direct/common.gradle
b/sdks/python/test-suites/direct/common.gradle
index 525c7f1..3ef5451 100644
--- a/sdks/python/test-suites/direct/common.gradle
+++ b/sdks/python/test-suites/direct/common.gradle
@@ -41,6 +41,26 @@ task postCommitIT {
"apache_beam/io/gcp/bigquery_read_it_test.py",
"apache_beam/io/gcp/bigquery_write_it_test.py",
"apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py",
+ ]
+ def testOpts = basicTestOpts + ["${batchTests.join(' ')}"]
+ def argMap = ["runner": "TestDirectRunner",
+ "test_opts": testOpts,
+ "suite": "postCommitIT-direct-py${pythonVersionSuffix}",
+ ]
+ def batchCmdArgs = mapToArgString(argMap)
+ exec {
+ executable 'sh'
+ args '-c', ". ${envdir}/bin/activate &&
${runScriptsDir}/run_integration_test.sh $batchCmdArgs"
+ }
+ }
+}
+
+task spannerioIT {
+ dependsOn 'installGcpTest'
+
+ // Run Spanner IO IT tests with TestDirectRunner in batch in Python 3.
+ doLast {
+ def batchTests = [
"apache_beam/io/gcp/experimental/spannerio_read_it_test.py",
"apache_beam/io/gcp/experimental/spannerio_write_it_test.py",
]
diff --git a/sdks/python/test-suites/portable/common.gradle
b/sdks/python/test-suites/portable/common.gradle
index 6c15a04..be9feaf 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -266,7 +266,6 @@ project.task("postCommitPy${pythonVersionSuffix}IT") {
"apache_beam/io/external/xlang_jdbcio_it_test.py",
"apache_beam/io/external/xlang_kafkaio_it_test.py",
"apache_beam/io/external/xlang_kinesisio_it_test.py",
- "apache_beam/io/gcp/tests/xlang_spannerio_it_test.py",
"apache_beam/io/external/xlang_debeziumio_it_test.py",
]
def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"]
@@ -291,6 +290,43 @@ project.task("postCommitPy${pythonVersionSuffix}IT") {
}
}
+project.task("xlangSpannerIOIT") {
+ dependsOn = [
+ 'setupVirtualenv',
+ 'installGcpTest',
+ ":runners:flink:${latestFlinkVersion}:job-server:shadowJar",
+ ':sdks:java:container:java8:docker',
+ ':sdks:java:io:expansion-service:shadowJar',
+ ':sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
+ ':sdks:java:io:kinesis:expansion-service:shadowJar',
+ ':sdks:java:extensions:schemaio-expansion-service:shadowJar',
+ ':sdks:java:io:debezium:expansion-service:shadowJar'
+ ]
+
+ doLast {
+ def tests = [
+ "apache_beam/io/gcp/tests/xlang_spannerio_it_test.py",
+ ]
+ def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"]
+ def pipelineOpts = [
+ "--runner=FlinkRunner",
+ "--project=apache-beam-testing",
+ "--environment_type=LOOPBACK",
+ "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
+
"--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}",
+ ]
+ def cmdArgs = mapToArgString([
+ "test_opts": testOpts,
+ "suite": "postCommitIT-flink-py${pythonVersionSuffix}",
+ "pipeline_opts": pipelineOpts.join(" "),
+ ])
+ exec {
+ executable 'sh'
+ args '-c', ". ${envdir}/bin/activate &&
${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs"
+ }
+ }
+}
+
def addTestJavaJarCreator(String runner, Task jobServerJarTask) {
project.tasks.create(name: "testJavaJarCreator${runner}") {
dependsOn jobServerJarTask