lwyszomi commented on code in PR #24468:
URL: https://github.com/apache/airflow/pull/24468#discussion_r901361071


##########
airflow/providers/google/cloud/operators/bigquery.py:
##########
@@ -425,27 +437,45 @@ def execute(self, context: 'Context') -> list:
             gcp_conn_id=self.gcp_conn_id,
             delegate_to=self.delegate_to,
             impersonation_chain=self.impersonation_chain,
+            location=self.location,
+            use_legacy_sql=self.use_legacy_sql
         )
 
-        if not self.selected_fields:
-            schema: Dict[str, list] = hook.get_schema(
+        if not self.query:
+            if not self.selected_fields:
+                schema: Dict[str, list] = hook.get_schema(
+                    dataset_id=self.dataset_id,
+                    table_id=self.table_id,
+                )
+                if "fields" in schema:
+                    self.selected_fields = ','.join([field["name"] for field 
in schema["fields"]])
+
+            rows = hook.list_rows(
                 dataset_id=self.dataset_id,
                 table_id=self.table_id,
+                max_results=self.max_results,
+                selected_fields=self.selected_fields
             )
-            if "fields" in schema:
-                self.selected_fields = ','.join([field["name"] for field in 
schema["fields"]])
 
-        rows = hook.list_rows(
-            dataset_id=self.dataset_id,
-            table_id=self.table_id,
-            max_results=self.max_results,
-            selected_fields=self.selected_fields,
-            location=self.location,
-        )
+            if self.as_dict:
+                table_data = [json.dumps(dict(zip(self.selected_fields, 
row))).encode('utf-8') for row in rows]
+            else:
+                table_data = [row.values() for row in rows]
+    
+        else:
+
+            conn = hook.get_conn()
+            cursor = conn.cursor()
+            cursor.execute(self.query)
+            #if self.as_dict:
+            #    table_data = 
[json.dumps(dict(zip(self.selected_fields,row))).encode('utf-8') for row in 
cursor.fetchmany(self.max_results)]
+            # this doesn't work, we don't know the field names, however the 
base function "next" on row 2657 in the BigQueryHook collects the field names 
and then discards them.
+            table_data = [row for row in cursor.fetchmany(self.max_results)]
+
+            cursor.close()

Review Comment:
   Maybe we should consider to extract this to the new method inside the hook.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to