This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ef44b6  Clean-up of google cloud example dags - batch 2 (#19527)
6ef44b6 is described below

commit 6ef44b6a507a8e8d5f41a6731a0773046623d171
Author: Niko <[email protected]>
AuthorDate: Sun Nov 14 16:09:48 2021 -0800

    Clean-up of google cloud example dags - batch 2 (#19527)
    
    - Use static start_date
    - Use catchup=False
    - Tidy up the chaining of tasks in some cases
    - Remove unnecessary specification of default conn ids
---
 .../example_dags/example_azure_fileshare_to_gcs.py |  5 +--
 .../google/cloud/example_dags/example_compute.py   | 16 +++++--
 .../cloud/example_dags/example_compute_igm.py      | 14 ++++--
 .../cloud/example_dags/example_compute_ssh.py      |  5 ++-
 .../cloud/example_dags/example_datacatalog.py      |  9 +++-
 .../google/cloud/example_dags/example_dataflow.py  | 16 ++++---
 .../example_dags/example_dataflow_flex_template.py |  5 ++-
 .../cloud/example_dags/example_dataflow_sql.py     |  5 ++-
 .../cloud/example_dags/example_datafusion.py       |  5 ++-
 .../google/cloud/example_dags/example_dataprep.py  |  5 ++-
 .../google/cloud/example_dags/example_dataproc.py  |  9 +++-
 .../google/cloud/example_dags/example_datastore.py | 10 +++--
 .../google/cloud/example_dags/example_dlp.py       | 19 +++++---
 .../example_dags/example_facebook_ads_to_gcs.py    | 19 +++++---
 .../google/cloud/example_dags/example_functions.py |  5 ++-
 .../google/cloud/example_dags/example_gcs.py       | 10 +++--
 .../example_gcs_timespan_file_transform.py         |  5 ++-
 .../cloud/example_dags/example_gcs_to_bigquery.py  | 51 +++++++++++-----------
 .../cloud/example_dags/example_gcs_to_gcs.py       |  8 +++-
 .../cloud/example_dags/example_gcs_to_local.py     |  6 +--
 .../cloud/example_dags/example_gcs_to_sftp.py      |  8 +++-
 .../cloud/example_dags/example_gdrive_to_gcs.py    |  5 ++-
 .../cloud/example_dags/example_gdrive_to_local.py  |  5 ++-
 .../example_dags/example_kubernetes_engine.py      |  5 ++-
 .../operators/cloud/gcs.rst                        |  2 +
 25 files changed, 164 insertions(+), 88 deletions(-)

diff --git 
a/airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
index ef6fc64..680b43b 100644
--- 
a/airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
+++ 
b/airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
@@ -37,7 +37,8 @@ with DAG(
         'retry_delay': timedelta(minutes=5),
     },
     schedule_interval='@once',
-    start_date=datetime(2018, 11, 1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     tags=['example'],
 ) as dag:
     # [START howto_operator_azure_fileshare_to_gcs_basic]
@@ -46,8 +47,6 @@ with DAG(
         share_name=AZURE_SHARE_NAME,
         dest_gcs=DEST_GCS_BUCKET,
         directory_name=AZURE_DIRECTORY_NAME,
-        azure_fileshare_conn_id='azure_fileshare_default',
-        gcp_conn_id='google_cloud_default',
         replace=False,
         gzip=True,
         google_impersonation_chain=None,
diff --git a/airflow/providers/google/cloud/example_dags/example_compute.py 
b/airflow/providers/google/cloud/example_dags/example_compute.py
index 820576f..6d81e3a 100644
--- a/airflow/providers/google/cloud/example_dags/example_compute.py
+++ b/airflow/providers/google/cloud/example_dags/example_compute.py
@@ -30,14 +30,15 @@ This DAG relies on the following OS environment variables
 """
 
 import os
+from datetime import datetime
 
 from airflow import models
+from airflow.models.baseoperator import chain
 from airflow.providers.google.cloud.operators.compute import (
     ComputeEngineSetMachineTypeOperator,
     ComputeEngineStartInstanceOperator,
     ComputeEngineStopInstanceOperator,
 )
-from airflow.utils.dates import days_ago
 
 # [START howto_operator_gce_args_common]
 GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
@@ -52,7 +53,8 @@ GCE_SHORT_MACHINE_TYPE_NAME = 
os.environ.get('GCE_SHORT_MACHINE_TYPE_NAME', 'n1-
 with models.DAG(
     'example_gcp_compute',
     schedule_interval='@once',  # Override to match your needs
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     tags=['example'],
 ) as dag:
     # [START howto_operator_gce_start]
@@ -96,5 +98,11 @@ with models.DAG(
     )
     # [END howto_operator_gce_set_machine_type_no_project_id]
 
-    gce_instance_start >> gce_instance_start2 >> gce_instance_stop >> 
gce_instance_stop2
-    gce_instance_stop2 >> gce_set_machine_type >> gce_set_machine_type2
+    chain(
+        gce_instance_start,
+        gce_instance_start2,
+        gce_instance_stop,
+        gce_instance_stop2,
+        gce_set_machine_type,
+        gce_set_machine_type2,
+    )
diff --git a/airflow/providers/google/cloud/example_dags/example_compute_igm.py 
b/airflow/providers/google/cloud/example_dags/example_compute_igm.py
index 0370d8a..7cad62c 100644
--- a/airflow/providers/google/cloud/example_dags/example_compute_igm.py
+++ b/airflow/providers/google/cloud/example_dags/example_compute_igm.py
@@ -39,13 +39,14 @@ Variables for update template in Group Manager:
 """
 
 import os
+from datetime import datetime
 
 from airflow import models
+from airflow.models.baseoperator import chain
 from airflow.providers.google.cloud.operators.compute import (
     ComputeEngineCopyInstanceTemplateOperator,
     ComputeEngineInstanceGroupUpdateManagerTemplateOperator,
 )
-from airflow.utils.dates import days_ago
 
 GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
 GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
@@ -92,7 +93,8 @@ UPDATE_POLICY = {
 with models.DAG(
     'example_gcp_compute_igm',
     schedule_interval='@once',  # Override to match your needs
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     tags=['example'],
 ) as dag:
     # [START howto_operator_gce_igm_copy_template]
@@ -133,5 +135,9 @@ with models.DAG(
     )
     # [END howto_operator_gce_igm_update_template_no_project_id]
 
-    gce_instance_template_copy >> gce_instance_template_copy2 >> 
gce_instance_group_manager_update_template
-    gce_instance_group_manager_update_template >> 
gce_instance_group_manager_update_template2
+    chain(
+        gce_instance_template_copy,
+        gce_instance_template_copy2,
+        gce_instance_group_manager_update_template,
+        gce_instance_group_manager_update_template2,
+    )
diff --git a/airflow/providers/google/cloud/example_dags/example_compute_ssh.py 
b/airflow/providers/google/cloud/example_dags/example_compute_ssh.py
index 62636fc..c207437 100644
--- a/airflow/providers/google/cloud/example_dags/example_compute_ssh.py
+++ b/airflow/providers/google/cloud/example_dags/example_compute_ssh.py
@@ -16,11 +16,11 @@
 # under the License.
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.hooks.compute_ssh import 
ComputeEngineSSHHook
 from airflow.providers.ssh.operators.ssh import SSHOperator
-from airflow.utils import dates
 
 # [START howto_operator_gce_args_common]
 GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
@@ -30,8 +30,9 @@ GCE_INSTANCE = os.environ.get('GCE_INSTANCE', 
'target-instance')
 
 with models.DAG(
     'example_compute_ssh',
-    default_args=dict(start_date=dates.days_ago(1)),
     schedule_interval='@once',  # Override to match your needs
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     tags=['example'],
 ) as dag:
     # # [START howto_execute_command_on_remote1]
diff --git a/airflow/providers/google/cloud/example_dags/example_datacatalog.py 
b/airflow/providers/google/cloud/example_dags/example_datacatalog.py
index e56bc10..4de219f 100644
--- a/airflow/providers/google/cloud/example_dags/example_datacatalog.py
+++ b/airflow/providers/google/cloud/example_dags/example_datacatalog.py
@@ -20,6 +20,7 @@
 Example Airflow DAG that interacts with Google Data Catalog service
 """
 import os
+from datetime import datetime
 
 from google.cloud.datacatalog_v1beta1 import FieldType, TagField, 
TagTemplateField
 
@@ -49,7 +50,6 @@ from airflow.providers.google.cloud.operators.datacatalog 
import (
     CloudDataCatalogUpdateTagTemplateFieldOperator,
     CloudDataCatalogUpdateTagTemplateOperator,
 )
-from airflow.utils.dates import days_ago
 
 PROJECT_ID = os.getenv("GCP_PROJECT_ID")
 BUCKET_ID = os.getenv("GCP_TEST_DATA_BUCKET", "INVALID BUCKET NAME")
@@ -61,7 +61,12 @@ FIELD_NAME_1 = "first"
 FIELD_NAME_2 = "second"
 FIELD_NAME_3 = "first-rename"
 
-with models.DAG("example_gcp_datacatalog", schedule_interval='@once', 
start_date=days_ago(1)) as dag:
+with models.DAG(
+    "example_gcp_datacatalog",
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
     # Create
     # [START howto_operator_gcp_datacatalog_create_entry_group]
     create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow.py 
b/airflow/providers/google/cloud/example_dags/example_dataflow.py
index 0c9aceb..8b1d01f 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataflow.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataflow.py
@@ -20,6 +20,7 @@
 Example Airflow DAG for Google Cloud Dataflow service
 """
 import os
+from datetime import datetime
 from typing import Callable, Dict, List
 from urllib.parse import urlparse
 
@@ -41,7 +42,8 @@ from airflow.providers.google.cloud.sensors.dataflow import (
     DataflowJobStatusSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_local import 
GCSToLocalFilesystemOperator
-from airflow.utils.dates import days_ago
+
+START_DATE = datetime(2021, 1, 1)
 
 GCS_TMP = os.environ.get('GCP_DATAFLOW_GCS_TMP', 'gs://INVALID BUCKET 
NAME/temp/')
 GCS_STAGING = os.environ.get('GCP_DATAFLOW_GCS_STAGING', 'gs://INVALID BUCKET 
NAME/staging/')
@@ -63,7 +65,8 @@ default_args = {
 with models.DAG(
     "example_gcp_dataflow_native_java",
     schedule_interval='@once',  # Override to match your needs
-    start_date=days_ago(1),
+    start_date=START_DATE,
+    catchup=False,
     tags=['example'],
 ) as dag_native_java:
 
@@ -110,7 +113,8 @@ with models.DAG(
 with models.DAG(
     "example_gcp_dataflow_native_python",
     default_args=default_args,
-    start_date=days_ago(1),
+    start_date=START_DATE,
+    catchup=False,
     schedule_interval='@once',  # Override to match your needs
     tags=['example'],
 ) as dag_native_python:
@@ -145,7 +149,8 @@ with models.DAG(
 with models.DAG(
     "example_gcp_dataflow_native_python_async",
     default_args=default_args,
-    start_date=days_ago(1),
+    start_date=START_DATE,
+    catchup=False,
     schedule_interval='@once',  # Override to match your needs
     tags=['example'],
 ) as dag_native_python_async:
@@ -246,7 +251,8 @@ with models.DAG(
 with models.DAG(
     "example_gcp_dataflow_template",
     default_args=default_args,
-    start_date=days_ago(1),
+    start_date=START_DATE,
+    catchup=False,
     schedule_interval='@once',  # Override to match your needs
     tags=['example'],
 ) as dag_template:
diff --git 
a/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py 
b/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py
index 3938af8..43d9914 100644
--- 
a/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py
+++ 
b/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py
@@ -20,10 +20,10 @@
 Example Airflow DAG for Google Cloud Dataflow service
 """
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.operators.dataflow import 
DataflowStartFlexTemplateOperator
-from airflow.utils.dates import days_ago
 
 GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
 
@@ -45,7 +45,8 @@ BQ_FLEX_TEMPLATE_LOCATION = 
os.environ.get('GCP_DATAFLOW_BQ_FLEX_TEMPLATE_LOCATI
 
 with models.DAG(
     dag_id="example_gcp_dataflow_flex_template_java",
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     schedule_interval='@once',  # Override to match your needs
 ) as dag_flex_template:
     # [START howto_operator_start_template_job]
diff --git 
a/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py 
b/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
index 0d03119..a74f5de 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
@@ -20,10 +20,10 @@
 Example Airflow DAG for Google Cloud Dataflow service
 """
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.operators.dataflow import 
DataflowStartSqlJobOperator
-from airflow.utils.dates import days_ago
 
 GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
 
@@ -36,7 +36,8 @@ DATAFLOW_SQL_LOCATION = 
os.environ.get("GCP_DATAFLOW_SQL_LOCATION", "us-west1")
 
 with models.DAG(
     dag_id="example_gcp_dataflow_sql",
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     schedule_interval='@once',  # Override to match your needs
     tags=['example'],
 ) as dag_sql:
diff --git a/airflow/providers/google/cloud/example_dags/example_datafusion.py 
b/airflow/providers/google/cloud/example_dags/example_datafusion.py
index 4900e6b..c2387a7 100644
--- a/airflow/providers/google/cloud/example_dags/example_datafusion.py
+++ b/airflow/providers/google/cloud/example_dags/example_datafusion.py
@@ -19,6 +19,7 @@
 Example Airflow DAG that shows how to use DataFusion.
 """
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.operators.bash import BashOperator
@@ -35,7 +36,6 @@ from airflow.providers.google.cloud.operators.datafusion 
import (
     CloudDataFusionUpdateInstanceOperator,
 )
 from airflow.providers.google.cloud.sensors.datafusion import 
CloudDataFusionPipelineStateSensor
-from airflow.utils import dates
 from airflow.utils.state import State
 
 # [START howto_data_fusion_env_variables]
@@ -153,7 +153,8 @@ PIPELINE = {
 with models.DAG(
     "example_data_fusion",
     schedule_interval='@once',  # Override to match your needs
-    start_date=dates.days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
 ) as dag:
     # [START howto_cloud_data_fusion_create_instance_operator]
     create_instance = CloudDataFusionCreateInstanceOperator(
diff --git a/airflow/providers/google/cloud/example_dags/example_dataprep.py 
b/airflow/providers/google/cloud/example_dags/example_dataprep.py
index b155681..1bd460a 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataprep.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataprep.py
@@ -18,6 +18,7 @@
 Example Airflow DAG that shows how to use Google Dataprep.
 """
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.operators.dataprep import (
@@ -25,7 +26,6 @@ from airflow.providers.google.cloud.operators.dataprep import 
(
     DataprepGetJobsForJobGroupOperator,
     DataprepRunJobGroupOperator,
 )
-from airflow.utils import dates
 
 DATAPREP_JOB_ID = int(os.environ.get('DATAPREP_JOB_ID', 12345677))
 DATAPREP_JOB_RECIPE_ID = int(os.environ.get('DATAPREP_JOB_RECIPE_ID', 
12345677))
@@ -53,7 +53,8 @@ DATA = {
 with models.DAG(
     "example_dataprep",
     schedule_interval='@once',
-    start_date=dates.days_ago(1),  # Override to match your needs
+    start_date=datetime(2021, 1, 1),  # Override to match your needs
+    catchup=False,
 ) as dag:
     # [START how_to_dataprep_run_job_group_operator]
     run_job_group = DataprepRunJobGroupOperator(task_id="run_job_group", 
body_request=DATA)
diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py 
b/airflow/providers/google/cloud/example_dags/example_dataproc.py
index 4959498..1a319f5 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataproc.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py
@@ -21,6 +21,7 @@ operators to manage a cluster and submit jobs.
 """
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.operators.dataproc import (
@@ -32,7 +33,6 @@ from airflow.providers.google.cloud.operators.dataproc import 
(
     DataprocUpdateClusterOperator,
 )
 from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
-from airflow.utils.dates import days_ago
 
 PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
 CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-cluster")
@@ -151,7 +151,12 @@ WORKFLOW_TEMPLATE = {
 }
 
 
-with models.DAG("example_gcp_dataproc", schedule_interval='@once', 
start_date=days_ago(1)) as dag:
+with models.DAG(
+    "example_gcp_dataproc",
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+) as dag:
     # [START how_to_cloud_dataproc_create_cluster_operator]
     create_cluster = DataprocCreateClusterOperator(
         task_id="create_cluster",
diff --git a/airflow/providers/google/cloud/example_dags/example_datastore.py 
b/airflow/providers/google/cloud/example_dags/example_datastore.py
index f871646..96497d2 100644
--- a/airflow/providers/google/cloud/example_dags/example_datastore.py
+++ b/airflow/providers/google/cloud/example_dags/example_datastore.py
@@ -23,6 +23,7 @@ This example requires that your project contains Datastore 
instance.
 """
 
 import os
+from datetime import datetime
 from typing import Any, Dict
 
 from airflow import models
@@ -35,7 +36,8 @@ from airflow.providers.google.cloud.operators.datastore 
import (
     CloudDatastoreRollbackOperator,
     CloudDatastoreRunQueryOperator,
 )
-from airflow.utils import dates
+
+START_DATE = datetime(2021, 1, 1)
 
 GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
 BUCKET = os.environ.get("GCP_DATASTORE_BUCKET", "datastore-system-test")
@@ -43,7 +45,8 @@ BUCKET = os.environ.get("GCP_DATASTORE_BUCKET", 
"datastore-system-test")
 with models.DAG(
     "example_gcp_datastore",
     schedule_interval='@once',  # Override to match your needs
-    start_date=dates.days_ago(1),
+    start_date=START_DATE,
+    catchup=False,
     tags=["example"],
 ) as dag:
     # [START how_to_export_task]
@@ -82,8 +85,9 @@ TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
 
 with models.DAG(
     "example_gcp_datastore_operations",
-    start_date=dates.days_ago(1),
     schedule_interval='@once',  # Override to match your needs
+    start_date=START_DATE,
+    catchup=False,
     tags=["example"],
 ) as dag2:
     # [START how_to_allocate_ids]
diff --git a/airflow/providers/google/cloud/example_dags/example_dlp.py 
b/airflow/providers/google/cloud/example_dags/example_dlp.py
index 2199877..480fda1 100644
--- a/airflow/providers/google/cloud/example_dags/example_dlp.py
+++ b/airflow/providers/google/cloud/example_dags/example_dlp.py
@@ -25,6 +25,7 @@ Cloud DLP service in the Google Cloud:
 """
 
 import os
+from datetime import datetime
 
 from google.cloud.dlp_v2.types import ContentItem, InspectConfig, 
InspectTemplate
 
@@ -41,7 +42,8 @@ from airflow.providers.google.cloud.operators.dlp import (
     CloudDLPUpdateJobTriggerOperator,
     CloudDLPUpdateStoredInfoTypeOperator,
 )
-from airflow.utils.dates import days_ago
+
+START_DATE = datetime(2021, 1, 1)
 
 GCP_PROJECT = os.environ.get("GCP_PROJECT_ID", "example-project")
 TEMPLATE_ID = "dlp-inspect-838746"
@@ -62,7 +64,8 @@ OBJECT_GCS_OUTPUT_URI = os.path.join(OUTPUT_BUCKET, "tmp", 
OUTPUT_FILENAME)
 with models.DAG(
     "example_gcp_dlp",
     schedule_interval='@once',  # Override to match your needs
-    start_date=days_ago(1),
+    start_date=START_DATE,
+    catchup=False,
     tags=['example'],
 ) as dag1:
     # [START howto_operator_dlp_create_inspect_template]
@@ -111,7 +114,8 @@ UPDATE_CUSTOM_INFO_TYPE = {
 with models.DAG(
     "example_gcp_dlp_info_types",
     schedule_interval='@once',
-    start_date=days_ago(1),
+    start_date=START_DATE,
+    catchup=False,
     tags=["example", "dlp", "info-types"],
 ) as dag2:
     # [START howto_operator_dlp_create_info_type]
@@ -152,7 +156,11 @@ JOB_TRIGGER = {
 TRIGGER_ID = "example_trigger"
 
 with models.DAG(
-    "example_gcp_dlp_job", schedule_interval='@once', start_date=days_ago(1), 
tags=["example", "dlp_job"]
+    "example_gcp_dlp_job",
+    schedule_interval='@once',
+    start_date=START_DATE,
+    catchup=False,
+    tags=["example", "dlp_job"],
 ) as dag3:  # [START howto_operator_dlp_create_job_trigger]
     create_trigger = CloudDLPCreateJobTriggerOperator(
         project_id=GCP_PROJECT,
@@ -196,7 +204,8 @@ DEIDENTIFY_CONFIG = {
 with models.DAG(
     "example_gcp_dlp_deidentify_content",
     schedule_interval='@once',
-    start_date=days_ago(1),
+    start_date=START_DATE,
+    catchup=False,
     tags=["example", "dlp", "deidentify"],
 ) as dag4:
     # [START _howto_operator_dlp_deidentify_content]
diff --git 
a/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
index 9b6ac50..bd80091 100644
--- a/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
@@ -19,10 +19,12 @@
 Example Airflow DAG that shows how to use FacebookAdsReportToGcsOperator.
 """
 import os
+from datetime import datetime
 
 from facebook_business.adobjects.adsinsights import AdsInsights
 
 from airflow import models
+from airflow.models.baseoperator import chain
 from airflow.providers.google.cloud.operators.bigquery import (
     BigQueryCreateEmptyDatasetOperator,
     BigQueryCreateEmptyTableOperator,
@@ -32,7 +34,6 @@ from airflow.providers.google.cloud.operators.bigquery import 
(
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
 from airflow.providers.google.cloud.transfers.facebook_ads_to_gcs import 
FacebookAdsReportToGcsOperator
 from airflow.providers.google.cloud.transfers.gcs_to_bigquery import 
GCSToBigQueryOperator
-from airflow.utils.dates import days_ago
 
 # [START howto_GCS_env_variables]
 GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "free-tier-1997")
@@ -57,7 +58,8 @@ PARAMETERS = {'level': 'ad', 'date_preset': 'yesterday'}
 with models.DAG(
     "example_facebook_ads_to_gcs",
     schedule_interval='@once',  # Override to match your needs
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
 ) as dag:
 
     create_bucket = GCSCreateBucketOperator(
@@ -87,7 +89,6 @@ with models.DAG(
     # [START howto_operator_facebook_ads_to_gcs]
     run_operator = FacebookAdsReportToGcsOperator(
         task_id='run_fetch_data',
-        start_date=days_ago(2),
         owner='airflow',
         bucket_name=GCS_BUCKET,
         parameters=PARAMETERS,
@@ -127,5 +128,13 @@ with models.DAG(
         delete_contents=True,
     )
 
-    create_bucket >> create_dataset >> create_table >> run_operator >> load_csv
-    load_csv >> read_data_from_gcs_many_chunks >> delete_bucket >> 
delete_dataset
+    chain(
+        create_bucket,
+        create_dataset,
+        create_table,
+        run_operator,
+        load_csv,
+        read_data_from_gcs_many_chunks,
+        delete_bucket,
+        delete_dataset,
+    )
diff --git a/airflow/providers/google/cloud/example_dags/example_functions.py 
b/airflow/providers/google/cloud/example_dags/example_functions.py
index 6144c07..01a6364 100644
--- a/airflow/providers/google/cloud/example_dags/example_functions.py
+++ b/airflow/providers/google/cloud/example_dags/example_functions.py
@@ -41,6 +41,7 @@ https://airflow.apache.org/concepts.html#variables
 """
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.operators.functions import (
@@ -48,7 +49,6 @@ from airflow.providers.google.cloud.operators.functions 
import (
     CloudFunctionDeployFunctionOperator,
     CloudFunctionInvokeFunctionOperator,
 )
-from airflow.utils import dates
 
 GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
 GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
@@ -94,7 +94,8 @@ with models.DAG(
     'example_gcp_function',
     default_args=default_args,
     schedule_interval='@once',  # Override to match your needs
-    start_date=dates.days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     tags=['example'],
 ) as dag:
     # [START howto_operator_gcf_deploy]
diff --git a/airflow/providers/google/cloud/example_dags/example_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_gcs.py
index 684a4b4..d0fb286 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs.py
@@ -20,6 +20,7 @@ Example Airflow DAG for Google Cloud Storage operators.
 """
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.operators.bash import BashOperator
@@ -39,9 +40,10 @@ from airflow.providers.google.cloud.sensors.gcs import (
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import 
GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import 
GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
-from airflow.utils.dates import days_ago
 from airflow.utils.state import State
 
+START_DATE = datetime(2021, 1, 1)
+
 PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
 BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
 GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
@@ -59,7 +61,8 @@ BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
 with models.DAG(
     "example_gcs",
-    start_date=days_ago(1),
+    start_date=START_DATE,
+    catchup=False,
     schedule_interval='@once',
     tags=['example'],
 ) as dag:
@@ -159,7 +162,8 @@ with models.DAG(
 
 with models.DAG(
     "example_gcs_sensors",
-    start_date=days_ago(1),
+    start_date=START_DATE,
+    catchup=False,
     schedule_interval='@once',
     tags=['example'],
 ) as dag2:
diff --git 
a/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
 
b/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
index b4c4332..6a36f3e 100644
--- 
a/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
+++ 
b/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
@@ -20,10 +20,10 @@ Example Airflow DAG for Google Cloud Storage time-span file 
transform operator.
 """
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.operators.gcs import 
GCSTimeSpanFileTransformOperator
-from airflow.utils.dates import days_ago
 from airflow.utils.state import State
 
 SOURCE_BUCKET = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
@@ -40,7 +40,8 @@ PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
 
 with models.DAG(
     "example_gcs_timespan_file_transform",
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     schedule_interval='@once',
     tags=['example'],
 ) as dag:
diff --git 
a/airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py 
b/airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
index f3c88b5..454320e 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
@@ -21,6 +21,7 @@ Example DAG using GCSToBigQueryOperator.
 """
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.operators.bigquery import (
@@ -28,39 +29,39 @@ from airflow.providers.google.cloud.operators.bigquery 
import (
     BigQueryDeleteDatasetOperator,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_bigquery import 
GCSToBigQueryOperator
-from airflow.utils.dates import days_ago
 
 DATASET_NAME = os.environ.get("GCP_DATASET_NAME", 'airflow_test')
 TABLE_NAME = os.environ.get("GCP_TABLE_NAME", 'gcs_to_bq_table')
 
-dag = models.DAG(
+with models.DAG(
     dag_id='example_gcs_to_bigquery_operator',
-    start_date=days_ago(2),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     schedule_interval='@once',
     tags=['example'],
-)
+) as dag:
+    create_test_dataset = BigQueryCreateEmptyDatasetOperator(
+        task_id='create_airflow_test_dataset', dataset_id=DATASET_NAME
+    )
 
-create_test_dataset = BigQueryCreateEmptyDatasetOperator(
-    task_id='create_airflow_test_dataset', dataset_id=DATASET_NAME, dag=dag
-)
+    # [START howto_operator_gcs_to_bigquery]
+    load_csv = GCSToBigQueryOperator(
+        task_id='gcs_to_bigquery_example',
+        bucket='cloud-samples-data',
+        source_objects=['bigquery/us-states/us-states.csv'],
+        destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
+        schema_fields=[
+            {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
+            {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
+        ],
+        write_disposition='WRITE_TRUNCATE',
+    )
+    # [END howto_operator_gcs_to_bigquery]
 
-# [START howto_operator_gcs_to_bigquery]
-load_csv = GCSToBigQueryOperator(
-    task_id='gcs_to_bigquery_example',
-    bucket='cloud-samples-data',
-    source_objects=['bigquery/us-states/us-states.csv'],
-    destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
-    schema_fields=[
-        {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
-        {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
-    ],
-    write_disposition='WRITE_TRUNCATE',
-    dag=dag,
-)
-# [END howto_operator_gcs_to_bigquery]
-
-delete_test_dataset = BigQueryDeleteDatasetOperator(
-    task_id='delete_airflow_test_dataset', dataset_id=DATASET_NAME, 
delete_contents=True, dag=dag
-)
+    delete_test_dataset = BigQueryDeleteDatasetOperator(
+        task_id='delete_airflow_test_dataset',
+        dataset_id=DATASET_NAME,
+        delete_contents=True,
+    )
 
 create_test_dataset >> load_csv >> delete_test_dataset
diff --git a/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
index 7086c94..f2d4b6e 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
@@ -20,11 +20,11 @@ Example Airflow DAG for Google Cloud Storage to Google 
Cloud Storage transfer op
 """
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.operators.gcs import 
GCSSynchronizeBucketsOperator
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import 
GCSToGCSOperator
-from airflow.utils.dates import days_ago
 
 BUCKET_1_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sync-1-src")
 BUCKET_1_DST = os.environ.get("GCP_GCS_BUCKET_1_DST", "test-gcs-sync-1-dst")
@@ -39,7 +39,11 @@ OBJECT_1 = os.environ.get("GCP_GCS_OBJECT_1", 
"test-gcs-to-gcs-1")
 OBJECT_2 = os.environ.get("GCP_GCS_OBJECT_2", "test-gcs-to-gcs-2")
 
 with models.DAG(
-    "example_gcs_to_gcs", schedule_interval='@once', start_date=days_ago(1), 
tags=['example']
+    "example_gcs_to_gcs",
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example'],
 ) as dag:
     # [START howto_synch_bucket]
     sync_bucket = GCSSynchronizeBucketsOperator(
diff --git 
a/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py 
b/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py
index 16b5afd..50f62d2 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py
@@ -14,12 +14,11 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.transfers.gcs_to_local import 
GCSToLocalFilesystemOperator
-from airflow.utils.dates import days_ago
 
 PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
 BUCKET = os.environ.get("GCP_GCS_BUCKET", "test-gcs-example-bucket")
@@ -29,8 +28,9 @@ PATH_TO_LOCAL_FILE = 
os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-exam
 
 with models.DAG(
     "example_gcs_to_local",
-    start_date=days_ago(1),
     schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     tags=['example'],
 ) as dag:
     # [START howto_operator_gcs_download_file_task]
diff --git a/airflow/providers/google/cloud/example_dags/example_gcs_to_sftp.py 
b/airflow/providers/google/cloud/example_dags/example_gcs_to_sftp.py
index c31a8f9..ff0431f 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs_to_sftp.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs_to_sftp.py
@@ -20,11 +20,11 @@ Example Airflow DAG for Google Cloud Storage to SFTP 
transfer operators.
 """
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.transfers.gcs_to_sftp import 
GCSToSFTPOperator
 from airflow.providers.sftp.sensors.sftp import SFTPSensor
-from airflow.utils.dates import days_ago
 
 SFTP_CONN_ID = "ssh_default"
 BUCKET_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sftp")
@@ -37,7 +37,11 @@ DESTINATION_PATH_3 = "/tmp/dest-dir-2/"
 
 
 with models.DAG(
-    "example_gcs_to_sftp", schedule_interval='@once', start_date=days_ago(1), 
tags=['example']
+    "example_gcs_to_sftp",
+    schedule_interval='@once',
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=['example'],
 ) as dag:
     # [START howto_operator_gcs_to_sftp_copy_single_file]
     copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
diff --git 
a/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py
index 2059850..bb65634 100644
--- a/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py
@@ -17,11 +17,11 @@
 # under the License.
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.transfers.gdrive_to_gcs import 
GoogleDriveToGCSOperator
 from airflow.providers.google.suite.sensors.drive import 
GoogleDriveFileExistenceSensor
-from airflow.utils.dates import days_ago
 
 BUCKET = os.environ.get("GCP_GCS_BUCKET", "test28397yeo")
 OBJECT = os.environ.get("GCP_GCS_OBJECT", "abc123xyz")
@@ -30,7 +30,8 @@ FILE_NAME = os.environ.get("FILE_NAME", "file.pdf")
 
 with models.DAG(
     "example_gdrive_to_gcs_with_gdrive_sensor",
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     schedule_interval='@once',  # Override to match your needs
     tags=["example"],
 ) as dag:
diff --git 
a/airflow/providers/google/cloud/example_dags/example_gdrive_to_local.py 
b/airflow/providers/google/cloud/example_dags/example_gdrive_to_local.py
index 94fca91..2ba38ea 100644
--- a/airflow/providers/google/cloud/example_dags/example_gdrive_to_local.py
+++ b/airflow/providers/google/cloud/example_dags/example_gdrive_to_local.py
@@ -17,11 +17,11 @@
 # under the License.
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.providers.google.cloud.transfers.gdrive_to_local import 
GoogleDriveToLocalOperator
 from airflow.providers.google.suite.sensors.drive import 
GoogleDriveFileExistenceSensor
-from airflow.utils.dates import days_ago
 
 FOLDER_ID = os.environ.get("FILE_ID", "1234567890qwerty")
 FILE_NAME = os.environ.get("FILE_NAME", "file.pdf")
@@ -29,7 +29,8 @@ OUTPUT_FILE = os.environ.get("OUTPUT_FILE", "out_file.pdf")
 
 with models.DAG(
     "example_gdrive_to_local_with_gdrive_sensor",
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     schedule_interval=None,  # Override to match your needs
     tags=["example"],
 ) as dag:
diff --git 
a/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py 
b/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
index 5d12fcf..8c3ff71 100644
--- a/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
+++ b/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
@@ -20,6 +20,7 @@ Example Airflow DAG for Google Kubernetes Engine.
 """
 
 import os
+from datetime import datetime
 
 from airflow import models
 from airflow.operators.bash import BashOperator
@@ -28,7 +29,6 @@ from 
airflow.providers.google.cloud.operators.kubernetes_engine import (
     GKEDeleteClusterOperator,
     GKEStartPodOperator,
 )
-from airflow.utils.dates import days_ago
 
 GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
 GCP_LOCATION = os.environ.get("GCP_GKE_LOCATION", "europe-north1-a")
@@ -41,7 +41,8 @@ CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}
 with models.DAG(
     "example_gcp_gke",
     schedule_interval='@once',  # Override to match your needs
-    start_date=days_ago(1),
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
     tags=['example'],
 ) as dag:
     # [START howto_operator_gke_create_cluster]
diff --git a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst 
b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
index e6ca14e..5d430f7 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
@@ -43,6 +43,7 @@ to execute a BigQuery load job.
 
 .. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
     :language: python
+    :dedent: 4
     :start-after: [START howto_operator_gcs_to_bigquery]
     :end-before: [END howto_operator_gcs_to_bigquery]
 
@@ -61,6 +62,7 @@ processes all files older than ``data_interval_start``.
 
 .. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
     :language: python
+    :dedent: 4
     :start-after: [START 
howto_operator_gcs_timespan_file_transform_operator_Task]
     :end-before: [END howto_operator_gcs_timespan_file_transform_operator_Task]
 

Reply via email to