This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cead81f [BEAM-7530] Add it test to read None values from BigQuery
(#8875)
cead81f is described below
commit cead81f4b0e4851c4e72ff6d48eeb27e4f0a5bcc
Author: Juta Staes <[email protected]>
AuthorDate: Wed Jul 17 20:49:17 2019 +0200
[BEAM-7530] Add it test to read None values from BigQuery (#8875)
* [BEAM-7530] Add it test to read None values from BigQuery
* fixup! merge tests
* fixup:xup: reuse equal function from testing.util
---
.../apache_beam/io/gcp/bigquery_read_it_test.py | 73 +++++++++++++++-------
.../apache_beam/io/gcp/bigquery_write_it_test.py | 59 +++++++++++------
.../apache_beam/io/gcp/tests/bigquery_matcher.py | 8 ++-
3 files changed, 97 insertions(+), 43 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index ad50b88..246d2ce 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -25,7 +25,9 @@ import logging
import random
import time
import unittest
+from decimal import Decimal
+from future.utils import iteritems
from nose.plugins.attrib import attr
import apache_beam as beam
@@ -103,6 +105,14 @@ class BigQueryReadIntegrationTests(unittest.TestCase):
def create_table_new_types(self, table_name):
table_schema = bigquery.TableSchema()
table_field = bigquery.TableFieldSchema()
+ table_field.name = 'float'
+ table_field.type = 'FLOAT'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'numeric'
+ table_field.type = 'NUMERIC'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
table_field.name = 'bytes'
table_field.type = 'BYTES'
table_schema.fields.append(table_field)
@@ -114,6 +124,18 @@ class BigQueryReadIntegrationTests(unittest.TestCase):
table_field.name = 'time'
table_field.type = 'TIME'
table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'datetime'
+ table_field.type = 'DATETIME'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'timestamp'
+ table_field.type = 'TIMESTAMP'
+ table_schema.fields.append(table_field)
+ table_field = bigquery.TableFieldSchema()
+ table_field.name = 'geo'
+ table_field.type = 'GEOGRAPHY'
+ table_schema.fields.append(table_field)
table = bigquery.Table(
tableReference=bigquery.TableReference(
projectId=self.project,
@@ -123,16 +145,18 @@ class BigQueryReadIntegrationTests(unittest.TestCase):
request = bigquery.BigqueryTablesInsertRequest(
projectId=self.project, datasetId=self.dataset_id, table=table)
self.bigquery_client.client.tables.Insert(request)
- table_data = [
- {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
- {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
- {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
- 'time': '23:59:59'},
- {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
- ]
- # bigquery client expects base64 encoded bytes
- for row in table_data:
- row['bytes'] = base64.b64encode(row['bytes']).decode('utf-8')
+ row_data = {
+ 'float': 0.33, 'numeric': Decimal('10'), 'bytes':
+ base64.b64encode(b'\xab\xac').decode('utf-8'), 'date': '3000-12-31',
+ 'time': '23:59:59', 'datetime': '2018-12-31T12:44:31',
+ 'timestamp': '2018-12-31 12:44:31.744957 UTC', 'geo': 'POINT(30 10)'
+ }
+
+ table_data = [row_data]
+ # add rows with only one key value pair and None values for all other keys
+ for key, value in iteritems(row_data):
+ table_data.append({key: value})
+
self.bigquery_client.insert_rows(
self.project, self.dataset_id, table_name, table_data)
@@ -155,26 +179,31 @@ class BigQueryReadIntegrationTests(unittest.TestCase):
@attr('IT')
def test_big_query_read_new_types(self):
- table_name = 'python_new_types_table'
+ table_name = 'python_new_types'
self.create_table_new_types(table_name)
table_id = '{}.{}'.format(self.dataset_id, table_name)
args = self.test_pipeline.get_full_options_as_args()
- expected_data = [
- {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
- {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
- {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
- 'time': '23:59:59'},
- {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
- ]
- # bigquery io returns bytes as base64 encoded values
- for row in expected_data:
- row['bytes'] = base64.b64encode(row['bytes'])
+ expected_row = {
+ 'float': 0.33, 'numeric': Decimal('10'), 'bytes':
+ base64.b64encode(b'\xab\xac'), 'date': '3000-12-31',
+ 'time': '23:59:59', 'datetime': '2018-12-31T12:44:31',
+ 'timestamp': '2018-12-31 12:44:31.744957 UTC', 'geo': 'POINT(30 10)'
+ }
+
+ expected_data = [expected_row]
+
+ # add rows with only one key value pair and None values for all other keys
+ for key, value in iteritems(expected_row):
+ row = {k: None for k in expected_row}
+ row[key] = value
+ expected_data.append(row)
with beam.Pipeline(argv=args) as p:
result = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
- query='SELECT bytes, date, time FROM `%s`' % table_id,
+ query='SELECT float, numeric, bytes, date, time, datetime,'
+ 'timestamp, geo FROM `%s`' % table_id,
use_standard_sql=True)))
assert_that(result, equal_to(expected_data))
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index 2cb563e..3658b9c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -26,8 +26,11 @@ import logging
import random
import time
import unittest
+from decimal import Decimal
import hamcrest as hc
+import pytz
+from future.utils import iteritems
from nose.plugins.attrib import attr
import apache_beam as beam
@@ -168,34 +171,50 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
table_name = 'python_new_types_table'
table_id = '{}.{}'.format(self.dataset_id, table_name)
- input_data = [
- {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
- {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
- {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
- 'time': '23:59:59'},
- {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
- ]
- # bigquery io expects bytes to be base64 encoded values
- for row in input_data:
- row['bytes'] = base64.b64encode(row['bytes'])
+ row_data = {
+ 'float': 0.33, 'numeric': Decimal('10'), 'bytes':
+ base64.b64encode(b'\xab\xac').decode('utf-8'), 'date': '3000-12-31',
+ 'time': '23:59:59', 'datetime': '2018-12-31T12:44:31',
+ 'timestamp': '2018-12-31 12:44:31.744957 UTC', 'geo': 'POINT(30 10)'
+ }
+
+ input_data = [row_data]
+ # add rows with only one key value pair and None values for all other keys
+ for key, value in iteritems(row_data):
+ input_data.append({key: value})
table_schema = {"fields": [
+ {"name": "float", "type": "FLOAT"},
+ {"name": "numeric", "type": "NUMERIC"},
{"name": "bytes", "type": "BYTES"},
{"name": "date", "type": "DATE"},
- {"name": "time", "type": "TIME"}]}
+ {"name": "time", "type": "TIME"},
+ {"name": "datetime", "type": "DATETIME"},
+ {"name": "timestamp", "type": "TIMESTAMP"},
+ {"name": "geo", "type": "GEOGRAPHY"}
+ ]}
+
+ expected_row = (0.33, Decimal('10'), b'\xab\xac',
+ datetime.date(3000, 12, 31), datetime.time(23, 59, 59),
+ datetime.datetime(2018, 12, 31, 12, 44, 31),
+ datetime.datetime(2018, 12, 31, 12, 44, 31, 744957,
+ tzinfo=pytz.utc), 'POINT(30 10)',
+ )
+
+ expected_data = [expected_row]
+
+ # add rows with only one key value pair and None values for all other keys
+ for i, value in enumerate(expected_row):
+ row = [None]*len(expected_row)
+ row[i] = value
+ expected_data.append(tuple(row))
pipeline_verifiers = [
BigqueryFullResultMatcher(
project=self.project,
- query="SELECT bytes, date, time FROM %s" % table_id,
- data=[(b'xyw', datetime.date(2011, 1, 1),
- datetime.time(23, 59, 59, 999999), ),
- (b'abc', datetime.date(2000, 1, 1),
- datetime.time(0, 0, 0), ),
- (b'\xe4\xbd\xa0\xe5\xa5\xbd', datetime.date(3000, 12, 31),
- datetime.time(23, 59, 59), ),
- (b'\xab\xac\xad', datetime.date(2000, 1, 1),
- datetime.time(0, 0, 0), )])]
+ query='SELECT float, numeric, bytes, date, time, datetime,'
+ 'timestamp, geo FROM %s' % table_id,
+ data=expected_data)]
args = self.test_pipeline.get_full_options_as_args(
on_success_matcher=hc.all_of(*pipeline_verifiers))
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 6709d6a..b981f14 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -27,6 +27,8 @@ from hamcrest.core.base_matcher import BaseMatcher
from apache_beam.io.gcp import bigquery_tools
from apache_beam.testing.test_utils import compute_hash
+from apache_beam.testing.util import BeamAssertException
+from apache_beam.testing.util import equal_to
from apache_beam.utils import retry
__all__ = ['BigqueryMatcher', 'BigQueryTableMatcher']
@@ -145,7 +147,11 @@ class BigqueryFullResultMatcher(BaseMatcher):
self.actual_data = response
# Verify result
- return sorted(self.expected_data) == sorted(self.actual_data)
+ try:
+ equal_to(self.expected_data)(self.actual_data)
+ return True
+ except BeamAssertException:
+ return False
def _get_query_result(self, bigquery_client):
return self._query_with_retry(bigquery_client)