vchiapaikeo commented on PR #28564:
URL: https://github.com/apache/airflow/pull/28564#issuecomment-1364532548

   Also tested this dag in breeze locally after adding a 
`google_default_connection` and things seemed to work fine as well:
   
   Dag:
   ```py
   from airflow import DAG
   
   from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
       GCSToBigQueryOperator,
   )
   
   DEFAULT_TASK_ARGS = {
       "owner": "gcp-data-platform",
       "retries": 1,
       "retry_delay": 10,
       "start_date": "2022-08-01",
   }
   
   with DAG(
       max_active_runs=1,
       concurrency=2,
       catchup=False,
       schedule_interval="@daily",
       dag_id="test_os_patch_gcs_to_bigquery",
       default_args=DEFAULT_TASK_ARGS,
   ) as dag:
   
       test_gcs_to_bigquery = GCSToBigQueryOperator(
           task_id="test_gcs_to_bigquery",
           create_disposition="CREATE_IF_NEEDED",
           # Need to explicitly set autodetect to None
           autodetect=None,
           write_disposition="WRITE_TRUNCATE",
           destination_project_dataset_table="my-project.vchiapaikeo.test1",
           bucket="my-bucket",
           source_format="CSV",
           source_objects=["vchiapaikeo/file.csv"],
       )
   ```
   
   Task Logs:
   ```
   *** Reading local file: 
/root/airflow/logs/dag_id=test_os_patch_gcs_to_bigquery/run_id=scheduled__2022-12-23T00:00:00+00:00/task_id=test_gcs_to_bigquery/attempt=5.log
   [2022-12-24, 13:38:59 UTC] {taskinstance.py:1084} INFO - Dependencies all 
met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery 
scheduled__2022-12-23T00:00:00+00:00 [queued]>
   [2022-12-24, 13:38:59 UTC] {taskinstance.py:1084} INFO - Dependencies all 
met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery 
scheduled__2022-12-23T00:00:00+00:00 [queued]>
   [2022-12-24, 13:39:00 UTC] {taskinstance.py:1282} INFO - 
   
--------------------------------------------------------------------------------
   [2022-12-24, 13:39:00 UTC] {taskinstance.py:1283} INFO - Starting attempt 5 
of 6
   [2022-12-24, 13:39:00 UTC] {taskinstance.py:1284} INFO - 
   
--------------------------------------------------------------------------------
   [2022-12-24, 13:39:00 UTC] {taskinstance.py:1303} INFO - Executing 
<Task(GCSToBigQueryOperator): test_gcs_to_bigquery> on 2022-12-23 00:00:00+00:00
   [2022-12-24, 13:39:00 UTC] {standard_task_runner.py:55} INFO - Started 
process 396 to run task
   [2022-12-24, 13:39:00 UTC] {standard_task_runner.py:82} INFO - Running: 
['airflow', 'tasks', 'run', 'test_os_patch_gcs_to_bigquery', 
'test_gcs_to_bigquery', 'scheduled__2022-12-23T00:00:00+00:00', '--job-id', 
'11', '--raw', '--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', 
'/tmp/tmpoc_4ow50']
   [2022-12-24, 13:39:00 UTC] {standard_task_runner.py:83} INFO - Job 11: 
Subtask test_gcs_to_bigquery
   [2022-12-24, 13:39:00 UTC] {task_command.py:388} INFO - Running 
<TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery 
scheduled__2022-12-23T00:00:00+00:00 [running]> on host d6d6ca865d2e
   [2022-12-24, 13:39:00 UTC] {taskinstance.py:1512} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
   AIRFLOW_CTX_DAG_ID=test_os_patch_gcs_to_bigquery
   AIRFLOW_CTX_TASK_ID=test_gcs_to_bigquery
   AIRFLOW_CTX_EXECUTION_DATE=2022-12-23T00:00:00+00:00
   AIRFLOW_CTX_TRY_NUMBER=5
   AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-23T00:00:00+00:00
   [2022-12-24, 13:39:00 UTC] {base.py:73} INFO - Using connection ID 
'google_cloud_default' for task execution.
   [2022-12-24, 13:39:00 UTC] {gcs_to_bigquery.py:377} INFO - Using existing 
BigQuery table for storing data...
   [2022-12-24, 13:39:00 UTC] {credentials_provider.py:323} INFO - Getting 
connection using `google.auth.default()` since no key file is defined for hook.
   [2022-12-24, 13:39:00 UTC] {_default.py:649} WARNING - No project ID could 
be determined. Consider running `gcloud config set project` or setting the 
GOOGLE_CLOUD_PROJECT environment variable
   [2022-12-24, 13:39:00 UTC] {gcs_to_bigquery.py:381} INFO - Executing: 
{'load': {'autodetect': None, 'createDisposition': 'CREATE_IF_NEEDED', 
'destinationTable': {'projectId': 'etsy-data-warehouse-dev', 'datasetId': 
'vchiapaikeo', 'tableId': 'test1'}, 'sourceFormat': 'CSV', 'sourceUris': 
['gs://hadoop-sandbox-dev-data-sodf9k/vchiapaikeo/file.csv'], 
'writeDisposition': 'WRITE_TRUNCATE', 'ignoreUnknownValues': False, 
'skipLeadingRows': None, 'fieldDelimiter': ',', 'quote': None, 
'allowQuotedNewlines': False, 'encoding': 'UTF-8'}}
   [2022-12-24, 13:39:00 UTC] {bigquery.py:1539} INFO - Inserting job 
airflow_test_os_patch_gcs_to_bigquery_test_gcs_to_bigquery_2022_12_23T00_00_00_00_00_81a09730bb5999ef34166fdfa7b80799
   [2022-12-24, 13:39:06 UTC] {taskinstance.py:1326} INFO - Marking task as 
SUCCESS. dag_id=test_os_patch_gcs_to_bigquery, task_id=test_gcs_to_bigquery, 
execution_date=20221223T000000, start_date=20221224T133859, 
end_date=20221224T133906
   [2022-12-24, 13:39:06 UTC] {local_task_job.py:208} INFO - Task exited with 
return code 0
   [2022-12-24, 13:39:06 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   Grid View:
   
   <img width="1430" alt="image" 
src="https://user-images.githubusercontent.com/9200263/209438520-4fc43bf4-2891-45df-bf59-0b423b92e6f2.png";>
   
   
   BigQuery Output:
   <img width="543" alt="image" 
src="https://user-images.githubusercontent.com/9200263/209438561-faa0f72c-ad6b-427e-9ed2-59cf9daccd57.png";>
   
   When the write disposition is changed to WRITE_APPEND, we get the records 
duplicated after a rerun as expected:
   <img width="594" alt="image" 
src="https://user-images.githubusercontent.com/9200263/209438582-85c0dceb-0635-4157-a5eb-f84425439f8b.png";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to