pabloem commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r421825128



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:
+        raise ValueError((
+            "Schema generated by reading from BQ doesn't match expected"
+            "schema.\nExpected: {}\nActual: {}").format(
+                self.target_schema, table_schema))
+
+      metadata_list = self._export_files(bq, table_reference, gcs_location)
+
+      yield pvalue.TaggedOutput('location_to_cleanup', gcs_location)
+      for metadata in metadata_list:
+        yield metadata.path
+
+    finally:
+      if query is not None:
+        bq.clean_up_temporary_dataset(self.project)
+
+  def _setup_temporary_dataset(self, bq, query, use_legacy_sql):
+    location = bq.get_query_location(self.project, query, use_legacy_sql)
+    bq.create_temporary_dataset(self.project, location)
+
+  def _execute_query(self, bq, query, use_legacy_sql, flatten_results):
+    job = bq._start_query_job(
+        self.project,
+        query,
+        use_legacy_sql,
+        flatten_results,
+        job_id=uuid.uuid4().hex,
+        kms_key=self.kms_key)
+    job_ref = job.jobReference
+    bq.wait_for_bq_job(job_ref)
+    return bq._get_temp_table(self.project)
+
+  def _export_files(self, bq, table_reference, gcs_location):
+    """Runs a BigQuery export job.
+
+    Returns:
+      a list of FileMetadata instances
+    """
+    job_id = uuid.uuid4().hex
+    job_ref = bq.perform_extract_job([gcs_location],
+                                     job_id,
+                                     table_reference,
+                                     bigquery_tools.FileFormat.JSON,
+                                     include_header=False)
+    bq.wait_for_bq_job(job_ref)
+    metadata_list = FileSystems.match([gcs_location])[0].metadata_list
+
+    return metadata_list
+
+
+class _PassThroughThenCleanupWithSI(PTransform):
+  """A PTransform that invokes a DoFn after the input PCollection has been
+    processed.
+
+    DoFn should have arguments (element, side_input, cleanup_signal).
+
+    Utilizes readiness of PCollection to trigger DoFn.
+  """
+  def __init__(self, cleanup_dofn, side_input):
+    self.cleanup_dofn = cleanup_dofn
+    self.side_input = side_input
+
+  def expand(self, input):
+    class PassThrough(beam.DoFn):
+      def process(self, element):
+        yield element
+
+    main_output, cleanup_signal = input | beam.ParDo(
+      PassThrough()).with_outputs(
+      'cleanup_signal', main='main')
+
+    _ = (
+        input.pipeline
+        | beam.Create([None])
+        | beam.ParDo(
+            self.cleanup_dofn,
+            self.side_input,
+            beam.pvalue.AsSingleton(cleanup_signal)))
+
+    return main_output
+
+
+class ReadAllFromBigQueryRequest:

Review comment:
       checked w Robert, and he thought the class was fine




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to