This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e0d8b58f397 new file for bq xlang tests (#26046)
e0d8b58f397 is described below
commit e0d8b58f397d735177bea3ba1f9eb98ccc5acbbf
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Thu Mar 30 23:48:32 2023 +0300
new file for bq xlang tests (#26046)
---
.../io/external/xlang_bigqueryio_it_test.py | 239 +++++++++++++++++++++
.../apache_beam/io/gcp/bigquery_write_it_test.py | 190 ----------------
2 files changed, 239 insertions(+), 190 deletions(-)
diff --git a/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
new file mode 100644
index 00000000000..48f4ab02b0c
--- /dev/null
+++ b/sdks/python/apache_beam/io/external/xlang_bigqueryio_it_test.py
@@ -0,0 +1,239 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for cross-language BigQuery sources and sinks."""
+# pytype: skip-file
+
+import datetime
+import logging
+import os
+import secrets
+import time
+import unittest
+from decimal import Decimal
+
+import pytest
+from hamcrest.core import assert_that as hamcrest_assert
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
+from apache_beam.io.gcp.internal.clients import bigquery
+from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultMatcher
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.utils.timestamp import Timestamp
+
+# Protect against environments where bigquery library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+
+try:
+ from apitools.base.py.exceptions import HttpError
+except ImportError:
+ HttpError = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+_LOGGER = logging.getLogger(__name__)
+
+
[email protected]_gcp_java_expansion_service
[email protected](
+ os.environ.get('EXPANSION_PORT'),
+ "EXPANSION_PORT environment var is not provided.")
+class BigQueryXlangStorageWriteIT(unittest.TestCase):
+ BIGQUERY_DATASET = 'python_xlang_storage_write'
+
+ ELEMENTS = [
+ # (int, float, numeric, string, bool, bytes, timestamp)
+ {
+ "int": 1,
+ "float": 0.1,
+ "numeric": Decimal("1.11"),
+ "str": "a",
+ "bool": True,
+ "bytes": b'a',
+ "timestamp": Timestamp(1000, 100)
+ },
+ {
+ "int": 2,
+ "float": 0.2,
+ "numeric": Decimal("2.22"),
+ "str": "b",
+ "bool": False,
+ "bytes": b'b',
+ "timestamp": Timestamp(2000, 200)
+ },
+ {
+ "int": 3,
+ "float": 0.3,
+ "numeric": Decimal("3.33"),
+ "str": "c",
+ "bool": True,
+ "bytes": b'd',
+ "timestamp": Timestamp(3000, 300)
+ },
+ {
+ "int": 4,
+ "float": 0.4,
+ "numeric": Decimal("4.44"),
+ "str": "d",
+ "bool": False,
+ "bytes": b'd',
+ "timestamp": Timestamp(4000, 400)
+ }
+ ]
+
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.args = self.test_pipeline.get_full_options_as_args()
+ self.project = self.test_pipeline.get_option('project')
+
+ self.bigquery_client = BigQueryWrapper()
+ self.dataset_id = '%s%s%s' % (
+ self.BIGQUERY_DATASET, str(int(time.time())), secrets.token_hex(3))
+ self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
+ _LOGGER.info(
+ "Created dataset %s in project %s", self.dataset_id, self.project)
+
+ _LOGGER.info("expansion port: %s", os.environ.get('EXPANSION_PORT'))
+ self.expansion_service = ('localhost:%s' %
os.environ.get('EXPANSION_PORT'))
+
+ def tearDown(self):
+ request = bigquery.BigqueryDatasetsDeleteRequest(
+ projectId=self.project, datasetId=self.dataset_id, deleteContents=True)
+ try:
+ _LOGGER.info(
+ "Deleting dataset %s in project %s", self.dataset_id, self.project)
+ self.bigquery_client.client.datasets.Delete(request)
+ except HttpError:
+ _LOGGER.debug(
+ 'Failed to clean up dataset %s in project %s',
+ self.dataset_id,
+ self.project)
+
+ def parse_expected_data(self, expected_elements):
+ data = []
+ for row in expected_elements:
+ values = list(row.values())
+ for i, val in enumerate(values):
+ if isinstance(val, Timestamp):
+ # BigQuery matcher query returns a datetime.datetime object
+ values[i] = val.to_utc_datetime().replace(
+ tzinfo=datetime.timezone.utc)
+ data.append(tuple(values))
+
+ return data
+
+ def storage_write_test(self, table_name, items, schema):
+ table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name)
+
+ bq_matcher = BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT * FROM %s" % '{}.{}'.format(self.dataset_id, table_name),
+ data=self.parse_expected_data(items))
+
+ with beam.Pipeline(argv=self.args) as p:
+ _ = (
+ p
+ | beam.Create(items)
+ | beam.io.WriteToBigQuery(
+ table=table_id,
+ method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
+ schema=schema,
+ expansion_service=self.expansion_service))
+ hamcrest_assert(p, bq_matcher)
+
+ def test_storage_write_all_types(self):
+ table_name = "python_storage_write_all_types"
+ schema = (
+ "int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING,"
+ "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP")
+ self.storage_write_test(table_name, self.ELEMENTS, schema)
+
+ def test_storage_write_nested_records_and_lists(self):
+ table_name = "python_storage_write_nested_records_and_lists"
+ schema = {
+ "fields": [{
+ "name": "repeated_int", "type": "INTEGER", "mode": "REPEATED"
+ },
+ {
+ "name": "struct",
+ "type": "STRUCT",
+ "fields": [{
+ "name": "nested_int", "type": "INTEGER"
+ }, {
+ "name": "nested_str", "type": "STRING"
+ }]
+ },
+ {
+ "name": "repeated_struct",
+ "type": "STRUCT",
+ "mode": "REPEATED",
+ "fields": [{
+ "name": "nested_numeric", "type": "NUMERIC"
+ }, {
+ "name": "nested_bytes", "type": "BYTES"
+ }]
+ }]
+ }
+ items = [{
+ "repeated_int": [1, 2, 3],
+ "struct": {
+ "nested_int": 1, "nested_str": "a"
+ },
+ "repeated_struct": [{
+ "nested_numeric": Decimal("1.23"), "nested_bytes": b'a'
+ },
+ {
+ "nested_numeric": Decimal("3.21"),
+ "nested_bytes": b'aa'
+ }]
+ }]
+
+ self.storage_write_test(table_name, items, schema)
+
+ def test_storage_write_beam_rows(self):
+ table_id = '{}:{}.python_xlang_storage_write_beam_rows'.format(
+ self.project, self.dataset_id)
+
+ row_elements = [
+ beam.Row(
+ my_int=e['int'],
+ my_float=e['float'],
+ my_numeric=e['numeric'],
+ my_string=e['str'],
+ my_bool=e['bool'],
+ my_bytes=e['bytes'],
+ my_timestamp=e['timestamp']) for e in self.ELEMENTS
+ ]
+
+ bq_matcher = BigqueryFullResultMatcher(
+ project=self.project,
+ query="SELECT * FROM %s" %
+ '{}.python_xlang_storage_write_beam_rows'.format(self.dataset_id),
+ data=self.parse_expected_data(self.ELEMENTS))
+
+ with beam.Pipeline(argv=self.args) as p:
+ _ = (
+ p
+ | beam.Create(row_elements)
+ | beam.io.StorageWriteToBigQuery(
+ table=table_id, expansion_service=self.expansion_service))
+ hamcrest_assert(p, bq_matcher)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index 3effe945355..a307e06ac5b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -23,7 +23,6 @@
import base64
import datetime
import logging
-import os
import secrets
import time
import unittest
@@ -33,7 +32,6 @@ import hamcrest as hc
import mock
import pytest
import pytz
-from hamcrest.core import assert_that as hamcrest_assert
from parameterized import param
from parameterized import parameterized
@@ -46,7 +44,6 @@ from apache_beam.io.gcp.tests.bigquery_matcher import
BigqueryFullResultMatcher
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
-from apache_beam.utils.timestamp import Timestamp
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
@@ -546,193 +543,6 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
temp_file_format=file_format))
-class BigQueryXlangStorageWriteIT(unittest.TestCase):
- BIGQUERY_DATASET = 'python_xlang_storage_write'
-
- ELEMENTS = [
- # (int, float, numeric, string, bool, bytes, timestamp)
- {
- "int": 1,
- "float": 0.1,
- "numeric": Decimal("1.11"),
- "str": "a",
- "bool": True,
- "bytes": b'a',
- "timestamp": Timestamp(1000, 100)
- },
- {
- "int": 2,
- "float": 0.2,
- "numeric": Decimal("2.22"),
- "str": "b",
- "bool": False,
- "bytes": b'b',
- "timestamp": Timestamp(2000, 200)
- },
- {
- "int": 3,
- "float": 0.3,
- "numeric": Decimal("3.33"),
- "str": "c",
- "bool": True,
- "bytes": b'd',
- "timestamp": Timestamp(3000, 300)
- },
- {
- "int": 4,
- "float": 0.4,
- "numeric": Decimal("4.44"),
- "str": "d",
- "bool": False,
- "bytes": b'd',
- "timestamp": Timestamp(4000, 400)
- }
- ]
-
- def setUp(self):
- self.test_pipeline = TestPipeline(is_integration_test=True)
- self.args = self.test_pipeline.get_full_options_as_args()
- self.project = self.test_pipeline.get_option('project')
-
- self.bigquery_client = BigQueryWrapper()
- self.dataset_id = '%s%s%s' % (
- self.BIGQUERY_DATASET, str(int(time.time())), secrets.token_hex(3))
- self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
- _LOGGER.info(
- "Created dataset %s in project %s", self.dataset_id, self.project)
- if not os.environ.get('EXPANSION_PORT'):
- raise ValueError("NO EXPANSION PORT")
- else:
- _LOGGER.info("expansion port: %s", os.environ.get('EXPANSION_PORT'))
- self.expansion_service = ('localhost:%s' %
os.environ.get('EXPANSION_PORT'))
-
- def tearDown(self):
- request = bigquery.BigqueryDatasetsDeleteRequest(
- projectId=self.project, datasetId=self.dataset_id, deleteContents=True)
- try:
- _LOGGER.info(
- "Deleting dataset %s in project %s", self.dataset_id, self.project)
- self.bigquery_client.client.datasets.Delete(request)
- except HttpError:
- _LOGGER.debug(
- 'Failed to clean up dataset %s in project %s',
- self.dataset_id,
- self.project)
-
- def parse_expected_data(self, expected_elements):
- data = []
- for row in expected_elements:
- values = list(row.values())
- for i, val in enumerate(values):
- if isinstance(val, Timestamp):
- # BigQuery matcher query returns a datetime.datetime object
- values[i] = val.to_utc_datetime().replace(
- tzinfo=datetime.timezone.utc)
- data.append(tuple(values))
-
- return data
-
- def storage_write_test(self, table_name, items, schema):
- table_id = '{}:{}.{}'.format(self.project, self.dataset_id, table_name)
-
- bq_matcher = BigqueryFullResultMatcher(
- project=self.project,
- query="SELECT * FROM %s" % '{}.{}'.format(self.dataset_id, table_name),
- data=self.parse_expected_data(items))
-
- with beam.Pipeline(argv=self.args) as p:
- _ = (
- p
- | beam.Create(items)
- | beam.io.WriteToBigQuery(
- table=table_id,
- method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
- schema=schema,
- expansion_service=self.expansion_service))
- hamcrest_assert(p, bq_matcher)
-
- @pytest.mark.uses_gcp_java_expansion_service
- def test_storage_write_all_types(self):
- table_name = "python_storage_write_all_types"
- schema = (
- "int:INTEGER,float:FLOAT,numeric:NUMERIC,str:STRING,"
- "bool:BOOLEAN,bytes:BYTES,timestamp:TIMESTAMP")
- self.storage_write_test(table_name, self.ELEMENTS, schema)
-
- @pytest.mark.uses_gcp_java_expansion_service
- def test_storage_write_nested_records_and_lists(self):
- table_name = "python_storage_write_nested_records_and_lists"
- schema = {
- "fields": [{
- "name": "repeated_int", "type": "INTEGER", "mode": "REPEATED"
- },
- {
- "name": "struct",
- "type": "STRUCT",
- "fields": [{
- "name": "nested_int", "type": "INTEGER"
- }, {
- "name": "nested_str", "type": "STRING"
- }]
- },
- {
- "name": "repeated_struct",
- "type": "STRUCT",
- "mode": "REPEATED",
- "fields": [{
- "name": "nested_numeric", "type": "NUMERIC"
- }, {
- "name": "nested_bytes", "type": "BYTES"
- }]
- }]
- }
- items = [{
- "repeated_int": [1, 2, 3],
- "struct": {
- "nested_int": 1, "nested_str": "a"
- },
- "repeated_struct": [{
- "nested_numeric": Decimal("1.23"), "nested_bytes": b'a'
- },
- {
- "nested_numeric": Decimal("3.21"),
- "nested_bytes": b'aa'
- }]
- }]
-
- self.storage_write_test(table_name, items, schema)
-
- @pytest.mark.uses_gcp_java_expansion_service
- def test_storage_write_beam_rows(self):
- table_id = '{}:{}.python_xlang_storage_write_beam_rows'.format(
- self.project, self.dataset_id)
-
- row_elements = [
- beam.Row(
- my_int=e['int'],
- my_float=e['float'],
- my_numeric=e['numeric'],
- my_string=e['str'],
- my_bool=e['bool'],
- my_bytes=e['bytes'],
- my_timestamp=e['timestamp']) for e in self.ELEMENTS
- ]
-
- bq_matcher = BigqueryFullResultMatcher(
- project=self.project,
- query="SELECT * FROM %s" %
- '{}.python_xlang_storage_write_beam_rows'.format(self.dataset_id),
- data=self.parse_expected_data(self.ELEMENTS))
-
- with beam.Pipeline(argv=self.args) as p:
- _ = (
- p
- | beam.Create(row_elements)
- | beam.io.StorageWriteToBigQuery(
- table=table_id, expansion_service=self.expansion_service))
- hamcrest_assert(p, bq_matcher)
-
-
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()