[ 
https://issues.apache.org/jira/browse/BEAM-1440?focusedWorklogId=335513&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335513
 ]

ASF GitHub Bot logged work on BEAM-1440:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Oct/19 13:11
            Start Date: 29/Oct/19 13:11
    Worklog Time Spent: 10m 
      Work Description: kamilwu commented on pull request #9772: [BEAM-1440] 
Create a BigQuery source that implements iobase.BoundedSource for Python
URL: https://github.com/apache/beam/pull/9772#discussion_r340063754
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery.py
 ##########
 @@ -496,6 +505,233 @@ def reader(self, test_bigquery_client=None):
         kms_key=self.kms_key)
 
 
+SchemaFields = collections.namedtuple('SchemaFields', 'fields mode name type')
+
+
+def _to_bool(value):
+  return value == 'true'
+
+
+def _to_decimal(value):
+  return decimal.Decimal(value)
+
+
+def _to_bytes(value):
+  """Converts value from str to bytes on Python 3.x. Does nothing on
+  Python 2.7."""
+  return value.encode('utf-8')
+
+
+class _BigQueryRowCoder(coders.Coder):
+  """A coder for a table row (represented as a dict) from a JSON string which
+  applies additional conversions.
+  """
+
+  def __init__(self, table_schema):
+    # bigquery.TableSchema type is unpickable so we must translate it to a
+    # pickable type
+    self.fields = [SchemaFields(x.fields, x.mode, x.name, x.type)
+                   for x in table_schema.fields]
+    self._converters = {
+        'INTEGER': int,
+        'INT64': int,
+        'FLOAT': float,
+        'BOOLEAN': _to_bool,
+        'NUMERIC': _to_decimal,
+        'BYTES': _to_bytes,
+    }
+
+  def decode(self, value):
+    value = json.loads(value)
+    for field in self.fields:
+      if field.name not in value:
+        # The field exists in the schema, but it doesn't exist in this row.
+        # It probably means its value was null, as the extract to JSON job
+        # doesn't preserve null fields
+        value[field.name] = None
+        continue
+
+      try:
+        converter = self._converters[field.type]
+        value[field.name] = converter(value[field.name])
+      except KeyError:
+        # No need to do any conversion
+        pass
+    return value
+
+  def is_deterministic(self):
+    return True
+
+  def to_type_hint(self):
+    return dict
+
+
+class _BigQuerySource(BoundedSource):
+  """Read data from BigQuery.
+
+    This source uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    Do note that currently this source does not work with DirectRunner.
+
+  Args:
+    table (str, callable, ValueProvider): The ID of the table, or a callable
+      that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
+      numbers ``0-9``, or underscores ``_``. If dataset argument is
+      :data:`None` then the table argument must contain the entire table
+      reference specified as: ``'DATASET.TABLE'``
+      or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
+      argument representing an element to be written to BigQuery, and return
+      a TableReference, or a string table name as specified above.
+    dataset (str): The ID of the dataset containing this table or
+      :data:`None` if the table reference is specified entirely by the table
+      argument.
+    project (str): The ID of the project containing this table.
+    query (str): A query to be used instead of arguments table, dataset, and
+      project.
+      validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?). This should be
+      :data:`True` for most scenarios in order to catch errors as early as
+      possible (pipeline construction instead of pipeline execution). It
+      should be :data:`False` if the table is created during pipeline
+      execution by a previous step.
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      :class:`~apache_beam.io.gcp.bigquery._BigQueryRowCoder`,
+      which will interpret every line in a file as a JSON serialized
+      dictionary. This argument needs a value only in special cases when
+      returning table rows as dictionaries is not desirable.
+    use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL
+      dialect for this query. The default value is :data:`False`.
+      If set to :data:`True`, the query will use BigQuery's updated SQL
+      dialect with improved standards compliance.
+      This parameter is ignored for table inputs.
+    flatten_results (bool): Flattens all nested and repeated fields in the
+      query results. The default value is :data:`True`.
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new tables.
+    gcs_bucket_name (str): The name of the Google Cloud Storage bucket where
+      the extracted table should be written.
+   """
+  def __init__(self, table=None, dataset=None, project=None, query=None,
+               validate=False, coder=None, use_standard_sql=False,
+               flatten_results=True, kms_key=None, gcs_bucket_name=None):
+    if gcs_bucket_name is None:
+      raise ValueError('The name of the GCS bucket must be specified')
+    self.gcs_bucket_name = gcs_bucket_name
+
+    if table is not None and query is not None:
+      raise ValueError('Both a BigQuery table and a query were specified.'
+                       ' Please specify only one of these.')
+    elif table is None and query is None:
+      raise ValueError('A BigQuery table or a query must be specified')
+    elif table is not None:
+      self.table_reference = bigquery_tools.parse_table_reference(
+          table, dataset, project)
+      self.query = None
+      self.use_legacy_sql = True
+    else:
+      self.query = query
+      # TODO(BEAM-1082): Change the internal flag to be standard_sql
+      self.use_legacy_sql = not use_standard_sql
+      self.table_reference = None
+
+    self.project = project
+    self.validate = validate
+    self.flatten_results = flatten_results
+    self.coder = coder or _BigQueryRowCoder
+    self.kms_key = kms_key
 
 Review comment:
   Thanks for spotting this. I will write a fix so that the kms key will be 
used when creating a temporary dataset. There are no other places where kms key 
should be used, right?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 335513)
    Time Spent: 7h  (was: 6h 50m)

> Create a BigQuery source (that implements iobase.BoundedSource) for Python SDK
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-1440
>                 URL: https://issues.apache.org/jira/browse/BEAM-1440
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Kamil Wasilewski
>            Priority: Major
>          Time Spent: 7h
>  Remaining Estimate: 0h
>
> Currently we have a BigQuery native source for Python SDK [1].
> This can only be used by Dataflow runner.
> We should  implement a Beam BigQuery source that implements 
> iobase.BoundedSource [2] interface so that other runners that try to use 
> Python SDK can read from BigQuery as well. Java SDK already has a Beam 
> BigQuery source [3].
> [1] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py
> [2] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L70
> [3] 
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1189



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to