vchiapaikeo commented on issue #12329: URL: https://github.com/apache/airflow/issues/12329#issuecomment-1364326477
Hi @eladkal , I took a look at this issue and it seems like with [this commit](https://github.com/apache/airflow/pull/28284) from @VladaZakharova a couple days ago, this is mostly working as expected. There is a small nit that is causing it not to work perfectly **due to a check for self.autodetect being falsey as opposed to it being explicitly set to None**. In fact, [the Job docs from Google](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload) allude to it working this way: > schema: object (TableSchema) Optional. The schema for the destination table. The schema can be omitted if the destination table already exists, or if you're loading data from Google Cloud Datastore. > autodetect: boolean Optional. Indicates if we should automatically infer the options and schema for CSV and JSON sources. Once I patch this with [PR 28564](https://github.com/apache/airflow/pull/28564), it works fine. To verify, I tried this on my local setup with a simple dag: ```py from airflow import DAG from etsy.operators.gcs_to_bigquery import GCSToBigQueryOperator DEFAULT_TASK_ARGS = { "owner": "gcp-data-platform", "retries": 1, "retry_delay": 10, "start_date": "2022-08-01", } with DAG( max_active_runs=1, concurrency=2, catchup=False, schedule_interval="@daily", dag_id="test_os_patch_gcs_to_bigquery", default_args=DEFAULT_TASK_ARGS, ) as dag: test_gcs_to_bigquery = GCSToBigQueryOperator( task_id="test_gcs_to_bigquery", create_disposition="CREATE_IF_NEEDED", # Need to explicitly set autodetect to None autodetect=None, write_disposition="WRITE_TRUNCATE", destination_project_dataset_table="my-project.vchiapaikeo.test1", bucket="my-bucket", source_format="CSV", source_objects=["vchiapaikeo/file.csv"], ) ``` I then created a simple table in BigQuery: <img width="552" alt="image" src="https://user-images.githubusercontent.com/9200263/209403766-fe31c8ca-77e5-49dd-a970-28306cba9d05.png"> And ran the dag: <img width="1440" alt="image" src="https://user-images.githubusercontent.com/9200263/209404262-18a46b04-8ec2-4bb1-9e9f-203ecc63876b.png"> <img width="583" alt="image" src="https://user-images.githubusercontent.com/9200263/209403806-fc9f09df-2ce2-45de-a44b-b746c3b8ac39.png"> Task logs: ``` [2022-12-23, 20:30:32 UTC] {taskinstance.py:1087} INFO - Dependencies all met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-22T00:00:00+00:00 [queued]> [2022-12-23, 20:30:32 UTC] {taskinstance.py:1087} INFO - Dependencies all met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-22T00:00:00+00:00 [queued]> [2022-12-23, 20:30:32 UTC] {taskinstance.py:1283} INFO - -------------------------------------------------------------------------------- [2022-12-23, 20:30:32 UTC] {taskinstance.py:1284} INFO - Starting attempt 15 of 16 [2022-12-23, 20:30:32 UTC] {taskinstance.py:1285} INFO - -------------------------------------------------------------------------------- [2022-12-23, 20:30:32 UTC] {taskinstance.py:1304} INFO - Executing <Task(GCSToBigQueryOperator): test_gcs_to_bigquery> on 2022-12-22 00:00:00+00:00 [2022-12-23, 20:30:32 UTC] {standard_task_runner.py:55} INFO - Started process 5611 to run task [2022-12-23, 20:30:32 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_os_patch_gcs_to_bigquery', 'test_gcs_to_bigquery', 'scheduled__2022-12-22T00:00:00+00:00', '--job-id', '17', '--raw', '--subdir', 'DAGS_FOLDER/dataeng/batch/test_os_patch_gcs_to_bigquery.py', '--cfg-path', '/tmp/tmpoxitwl1m'] [2022-12-23, 20:30:32 UTC] {standard_task_runner.py:83} INFO - Job 17: Subtask test_gcs_to_bigquery [2022-12-23, 20:30:32 UTC] {task_command.py:389} INFO - Running <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-22T00:00:00+00:00 [running]> on host f3b7042f4dc5 [2022-12-23, 20:30:32 UTC] {taskinstance.py:1511} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=gcp-data-platform AIRFLOW_CTX_DAG_ID=test_os_patch_gcs_to_bigquery AIRFLOW_CTX_TASK_ID=test_gcs_to_bigquery AIRFLOW_CTX_EXECUTION_DATE=2022-12-22T00:00:00+00:00 AIRFLOW_CTX_TRY_NUMBER=15 AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-22T00:00:00+00:00 [2022-12-23, 20:30:32 UTC] {metastore.py:45} INFO - Default connection request. Checking conn_id google_cloud_gcp_data_platform [2022-12-23, 20:30:32 UTC] {connection.py:210} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986. [2022-12-23, 20:30:32 UTC] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted. [2022-12-23, 20:30:32 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution. [2022-12-23, 20:30:32 UTC] {gcs_to_bigquery.py:370} INFO - Using existing BigQuery table for storing data... [2022-12-23, 20:30:32 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook. [2022-12-23, 20:30:34 UTC] {gcs_to_bigquery.py:374} INFO - Executing: {'load': {'autodetect': None, 'createDisposition': 'CREATE_IF_NEEDED', 'destinationTable': {'projectId': 'my-project', 'datasetId': 'vchiapaikeo', 'tableId': 'test1'}, 'sourceFormat': 'CSV', 'sourceUris': ['gs://my-bucket/vchiapaikeo/file.csv'], 'writeDisposition': 'WRITE_TRUNCATE', 'ignoreUnknownValues': False, 'skipLeadingRows': None, 'fieldDelimiter': ',', 'quote': None, 'allowQuotedNewlines': False, 'encoding': 'UTF-8'}} [2022-12-23, 20:30:34 UTC] {bigquery.py:1539} INFO - Inserting job airflow_test_os_patch_gcs_to_bigquery_test_gcs_to_bigquery_2022_12_22T00_00_00_00_00_8c90b0141a25c185bab829d91cc9a474 [2022-12-23, 20:30:37 UTC] {taskinstance.py:1322} INFO - Marking task as SUCCESS. dag_id=test_os_patch_gcs_to_bigquery, task_id=test_gcs_to_bigquery, execution_date=20221222T000000, start_date=20221223T203032, end_date=20221223T203037 [2022-12-23, 20:30:37 UTC] {connection.py:210} WARNING - Connection schemes (type: datahub_rest) shall not contain '_' according to RFC3986. [2022-12-23, 20:30:40 UTC] {local_task_job.py:159} INFO - Task exited with return code 0 [2022-12-23, 20:30:40 UTC] {taskinstance.py:2582} INFO - 0 downstream tasks scheduled from follow-on schedule check ``` ^ omitted some redundant log lines https://github.com/apache/airflow/pull/28564 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
