Ardagan commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r422903477
##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
*self._args,
**self._kwargs))
| _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+ '''
+ PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+ a temporary storage and returns metadata for created files.
+ '''
+ def __init__(
+ self,
+ gcs_location_pattern=None,
+ project=None,
+ coder=None,
+ schema=None,
+ kms_key=None):
+
+ self.gcs_location_pattern = gcs_location_pattern
+ self.project = project
+ self.coder = coder or _JsonToDictCoder
+ self.kms_key = kms_key
+ self.split_result = None
+ self.schema = schema
+ self.target_schema = None
+
+ def process(self, element):
+ '''
+ :param element(ReadAllFromBigQueryRequest):
+ :return:
+ '''
+ element.validate()
+ if element.table is not None:
+ table_reference = bigquery_tools.parse_table_reference(element.table)
+ query = None
+ use_legacy_sql = True
+ else:
+ query = element.query
+ use_legacy_sql = element.use_legacy_sql
+
+ flatten_results = element.flatten_results
+
+ bq = bigquery_tools.BigQueryWrapper()
+
+ try:
+ if element.query is not None:
+ self._setup_temporary_dataset(bq, query, use_legacy_sql)
+ table_reference = self._execute_query(
+ bq, query, use_legacy_sql, flatten_results)
+
+ gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+ table_schema = bq.get_table(
+ table_reference.projectId,
+ table_reference.datasetId,
+ table_reference.tableId).schema
+
+ if self.target_schema is None:
+ self.target_schema = bigquery_tools.parse_table_schema_from_json(
+ json.dumps(self.schema))
+
+ if not self.target_schema == table_schema:
Review comment:
We need schema to parse files generated from BQ export. I also use it to
verify all queries return same data format.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]