This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 7fe46e12e2b Google provider delete deprecated reaching removal date
(December 2024) (#45084)
7fe46e12e2b is described below
commit 7fe46e12e2bccc1312fe5e4aadc2a3729d34b385
Author: olegkachur-e <[email protected]>
AuthorDate: Thu Dec 19 21:48:22 2024 +0100
Google provider delete deprecated reaching removal date (December 2024)
(#45084)
* remove deprecated DataPipelineHook hook
* remove deprecated CreateDataPipelineOperator, RunDataPipelineOperator
---------
Co-authored-by: Oleg Kachur <[email protected]>
---
.../operators/cloud/datapipeline.rst | 95 -------------
.../src/airflow/providers/google/CHANGELOG.rst | 26 ++++
.../providers/google/cloud/hooks/datapipeline.py | 71 ----------
.../google/cloud/operators/datapipeline.py | 63 ---------
.../src/airflow/providers/google/provider.yaml | 11 --
.../tests/google/cloud/hooks/test_datapipeline.py | 122 -----------------
.../google/cloud/operators/test_datapipeline.py | 4 +-
.../system/google/cloud/datapipelines/__init__.py | 16 ---
.../cloud/datapipelines/example_datapipeline.py | 151 ---------------------
tests/always/test_project_structure.py | 2 -
10 files changed, 28 insertions(+), 533 deletions(-)
diff --git
a/docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst
b/docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst
deleted file mode 100644
index 4996afaf7c3..00000000000
--- a/docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst
+++ /dev/null
@@ -1,95 +0,0 @@
- .. Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- .. http://www.apache.org/licenses/LICENSE-2.0
-
- .. Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
-
-Google Cloud Data Pipelines Operators
-=====================================
-
-Data Pipelines is a Dataflow feature that allows customers to create
-and schedule recurring jobs, view aggregated job metrics, and define
-and manage job SLOs. A pipeline consists of a collection of jobs
-including ways to manage them. A pipeline may be associated with a
-Dataflow Template (classic/flex) and include all jobs launched with
-the associated template.
-
-Prerequisite Tasks
-^^^^^^^^^^^^^^^^^^
-
-.. include:: /operators/_partials/prerequisite_tasks.rst
-
-Creating a Data Pipeline
-^^^^^^^^^^^^^^^^^^^^^^^^
-
-.. warning::
- This operator is deprecated. Please use
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePipelineOperator`.
-
-To create a new Data Pipelines instance using a request body and parent name,
use
:class:`~airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator`.
-The operator accesses Google Cloud's Data Pipelines API and calls upon the
-`create method
<https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create>`__
-to run the given pipeline.
-
-:class:`~airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator`
accepts four parameters:
- **body**: instance of the Pipeline,
- **project_id**: id of the GCP project that owns the job,
- **location**: destination for the Pipeline,
- **gcp_conn_id**: id to connect to Google Cloud.
-
-The request body and project id need to be passed each time, while the GCP
connection id and location have default values.
-The project id and location will be used to build the parent name needed to
create the operator.
-
-Here is an example of how you can create a Data Pipelines instance by running
the above parameters with CreateDataPipelineOperator:
-
-.. exampleinclude::
/../../providers/tests/system/google/cloud/datapipelines/example_datapipeline.py
- :language: python
- :dedent: 4
- :start-after: [START howto_operator_create_data_pipeline]
- :end-before: [END howto_operator_create_data_pipeline]
-
-Running a Data Pipeline
-^^^^^^^^^^^^^^^^^^^^^^^
-
-.. warning::
- This operator is deprecated. Please use
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowRunPipelineOperator`.
-
-To run a Data Pipelines instance, use
:class:`~airflow.providers.google.cloud.operators.datapipeline.RunDataPipelineOperator`.
-The operator accesses Google Cloud's Data Pipelines API and calls upon the
-`run method
<https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/run>`__
-to run the given pipeline.
-
-:class:`~airflow.providers.google.cloud.operators.datapipeline.RunDataPipelineOperator`
can take in four parameters:
-
-- ``data_pipeline_name``: the name of the Data Pipelines instance
-- ``project_id``: the ID of the GCP project that owns the job
-- ``location``: the location of the Data Pipelines instance
-- ``gcp_conn_id``: the connection ID to connect to the Google Cloud Platform
-
-Only the Data Pipeline name and Project ID are required parameters, as the
Location and GCP Connection ID have default values.
-The Project ID and Location will be used to build the parent name, which is
where the given Data Pipeline should be located.
-
-You can run a Data Pipelines instance by running the above parameters with
RunDataPipelineOperator:
-
-.. exampleinclude::
/../../providers/tests/system/google/cloud/datapipelines/example_datapipeline.py
- :language: python
- :dedent: 4
- :start-after: [START howto_operator_run_data_pipeline]
- :end-before: [END howto_operator_run_data_pipeline]
-
-Once called, the RunDataPipelineOperator will return the Google Cloud
`Dataflow Job
<https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/Job>`__
-created by running the given pipeline.
-
-For further information regarding the API usage, see
-`Data Pipelines API REST Resource
<https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines#Pipeline>`__
-in the Google Cloud documentation.
diff --git a/providers/src/airflow/providers/google/CHANGELOG.rst
b/providers/src/airflow/providers/google/CHANGELOG.rst
index 16e0a6d1c94..8fa95572277 100644
--- a/providers/src/airflow/providers/google/CHANGELOG.rst
+++ b/providers/src/airflow/providers/google/CHANGELOG.rst
@@ -27,6 +27,29 @@
Changelog
---------
+12.0.0
+......
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+.. warning::
+ Deprecated classes, parameters and features have been removed from the
Google provider package.
+ The following breaking changes were introduced:
+
+ * Operators
+
+ * Removed ``CreateDataPipelineOperator``. Please use the
``DataflowCreatePipelineOperator`` instead
+ * Removed ``RunDataPipelineOperator``. Please use the
``DataflowRunPipelineOperator`` instead
+
+ * Hooks
+
+ * Removed ``DataPipelineHook``. Please use the ``DataflowHook`` instead
+
+
+.. Below changes are excluded from the changelog. Move them to
+ appropriate section above if needed.
+
11.0.0
......
@@ -80,6 +103,8 @@ Breaking changes
* Removed ``CreateHyperparameterTuningJobOperator.sync``. This parameter
is not in actual use
* Removed ``CustomTrainingJobBaseOperator.sync``. This parameter is not in
actual use
* Removed ``GKEStartPodOperator.get_gke_config_file()``. Please use
``GKEStartPodOperator.fetch_cluster_info()`` instead
+ * Removed ``CreateDataPipelineOperator``. Please use the
``DataflowCreatePipelineOperator`` instead
+ * Removed ``RunDataPipelineOperator``. Please use the
``DataflowRunPipelineOperator`` instead
* Triggers
@@ -141,6 +166,7 @@ Breaking changes
* Removed ``BigQueryHook.run_query()``. Please use
``BigQueryHook.insert_job()`` instead
* Removed ``BigQueryHook.create_external_table()``. Please use
``BigQueryHook.create_empty_table()`` instead
* Removed ``BigQueryHook.get_service()``. Please use
``BigQueryHook.get_client()`` instead
+ * Removed ``DataPipelineHook``. Please use the DataflowHook instead
* Backends
diff --git a/providers/src/airflow/providers/google/cloud/hooks/datapipeline.py
b/providers/src/airflow/providers/google/cloud/hooks/datapipeline.py
deleted file mode 100644
index 4d408c919b3..00000000000
--- a/providers/src/airflow/providers/google/cloud/hooks/datapipeline.py
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module contains a Google Data Pipelines Hook."""
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.google.cloud.hooks.dataflow import DataflowHook
-from airflow.providers.google.common.deprecated import deprecated
-from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
-
-if TYPE_CHECKING:
- from googleapiclient.discovery import build
-
-DEFAULT_DATAPIPELINE_LOCATION = "us-central1"
-
-
-@deprecated(
- planned_removal_date="December 01, 2024",
- use_instead="DataflowHook",
- category=AirflowProviderDeprecationWarning,
-)
-class DataPipelineHook(DataflowHook):
- """Hook for Google Data Pipelines."""
-
- def get_conn(self) -> build:
- """Return a Google Cloud Data Pipelines service object."""
- return super().get_pipelines_conn()
-
- @GoogleBaseHook.fallback_to_default_project_id
- def create_data_pipeline(
- self,
- body: dict,
- project_id: str,
- location: str = DEFAULT_DATAPIPELINE_LOCATION,
- ) -> dict:
- """Create a new Data Pipelines instance from the Data Pipelines API."""
- return super().create_data_pipeline(body=body, project_id=project_id,
location=location)
-
- @GoogleBaseHook.fallback_to_default_project_id
- def run_data_pipeline(
- self,
- data_pipeline_name: str,
- project_id: str,
- location: str = DEFAULT_DATAPIPELINE_LOCATION,
- ) -> dict:
- """Run a Data Pipelines Instance using the Data Pipelines API."""
- return super().run_data_pipeline(
- pipeline_name=data_pipeline_name, project_id=project_id,
location=location
- )
-
- @staticmethod
- def build_parent_name(project_id: str, location: str):
- return f"projects/{project_id}/locations/{location}"
diff --git
a/providers/src/airflow/providers/google/cloud/operators/datapipeline.py
b/providers/src/airflow/providers/google/cloud/operators/datapipeline.py
deleted file mode 100644
index d9188d16585..00000000000
--- a/providers/src/airflow/providers/google/cloud/operators/datapipeline.py
+++ /dev/null
@@ -1,63 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module contains Google Data Pipelines operators."""
-
-from __future__ import annotations
-
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.google.cloud.hooks.dataflow import
DEFAULT_DATAFLOW_LOCATION
-from airflow.providers.google.cloud.operators.dataflow import (
- DataflowCreatePipelineOperator,
- DataflowRunPipelineOperator,
-)
-from airflow.providers.google.common.deprecated import deprecated
-from airflow.providers.google.common.hooks.base_google import
PROVIDE_PROJECT_ID
-
-
-@deprecated(
- planned_removal_date="December 01, 2024",
- use_instead="DataflowCreatePipelineOperator",
- category=AirflowProviderDeprecationWarning,
-)
-class CreateDataPipelineOperator(DataflowCreatePipelineOperator):
- """Creates a new Data Pipelines instance from the Data Pipelines API."""
-
-
-@deprecated(
- planned_removal_date="December 01, 2024",
- use_instead="DataflowRunPipelineOperator",
- category=AirflowProviderDeprecationWarning,
-)
-class RunDataPipelineOperator(DataflowRunPipelineOperator):
- """Runs a Data Pipelines Instance using the Data Pipelines API."""
-
- def __init__(
- self,
- data_pipeline_name: str,
- project_id: str = PROVIDE_PROJECT_ID,
- location: str = DEFAULT_DATAFLOW_LOCATION,
- gcp_conn_id: str = "google_cloud_default",
- **kwargs,
- ) -> None:
- super().__init__(
- pipeline_name=data_pipeline_name,
- project_id=project_id,
- location=location,
- gcp_conn_id=gcp_conn_id,
- **kwargs,
- )
diff --git a/providers/src/airflow/providers/google/provider.yaml
b/providers/src/airflow/providers/google/provider.yaml
index cc7357075ab..7eb3a5e7c64 100644
--- a/providers/src/airflow/providers/google/provider.yaml
+++ b/providers/src/airflow/providers/google/provider.yaml
@@ -410,11 +410,6 @@ integrations:
- /docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
logo: /integration-logos/gcp/Cloud-Dataflow.png
tags: [gcp]
- - integration-name: Google Data Pipelines
- external-doc-url:
https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest
- how-to-guide:
- - /docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst
- tags: [gcp]
- integration-name: Google Data Fusion
external-doc-url: https://cloud.google.com/data-fusion/
how-to-guide:
@@ -596,9 +591,6 @@ operators:
- integration-name: Google Dataflow
python-modules:
- airflow.providers.google.cloud.operators.dataflow
- - integration-name: Google Data Pipelines
- python-modules:
- - airflow.providers.google.cloud.operators.datapipeline
- integration-name: Google Data Fusion
python-modules:
- airflow.providers.google.cloud.operators.datafusion
@@ -850,9 +842,6 @@ hooks:
- integration-name: Google Dataflow
python-modules:
- airflow.providers.google.cloud.hooks.dataflow
- - integration-name: Google Data Pipelines
- python-modules:
- - airflow.providers.google.cloud.hooks.datapipeline
- integration-name: Google Data Fusion
python-modules:
- airflow.providers.google.cloud.hooks.datafusion
diff --git a/providers/tests/google/cloud/hooks/test_datapipeline.py
b/providers/tests/google/cloud/hooks/test_datapipeline.py
deleted file mode 100644
index 2ff65c4fee0..00000000000
--- a/providers/tests/google/cloud/hooks/test_datapipeline.py
+++ /dev/null
@@ -1,122 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import warnings
-from unittest import mock
-
-import pytest
-
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.google.cloud.hooks.datapipeline import DataPipelineHook
-
-pytestmark = pytest.mark.db_test
-
-
-TASK_ID = "test-datapipeline-operators"
-TEST_PARENT_NAME = "projects/test-project-id/locations/test-location"
-TEST_LOCATION = "test-location"
-TEST_PROJECT_ID = "test-project-id"
-TEST_DATA_PIPELINE_NAME = "test-data-pipeline-name"
-TEST_JOB_ID = "test-job-id"
-TEST_BODY = {
- "name": f"{TEST_PARENT_NAME}/pipelines/{TEST_DATA_PIPELINE_NAME}",
- "type": "PIPELINE_TYPE_BATCH",
- "workload": {
- "dataflowFlexTemplateRequest": {
- "launchParameter": {
- "containerSpecGcsPath":
"gs://dataflow-templates-us-central1/latest/Word_Count_metadata",
- "jobName": "test-job",
- "environment": {"tempLocation": "test-temp-location"},
- "parameters": {
- "inputFile":
"gs://dataflow-samples/shakespeare/kinglear.txt",
- "output": "gs://test/output/my_output",
- },
- },
- "projectId": TEST_PROJECT_ID,
- "location": TEST_LOCATION,
- }
- },
-}
-
-
-class TestDataPipelineHook:
- """
- Module meant to test the DataPipeline Hooks
- """
-
- def setup_method(self):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", AirflowProviderDeprecationWarning)
- self.datapipeline_hook =
DataPipelineHook(gcp_conn_id="google_cloud_default")
-
-
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.DataflowHook._authorize")
- @mock.patch("airflow.providers.google.cloud.hooks.dataflow.build")
- def test_get_conn(self, mock_build, mock_authorize):
- """
- Test that get_conn is called with the correct params and
- returns the correct API address
- """
- connection = self.datapipeline_hook.get_conn()
- mock_build.assert_called_once_with(
- "datapipelines", "v1", http=mock_authorize.return_value,
cache_discovery=False
- )
- assert mock_build.return_value == connection
-
-
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.DataflowHook.get_pipelines_conn")
- def test_create_data_pipeline(self, mock_connection):
- """
- Test that request are called with the correct params
- Test that request returns the correct value
- """
- mock_locations =
mock_connection.return_value.projects.return_value.locations
- mock_request =
mock_locations.return_value.pipelines.return_value.create
- mock_request.return_value.execute.return_value = {"name":
TEST_PARENT_NAME}
-
- result = self.datapipeline_hook.create_data_pipeline(
- body=TEST_BODY,
- project_id=TEST_PROJECT_ID,
- location=TEST_LOCATION,
- )
-
- mock_request.assert_called_once_with(
- parent=TEST_PARENT_NAME,
- body=TEST_BODY,
- )
- assert result == {"name": TEST_PARENT_NAME}
-
-
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.DataflowHook.get_pipelines_conn")
- def test_run_data_pipeline(self, mock_connection):
- """
- Test that run_data_pipeline is called with correct parameters and
- calls Google Data Pipelines API
- """
- mock_request =
mock_connection.return_value.projects.return_value.locations.return_value.pipelines.return_value.run
- mock_request.return_value.execute.return_value = {"job": {"id":
TEST_JOB_ID}}
-
- result = self.datapipeline_hook.run_data_pipeline(
- data_pipeline_name=TEST_DATA_PIPELINE_NAME,
- project_id=TEST_PROJECT_ID,
- location=TEST_LOCATION,
- )
-
- mock_request.assert_called_once_with(
- name=f"{TEST_PARENT_NAME}/pipelines/{TEST_DATA_PIPELINE_NAME}",
- body={},
- )
- assert result == {"job": {"id": TEST_JOB_ID}}
diff --git a/providers/tests/google/cloud/operators/test_datapipeline.py
b/providers/tests/google/cloud/operators/test_datapipeline.py
index 130b58d409a..c2963d184e1 100644
--- a/providers/tests/google/cloud/operators/test_datapipeline.py
+++ b/providers/tests/google/cloud/operators/test_datapipeline.py
@@ -52,7 +52,7 @@ TEST_GCP_CONN_ID = "test_gcp_conn_id"
TEST_DATA_PIPELINE_NAME = "test_data_pipeline_name"
-class TestCreateDataPipelineOperator:
+class TestDataflowCreatePipelineOperator:
@pytest.fixture
def create_operator(self):
"""
@@ -85,7 +85,7 @@ class TestCreateDataPipelineOperator:
@pytest.mark.db_test
-class TestRunDataPipelineOperator:
+class TestDataflowRunPipelineOperator:
@pytest.fixture
def run_operator(self):
"""
diff --git a/providers/tests/system/google/cloud/datapipelines/__init__.py
b/providers/tests/system/google/cloud/datapipelines/__init__.py
deleted file mode 100644
index 13a83393a91..00000000000
--- a/providers/tests/system/google/cloud/datapipelines/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git
a/providers/tests/system/google/cloud/datapipelines/example_datapipeline.py
b/providers/tests/system/google/cloud/datapipelines/example_datapipeline.py
deleted file mode 100644
index 0c0c430eae1..00000000000
--- a/providers/tests/system/google/cloud/datapipelines/example_datapipeline.py
+++ /dev/null
@@ -1,151 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG for testing Google DataPipelines Create Data Pipeline
Operator.
-"""
-
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.dataflow import
DataflowDeletePipelineOperator
-from airflow.providers.google.cloud.operators.datapipeline import (
- CreateDataPipelineOperator,
- RunDataPipelineOperator,
-)
-from airflow.providers.google.cloud.operators.gcs import (
- GCSCreateBucketOperator,
- GCSDeleteBucketOperator,
- GCSSynchronizeBucketsOperator,
-)
-from airflow.utils.trigger_rule import TriggerRule
-
-from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
-
-DAG_ID = "datapipeline"
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
-GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") or
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
-GCP_LOCATION = "us-central1"
-
-PIPELINE_NAME = f"{DAG_ID}-{ENV_ID}".replace("_", "-")
-PIPELINE_JOB_NAME = f"{DAG_ID}-{ENV_ID}-job".replace("_", "-")
-PIPELINE_TYPE = "PIPELINE_TYPE_BATCH"
-
-RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("-", "_")
-
-FILE_NAME = "kinglear.txt"
-TEMPLATE_FILE = "word-count.json"
-TEMP_LOCATION = f"gs://{BUCKET_NAME}/temp"
-
-GCS_PATH = f"gs://{BUCKET_NAME}/dataflow/{TEMPLATE_FILE}"
-INPUT_FILE = f"gs://{BUCKET_NAME}/dataflow/{FILE_NAME}"
-OUTPUT = f"gs://{BUCKET_NAME}/results/hello"
-
-with DAG(
- DAG_ID,
- schedule="@once",
- start_date=datetime(2021, 1, 1),
- catchup=False,
- tags=["example", "datapipeline"],
-) as dag:
- create_bucket = GCSCreateBucketOperator(task_id="create_bucket",
bucket_name=BUCKET_NAME)
-
- move_files_to_bucket = GCSSynchronizeBucketsOperator(
- task_id="move_files_to_bucket",
- source_bucket=RESOURCE_DATA_BUCKET,
- source_object="dataflow/pipelines",
- destination_bucket=BUCKET_NAME,
- destination_object="dataflow",
- recursive=True,
- )
-
- # [START howto_operator_create_data_pipeline]
- create_data_pipeline = CreateDataPipelineOperator(
- task_id="create_data_pipeline",
- project_id=GCP_PROJECT_ID,
- location=GCP_LOCATION,
- body={
- "name":
f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
- "type": PIPELINE_TYPE,
- "workload": {
- "dataflowFlexTemplateRequest": {
- "launchParameter": {
- "containerSpecGcsPath": GCS_PATH,
- "jobName": PIPELINE_JOB_NAME,
- "environment": {"tempLocation": TEMP_LOCATION},
- "parameters": {
- "inputFile": INPUT_FILE,
- "output": OUTPUT,
- },
- },
- "projectId": GCP_PROJECT_ID,
- "location": GCP_LOCATION,
- }
- },
- },
- )
- # [END howto_operator_create_data_pipeline]
-
- # [START howto_operator_run_data_pipeline]
- run_data_pipeline = RunDataPipelineOperator(
- task_id="run_data_pipeline",
- data_pipeline_name=PIPELINE_NAME,
- project_id=GCP_PROJECT_ID,
- )
- # [END howto_operator_run_data_pipeline]
-
- # [START howto_operator_delete_dataflow_pipeline]
- delete_pipeline = DataflowDeletePipelineOperator(
- task_id="delete_data_pipeline",
- pipeline_name=PIPELINE_NAME,
- project_id=GCP_PROJECT_ID,
- trigger_rule=TriggerRule.ALL_DONE,
- )
- # [END howto_operator_delete_dataflow_pipeline]
-
- delete_bucket = GCSDeleteBucketOperator(
- task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
- )
-
- (
- # TEST SETUP
- create_bucket
- >> move_files_to_bucket
- # TEST BODY
- >> create_data_pipeline
- >> run_data_pipeline
- # TEST TEARDOWN
- >> delete_pipeline
- >> delete_bucket
- )
-
- from tests_common.test_utils.watcher import watcher
-
- # This test needs watcher in order to properly mark success/failure
- # when "teardown" task with trigger rule is part of the DAG
- list(dag.tasks) >> watcher()
-
-
-from tests_common.test_utils.system_tests import get_test_run # noqa: E402
-
-# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git a/tests/always/test_project_structure.py
b/tests/always/test_project_structure.py
index 9a2d0d59e5f..85c467160b6 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -354,8 +354,6 @@ class
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
"airflow.providers.google.cloud.operators.automl.AutoMLTablesUpdateDatasetOperator",
"airflow.providers.google.cloud.operators.automl.AutoMLDeployModelOperator",
"airflow.providers.google.cloud.operators.automl.AutoMLBatchPredictOperator",
-
"airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator",
-
"airflow.providers.google.cloud.operators.datapipeline.RunDataPipelineOperator",
"airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator",
"airflow.providers.google.cloud.operators.mlengine.MLEngineManageModelOperator",
"airflow.providers.google.cloud.operators.mlengine.MLEngineManageVersionOperator",