[ 
https://issues.apache.org/jira/browse/BEAM-1440?focusedWorklogId=358350&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358350
 ]

ASF GitHub Bot logged work on BEAM-1440:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Dec/19 06:20
            Start Date: 12/Dec/19 06:20
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on pull request #9772: 
[BEAM-1440] Create a BigQuery source that implements iobase.BoundedSource for 
Python
URL: https://github.com/apache/beam/pull/9772#discussion_r356974540
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigquery.py
 ##########
 @@ -1274,3 +1463,139 @@ def display_data(self):
                                    tableSpec)
       res['table'] = DisplayDataItem(tableSpec, label='Table')
     return res
+
+
+class _PassThroughThenCleanup(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+  """
+  def __init__(self, cleanup_dofn):
+    self.cleanup_dofn = cleanup_dofn
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    output = input | beam.ParDo(PassThrough()).with_outputs('cleanup_signal',
+                                                            main='main')
+    main_output = output['main']
+    cleanup_signal = output['cleanup_signal']
+
+    _ = (input.pipeline
+         | beam.Create([None])
+         | beam.ParDo(self.cleanup_dofn, beam.pvalue.AsSingleton(
+             cleanup_signal)))
+
+    return main_output
+
+
+@experimental()
+class _ReadFromBigQuery(PTransform):
+  """Read data from BigQuery.
+
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced JSON file.
+
+    Do note that currently this source does not work with DirectRunner.
+
+  Args:
+    table (str, callable, ValueProvider): The ID of the table, or a callable
+      that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
+      numbers ``0-9``, or underscores ``_``. If dataset argument is
+      :data:`None` then the table argument must contain the entire table
+      reference specified as: ``'DATASET.TABLE'``
+      or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
+      argument representing an element to be written to BigQuery, and return
+      a TableReference, or a string table name as specified above.
+    dataset (str): The ID of the dataset containing this table or
+      :data:`None` if the table reference is specified entirely by the table
+      argument.
+    project (str): The ID of the project containing this table.
+    query (str): A query to be used instead of arguments table, dataset, and
+      project.
+    validate (bool): If :data:`True`, various checks will be done when source
+      gets initialized (e.g., is table present?). This should be
+      :data:`True` for most scenarios in order to catch errors as early as
+      possible (pipeline construction instead of pipeline execution). It
+      should be :data:`False` if the table is created during pipeline
+      execution by a previous step.
+    coder (~apache_beam.coders.coders.Coder): The coder for the table
+      rows. If :data:`None`, then the default coder is
+      _JsonToDictCoder, which will interpret every row as a JSON
+      serialized dictionary.
+    use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL
+      dialect for this query. The default value is :data:`False`.
+      If set to :data:`True`, the query will use BigQuery's updated SQL
+      dialect with improved standards compliance.
+      This parameter is ignored for table inputs.
+    flatten_results (bool): Flattens all nested and repeated fields in the
+      query results. The default value is :data:`True`.
+    kms_key (str): Experimental. Optional Cloud KMS key name for use when
+      creating new temporary tables.
+    gcs_location (str): The name of the Google Cloud Storage bucket where
+      the extracted table should be written as a string or
+      a :class:`~apache_beam.options.value_provider.ValueProvider`. If
+      :data:`None`, then the temp_location parameter is used.
+   """
+  def __init__(self, gcs_location=None, validate=False, *args, **kwargs):
+    if gcs_location:
+      if not isinstance(gcs_location, (str, unicode, ValueProvider)):
+        raise TypeError('%s: gcs_location must be of type string'
+                        ' or ValueProvider; got %r instead'
+                        % (self.__class__.__name__, type(gcs_location)))
+
+      if isinstance(gcs_location, (str, unicode)):
+        gcs_location = StaticValueProvider(str, gcs_location)
+    self.gcs_location = gcs_location
+    self.validate = validate
+
+    self._args = args
+    self._kwargs = kwargs
+
+  def _get_destination_uri(self, temp_location):
+    """Returns the fully qualified Google Cloud Storage URI where the
+    extracted table should be written.
+    """
+    file_pattern = 'bigquery-table-dump-*.json'
+
+    if self.gcs_location is not None:
+      gcs_base = self.gcs_location.get()
+    elif temp_location is not None:
+      gcs_base = temp_location
+      logging.debug("gcs_location is empty, using temp_location instead")
+    else:
+      raise ValueError('{} requires a GCS location to be provided'
+                       .format(self.__class__.__name__))
+    if self.validate:
+      self._validate_gcs_location(gcs_base)
+
+    job_id = uuid.uuid4().hex
+    return FileSystems.join(gcs_base, job_id, file_pattern)
+
+  @staticmethod
+  def _validate_gcs_location(gcs_location):
+    if not gcs_location.startswith('gs://'):
+      raise ValueError('Invalid GCS location: {}'.format(gcs_location))
+
+  def expand(self, pcoll):
+    class RemoveJsonFiles(beam.DoFn):
+      def __init__(self, gcs_location):
+        self._gcs_location = gcs_location
+
+      def process(self, unused_element, signal):
+        match_result = FileSystems.match([self._gcs_location])[0].metadata_list
+        logging.debug("%s: matched %s files", self.__class__.__name__,
+                      len(match_result))
+        paths = [x.path for x in match_result]
+        FileSystems.delete(paths)
+
+    temp_location = pcoll.pipeline.options.view_as(
+        GoogleCloudOptions).temp_location
+    gcs_location = self._get_destination_uri(temp_location)
+
+    return (pcoll
+            | beam.io.Read(_CustomBigQuerySource(gcs_location=gcs_location,
 
 Review comment:
   Quick question. Have we considered workiem failures and retries ? We have to 
be careful to not to loose data in such cases. For example, what will happen if 
one of the workitems is failed and retried after deleting some files in 
_PassThroughThenCleanup step ? 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 358350)
    Time Spent: 18h 20m  (was: 18h 10m)

> Create a BigQuery source (that implements iobase.BoundedSource) for Python SDK
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-1440
>                 URL: https://issues.apache.org/jira/browse/BEAM-1440
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Kamil Wasilewski
>            Priority: Major
>          Time Spent: 18h 20m
>  Remaining Estimate: 0h
>
> Currently we have a BigQuery native source for Python SDK [1].
> This can only be used by Dataflow runner.
> We should  implement a Beam BigQuery source that implements 
> iobase.BoundedSource [2] interface so that other runners that try to use 
> Python SDK can read from BigQuery as well. Java SDK already has a Beam 
> BigQuery source [3].
> [1] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py
> [2] 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L70
> [3] 
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1189



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to