This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cead81f  [BEAM-7530] Add it test to read None values from BigQuery 
(#8875)
cead81f is described below

commit cead81f4b0e4851c4e72ff6d48eeb27e4f0a5bcc
Author: Juta Staes <juta.st...@gmail.com>
AuthorDate: Wed Jul 17 20:49:17 2019 +0200

    [BEAM-7530] Add it test to read None values from BigQuery (#8875)
    
    * [BEAM-7530] Add it test to read None values from BigQuery
    
    * fixup! merge tests
    
    * fixup:xup: reuse equal function from testing.util
---
 .../apache_beam/io/gcp/bigquery_read_it_test.py    | 73 +++++++++++++++-------
 .../apache_beam/io/gcp/bigquery_write_it_test.py   | 59 +++++++++++------
 .../apache_beam/io/gcp/tests/bigquery_matcher.py   |  8 ++-
 3 files changed, 97 insertions(+), 43 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index ad50b88..246d2ce 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -25,7 +25,9 @@ import logging
 import random
 import time
 import unittest
+from decimal import Decimal
 
+from future.utils import iteritems
 from nose.plugins.attrib import attr
 
 import apache_beam as beam
@@ -103,6 +105,14 @@ class BigQueryReadIntegrationTests(unittest.TestCase):
   def create_table_new_types(self, table_name):
     table_schema = bigquery.TableSchema()
     table_field = bigquery.TableFieldSchema()
+    table_field.name = 'float'
+    table_field.type = 'FLOAT'
+    table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'numeric'
+    table_field.type = 'NUMERIC'
+    table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
     table_field.name = 'bytes'
     table_field.type = 'BYTES'
     table_schema.fields.append(table_field)
@@ -114,6 +124,18 @@ class BigQueryReadIntegrationTests(unittest.TestCase):
     table_field.name = 'time'
     table_field.type = 'TIME'
     table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'datetime'
+    table_field.type = 'DATETIME'
+    table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'timestamp'
+    table_field.type = 'TIMESTAMP'
+    table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'geo'
+    table_field.type = 'GEOGRAPHY'
+    table_schema.fields.append(table_field)
     table = bigquery.Table(
         tableReference=bigquery.TableReference(
             projectId=self.project,
@@ -123,16 +145,18 @@ class BigQueryReadIntegrationTests(unittest.TestCase):
     request = bigquery.BigqueryTablesInsertRequest(
         projectId=self.project, datasetId=self.dataset_id, table=table)
     self.bigquery_client.client.tables.Insert(request)
-    table_data = [
-        {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
-        {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
-        {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
-         'time': '23:59:59'},
-        {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
-    ]
-    # bigquery client expects base64 encoded bytes
-    for row in table_data:
-      row['bytes'] = base64.b64encode(row['bytes']).decode('utf-8')
+    row_data = {
+        'float': 0.33, 'numeric': Decimal('10'), 'bytes':
+        base64.b64encode(b'\xab\xac').decode('utf-8'), 'date': '3000-12-31',
+        'time': '23:59:59', 'datetime': '2018-12-31T12:44:31',
+        'timestamp': '2018-12-31 12:44:31.744957 UTC', 'geo': 'POINT(30 10)'
+    }
+
+    table_data = [row_data]
+    # add rows with only one key value pair and None values for all other keys
+    for key, value in iteritems(row_data):
+      table_data.append({key: value})
+
     self.bigquery_client.insert_rows(
         self.project, self.dataset_id, table_name, table_data)
 
@@ -155,26 +179,31 @@ class BigQueryReadIntegrationTests(unittest.TestCase):
 
   @attr('IT')
   def test_big_query_read_new_types(self):
-    table_name = 'python_new_types_table'
+    table_name = 'python_new_types'
     self.create_table_new_types(table_name)
     table_id = '{}.{}'.format(self.dataset_id, table_name)
 
     args = self.test_pipeline.get_full_options_as_args()
 
-    expected_data = [
-        {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
-        {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
-        {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
-         'time': '23:59:59'},
-        {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
-    ]
-    # bigquery io returns bytes as base64 encoded values
-    for row in expected_data:
-      row['bytes'] = base64.b64encode(row['bytes'])
+    expected_row = {
+        'float': 0.33, 'numeric': Decimal('10'), 'bytes':
+        base64.b64encode(b'\xab\xac'), 'date': '3000-12-31',
+        'time': '23:59:59', 'datetime': '2018-12-31T12:44:31',
+        'timestamp': '2018-12-31 12:44:31.744957 UTC', 'geo': 'POINT(30 10)'
+    }
+
+    expected_data = [expected_row]
+
+    # add rows with only one key value pair and None values for all other keys
+    for key, value in iteritems(expected_row):
+      row = {k: None for k in expected_row}
+      row[key] = value
+      expected_data.append(row)
 
     with beam.Pipeline(argv=args) as p:
       result = (p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
-          query='SELECT bytes, date, time FROM `%s`' % table_id,
+          query='SELECT float, numeric, bytes, date, time, datetime,'
+                'timestamp, geo FROM `%s`' % table_id,
           use_standard_sql=True)))
       assert_that(result, equal_to(expected_data))
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index 2cb563e..3658b9c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -26,8 +26,11 @@ import logging
 import random
 import time
 import unittest
+from decimal import Decimal
 
 import hamcrest as hc
+import pytz
+from future.utils import iteritems
 from nose.plugins.attrib import attr
 
 import apache_beam as beam
@@ -168,34 +171,50 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
     table_name = 'python_new_types_table'
     table_id = '{}.{}'.format(self.dataset_id, table_name)
 
-    input_data = [
-        {'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'},
-        {'bytes': b'abc', 'date': '2000-01-01', 'time': '00:00:00'},
-        {'bytes': b'\xe4\xbd\xa0\xe5\xa5\xbd', 'date': '3000-12-31',
-         'time': '23:59:59'},
-        {'bytes': b'\xab\xac\xad', 'date': '2000-01-01', 'time': '00:00:00'}
-    ]
-    # bigquery io expects bytes to be base64 encoded values
-    for row in input_data:
-      row['bytes'] = base64.b64encode(row['bytes'])
+    row_data = {
+        'float': 0.33, 'numeric': Decimal('10'), 'bytes':
+        base64.b64encode(b'\xab\xac').decode('utf-8'), 'date': '3000-12-31',
+        'time': '23:59:59', 'datetime': '2018-12-31T12:44:31',
+        'timestamp': '2018-12-31 12:44:31.744957 UTC', 'geo': 'POINT(30 10)'
+    }
+
+    input_data = [row_data]
+    # add rows with only one key value pair and None values for all other keys
+    for key, value in iteritems(row_data):
+      input_data.append({key: value})
 
     table_schema = {"fields": [
+        {"name": "float", "type": "FLOAT"},
+        {"name": "numeric", "type": "NUMERIC"},
         {"name": "bytes", "type": "BYTES"},
         {"name": "date", "type": "DATE"},
-        {"name": "time", "type": "TIME"}]}
+        {"name": "time", "type": "TIME"},
+        {"name": "datetime", "type": "DATETIME"},
+        {"name": "timestamp", "type": "TIMESTAMP"},
+        {"name": "geo", "type": "GEOGRAPHY"}
+    ]}
+
+    expected_row = (0.33, Decimal('10'), b'\xab\xac',
+                    datetime.date(3000, 12, 31), datetime.time(23, 59, 59),
+                    datetime.datetime(2018, 12, 31, 12, 44, 31),
+                    datetime.datetime(2018, 12, 31, 12, 44, 31, 744957,
+                                      tzinfo=pytz.utc), 'POINT(30 10)',
+                   )
+
+    expected_data = [expected_row]
+
+    # add rows with only one key value pair and None values for all other keys
+    for i, value in enumerate(expected_row):
+      row = [None]*len(expected_row)
+      row[i] = value
+      expected_data.append(tuple(row))
 
     pipeline_verifiers = [
         BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT bytes, date, time FROM %s" % table_id,
-            data=[(b'xyw', datetime.date(2011, 1, 1),
-                   datetime.time(23, 59, 59, 999999), ),
-                  (b'abc', datetime.date(2000, 1, 1),
-                   datetime.time(0, 0, 0), ),
-                  (b'\xe4\xbd\xa0\xe5\xa5\xbd', datetime.date(3000, 12, 31),
-                   datetime.time(23, 59, 59), ),
-                  (b'\xab\xac\xad', datetime.date(2000, 1, 1),
-                   datetime.time(0, 0, 0), )])]
+            query='SELECT float, numeric, bytes, date, time, datetime,'
+                  'timestamp, geo FROM %s' % table_id,
+            data=expected_data)]
 
     args = self.test_pipeline.get_full_options_as_args(
         on_success_matcher=hc.all_of(*pipeline_verifiers))
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py 
b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index 6709d6a..b981f14 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -27,6 +27,8 @@ from hamcrest.core.base_matcher import BaseMatcher
 
 from apache_beam.io.gcp import bigquery_tools
 from apache_beam.testing.test_utils import compute_hash
+from apache_beam.testing.util import BeamAssertException
+from apache_beam.testing.util import equal_to
 from apache_beam.utils import retry
 
 __all__ = ['BigqueryMatcher', 'BigQueryTableMatcher']
@@ -145,7 +147,11 @@ class BigqueryFullResultMatcher(BaseMatcher):
     self.actual_data = response
 
     # Verify result
-    return sorted(self.expected_data) == sorted(self.actual_data)
+    try:
+      equal_to(self.expected_data)(self.actual_data)
+      return True
+    except BeamAssertException:
+      return False
 
   def _get_query_result(self, bigquery_client):
     return self._query_with_retry(bigquery_client)

Reply via email to