[ https://issues.apache.org/jira/browse/BEAM-5186?focusedWorklogId=137640&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137640 ]
ASF GitHub Bot logged work on BEAM-5186: ---------------------------------------- Author: ASF GitHub Bot Created on: 24/Aug/18 02:16 Start Date: 24/Aug/18 02:16 Worklog Time Spent: 10m Work Description: pabloem closed pull request #6255: [BEAM-5186] Adding numeric support to BQ Sink URL: https://github.com/apache/beam/pull/6255 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index ee39a6678de..58db0957113 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -98,9 +98,9 @@ TableFieldSchema: Describes the schema (type, name) for one field. Has several attributes, including 'name' and 'type'. Common values for - the type attribute are: 'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN'. All possible - values are described at: - https://cloud.google.com/bigquery/preparing-data-for-bigquery#datatypes + the type attribute are: 'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'NUMERIC'. + All possible values are described at: + https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types TableRow: Holds all values in a table row. Has one attribute, 'f', which is a list of TableCell instances. @@ -108,12 +108,16 @@ TableCell: Holds the value for one cell (or field). Has one attribute, 'v', which is a JsonValue instance. This class is defined in apitools.base.py.extra_types.py module. + +As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports +high-precision decimal numbers (precision of 38 digits, scale of 9 digits). """ from __future__ import absolute_import import collections import datetime +import decimal import json import logging import re @@ -160,6 +164,13 @@ MAX_RETRIES = 3 +def default_encoder(obj): + if isinstance(obj, decimal.Decimal): + return str(obj) + raise TypeError( + "Object of type '%s' is not JSON serializable" % type(obj).__name__) + + class RowAsDictJsonCoder(coders.Coder): """A coder for a table row (represented as a dict) to/from a JSON string. @@ -173,7 +184,8 @@ def encode(self, table_row): # This code will catch this error to emit an error that explains # to the programmer that they have used NAN/INF values. try: - return json.dumps(table_row, allow_nan=False) + return json.dumps( + table_row, allow_nan=False, default=default_encoder) except ValueError as e: raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR)) @@ -197,6 +209,7 @@ def __init__(self, table_schema=None): # Precompute field names since we need them for row encoding. if self.table_schema: self.field_names = tuple(fs.name for fs in self.table_schema.fields) + self.field_types = tuple(fs.type for fs in self.table_schema.fields) def encode(self, table_row): if self.table_schema is None: @@ -208,7 +221,8 @@ def encode(self, table_row): collections.OrderedDict( zip(self.field_names, [from_json_value(f.v) for f in table_row.f])), - allow_nan=False) + allow_nan=False, + default=default_encoder) except ValueError as e: raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR)) @@ -1108,6 +1122,11 @@ def insert_rows(self, project_id, dataset_id, table_id, rows): for row in rows: json_object = bigquery.JsonObject() for k, v in iteritems(row): + if isinstance(v, decimal.Decimal): + # decimal values are converted into string because JSON does not + # support the precision that decimal supports. BQ is able to handle + # inserts into NUMERIC columns by receiving JSON with string attrs. + v = str(v) json_object.additionalProperties.append( bigquery.JsonObject.AdditionalProperty( key=k, value=to_json_value(v))) @@ -1156,6 +1175,8 @@ def _convert_cell_value_to_dict(self, value, field): # when querying, the repeated and/or record fields are flattened # unless we pass the flatten_results flag as False to the source return self.convert_row_to_dict(value, field) + elif field.type == 'NUMERIC': + return decimal.Decimal(value) else: raise RuntimeError('Unexpected field type: %s' % field.type) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 843fc394ff1..b155244d64b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -19,6 +19,7 @@ from __future__ import absolute_import import datetime +import decimal import json import logging import re @@ -57,6 +58,21 @@ def test_row_as_dict(self): test_value = {'s': 'abc', 'i': 123, 'f': 123.456, 'b': True} self.assertEqual(test_value, coder.decode(coder.encode(test_value))) + def test_decimal_in_row_as_dict(self): + decimal_value = decimal.Decimal('123456789.987654321') + coder = RowAsDictJsonCoder() + # Bigquery IO uses decimals to represent NUMERIC types. + # To export to BQ, it's necessary to convert to strings, due to the + # lower precision of JSON numbers. This means that we can't recognize + # a NUMERIC when we decode from JSON, thus we match the string here. + test_value = {'f': 123.456, + 'b': True, + 'numerico': decimal_value} + output_value = {'f': 123.456, + 'b': True, + 'numerico': str(decimal_value)} + self.assertEqual(output_value, coder.decode(coder.encode(test_value))) + def json_compliance_exception(self, value): with self.assertRaisesRegexp(ValueError, re.escape(JSON_COMPLIANCE_ERROR)): coder = RowAsDictJsonCoder() @@ -82,20 +98,35 @@ def test_row_as_table_row(self): ('i', 'INTEGER'), ('f', 'FLOAT'), ('b', 'BOOLEAN'), + ('n', 'NUMERIC'), ('r', 'RECORD')] - data_defination = [ + data_definition = [ 'abc', 123, 123.456, True, + decimal.Decimal('987654321.987654321'), {'a': 'b'}] - str_def = '{"s": "abc", "i": 123, "f": 123.456, "b": true, "r": {"a": "b"}}' + str_def = ('{"s": "abc", ' + '"i": 123, ' + '"f": 123.456, ' + '"b": true, ' + '"n": "987654321.987654321", ' + '"r": {"a": "b"}}') schema = bigquery.TableSchema( fields=[bigquery.TableFieldSchema(name=k, type=v) for k, v in schema_definition]) coder = TableRowJsonCoder(table_schema=schema) + + def value_or_decimal_to_json(val): + if isinstance(val, decimal.Decimal): + return to_json_value(str(val)) + else: + return to_json_value(val) + test_row = bigquery.TableRow( - f=[bigquery.TableCell(v=to_json_value(e)) for e in data_defination]) + f=[bigquery.TableCell(v=value_or_decimal_to_json(e)) + for e in data_definition]) self.assertEqual(str_def, coder.encode(test_row)) self.assertEqual(test_row, coder.decode(coder.encode(test_row))) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 137640) Time Spent: 1h 50m (was: 1h 40m) > Support for NUMERIC data type in BQ > ----------------------------------- > > Key: BEAM-5186 > URL: https://issues.apache.org/jira/browse/BEAM-5186 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Pablo Estrada > Assignee: Pablo Estrada > Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)