vchiapaikeo commented on issue #12329:
URL: https://github.com/apache/airflow/issues/12329#issuecomment-1364326477

   Hi @eladkal , I took a look at this issue and it seems like with [this 
commit](https://github.com/apache/airflow/pull/28284) from @VladaZakharova a 
couple days ago, this is mostly working as expected. There is a small nit that 
is causing it not to work perfectly **due to a check for self.autodetect being 
falsey as opposed to it being explicitly set to None**. In fact, [the Job docs 
from 
Google](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload)
 allude to it working this way:
   
   > schema: object (TableSchema) Optional. The schema for the destination 
table. The schema can be omitted if the destination table already exists, or if 
you're loading data from Google Cloud Datastore.
   > autodetect: boolean Optional. Indicates if we should automatically infer 
the options and schema for CSV and JSON sources.
   
   
   Once I patch this with [PR 
28564](https://github.com/apache/airflow/pull/28564), it works fine. To verify, 
I tried this on my local setup with a simple dag:
   
   ```py
   from airflow import DAG
   
   from etsy.operators.gcs_to_bigquery import GCSToBigQueryOperator
   
   DEFAULT_TASK_ARGS = {
       "owner": "gcp-data-platform",
       "retries": 1,
       "retry_delay": 10,
       "start_date": "2022-08-01",
   }
   
   with DAG(
       max_active_runs=1,
       concurrency=2,
       catchup=False,
       schedule_interval="@daily",
       dag_id="test_os_patch_gcs_to_bigquery",
       default_args=DEFAULT_TASK_ARGS,
   ) as dag:
   
       test_gcs_to_bigquery = GCSToBigQueryOperator(
           task_id="test_gcs_to_bigquery",
           create_disposition="CREATE_IF_NEEDED",
           # Need to explicitly set autodetect to None
           autodetect=None,
           write_disposition="WRITE_TRUNCATE",
           destination_project_dataset_table="my-project.vchiapaikeo.test1",
           bucket="my-bucket",
           source_format="CSV",
           source_objects=["vchiapaikeo/file.csv"],
       )
   ```
   
   I then created a simple table in BigQuery:
   
   <img width="552" alt="image" 
src="https://user-images.githubusercontent.com/9200263/209403766-fe31c8ca-77e5-49dd-a970-28306cba9d05.png";>
   
   And ran the dag:
   
   <img width="1440" alt="image" 
src="https://user-images.githubusercontent.com/9200263/209404262-18a46b04-8ec2-4bb1-9e9f-203ecc63876b.png";>
   
   <img width="583" alt="image" 
src="https://user-images.githubusercontent.com/9200263/209403806-fc9f09df-2ce2-45de-a44b-b746c3b8ac39.png";>
   
   Task logs:
   
   ```
   [2022-12-23, 20:30:32 UTC] {taskinstance.py:1087} INFO - Dependencies all 
met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery 
scheduled__2022-12-22T00:00:00+00:00 [queued]>
   [2022-12-23, 20:30:32 UTC] {taskinstance.py:1087} INFO - Dependencies all 
met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery 
scheduled__2022-12-22T00:00:00+00:00 [queued]>
   [2022-12-23, 20:30:32 UTC] {taskinstance.py:1283} INFO - 
   
--------------------------------------------------------------------------------
   [2022-12-23, 20:30:32 UTC] {taskinstance.py:1284} INFO - Starting attempt 15 
of 16
   [2022-12-23, 20:30:32 UTC] {taskinstance.py:1285} INFO - 
   
--------------------------------------------------------------------------------
   [2022-12-23, 20:30:32 UTC] {taskinstance.py:1304} INFO - Executing 
<Task(GCSToBigQueryOperator): test_gcs_to_bigquery> on 2022-12-22 00:00:00+00:00
   [2022-12-23, 20:30:32 UTC] {standard_task_runner.py:55} INFO - Started 
process 5611 to run task
   [2022-12-23, 20:30:32 UTC] {standard_task_runner.py:82} INFO - Running: 
['airflow', 'tasks', 'run', 'test_os_patch_gcs_to_bigquery', 
'test_gcs_to_bigquery', 'scheduled__2022-12-22T00:00:00+00:00', '--job-id', 
'17', '--raw', '--subdir', 
'DAGS_FOLDER/dataeng/batch/test_os_patch_gcs_to_bigquery.py', '--cfg-path', 
'/tmp/tmpoxitwl1m']
   [2022-12-23, 20:30:32 UTC] {standard_task_runner.py:83} INFO - Job 17: 
Subtask test_gcs_to_bigquery
   [2022-12-23, 20:30:32 UTC] {task_command.py:389} INFO - Running 
<TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery 
scheduled__2022-12-22T00:00:00+00:00 [running]> on host f3b7042f4dc5
   [2022-12-23, 20:30:32 UTC] {taskinstance.py:1511} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
   AIRFLOW_CTX_DAG_ID=test_os_patch_gcs_to_bigquery
   AIRFLOW_CTX_TASK_ID=test_gcs_to_bigquery
   AIRFLOW_CTX_EXECUTION_DATE=2022-12-22T00:00:00+00:00
   AIRFLOW_CTX_TRY_NUMBER=15
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-22T00:00:00+00:00
   [2022-12-23, 20:30:32 UTC] {metastore.py:45} INFO - Default connection 
request. Checking conn_id google_cloud_gcp_data_platform
   [2022-12-23, 20:30:32 UTC] {connection.py:210} WARNING - Connection schemes 
(type: google_cloud_platform) shall not contain '_' according to RFC3986.
   [2022-12-23, 20:30:32 UTC] {crypto.py:83} WARNING - empty cryptography key - 
values will not be stored encrypted.
   [2022-12-23, 20:30:32 UTC] {base.py:73} INFO - Using connection ID 
'google_cloud_default' for task execution.
   [2022-12-23, 20:30:32 UTC] {gcs_to_bigquery.py:370} INFO - Using existing 
BigQuery table for storing data...
   [2022-12-23, 20:30:32 UTC] {credentials_provider.py:323} INFO - Getting 
connection using `google.auth.default()` since no key file is defined for hook.
   [2022-12-23, 20:30:34 UTC] {gcs_to_bigquery.py:374} INFO - Executing: 
{'load': {'autodetect': None, 'createDisposition': 'CREATE_IF_NEEDED', 
'destinationTable': {'projectId': 'my-project', 'datasetId': 'vchiapaikeo', 
'tableId': 'test1'}, 'sourceFormat': 'CSV', 'sourceUris': 
['gs://my-bucket/vchiapaikeo/file.csv'], 'writeDisposition': 'WRITE_TRUNCATE', 
'ignoreUnknownValues': False, 'skipLeadingRows': None, 'fieldDelimiter': ',', 
'quote': None, 'allowQuotedNewlines': False, 'encoding': 'UTF-8'}}
   [2022-12-23, 20:30:34 UTC] {bigquery.py:1539} INFO - Inserting job 
airflow_test_os_patch_gcs_to_bigquery_test_gcs_to_bigquery_2022_12_22T00_00_00_00_00_8c90b0141a25c185bab829d91cc9a474
   [2022-12-23, 20:30:37 UTC] {taskinstance.py:1322} INFO - Marking task as 
SUCCESS. dag_id=test_os_patch_gcs_to_bigquery, task_id=test_gcs_to_bigquery, 
execution_date=20221222T000000, start_date=20221223T203032, 
end_date=20221223T203037
   [2022-12-23, 20:30:37 UTC] {connection.py:210} WARNING - Connection schemes 
(type: datahub_rest) shall not contain '_' according to RFC3986.
   [2022-12-23, 20:30:40 UTC] {local_task_job.py:159} INFO - Task exited with 
return code 0
   [2022-12-23, 20:30:40 UTC] {taskinstance.py:2582} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   ^ omitted some redundant log lines
   
   https://github.com/apache/airflow/pull/28564


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to