[ 
https://issues.apache.org/jira/browse/BEAM-5186?focusedWorklogId=137640&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-137640
 ]

ASF GitHub Bot logged work on BEAM-5186:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Aug/18 02:16
            Start Date: 24/Aug/18 02:16
    Worklog Time Spent: 10m 
      Work Description: pabloem closed pull request #6255: [BEAM-5186] Adding 
numeric support to BQ Sink
URL: https://github.com/apache/beam/pull/6255
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index ee39a6678de..58db0957113 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -98,9 +98,9 @@
 
 TableFieldSchema: Describes the schema (type, name) for one field.
   Has several attributes, including 'name' and 'type'. Common values for
-  the type attribute are: 'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN'. All possible
-  values are described at:
-  https://cloud.google.com/bigquery/preparing-data-for-bigquery#datatypes
+  the type attribute are: 'STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'NUMERIC'.
+  All possible values are described at:
+  https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
 
 TableRow: Holds all values in a table row. Has one attribute, 'f', which is a
   list of TableCell instances.
@@ -108,12 +108,16 @@
 TableCell: Holds the value for one cell (or field).  Has one attribute,
   'v', which is a JsonValue instance. This class is defined in
   apitools.base.py.extra_types.py module.
+
+As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports
+high-precision decimal numbers (precision of 38 digits, scale of 9 digits).
 """
 
 from __future__ import absolute_import
 
 import collections
 import datetime
+import decimal
 import json
 import logging
 import re
@@ -160,6 +164,13 @@
 MAX_RETRIES = 3
 
 
+def default_encoder(obj):
+  if isinstance(obj, decimal.Decimal):
+    return str(obj)
+  raise TypeError(
+      "Object of type '%s' is not JSON serializable" % type(obj).__name__)
+
+
 class RowAsDictJsonCoder(coders.Coder):
   """A coder for a table row (represented as a dict) to/from a JSON string.
 
@@ -173,7 +184,8 @@ def encode(self, table_row):
     # This code will catch this error to emit an error that explains
     # to the programmer that they have used NAN/INF values.
     try:
-      return json.dumps(table_row, allow_nan=False)
+      return json.dumps(
+          table_row, allow_nan=False, default=default_encoder)
     except ValueError as e:
       raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
 
@@ -197,6 +209,7 @@ def __init__(self, table_schema=None):
     # Precompute field names since we need them for row encoding.
     if self.table_schema:
       self.field_names = tuple(fs.name for fs in self.table_schema.fields)
+      self.field_types = tuple(fs.type for fs in self.table_schema.fields)
 
   def encode(self, table_row):
     if self.table_schema is None:
@@ -208,7 +221,8 @@ def encode(self, table_row):
           collections.OrderedDict(
               zip(self.field_names,
                   [from_json_value(f.v) for f in table_row.f])),
-          allow_nan=False)
+          allow_nan=False,
+          default=default_encoder)
     except ValueError as e:
       raise ValueError('%s. %s' % (e, JSON_COMPLIANCE_ERROR))
 
@@ -1108,6 +1122,11 @@ def insert_rows(self, project_id, dataset_id, table_id, 
rows):
     for row in rows:
       json_object = bigquery.JsonObject()
       for k, v in iteritems(row):
+        if isinstance(v, decimal.Decimal):
+          # decimal values are converted into string because JSON does not
+          # support the precision that decimal supports. BQ is able to handle
+          # inserts into NUMERIC columns by receiving JSON with string attrs.
+          v = str(v)
         json_object.additionalProperties.append(
             bigquery.JsonObject.AdditionalProperty(
                 key=k, value=to_json_value(v)))
@@ -1156,6 +1175,8 @@ def _convert_cell_value_to_dict(self, value, field):
       # when querying, the repeated and/or record fields are flattened
       # unless we pass the flatten_results flag as False to the source
       return self.convert_row_to_dict(value, field)
+    elif field.type == 'NUMERIC':
+      return decimal.Decimal(value)
     else:
       raise RuntimeError('Unexpected field type: %s' % field.type)
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 843fc394ff1..b155244d64b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -19,6 +19,7 @@
 from __future__ import absolute_import
 
 import datetime
+import decimal
 import json
 import logging
 import re
@@ -57,6 +58,21 @@ def test_row_as_dict(self):
     test_value = {'s': 'abc', 'i': 123, 'f': 123.456, 'b': True}
     self.assertEqual(test_value, coder.decode(coder.encode(test_value)))
 
+  def test_decimal_in_row_as_dict(self):
+    decimal_value = decimal.Decimal('123456789.987654321')
+    coder = RowAsDictJsonCoder()
+    # Bigquery IO uses decimals to represent NUMERIC types.
+    # To export to BQ, it's necessary to convert to strings, due to the
+    # lower precision of JSON numbers. This means that we can't recognize
+    # a NUMERIC when we decode from JSON, thus we match the string here.
+    test_value = {'f': 123.456,
+                  'b': True,
+                  'numerico': decimal_value}
+    output_value = {'f': 123.456,
+                    'b': True,
+                    'numerico': str(decimal_value)}
+    self.assertEqual(output_value, coder.decode(coder.encode(test_value)))
+
   def json_compliance_exception(self, value):
     with self.assertRaisesRegexp(ValueError, re.escape(JSON_COMPLIANCE_ERROR)):
       coder = RowAsDictJsonCoder()
@@ -82,20 +98,35 @@ def test_row_as_table_row(self):
         ('i', 'INTEGER'),
         ('f', 'FLOAT'),
         ('b', 'BOOLEAN'),
+        ('n', 'NUMERIC'),
         ('r', 'RECORD')]
-    data_defination = [
+    data_definition = [
         'abc',
         123,
         123.456,
         True,
+        decimal.Decimal('987654321.987654321'),
         {'a': 'b'}]
-    str_def = '{"s": "abc", "i": 123, "f": 123.456, "b": true, "r": {"a": 
"b"}}'
+    str_def = ('{"s": "abc", '
+               '"i": 123, '
+               '"f": 123.456, '
+               '"b": true, '
+               '"n": "987654321.987654321", '
+               '"r": {"a": "b"}}')
     schema = bigquery.TableSchema(
         fields=[bigquery.TableFieldSchema(name=k, type=v)
                 for k, v in schema_definition])
     coder = TableRowJsonCoder(table_schema=schema)
+
+    def value_or_decimal_to_json(val):
+      if isinstance(val, decimal.Decimal):
+        return to_json_value(str(val))
+      else:
+        return to_json_value(val)
+
     test_row = bigquery.TableRow(
-        f=[bigquery.TableCell(v=to_json_value(e)) for e in data_defination])
+        f=[bigquery.TableCell(v=value_or_decimal_to_json(e))
+           for e in data_definition])
 
     self.assertEqual(str_def, coder.encode(test_row))
     self.assertEqual(test_row, coder.decode(coder.encode(test_row)))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 137640)
    Time Spent: 1h 50m  (was: 1h 40m)

> Support for NUMERIC data type in BQ
> -----------------------------------
>
>                 Key: BEAM-5186
>                 URL: https://issues.apache.org/jira/browse/BEAM-5186
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to