[ 
https://issues.apache.org/jira/browse/BEAM-1440?focusedWorklogId=352667&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-352667
 ]

ASF GitHub Bot logged work on BEAM-1440:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Dec/19 14:42
            Start Date: 03/Dec/19 14:42
    Worklog Time Spent: 10m 
      Work Description: kamilwu commented on pull request #9772: [BEAM-1440] 
Create a BigQuery source that implements iobase.BoundedSource for Python
URL: https://github.com/apache/beam/pull/9772#discussion_r353217656
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery.py
 ##########
 @@ -499,6 +509,189 @@ def reader(self, test_bigquery_client=None):
         kms_key=self.kms_key)
 
 
+FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')
+
+
+def _to_bool(value):
+  return value == 'true'
+
+
+def _to_decimal(value):
+  return decimal.Decimal(value)
+
+
+def _to_bytes(value):
+  """Converts value from str to bytes on Python 3.x. Does nothing on
+  Python 2.7."""
+  return value.encode('utf-8')
+
+
+class _JsonToDictCoder(coders.Coder):
+  """A coder for a JSON string to a Python dict."""
+
+  def __init__(self, table_schema):
+    self.fields = self._convert_to_tuple(table_schema.fields)
+    self._converters = {
+        'INTEGER': int,
+        'INT64': int,
+        'FLOAT': float,
+        'BOOLEAN': _to_bool,
+        'NUMERIC': _to_decimal,
+        'BYTES': _to_bytes,
+    }
+
+  @classmethod
+  def _convert_to_tuple(cls, table_field_schemas):
+    """Recursively converts the list of TableFieldSchema instances to the
+    list of tuples to prevent errors when pickling and unpickling
+    TableFieldSchema instances.
+    """
+    if not table_field_schemas:
+      return []
+
+    return [FieldSchema(cls._convert_to_tuple(x.fields), x.mode, x.name,
+                        x.type)
+            for x in table_field_schemas]
+
+  def decode(self, value):
+    value = json.loads(value)
+    return self._decode_with_schema(value, self.fields)
+
+  def _decode_with_schema(self, value, schema_fields):
+    for field in schema_fields:
+      if field.name not in value:
+        # The field exists in the schema, but it doesn't exist in this row.
+        # It probably means its value was null, as the extract to JSON job
+        # doesn't preserve null fields
+        value[field.name] = None
+        continue
+
+      if field.type == 'RECORD':
+        value[field.name] = self._decode_with_schema(value[field.name],
+                                                     field.fields)
+      else:
+        try:
+          converter = self._converters[field.type]
+          value[field.name] = converter(value[field.name])
+        except KeyError:
 
 Review comment:
   Yes. I wanted to implement the same behavior as we already have when using 
default coder for the native BigQuerySource. There is a 
[test](https://github.com/apache/beam/blob/03f780c7329e0eca692baef44874056b7d263303/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py#L232)
 that checks BigQuery data types conversions across both sources.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 352667)
    Time Spent: 12h 20m  (was: 12h 10m)

> Create a BigQuery source (that implements iobase.BoundedSource) for Python SDK
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-1440
>                 URL: https://issues.apache.org/jira/browse/BEAM-1440
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Kamil Wasilewski
>            Priority: Major
>          Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> Currently we have a BigQuery native source for Python SDK [1].
> This can only be used by Dataflow runner.
> We should  implement a Beam BigQuery source that implements 
> iobase.BoundedSource [2] interface so that other runners that try to use 
> Python SDK can read from BigQuery as well. Java SDK already has a Beam 
> BigQuery source [3].
> [1] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py
> [2] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L70
> [3] 
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1189



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to