[ 
https://issues.apache.org/jira/browse/BEAM-9650?focusedWorklogId=432686&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432686
 ]

ASF GitHub Bot logged work on BEAM-9650:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/20 09:22
            Start Date: 11/May/20 09:22
    Worklog Time Spent: 10m 
      Work Description: Ardagan commented on a change in pull request #11582:
URL: https://github.com/apache/beam/pull/11582#discussion_r422903477



##########
File path: sdks/python/apache_beam/io/gcp/bigquery.py
##########
@@ -1641,3 +1644,314 @@ def process(self, unused_element, signal):
                 *self._args,
                 **self._kwargs))
         | _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
+
+
+class _ExtractBQData(DoFn):
+  '''
+  PTransform:ReadAllFromBigQueryRequest->FileMetadata that fetches BQ data into
+  a temporary storage and returns metadata for created files.
+  '''
+  def __init__(
+      self,
+      gcs_location_pattern=None,
+      project=None,
+      coder=None,
+      schema=None,
+      kms_key=None):
+
+    self.gcs_location_pattern = gcs_location_pattern
+    self.project = project
+    self.coder = coder or _JsonToDictCoder
+    self.kms_key = kms_key
+    self.split_result = None
+    self.schema = schema
+    self.target_schema = None
+
+  def process(self, element):
+    '''
+    :param element(ReadAllFromBigQueryRequest):
+    :return:
+    '''
+    element.validate()
+    if element.table is not None:
+      table_reference = bigquery_tools.parse_table_reference(element.table)
+      query = None
+      use_legacy_sql = True
+    else:
+      query = element.query
+      use_legacy_sql = element.use_legacy_sql
+
+    flatten_results = element.flatten_results
+
+    bq = bigquery_tools.BigQueryWrapper()
+
+    try:
+      if element.query is not None:
+        self._setup_temporary_dataset(bq, query, use_legacy_sql)
+        table_reference = self._execute_query(
+            bq, query, use_legacy_sql, flatten_results)
+
+      gcs_location = self.gcs_location_pattern.format(uuid.uuid4().hex)
+
+      table_schema = bq.get_table(
+          table_reference.projectId,
+          table_reference.datasetId,
+          table_reference.tableId).schema
+
+      if self.target_schema is None:
+        self.target_schema = bigquery_tools.parse_table_schema_from_json(
+            json.dumps(self.schema))
+
+      if not self.target_schema == table_schema:

Review comment:
       We need schema to parse files generated from BQ export. I also use it to 
verify all queries return same data format.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 432686)
    Time Spent: 11h 40m  (was: 11.5h)

> Add consistent slowly changing side inputs support
> --------------------------------------------------
>
>                 Key: BEAM-9650
>                 URL: https://issues.apache.org/jira/browse/BEAM-9650
>             Project: Beam
>          Issue Type: Bug
>          Components: io-ideas
>            Reporter: Mikhail Gryzykhin
>            Assignee: Mikhail Gryzykhin
>            Priority: Major
>          Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> Add implementation for slowly changing dimentions based on [design 
> doc](https://docs.google.com/document/d/1LDY_CtsOJ8Y_zNv1QtkP6AGFrtzkj1q5EW_gSChOIvg/edit]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to