This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7fe46e12e2b Google provider delete deprecated reaching removal date 
(December 2024) (#45084)
7fe46e12e2b is described below

commit 7fe46e12e2bccc1312fe5e4aadc2a3729d34b385
Author: olegkachur-e <[email protected]>
AuthorDate: Thu Dec 19 21:48:22 2024 +0100

    Google provider delete deprecated reaching removal date (December 2024) 
(#45084)
    
    * remove deprecated DataPipelineHook hook
    
    * remove deprecated CreateDataPipelineOperator, RunDataPipelineOperator
    
    ---------
    
    Co-authored-by: Oleg Kachur <[email protected]>
---
 .../operators/cloud/datapipeline.rst               |  95 -------------
 .../src/airflow/providers/google/CHANGELOG.rst     |  26 ++++
 .../providers/google/cloud/hooks/datapipeline.py   |  71 ----------
 .../google/cloud/operators/datapipeline.py         |  63 ---------
 .../src/airflow/providers/google/provider.yaml     |  11 --
 .../tests/google/cloud/hooks/test_datapipeline.py  | 122 -----------------
 .../google/cloud/operators/test_datapipeline.py    |   4 +-
 .../system/google/cloud/datapipelines/__init__.py  |  16 ---
 .../cloud/datapipelines/example_datapipeline.py    | 151 ---------------------
 tests/always/test_project_structure.py             |   2 -
 10 files changed, 28 insertions(+), 533 deletions(-)

diff --git 
a/docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst 
b/docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst
deleted file mode 100644
index 4996afaf7c3..00000000000
--- a/docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst
+++ /dev/null
@@ -1,95 +0,0 @@
- .. Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
- ..   http://www.apache.org/licenses/LICENSE-2.0
-
- .. Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
-
-Google Cloud Data Pipelines Operators
-=====================================
-
-Data Pipelines is a Dataflow feature that allows customers to create
-and schedule recurring jobs, view aggregated job metrics, and define
-and manage job SLOs. A pipeline consists of a collection of jobs
-including ways to manage them. A pipeline may be associated with a
-Dataflow Template (classic/flex) and include all jobs launched with
-the associated template.
-
-Prerequisite Tasks
-^^^^^^^^^^^^^^^^^^
-
-.. include:: /operators/_partials/prerequisite_tasks.rst
-
-Creating a Data Pipeline
-^^^^^^^^^^^^^^^^^^^^^^^^
-
-.. warning::
-   This operator is deprecated. Please use 
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePipelineOperator`.
-
-To create a new Data Pipelines instance using a request body and parent name, 
use 
:class:`~airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator`.
-The operator accesses Google Cloud's Data Pipelines API and calls upon the
-`create method 
<https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create>`__
-to run the given pipeline.
-
-:class:`~airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator`
 accepts four parameters:
-   **body**: instance of the Pipeline,
-   **project_id**: id of the GCP project that owns the job,
-   **location**: destination for the Pipeline,
-   **gcp_conn_id**: id to connect to Google Cloud.
-
-The request body and project id need to be passed each time, while the GCP 
connection id and location have default values.
-The project id and location will be used to build the parent name needed to 
create the operator.
-
-Here is an example of how you can create a Data Pipelines instance by running 
the above parameters with CreateDataPipelineOperator:
-
-.. exampleinclude:: 
/../../providers/tests/system/google/cloud/datapipelines/example_datapipeline.py
-   :language: python
-   :dedent: 4
-   :start-after: [START howto_operator_create_data_pipeline]
-   :end-before: [END howto_operator_create_data_pipeline]
-
-Running a Data Pipeline
-^^^^^^^^^^^^^^^^^^^^^^^
-
-.. warning::
-   This operator is deprecated. Please use 
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowRunPipelineOperator`.
-
-To run a Data Pipelines instance, use 
:class:`~airflow.providers.google.cloud.operators.datapipeline.RunDataPipelineOperator`.
-The operator accesses Google Cloud's Data Pipelines API and calls upon the
-`run method 
<https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/run>`__
-to run the given pipeline.
-
-:class:`~airflow.providers.google.cloud.operators.datapipeline.RunDataPipelineOperator`
 can take in four parameters:
-
-- ``data_pipeline_name``: the name of the Data Pipelines instance
-- ``project_id``: the ID of the GCP project that owns the job
-- ``location``: the location of the Data Pipelines instance
-- ``gcp_conn_id``: the connection ID to connect to the Google Cloud Platform
-
-Only the Data Pipeline name and Project ID are required parameters, as the 
Location and GCP Connection ID have default values.
-The Project ID and Location will be used to build the parent name, which is 
where the given Data Pipeline should be located.
-
-You can run a Data Pipelines instance by running the above parameters with 
RunDataPipelineOperator:
-
-.. exampleinclude:: 
/../../providers/tests/system/google/cloud/datapipelines/example_datapipeline.py
-    :language: python
-    :dedent: 4
-    :start-after: [START howto_operator_run_data_pipeline]
-    :end-before: [END howto_operator_run_data_pipeline]
-
-Once called, the RunDataPipelineOperator will return the Google Cloud 
`Dataflow Job 
<https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/Job>`__
-created by running the given pipeline.
-
-For further information regarding the API usage, see
-`Data Pipelines API REST Resource 
<https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines#Pipeline>`__
-in the Google Cloud documentation.
diff --git a/providers/src/airflow/providers/google/CHANGELOG.rst 
b/providers/src/airflow/providers/google/CHANGELOG.rst
index 16e0a6d1c94..8fa95572277 100644
--- a/providers/src/airflow/providers/google/CHANGELOG.rst
+++ b/providers/src/airflow/providers/google/CHANGELOG.rst
@@ -27,6 +27,29 @@
 Changelog
 ---------
 
+12.0.0
+......
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+.. warning::
+  Deprecated classes, parameters and features have been removed from the 
Google provider package.
+  The following breaking changes were introduced:
+
+  * Operators
+
+    * Removed ``CreateDataPipelineOperator``. Please use the 
``DataflowCreatePipelineOperator`` instead
+    * Removed ``RunDataPipelineOperator``. Please use the 
``DataflowRunPipelineOperator`` instead
+
+  * Hooks
+
+    * Removed ``DataPipelineHook``. Please use the ``DataflowHook`` instead
+
+
+.. Below changes are excluded from the changelog. Move them to
+   appropriate section above if needed.
+
 11.0.0
 ......
 
@@ -80,6 +103,8 @@ Breaking changes
     * Removed ``CreateHyperparameterTuningJobOperator.sync``. This parameter 
is not in actual use
     * Removed ``CustomTrainingJobBaseOperator.sync``. This parameter is not in 
actual use
     * Removed ``GKEStartPodOperator.get_gke_config_file()``. Please use 
``GKEStartPodOperator.fetch_cluster_info()`` instead
+    * Removed ``CreateDataPipelineOperator``. Please use the 
``DataflowCreatePipelineOperator`` instead
+    * Removed ``RunDataPipelineOperator``. Please use the 
``DataflowRunPipelineOperator`` instead
 
   * Triggers
 
@@ -141,6 +166,7 @@ Breaking changes
     * Removed ``BigQueryHook.run_query()``. Please use 
``BigQueryHook.insert_job()`` instead
     * Removed ``BigQueryHook.create_external_table()``. Please use 
``BigQueryHook.create_empty_table()`` instead
     * Removed ``BigQueryHook.get_service()``. Please use 
``BigQueryHook.get_client()`` instead
+    * Removed ``DataPipelineHook``. Please use the DataflowHook instead
 
   * Backends
 
diff --git a/providers/src/airflow/providers/google/cloud/hooks/datapipeline.py 
b/providers/src/airflow/providers/google/cloud/hooks/datapipeline.py
deleted file mode 100644
index 4d408c919b3..00000000000
--- a/providers/src/airflow/providers/google/cloud/hooks/datapipeline.py
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module contains a Google Data Pipelines Hook."""
-
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.google.cloud.hooks.dataflow import DataflowHook
-from airflow.providers.google.common.deprecated import deprecated
-from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
-
-if TYPE_CHECKING:
-    from googleapiclient.discovery import build
-
-DEFAULT_DATAPIPELINE_LOCATION = "us-central1"
-
-
-@deprecated(
-    planned_removal_date="December 01, 2024",
-    use_instead="DataflowHook",
-    category=AirflowProviderDeprecationWarning,
-)
-class DataPipelineHook(DataflowHook):
-    """Hook for Google Data Pipelines."""
-
-    def get_conn(self) -> build:
-        """Return a Google Cloud Data Pipelines service object."""
-        return super().get_pipelines_conn()
-
-    @GoogleBaseHook.fallback_to_default_project_id
-    def create_data_pipeline(
-        self,
-        body: dict,
-        project_id: str,
-        location: str = DEFAULT_DATAPIPELINE_LOCATION,
-    ) -> dict:
-        """Create a new Data Pipelines instance from the Data Pipelines API."""
-        return super().create_data_pipeline(body=body, project_id=project_id, 
location=location)
-
-    @GoogleBaseHook.fallback_to_default_project_id
-    def run_data_pipeline(
-        self,
-        data_pipeline_name: str,
-        project_id: str,
-        location: str = DEFAULT_DATAPIPELINE_LOCATION,
-    ) -> dict:
-        """Run a Data Pipelines Instance using the Data Pipelines API."""
-        return super().run_data_pipeline(
-            pipeline_name=data_pipeline_name, project_id=project_id, 
location=location
-        )
-
-    @staticmethod
-    def build_parent_name(project_id: str, location: str):
-        return f"projects/{project_id}/locations/{location}"
diff --git 
a/providers/src/airflow/providers/google/cloud/operators/datapipeline.py 
b/providers/src/airflow/providers/google/cloud/operators/datapipeline.py
deleted file mode 100644
index d9188d16585..00000000000
--- a/providers/src/airflow/providers/google/cloud/operators/datapipeline.py
+++ /dev/null
@@ -1,63 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""This module contains Google Data Pipelines operators."""
-
-from __future__ import annotations
-
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.google.cloud.hooks.dataflow import 
DEFAULT_DATAFLOW_LOCATION
-from airflow.providers.google.cloud.operators.dataflow import (
-    DataflowCreatePipelineOperator,
-    DataflowRunPipelineOperator,
-)
-from airflow.providers.google.common.deprecated import deprecated
-from airflow.providers.google.common.hooks.base_google import 
PROVIDE_PROJECT_ID
-
-
-@deprecated(
-    planned_removal_date="December 01, 2024",
-    use_instead="DataflowCreatePipelineOperator",
-    category=AirflowProviderDeprecationWarning,
-)
-class CreateDataPipelineOperator(DataflowCreatePipelineOperator):
-    """Creates a new Data Pipelines instance from the Data Pipelines API."""
-
-
-@deprecated(
-    planned_removal_date="December 01, 2024",
-    use_instead="DataflowRunPipelineOperator",
-    category=AirflowProviderDeprecationWarning,
-)
-class RunDataPipelineOperator(DataflowRunPipelineOperator):
-    """Runs a Data Pipelines Instance using the Data Pipelines API."""
-
-    def __init__(
-        self,
-        data_pipeline_name: str,
-        project_id: str = PROVIDE_PROJECT_ID,
-        location: str = DEFAULT_DATAFLOW_LOCATION,
-        gcp_conn_id: str = "google_cloud_default",
-        **kwargs,
-    ) -> None:
-        super().__init__(
-            pipeline_name=data_pipeline_name,
-            project_id=project_id,
-            location=location,
-            gcp_conn_id=gcp_conn_id,
-            **kwargs,
-        )
diff --git a/providers/src/airflow/providers/google/provider.yaml 
b/providers/src/airflow/providers/google/provider.yaml
index cc7357075ab..7eb3a5e7c64 100644
--- a/providers/src/airflow/providers/google/provider.yaml
+++ b/providers/src/airflow/providers/google/provider.yaml
@@ -410,11 +410,6 @@ integrations:
       - /docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
     logo: /integration-logos/gcp/Cloud-Dataflow.png
     tags: [gcp]
-  - integration-name: Google Data Pipelines
-    external-doc-url: 
https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest
-    how-to-guide:
-      - /docs/apache-airflow-providers-google/operators/cloud/datapipeline.rst
-    tags: [gcp]
   - integration-name: Google Data Fusion
     external-doc-url: https://cloud.google.com/data-fusion/
     how-to-guide:
@@ -596,9 +591,6 @@ operators:
   - integration-name: Google Dataflow
     python-modules:
       - airflow.providers.google.cloud.operators.dataflow
-  - integration-name: Google Data Pipelines
-    python-modules:
-      - airflow.providers.google.cloud.operators.datapipeline
   - integration-name: Google Data Fusion
     python-modules:
       - airflow.providers.google.cloud.operators.datafusion
@@ -850,9 +842,6 @@ hooks:
   - integration-name: Google Dataflow
     python-modules:
       - airflow.providers.google.cloud.hooks.dataflow
-  - integration-name: Google Data Pipelines
-    python-modules:
-      - airflow.providers.google.cloud.hooks.datapipeline
   - integration-name: Google Data Fusion
     python-modules:
       - airflow.providers.google.cloud.hooks.datafusion
diff --git a/providers/tests/google/cloud/hooks/test_datapipeline.py 
b/providers/tests/google/cloud/hooks/test_datapipeline.py
deleted file mode 100644
index 2ff65c4fee0..00000000000
--- a/providers/tests/google/cloud/hooks/test_datapipeline.py
+++ /dev/null
@@ -1,122 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import warnings
-from unittest import mock
-
-import pytest
-
-from airflow.exceptions import AirflowProviderDeprecationWarning
-from airflow.providers.google.cloud.hooks.datapipeline import DataPipelineHook
-
-pytestmark = pytest.mark.db_test
-
-
-TASK_ID = "test-datapipeline-operators"
-TEST_PARENT_NAME = "projects/test-project-id/locations/test-location"
-TEST_LOCATION = "test-location"
-TEST_PROJECT_ID = "test-project-id"
-TEST_DATA_PIPELINE_NAME = "test-data-pipeline-name"
-TEST_JOB_ID = "test-job-id"
-TEST_BODY = {
-    "name": f"{TEST_PARENT_NAME}/pipelines/{TEST_DATA_PIPELINE_NAME}",
-    "type": "PIPELINE_TYPE_BATCH",
-    "workload": {
-        "dataflowFlexTemplateRequest": {
-            "launchParameter": {
-                "containerSpecGcsPath": 
"gs://dataflow-templates-us-central1/latest/Word_Count_metadata",
-                "jobName": "test-job",
-                "environment": {"tempLocation": "test-temp-location"},
-                "parameters": {
-                    "inputFile": 
"gs://dataflow-samples/shakespeare/kinglear.txt",
-                    "output": "gs://test/output/my_output",
-                },
-            },
-            "projectId": TEST_PROJECT_ID,
-            "location": TEST_LOCATION,
-        }
-    },
-}
-
-
-class TestDataPipelineHook:
-    """
-    Module meant to test the DataPipeline Hooks
-    """
-
-    def setup_method(self):
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", AirflowProviderDeprecationWarning)
-            self.datapipeline_hook = 
DataPipelineHook(gcp_conn_id="google_cloud_default")
-
-    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.DataflowHook._authorize")
-    @mock.patch("airflow.providers.google.cloud.hooks.dataflow.build")
-    def test_get_conn(self, mock_build, mock_authorize):
-        """
-        Test that get_conn is called with the correct params and
-        returns the correct API address
-        """
-        connection = self.datapipeline_hook.get_conn()
-        mock_build.assert_called_once_with(
-            "datapipelines", "v1", http=mock_authorize.return_value, 
cache_discovery=False
-        )
-        assert mock_build.return_value == connection
-
-    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.DataflowHook.get_pipelines_conn")
-    def test_create_data_pipeline(self, mock_connection):
-        """
-        Test that request are called with the correct params
-        Test that request returns the correct value
-        """
-        mock_locations = 
mock_connection.return_value.projects.return_value.locations
-        mock_request = 
mock_locations.return_value.pipelines.return_value.create
-        mock_request.return_value.execute.return_value = {"name": 
TEST_PARENT_NAME}
-
-        result = self.datapipeline_hook.create_data_pipeline(
-            body=TEST_BODY,
-            project_id=TEST_PROJECT_ID,
-            location=TEST_LOCATION,
-        )
-
-        mock_request.assert_called_once_with(
-            parent=TEST_PARENT_NAME,
-            body=TEST_BODY,
-        )
-        assert result == {"name": TEST_PARENT_NAME}
-
-    
@mock.patch("airflow.providers.google.cloud.hooks.dataflow.DataflowHook.get_pipelines_conn")
-    def test_run_data_pipeline(self, mock_connection):
-        """
-        Test that run_data_pipeline is called with correct parameters and
-        calls Google Data Pipelines API
-        """
-        mock_request = 
mock_connection.return_value.projects.return_value.locations.return_value.pipelines.return_value.run
-        mock_request.return_value.execute.return_value = {"job": {"id": 
TEST_JOB_ID}}
-
-        result = self.datapipeline_hook.run_data_pipeline(
-            data_pipeline_name=TEST_DATA_PIPELINE_NAME,
-            project_id=TEST_PROJECT_ID,
-            location=TEST_LOCATION,
-        )
-
-        mock_request.assert_called_once_with(
-            name=f"{TEST_PARENT_NAME}/pipelines/{TEST_DATA_PIPELINE_NAME}",
-            body={},
-        )
-        assert result == {"job": {"id": TEST_JOB_ID}}
diff --git a/providers/tests/google/cloud/operators/test_datapipeline.py 
b/providers/tests/google/cloud/operators/test_datapipeline.py
index 130b58d409a..c2963d184e1 100644
--- a/providers/tests/google/cloud/operators/test_datapipeline.py
+++ b/providers/tests/google/cloud/operators/test_datapipeline.py
@@ -52,7 +52,7 @@ TEST_GCP_CONN_ID = "test_gcp_conn_id"
 TEST_DATA_PIPELINE_NAME = "test_data_pipeline_name"
 
 
-class TestCreateDataPipelineOperator:
+class TestDataflowCreatePipelineOperator:
     @pytest.fixture
     def create_operator(self):
         """
@@ -85,7 +85,7 @@ class TestCreateDataPipelineOperator:
 
 
 @pytest.mark.db_test
-class TestRunDataPipelineOperator:
+class TestDataflowRunPipelineOperator:
     @pytest.fixture
     def run_operator(self):
         """
diff --git a/providers/tests/system/google/cloud/datapipelines/__init__.py 
b/providers/tests/system/google/cloud/datapipelines/__init__.py
deleted file mode 100644
index 13a83393a91..00000000000
--- a/providers/tests/system/google/cloud/datapipelines/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git 
a/providers/tests/system/google/cloud/datapipelines/example_datapipeline.py 
b/providers/tests/system/google/cloud/datapipelines/example_datapipeline.py
deleted file mode 100644
index 0c0c430eae1..00000000000
--- a/providers/tests/system/google/cloud/datapipelines/example_datapipeline.py
+++ /dev/null
@@ -1,151 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Example Airflow DAG for testing Google DataPipelines Create Data Pipeline 
Operator.
-"""
-
-from __future__ import annotations
-
-import os
-from datetime import datetime
-
-from airflow.models.dag import DAG
-from airflow.providers.google.cloud.operators.dataflow import 
DataflowDeletePipelineOperator
-from airflow.providers.google.cloud.operators.datapipeline import (
-    CreateDataPipelineOperator,
-    RunDataPipelineOperator,
-)
-from airflow.providers.google.cloud.operators.gcs import (
-    GCSCreateBucketOperator,
-    GCSDeleteBucketOperator,
-    GCSSynchronizeBucketsOperator,
-)
-from airflow.utils.trigger_rule import TriggerRule
-
-from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
-
-DAG_ID = "datapipeline"
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
-GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") or 
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
-GCP_LOCATION = "us-central1"
-
-PIPELINE_NAME = f"{DAG_ID}-{ENV_ID}".replace("_", "-")
-PIPELINE_JOB_NAME = f"{DAG_ID}-{ENV_ID}-job".replace("_", "-")
-PIPELINE_TYPE = "PIPELINE_TYPE_BATCH"
-
-RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
-BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("-", "_")
-
-FILE_NAME = "kinglear.txt"
-TEMPLATE_FILE = "word-count.json"
-TEMP_LOCATION = f"gs://{BUCKET_NAME}/temp"
-
-GCS_PATH = f"gs://{BUCKET_NAME}/dataflow/{TEMPLATE_FILE}"
-INPUT_FILE = f"gs://{BUCKET_NAME}/dataflow/{FILE_NAME}"
-OUTPUT = f"gs://{BUCKET_NAME}/results/hello"
-
-with DAG(
-    DAG_ID,
-    schedule="@once",
-    start_date=datetime(2021, 1, 1),
-    catchup=False,
-    tags=["example", "datapipeline"],
-) as dag:
-    create_bucket = GCSCreateBucketOperator(task_id="create_bucket", 
bucket_name=BUCKET_NAME)
-
-    move_files_to_bucket = GCSSynchronizeBucketsOperator(
-        task_id="move_files_to_bucket",
-        source_bucket=RESOURCE_DATA_BUCKET,
-        source_object="dataflow/pipelines",
-        destination_bucket=BUCKET_NAME,
-        destination_object="dataflow",
-        recursive=True,
-    )
-
-    # [START howto_operator_create_data_pipeline]
-    create_data_pipeline = CreateDataPipelineOperator(
-        task_id="create_data_pipeline",
-        project_id=GCP_PROJECT_ID,
-        location=GCP_LOCATION,
-        body={
-            "name": 
f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
-            "type": PIPELINE_TYPE,
-            "workload": {
-                "dataflowFlexTemplateRequest": {
-                    "launchParameter": {
-                        "containerSpecGcsPath": GCS_PATH,
-                        "jobName": PIPELINE_JOB_NAME,
-                        "environment": {"tempLocation": TEMP_LOCATION},
-                        "parameters": {
-                            "inputFile": INPUT_FILE,
-                            "output": OUTPUT,
-                        },
-                    },
-                    "projectId": GCP_PROJECT_ID,
-                    "location": GCP_LOCATION,
-                }
-            },
-        },
-    )
-    # [END howto_operator_create_data_pipeline]
-
-    # [START howto_operator_run_data_pipeline]
-    run_data_pipeline = RunDataPipelineOperator(
-        task_id="run_data_pipeline",
-        data_pipeline_name=PIPELINE_NAME,
-        project_id=GCP_PROJECT_ID,
-    )
-    # [END howto_operator_run_data_pipeline]
-
-    # [START howto_operator_delete_dataflow_pipeline]
-    delete_pipeline = DataflowDeletePipelineOperator(
-        task_id="delete_data_pipeline",
-        pipeline_name=PIPELINE_NAME,
-        project_id=GCP_PROJECT_ID,
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-    # [END howto_operator_delete_dataflow_pipeline]
-
-    delete_bucket = GCSDeleteBucketOperator(
-        task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
-    )
-
-    (
-        # TEST SETUP
-        create_bucket
-        >> move_files_to_bucket
-        # TEST BODY
-        >> create_data_pipeline
-        >> run_data_pipeline
-        # TEST TEARDOWN
-        >> delete_pipeline
-        >> delete_bucket
-    )
-
-    from tests_common.test_utils.watcher import watcher
-
-    # This test needs watcher in order to properly mark success/failure
-    # when "teardown" task with trigger rule is part of the DAG
-    list(dag.tasks) >> watcher()
-
-
-from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
-
-# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
-test_run = get_test_run(dag)
diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index 9a2d0d59e5f..85c467160b6 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -354,8 +354,6 @@ class 
TestGoogleProviderProjectStructure(ExampleCoverageTest, AssetsCoverageTest
         
"airflow.providers.google.cloud.operators.automl.AutoMLTablesUpdateDatasetOperator",
         
"airflow.providers.google.cloud.operators.automl.AutoMLDeployModelOperator",
         
"airflow.providers.google.cloud.operators.automl.AutoMLBatchPredictOperator",
-        
"airflow.providers.google.cloud.operators.datapipeline.CreateDataPipelineOperator",
-        
"airflow.providers.google.cloud.operators.datapipeline.RunDataPipelineOperator",
         
"airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator",
         
"airflow.providers.google.cloud.operators.mlengine.MLEngineManageModelOperator",
         
"airflow.providers.google.cloud.operators.mlengine.MLEngineManageVersionOperator",

Reply via email to