This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6ef44b6 Clean-up of google cloud example dags - batch 2 (#19527)
6ef44b6 is described below
commit 6ef44b6a507a8e8d5f41a6731a0773046623d171
Author: Niko <[email protected]>
AuthorDate: Sun Nov 14 16:09:48 2021 -0800
Clean-up of google cloud example dags - batch 2 (#19527)
- Use static start_date
- Use catchup=False
- Tidy up the chaining of tasks in some cases
- Remove unnecessary specification of default conn ids
---
.../example_dags/example_azure_fileshare_to_gcs.py | 5 +--
.../google/cloud/example_dags/example_compute.py | 16 +++++--
.../cloud/example_dags/example_compute_igm.py | 14 ++++--
.../cloud/example_dags/example_compute_ssh.py | 5 ++-
.../cloud/example_dags/example_datacatalog.py | 9 +++-
.../google/cloud/example_dags/example_dataflow.py | 16 ++++---
.../example_dags/example_dataflow_flex_template.py | 5 ++-
.../cloud/example_dags/example_dataflow_sql.py | 5 ++-
.../cloud/example_dags/example_datafusion.py | 5 ++-
.../google/cloud/example_dags/example_dataprep.py | 5 ++-
.../google/cloud/example_dags/example_dataproc.py | 9 +++-
.../google/cloud/example_dags/example_datastore.py | 10 +++--
.../google/cloud/example_dags/example_dlp.py | 19 +++++---
.../example_dags/example_facebook_ads_to_gcs.py | 19 +++++---
.../google/cloud/example_dags/example_functions.py | 5 ++-
.../google/cloud/example_dags/example_gcs.py | 10 +++--
.../example_gcs_timespan_file_transform.py | 5 ++-
.../cloud/example_dags/example_gcs_to_bigquery.py | 51 +++++++++++-----------
.../cloud/example_dags/example_gcs_to_gcs.py | 8 +++-
.../cloud/example_dags/example_gcs_to_local.py | 6 +--
.../cloud/example_dags/example_gcs_to_sftp.py | 8 +++-
.../cloud/example_dags/example_gdrive_to_gcs.py | 5 ++-
.../cloud/example_dags/example_gdrive_to_local.py | 5 ++-
.../example_dags/example_kubernetes_engine.py | 5 ++-
.../operators/cloud/gcs.rst | 2 +
25 files changed, 164 insertions(+), 88 deletions(-)
diff --git
a/airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
b/airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
index ef6fc64..680b43b 100644
---
a/airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
+++
b/airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
@@ -37,7 +37,8 @@ with DAG(
'retry_delay': timedelta(minutes=5),
},
schedule_interval='@once',
- start_date=datetime(2018, 11, 1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_azure_fileshare_to_gcs_basic]
@@ -46,8 +47,6 @@ with DAG(
share_name=AZURE_SHARE_NAME,
dest_gcs=DEST_GCS_BUCKET,
directory_name=AZURE_DIRECTORY_NAME,
- azure_fileshare_conn_id='azure_fileshare_default',
- gcp_conn_id='google_cloud_default',
replace=False,
gzip=True,
google_impersonation_chain=None,
diff --git a/airflow/providers/google/cloud/example_dags/example_compute.py
b/airflow/providers/google/cloud/example_dags/example_compute.py
index 820576f..6d81e3a 100644
--- a/airflow/providers/google/cloud/example_dags/example_compute.py
+++ b/airflow/providers/google/cloud/example_dags/example_compute.py
@@ -30,14 +30,15 @@ This DAG relies on the following OS environment variables
"""
import os
+from datetime import datetime
from airflow import models
+from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.compute import (
ComputeEngineSetMachineTypeOperator,
ComputeEngineStartInstanceOperator,
ComputeEngineStopInstanceOperator,
)
-from airflow.utils.dates import days_ago
# [START howto_operator_gce_args_common]
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
@@ -52,7 +53,8 @@ GCE_SHORT_MACHINE_TYPE_NAME =
os.environ.get('GCE_SHORT_MACHINE_TYPE_NAME', 'n1-
with models.DAG(
'example_gcp_compute',
schedule_interval='@once', # Override to match your needs
- start_date=days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_gce_start]
@@ -96,5 +98,11 @@ with models.DAG(
)
# [END howto_operator_gce_set_machine_type_no_project_id]
- gce_instance_start >> gce_instance_start2 >> gce_instance_stop >>
gce_instance_stop2
- gce_instance_stop2 >> gce_set_machine_type >> gce_set_machine_type2
+ chain(
+ gce_instance_start,
+ gce_instance_start2,
+ gce_instance_stop,
+ gce_instance_stop2,
+ gce_set_machine_type,
+ gce_set_machine_type2,
+ )
diff --git a/airflow/providers/google/cloud/example_dags/example_compute_igm.py
b/airflow/providers/google/cloud/example_dags/example_compute_igm.py
index 0370d8a..7cad62c 100644
--- a/airflow/providers/google/cloud/example_dags/example_compute_igm.py
+++ b/airflow/providers/google/cloud/example_dags/example_compute_igm.py
@@ -39,13 +39,14 @@ Variables for update template in Group Manager:
"""
import os
+from datetime import datetime
from airflow import models
+from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.compute import (
ComputeEngineCopyInstanceTemplateOperator,
ComputeEngineInstanceGroupUpdateManagerTemplateOperator,
)
-from airflow.utils.dates import days_ago
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCE_ZONE = os.environ.get('GCE_ZONE', 'europe-west1-b')
@@ -92,7 +93,8 @@ UPDATE_POLICY = {
with models.DAG(
'example_gcp_compute_igm',
schedule_interval='@once', # Override to match your needs
- start_date=days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_gce_igm_copy_template]
@@ -133,5 +135,9 @@ with models.DAG(
)
# [END howto_operator_gce_igm_update_template_no_project_id]
- gce_instance_template_copy >> gce_instance_template_copy2 >>
gce_instance_group_manager_update_template
- gce_instance_group_manager_update_template >>
gce_instance_group_manager_update_template2
+ chain(
+ gce_instance_template_copy,
+ gce_instance_template_copy2,
+ gce_instance_group_manager_update_template,
+ gce_instance_group_manager_update_template2,
+ )
diff --git a/airflow/providers/google/cloud/example_dags/example_compute_ssh.py
b/airflow/providers/google/cloud/example_dags/example_compute_ssh.py
index 62636fc..c207437 100644
--- a/airflow/providers/google/cloud/example_dags/example_compute_ssh.py
+++ b/airflow/providers/google/cloud/example_dags/example_compute_ssh.py
@@ -16,11 +16,11 @@
# under the License.
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.hooks.compute_ssh import
ComputeEngineSSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator
-from airflow.utils import dates
# [START howto_operator_gce_args_common]
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
@@ -30,8 +30,9 @@ GCE_INSTANCE = os.environ.get('GCE_INSTANCE',
'target-instance')
with models.DAG(
'example_compute_ssh',
- default_args=dict(start_date=dates.days_ago(1)),
schedule_interval='@once', # Override to match your needs
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
tags=['example'],
) as dag:
# # [START howto_execute_command_on_remote1]
diff --git a/airflow/providers/google/cloud/example_dags/example_datacatalog.py
b/airflow/providers/google/cloud/example_dags/example_datacatalog.py
index e56bc10..4de219f 100644
--- a/airflow/providers/google/cloud/example_dags/example_datacatalog.py
+++ b/airflow/providers/google/cloud/example_dags/example_datacatalog.py
@@ -20,6 +20,7 @@
Example Airflow DAG that interacts with Google Data Catalog service
"""
import os
+from datetime import datetime
from google.cloud.datacatalog_v1beta1 import FieldType, TagField,
TagTemplateField
@@ -49,7 +50,6 @@ from airflow.providers.google.cloud.operators.datacatalog
import (
CloudDataCatalogUpdateTagTemplateFieldOperator,
CloudDataCatalogUpdateTagTemplateOperator,
)
-from airflow.utils.dates import days_ago
PROJECT_ID = os.getenv("GCP_PROJECT_ID")
BUCKET_ID = os.getenv("GCP_TEST_DATA_BUCKET", "INVALID BUCKET NAME")
@@ -61,7 +61,12 @@ FIELD_NAME_1 = "first"
FIELD_NAME_2 = "second"
FIELD_NAME_3 = "first-rename"
-with models.DAG("example_gcp_datacatalog", schedule_interval='@once',
start_date=days_ago(1)) as dag:
+with models.DAG(
+ "example_gcp_datacatalog",
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+) as dag:
# Create
# [START howto_operator_gcp_datacatalog_create_entry_group]
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
diff --git a/airflow/providers/google/cloud/example_dags/example_dataflow.py
b/airflow/providers/google/cloud/example_dags/example_dataflow.py
index 0c9aceb..8b1d01f 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataflow.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataflow.py
@@ -20,6 +20,7 @@
Example Airflow DAG for Google Cloud Dataflow service
"""
import os
+from datetime import datetime
from typing import Callable, Dict, List
from urllib.parse import urlparse
@@ -41,7 +42,8 @@ from airflow.providers.google.cloud.sensors.dataflow import (
DataflowJobStatusSensor,
)
from airflow.providers.google.cloud.transfers.gcs_to_local import
GCSToLocalFilesystemOperator
-from airflow.utils.dates import days_ago
+
+START_DATE = datetime(2021, 1, 1)
GCS_TMP = os.environ.get('GCP_DATAFLOW_GCS_TMP', 'gs://INVALID BUCKET
NAME/temp/')
GCS_STAGING = os.environ.get('GCP_DATAFLOW_GCS_STAGING', 'gs://INVALID BUCKET
NAME/staging/')
@@ -63,7 +65,8 @@ default_args = {
with models.DAG(
"example_gcp_dataflow_native_java",
schedule_interval='@once', # Override to match your needs
- start_date=days_ago(1),
+ start_date=START_DATE,
+ catchup=False,
tags=['example'],
) as dag_native_java:
@@ -110,7 +113,8 @@ with models.DAG(
with models.DAG(
"example_gcp_dataflow_native_python",
default_args=default_args,
- start_date=days_ago(1),
+ start_date=START_DATE,
+ catchup=False,
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_native_python:
@@ -145,7 +149,8 @@ with models.DAG(
with models.DAG(
"example_gcp_dataflow_native_python_async",
default_args=default_args,
- start_date=days_ago(1),
+ start_date=START_DATE,
+ catchup=False,
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_native_python_async:
@@ -246,7 +251,8 @@ with models.DAG(
with models.DAG(
"example_gcp_dataflow_template",
default_args=default_args,
- start_date=days_ago(1),
+ start_date=START_DATE,
+ catchup=False,
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_template:
diff --git
a/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py
b/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py
index 3938af8..43d9914 100644
---
a/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py
+++
b/airflow/providers/google/cloud/example_dags/example_dataflow_flex_template.py
@@ -20,10 +20,10 @@
Example Airflow DAG for Google Cloud Dataflow service
"""
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import
DataflowStartFlexTemplateOperator
-from airflow.utils.dates import days_ago
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
@@ -45,7 +45,8 @@ BQ_FLEX_TEMPLATE_LOCATION =
os.environ.get('GCP_DATAFLOW_BQ_FLEX_TEMPLATE_LOCATI
with models.DAG(
dag_id="example_gcp_dataflow_flex_template_java",
- start_date=days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
schedule_interval='@once', # Override to match your needs
) as dag_flex_template:
# [START howto_operator_start_template_job]
diff --git
a/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
b/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
index 0d03119..a74f5de 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataflow_sql.py
@@ -20,10 +20,10 @@
Example Airflow DAG for Google Cloud Dataflow service
"""
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataflow import
DataflowStartSqlJobOperator
-from airflow.utils.dates import days_ago
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
@@ -36,7 +36,8 @@ DATAFLOW_SQL_LOCATION =
os.environ.get("GCP_DATAFLOW_SQL_LOCATION", "us-west1")
with models.DAG(
dag_id="example_gcp_dataflow_sql",
- start_date=days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
schedule_interval='@once', # Override to match your needs
tags=['example'],
) as dag_sql:
diff --git a/airflow/providers/google/cloud/example_dags/example_datafusion.py
b/airflow/providers/google/cloud/example_dags/example_datafusion.py
index 4900e6b..c2387a7 100644
--- a/airflow/providers/google/cloud/example_dags/example_datafusion.py
+++ b/airflow/providers/google/cloud/example_dags/example_datafusion.py
@@ -19,6 +19,7 @@
Example Airflow DAG that shows how to use DataFusion.
"""
import os
+from datetime import datetime
from airflow import models
from airflow.operators.bash import BashOperator
@@ -35,7 +36,6 @@ from airflow.providers.google.cloud.operators.datafusion
import (
CloudDataFusionUpdateInstanceOperator,
)
from airflow.providers.google.cloud.sensors.datafusion import
CloudDataFusionPipelineStateSensor
-from airflow.utils import dates
from airflow.utils.state import State
# [START howto_data_fusion_env_variables]
@@ -153,7 +153,8 @@ PIPELINE = {
with models.DAG(
"example_data_fusion",
schedule_interval='@once', # Override to match your needs
- start_date=dates.days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
) as dag:
# [START howto_cloud_data_fusion_create_instance_operator]
create_instance = CloudDataFusionCreateInstanceOperator(
diff --git a/airflow/providers/google/cloud/example_dags/example_dataprep.py
b/airflow/providers/google/cloud/example_dags/example_dataprep.py
index b155681..1bd460a 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataprep.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataprep.py
@@ -18,6 +18,7 @@
Example Airflow DAG that shows how to use Google Dataprep.
"""
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataprep import (
@@ -25,7 +26,6 @@ from airflow.providers.google.cloud.operators.dataprep import
(
DataprepGetJobsForJobGroupOperator,
DataprepRunJobGroupOperator,
)
-from airflow.utils import dates
DATAPREP_JOB_ID = int(os.environ.get('DATAPREP_JOB_ID', 12345677))
DATAPREP_JOB_RECIPE_ID = int(os.environ.get('DATAPREP_JOB_RECIPE_ID',
12345677))
@@ -53,7 +53,8 @@ DATA = {
with models.DAG(
"example_dataprep",
schedule_interval='@once',
- start_date=dates.days_ago(1), # Override to match your needs
+ start_date=datetime(2021, 1, 1), # Override to match your needs
+ catchup=False,
) as dag:
# [START how_to_dataprep_run_job_group_operator]
run_job_group = DataprepRunJobGroupOperator(task_id="run_job_group",
body_request=DATA)
diff --git a/airflow/providers/google/cloud/example_dags/example_dataproc.py
b/airflow/providers/google/cloud/example_dags/example_dataproc.py
index 4959498..1a319f5 100644
--- a/airflow/providers/google/cloud/example_dags/example_dataproc.py
+++ b/airflow/providers/google/cloud/example_dags/example_dataproc.py
@@ -21,6 +21,7 @@ operators to manage a cluster and submit jobs.
"""
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
@@ -32,7 +33,6 @@ from airflow.providers.google.cloud.operators.dataproc import
(
DataprocUpdateClusterOperator,
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
-from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-cluster")
@@ -151,7 +151,12 @@ WORKFLOW_TEMPLATE = {
}
-with models.DAG("example_gcp_dataproc", schedule_interval='@once',
start_date=days_ago(1)) as dag:
+with models.DAG(
+ "example_gcp_dataproc",
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
diff --git a/airflow/providers/google/cloud/example_dags/example_datastore.py
b/airflow/providers/google/cloud/example_dags/example_datastore.py
index f871646..96497d2 100644
--- a/airflow/providers/google/cloud/example_dags/example_datastore.py
+++ b/airflow/providers/google/cloud/example_dags/example_datastore.py
@@ -23,6 +23,7 @@ This example requires that your project contains Datastore
instance.
"""
import os
+from datetime import datetime
from typing import Any, Dict
from airflow import models
@@ -35,7 +36,8 @@ from airflow.providers.google.cloud.operators.datastore
import (
CloudDatastoreRollbackOperator,
CloudDatastoreRunQueryOperator,
)
-from airflow.utils import dates
+
+START_DATE = datetime(2021, 1, 1)
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
BUCKET = os.environ.get("GCP_DATASTORE_BUCKET", "datastore-system-test")
@@ -43,7 +45,8 @@ BUCKET = os.environ.get("GCP_DATASTORE_BUCKET",
"datastore-system-test")
with models.DAG(
"example_gcp_datastore",
schedule_interval='@once', # Override to match your needs
- start_date=dates.days_ago(1),
+ start_date=START_DATE,
+ catchup=False,
tags=["example"],
) as dag:
# [START how_to_export_task]
@@ -82,8 +85,9 @@ TRANSACTION_OPTIONS: Dict[str, Any] = {"readWrite": {}}
with models.DAG(
"example_gcp_datastore_operations",
- start_date=dates.days_ago(1),
schedule_interval='@once', # Override to match your needs
+ start_date=START_DATE,
+ catchup=False,
tags=["example"],
) as dag2:
# [START how_to_allocate_ids]
diff --git a/airflow/providers/google/cloud/example_dags/example_dlp.py
b/airflow/providers/google/cloud/example_dags/example_dlp.py
index 2199877..480fda1 100644
--- a/airflow/providers/google/cloud/example_dags/example_dlp.py
+++ b/airflow/providers/google/cloud/example_dags/example_dlp.py
@@ -25,6 +25,7 @@ Cloud DLP service in the Google Cloud:
"""
import os
+from datetime import datetime
from google.cloud.dlp_v2.types import ContentItem, InspectConfig,
InspectTemplate
@@ -41,7 +42,8 @@ from airflow.providers.google.cloud.operators.dlp import (
CloudDLPUpdateJobTriggerOperator,
CloudDLPUpdateStoredInfoTypeOperator,
)
-from airflow.utils.dates import days_ago
+
+START_DATE = datetime(2021, 1, 1)
GCP_PROJECT = os.environ.get("GCP_PROJECT_ID", "example-project")
TEMPLATE_ID = "dlp-inspect-838746"
@@ -62,7 +64,8 @@ OBJECT_GCS_OUTPUT_URI = os.path.join(OUTPUT_BUCKET, "tmp",
OUTPUT_FILENAME)
with models.DAG(
"example_gcp_dlp",
schedule_interval='@once', # Override to match your needs
- start_date=days_ago(1),
+ start_date=START_DATE,
+ catchup=False,
tags=['example'],
) as dag1:
# [START howto_operator_dlp_create_inspect_template]
@@ -111,7 +114,8 @@ UPDATE_CUSTOM_INFO_TYPE = {
with models.DAG(
"example_gcp_dlp_info_types",
schedule_interval='@once',
- start_date=days_ago(1),
+ start_date=START_DATE,
+ catchup=False,
tags=["example", "dlp", "info-types"],
) as dag2:
# [START howto_operator_dlp_create_info_type]
@@ -152,7 +156,11 @@ JOB_TRIGGER = {
TRIGGER_ID = "example_trigger"
with models.DAG(
- "example_gcp_dlp_job", schedule_interval='@once', start_date=days_ago(1),
tags=["example", "dlp_job"]
+ "example_gcp_dlp_job",
+ schedule_interval='@once',
+ start_date=START_DATE,
+ catchup=False,
+ tags=["example", "dlp_job"],
) as dag3: # [START howto_operator_dlp_create_job_trigger]
create_trigger = CloudDLPCreateJobTriggerOperator(
project_id=GCP_PROJECT,
@@ -196,7 +204,8 @@ DEIDENTIFY_CONFIG = {
with models.DAG(
"example_gcp_dlp_deidentify_content",
schedule_interval='@once',
- start_date=days_ago(1),
+ start_date=START_DATE,
+ catchup=False,
tags=["example", "dlp", "deidentify"],
) as dag4:
# [START _howto_operator_dlp_deidentify_content]
diff --git
a/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
b/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
index 9b6ac50..bd80091 100644
--- a/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_facebook_ads_to_gcs.py
@@ -19,10 +19,12 @@
Example Airflow DAG that shows how to use FacebookAdsReportToGcsOperator.
"""
import os
+from datetime import datetime
from facebook_business.adobjects.adsinsights import AdsInsights
from airflow import models
+from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateEmptyTableOperator,
@@ -32,7 +34,6 @@ from airflow.providers.google.cloud.operators.bigquery import
(
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.facebook_ads_to_gcs import
FacebookAdsReportToGcsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import
GCSToBigQueryOperator
-from airflow.utils.dates import days_ago
# [START howto_GCS_env_variables]
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "free-tier-1997")
@@ -57,7 +58,8 @@ PARAMETERS = {'level': 'ad', 'date_preset': 'yesterday'}
with models.DAG(
"example_facebook_ads_to_gcs",
schedule_interval='@once', # Override to match your needs
- start_date=days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
) as dag:
create_bucket = GCSCreateBucketOperator(
@@ -87,7 +89,6 @@ with models.DAG(
# [START howto_operator_facebook_ads_to_gcs]
run_operator = FacebookAdsReportToGcsOperator(
task_id='run_fetch_data',
- start_date=days_ago(2),
owner='airflow',
bucket_name=GCS_BUCKET,
parameters=PARAMETERS,
@@ -127,5 +128,13 @@ with models.DAG(
delete_contents=True,
)
- create_bucket >> create_dataset >> create_table >> run_operator >> load_csv
- load_csv >> read_data_from_gcs_many_chunks >> delete_bucket >>
delete_dataset
+ chain(
+ create_bucket,
+ create_dataset,
+ create_table,
+ run_operator,
+ load_csv,
+ read_data_from_gcs_many_chunks,
+ delete_bucket,
+ delete_dataset,
+ )
diff --git a/airflow/providers/google/cloud/example_dags/example_functions.py
b/airflow/providers/google/cloud/example_dags/example_functions.py
index 6144c07..01a6364 100644
--- a/airflow/providers/google/cloud/example_dags/example_functions.py
+++ b/airflow/providers/google/cloud/example_dags/example_functions.py
@@ -41,6 +41,7 @@ https://airflow.apache.org/concepts.html#variables
"""
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.functions import (
@@ -48,7 +49,6 @@ from airflow.providers.google.cloud.operators.functions
import (
CloudFunctionDeployFunctionOperator,
CloudFunctionInvokeFunctionOperator,
)
-from airflow.utils import dates
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_LOCATION = os.environ.get('GCP_LOCATION', 'europe-west1')
@@ -94,7 +94,8 @@ with models.DAG(
'example_gcp_function',
default_args=default_args,
schedule_interval='@once', # Override to match your needs
- start_date=dates.days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_gcf_deploy]
diff --git a/airflow/providers/google/cloud/example_dags/example_gcs.py
b/airflow/providers/google/cloud/example_dags/example_gcs.py
index 684a4b4..d0fb286 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs.py
@@ -20,6 +20,7 @@ Example Airflow DAG for Google Cloud Storage operators.
"""
import os
+from datetime import datetime
from airflow import models
from airflow.operators.bash import BashOperator
@@ -39,9 +40,10 @@ from airflow.providers.google.cloud.sensors.gcs import (
from airflow.providers.google.cloud.transfers.gcs_to_gcs import
GCSToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import
GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
-from airflow.utils.dates import days_ago
from airflow.utils.state import State
+START_DATE = datetime(2021, 1, 1)
+
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
@@ -59,7 +61,8 @@ BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
with models.DAG(
"example_gcs",
- start_date=days_ago(1),
+ start_date=START_DATE,
+ catchup=False,
schedule_interval='@once',
tags=['example'],
) as dag:
@@ -159,7 +162,8 @@ with models.DAG(
with models.DAG(
"example_gcs_sensors",
- start_date=days_ago(1),
+ start_date=START_DATE,
+ catchup=False,
schedule_interval='@once',
tags=['example'],
) as dag2:
diff --git
a/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
b/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
index b4c4332..6a36f3e 100644
---
a/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
+++
b/airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
@@ -20,10 +20,10 @@ Example Airflow DAG for Google Cloud Storage time-span file
transform operator.
"""
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.gcs import
GCSTimeSpanFileTransformOperator
-from airflow.utils.dates import days_ago
from airflow.utils.state import State
SOURCE_BUCKET = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
@@ -40,7 +40,8 @@ PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
with models.DAG(
"example_gcs_timespan_file_transform",
- start_date=days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
schedule_interval='@once',
tags=['example'],
) as dag:
diff --git
a/airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
b/airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
index f3c88b5..454320e 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
@@ -21,6 +21,7 @@ Example DAG using GCSToBigQueryOperator.
"""
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
@@ -28,39 +29,39 @@ from airflow.providers.google.cloud.operators.bigquery
import (
BigQueryDeleteDatasetOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import
GCSToBigQueryOperator
-from airflow.utils.dates import days_ago
DATASET_NAME = os.environ.get("GCP_DATASET_NAME", 'airflow_test')
TABLE_NAME = os.environ.get("GCP_TABLE_NAME", 'gcs_to_bq_table')
-dag = models.DAG(
+with models.DAG(
dag_id='example_gcs_to_bigquery_operator',
- start_date=days_ago(2),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
schedule_interval='@once',
tags=['example'],
-)
+) as dag:
+ create_test_dataset = BigQueryCreateEmptyDatasetOperator(
+ task_id='create_airflow_test_dataset', dataset_id=DATASET_NAME
+ )
-create_test_dataset = BigQueryCreateEmptyDatasetOperator(
- task_id='create_airflow_test_dataset', dataset_id=DATASET_NAME, dag=dag
-)
+ # [START howto_operator_gcs_to_bigquery]
+ load_csv = GCSToBigQueryOperator(
+ task_id='gcs_to_bigquery_example',
+ bucket='cloud-samples-data',
+ source_objects=['bigquery/us-states/us-states.csv'],
+ destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
+ schema_fields=[
+ {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
+ ],
+ write_disposition='WRITE_TRUNCATE',
+ )
+ # [END howto_operator_gcs_to_bigquery]
-# [START howto_operator_gcs_to_bigquery]
-load_csv = GCSToBigQueryOperator(
- task_id='gcs_to_bigquery_example',
- bucket='cloud-samples-data',
- source_objects=['bigquery/us-states/us-states.csv'],
- destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
- schema_fields=[
- {'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
- {'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
- ],
- write_disposition='WRITE_TRUNCATE',
- dag=dag,
-)
-# [END howto_operator_gcs_to_bigquery]
-
-delete_test_dataset = BigQueryDeleteDatasetOperator(
- task_id='delete_airflow_test_dataset', dataset_id=DATASET_NAME,
delete_contents=True, dag=dag
-)
+ delete_test_dataset = BigQueryDeleteDatasetOperator(
+ task_id='delete_airflow_test_dataset',
+ dataset_id=DATASET_NAME,
+ delete_contents=True,
+ )
create_test_dataset >> load_csv >> delete_test_dataset
diff --git a/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
b/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
index 7086c94..f2d4b6e 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs_to_gcs.py
@@ -20,11 +20,11 @@ Example Airflow DAG for Google Cloud Storage to Google
Cloud Storage transfer op
"""
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.gcs import
GCSSynchronizeBucketsOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import
GCSToGCSOperator
-from airflow.utils.dates import days_ago
BUCKET_1_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sync-1-src")
BUCKET_1_DST = os.environ.get("GCP_GCS_BUCKET_1_DST", "test-gcs-sync-1-dst")
@@ -39,7 +39,11 @@ OBJECT_1 = os.environ.get("GCP_GCS_OBJECT_1",
"test-gcs-to-gcs-1")
OBJECT_2 = os.environ.get("GCP_GCS_OBJECT_2", "test-gcs-to-gcs-2")
with models.DAG(
- "example_gcs_to_gcs", schedule_interval='@once', start_date=days_ago(1),
tags=['example']
+ "example_gcs_to_gcs",
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=['example'],
) as dag:
# [START howto_synch_bucket]
sync_bucket = GCSSynchronizeBucketsOperator(
diff --git
a/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py
b/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py
index 16b5afd..50f62d2 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs_to_local.py
@@ -14,12 +14,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.transfers.gcs_to_local import
GCSToLocalFilesystemOperator
-from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
BUCKET = os.environ.get("GCP_GCS_BUCKET", "test-gcs-example-bucket")
@@ -29,8 +28,9 @@ PATH_TO_LOCAL_FILE =
os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-exam
with models.DAG(
"example_gcs_to_local",
- start_date=days_ago(1),
schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_gcs_download_file_task]
diff --git a/airflow/providers/google/cloud/example_dags/example_gcs_to_sftp.py
b/airflow/providers/google/cloud/example_dags/example_gcs_to_sftp.py
index c31a8f9..ff0431f 100644
--- a/airflow/providers/google/cloud/example_dags/example_gcs_to_sftp.py
+++ b/airflow/providers/google/cloud/example_dags/example_gcs_to_sftp.py
@@ -20,11 +20,11 @@ Example Airflow DAG for Google Cloud Storage to SFTP
transfer operators.
"""
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.transfers.gcs_to_sftp import
GCSToSFTPOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor
-from airflow.utils.dates import days_ago
SFTP_CONN_ID = "ssh_default"
BUCKET_SRC = os.environ.get("GCP_GCS_BUCKET_1_SRC", "test-gcs-sftp")
@@ -37,7 +37,11 @@ DESTINATION_PATH_3 = "/tmp/dest-dir-2/"
with models.DAG(
- "example_gcs_to_sftp", schedule_interval='@once', start_date=days_ago(1),
tags=['example']
+ "example_gcs_to_sftp",
+ schedule_interval='@once',
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=['example'],
) as dag:
# [START howto_operator_gcs_to_sftp_copy_single_file]
copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
diff --git
a/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py
b/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py
index 2059850..bb65634 100644
--- a/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_gdrive_to_gcs.py
@@ -17,11 +17,11 @@
# under the License.
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.transfers.gdrive_to_gcs import
GoogleDriveToGCSOperator
from airflow.providers.google.suite.sensors.drive import
GoogleDriveFileExistenceSensor
-from airflow.utils.dates import days_ago
BUCKET = os.environ.get("GCP_GCS_BUCKET", "test28397yeo")
OBJECT = os.environ.get("GCP_GCS_OBJECT", "abc123xyz")
@@ -30,7 +30,8 @@ FILE_NAME = os.environ.get("FILE_NAME", "file.pdf")
with models.DAG(
"example_gdrive_to_gcs_with_gdrive_sensor",
- start_date=days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
schedule_interval='@once', # Override to match your needs
tags=["example"],
) as dag:
diff --git
a/airflow/providers/google/cloud/example_dags/example_gdrive_to_local.py
b/airflow/providers/google/cloud/example_dags/example_gdrive_to_local.py
index 94fca91..2ba38ea 100644
--- a/airflow/providers/google/cloud/example_dags/example_gdrive_to_local.py
+++ b/airflow/providers/google/cloud/example_dags/example_gdrive_to_local.py
@@ -17,11 +17,11 @@
# under the License.
import os
+from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.transfers.gdrive_to_local import
GoogleDriveToLocalOperator
from airflow.providers.google.suite.sensors.drive import
GoogleDriveFileExistenceSensor
-from airflow.utils.dates import days_ago
FOLDER_ID = os.environ.get("FILE_ID", "1234567890qwerty")
FILE_NAME = os.environ.get("FILE_NAME", "file.pdf")
@@ -29,7 +29,8 @@ OUTPUT_FILE = os.environ.get("OUTPUT_FILE", "out_file.pdf")
with models.DAG(
"example_gdrive_to_local_with_gdrive_sensor",
- start_date=days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
schedule_interval=None, # Override to match your needs
tags=["example"],
) as dag:
diff --git
a/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
b/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
index 5d12fcf..8c3ff71 100644
--- a/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
+++ b/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py
@@ -20,6 +20,7 @@ Example Airflow DAG for Google Kubernetes Engine.
"""
import os
+from datetime import datetime
from airflow import models
from airflow.operators.bash import BashOperator
@@ -28,7 +29,6 @@ from
airflow.providers.google.cloud.operators.kubernetes_engine import (
GKEDeleteClusterOperator,
GKEStartPodOperator,
)
-from airflow.utils.dates import days_ago
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
GCP_LOCATION = os.environ.get("GCP_GKE_LOCATION", "europe-north1-a")
@@ -41,7 +41,8 @@ CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}
with models.DAG(
"example_gcp_gke",
schedule_interval='@once', # Override to match your needs
- start_date=days_ago(1),
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
tags=['example'],
) as dag:
# [START howto_operator_gke_create_cluster]
diff --git a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
index e6ca14e..5d430f7 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst
@@ -43,6 +43,7 @@ to execute a BigQuery load job.
.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
:language: python
+ :dedent: 4
:start-after: [START howto_operator_gcs_to_bigquery]
:end-before: [END howto_operator_gcs_to_bigquery]
@@ -61,6 +62,7 @@ processes all files older than ``data_interval_start``.
.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py
:language: python
+ :dedent: 4
:start-after: [START
howto_operator_gcs_timespan_file_transform_operator_Task]
:end-before: [END howto_operator_gcs_timespan_file_transform_operator_Task]