[ 
https://issues.apache.org/jira/browse/BEAM-1440?focusedWorklogId=360529&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360529
 ]

ASF GitHub Bot logged work on BEAM-1440:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Dec/19 22:26
            Start Date: 16/Dec/19 22:26
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on pull request #9772: [BEAM-1440] 
Create a BigQuery source that implements iobase.BoundedSource for Python
URL: https://github.com/apache/beam/pull/9772#discussion_r358499063
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery.py
 ##########
 @@ -1274,3 +1463,139 @@ def display_data(self):
                                    tableSpec)
       res['table'] = DisplayDataItem(tableSpec, label='Table')
     return res
+
+
+class _PassThroughThenCleanup(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+  """
+  def __init__(self, cleanup_dofn):
+    self.cleanup_dofn = cleanup_dofn
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    output = input | beam.ParDo(PassThrough()).with_outputs('cleanup_signal',
+                                                            main='main')
+    main_output = output['main']
+    cleanup_signal = output['cleanup_signal']
+
+    _ = (input.pipeline
+         | beam.Create([None])
+         | beam.ParDo(self.cleanup_dofn, beam.pvalue.AsSingleton(
+             cleanup_signal)))
+
+    return main_output
+
+
+@experimental()
+class _ReadFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    Do note that currently this source does not work with DirectRunner.
+
+  Args:
+    table (str, callable, ValueProvider): The ID of the table, or a callable
+      that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
+      numbers ``0-9``, or underscores ``_``. If dataset argument is
+      :data:`None` then the table argument must contain the entire table
+      reference specified as: ``'DATASET.TABLE'``
+      or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
+      argument representing an element to be written to BigQuery, and return
+      a TableReference, or a string table name as specified above.
+    dataset (str): The ID of the dataset containing this table or
+      :data:`None` if the table reference is specified entirely by the table
+      argument.
+    project (str): The ID of the project containing this table.
+    query (str): A query to be used instead of arguments table, dataset, and
+      project.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?). This should be
+      :data:`True` for most scenarios in order to catch errors as early as
+      possible (pipeline construction instead of pipeline execution). It
+      should be :data:`False` if the table is created during pipeline
+      execution by a previous step.
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL
+      dialect for this query. The default value is :data:`False`.
+      If set to :data:`True`, the query will use BigQuery's updated SQL
+      dialect with improved standards compliance.
+      This parameter is ignored for table inputs.
+    flatten_results (bool): Flattens all nested and repeated fields in the
+      query results. The default value is :data:`True`.
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+    gcs_location (str): The name of the Google Cloud Storage bucket where
+      the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+   """
+  def __init__(self, gcs_location=None, validate=False, *args, **kwargs):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError('%s: gcs_location must be of type string'
+                        ' or ValueProvider; got %r instead'
+                        % (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+    self.gcs_location = gcs_location
+    self.validate = validate
+
+    self._args = args
+    self._kwargs = kwargs
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI where the
+    extracted table should be written.
+    """
+    file_pattern = 'bigquery-table-dump-*.json'
+
+    if self.gcs_location is not None:
+      gcs_base = self.gcs_location.get()
+    elif temp_location is not None:
+      gcs_base = temp_location
+      logging.debug("gcs_location is empty, using temp_location instead")
+    else:
+      raise ValueError('{} requires a GCS location to be provided'
+                       .format(self.__class__.__name__))
+    if self.validate:
+      self._validate_gcs_location(gcs_base)
+
+    job_id = uuid.uuid4().hex
+    return FileSystems.join(gcs_base, job_id, file_pattern)
+
+  @staticmethod
+  def _validate_gcs_location(gcs_location):
+    if not gcs_location.startswith('gs://'):
+      raise ValueError('Invalid GCS location: {}'.format(gcs_location))
+
+  def expand(self, pcoll):
+    class RemoveJsonFiles(beam.DoFn):
+      def __init__(self, gcs_location):
+        self._gcs_location = gcs_location
+
+      def process(self, unused_element, signal):
+        match_result = FileSystems.match([self._gcs_location])[0].metadata_list
+        logging.debug("%s: matched %s files", self.__class__.__name__,
+                      len(match_result))
+        paths = [x.path for x in match_result]
+        FileSystems.delete(paths)
+
+    temp_location = pcoll.pipeline.options.view_as(
+        GoogleCloudOptions).temp_location
+    gcs_location = self._get_destination_uri(temp_location)
+
+    return (pcoll
+            | beam.io.Read(_CustomBigQuerySource(gcs_location=gcs_location,
 
 Review comment:
   The pasthroughthencleanup step produces and consumes side inputs, which 
means that it happens as part of a separate stage. The stage boundary 'commits' 
the data from files before deleting them.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 360529)
    Time Spent: 18.5h  (was: 18h 20m)

> Create a BigQuery source (that implements iobase.BoundedSource) for Python SDK
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-1440
>                 URL: https://issues.apache.org/jira/browse/BEAM-1440
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Kamil Wasilewski
>            Priority: Major
>          Time Spent: 18.5h
>  Remaining Estimate: 0h
>
> Currently we have a BigQuery native source for Python SDK [1].
> This can only be used by Dataflow runner.
> We should  implement a Beam BigQuery source that implements 
> iobase.BoundedSource [2] interface so that other runners that try to use 
> Python SDK can read from BigQuery as well. Java SDK already has a Beam 
> BigQuery source [3].
> [1] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py
> [2] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L70
> [3] 
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1189



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to