This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aab1189  Add GCSToPrestoOperator (#21084)
aab1189 is described below

commit aab11890dbae5dab7ec5df89e79bc8e27f451ce7
Author: Rachana Gogate <[email protected]>
AuthorDate: Wed Feb 16 22:17:35 2022 -0800

    Add GCSToPrestoOperator (#21084)
---
 CONTRIBUTING.rst                                   |   1 +
 airflow/providers/dependencies.json                |   3 +
 .../{provider.yaml => example_dags/__init__.py}    |  34 -------
 .../presto/example_dags/example_gcs_to_presto.py   |  46 +++++++++
 airflow/providers/presto/provider.yaml             |   6 ++
 .../{provider.yaml => transfers/__init__.py}       |  34 -------
 .../providers/presto/transfers/gcs_to_presto.py    | 103 +++++++++++++++++++++
 docs/apache-airflow-providers-presto/index.rst     |   7 ++
 .../operators/transfer/gcs_to_presto.rst           |  51 ++++++++++
 .../providers/presto/transfers/__init__.py         |  35 +------
 .../providers/presto/transfers/test_gcs_presto.py  |  71 ++++++++++++++
 11 files changed, 289 insertions(+), 102 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index c43a4c5..bed3915 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -686,6 +686,7 @@ hashicorp                  google
 microsoft.azure            google,oracle,sftp
 mysql                      amazon,presto,trino,vertica
 postgres                   amazon
+presto                     google
 salesforce                 tableau
 sftp                       ssh
 slack                      http
diff --git a/airflow/providers/dependencies.json 
b/airflow/providers/dependencies.json
index bb73d81..f664364 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -70,6 +70,9 @@
   "postgres": [
     "amazon"
   ],
+  "presto": [
+    "google"
+  ],
   "salesforce": [
     "tableau"
   ],
diff --git a/airflow/providers/presto/provider.yaml 
b/airflow/providers/presto/example_dags/__init__.py
similarity index 50%
copy from airflow/providers/presto/provider.yaml
copy to airflow/providers/presto/example_dags/__init__.py
index 456bd0b..13a8339 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/airflow/providers/presto/example_dags/__init__.py
@@ -14,37 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
----
-package-name: apache-airflow-providers-presto
-name: Presto
-description: |
-    `Presto <https://prestodb.github.io/>`__
-
-versions:
-  - 2.0.1
-  - 2.0.0
-  - 1.0.2
-  - 1.0.1
-  - 1.0.0
-
-additional-dependencies:
-  - apache-airflow>=2.1.0
-
-integrations:
-  - integration-name: Presto
-    external-doc-url: http://prestodb.github.io/
-    logo: /integration-logos/presto/PrestoDB.png
-    tags: [software]
-
-hooks:
-  - integration-name: Presto
-    python-modules:
-      - airflow.providers.presto.hooks.presto
-
-hook-class-names:  # deprecated - to be removed after providers add dependency 
on Airflow 2.2.0+
-  - airflow.providers.presto.hooks.presto.PrestoHook
-
-connection-types:
-  - hook-class-name: airflow.providers.presto.hooks.presto.PrestoHook
-    connection-type: presto
diff --git a/airflow/providers/presto/example_dags/example_gcs_to_presto.py 
b/airflow/providers/presto/example_dags/example_gcs_to_presto.py
new file mode 100644
index 0000000..1593127
--- /dev/null
+++ b/airflow/providers/presto/example_dags/example_gcs_to_presto.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG using GCSToPrestoOperator.
+"""
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.presto.transfers.gcs_to_presto import 
GCSToPrestoOperator
+
+BUCKET = os.environ.get("GCP_GCS_BUCKET", "test28397yeo")
+PATH_TO_FILE = os.environ.get("GCP_PATH", "path/to/file")
+PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table")
+
+with models.DAG(
+    dag_id="example_gcs_to_presto",
+    schedule_interval='@once',  # Override to match your needs
+    start_date=datetime(2022, 1, 1),
+    catchup=False,
+    tags=["example"],
+) as dag:
+    # [START gcs_csv_to_presto_table]
+    gcs_csv_to_presto_table = GCSToPrestoOperator(
+        task_id="gcs_csv_to_presto_table",
+        source_bucket=BUCKET,
+        source_object=PATH_TO_FILE,
+        presto_table=PRESTO_TABLE,
+    )
+    # [END gcs_csv_to_presto_table]
diff --git a/airflow/providers/presto/provider.yaml 
b/airflow/providers/presto/provider.yaml
index 456bd0b..62ffeeb 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/airflow/providers/presto/provider.yaml
@@ -42,6 +42,12 @@ hooks:
     python-modules:
       - airflow.providers.presto.hooks.presto
 
+transfers:
+  - source-integration-name: Google Cloud Storage (GCS)
+    target-integration-name: Presto
+    how-to-guide: 
/docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst
+    python-module: airflow.providers.presto.transfers.gcs_to_presto
+
 hook-class-names:  # deprecated - to be removed after providers add dependency 
on Airflow 2.2.0+
   - airflow.providers.presto.hooks.presto.PrestoHook
 
diff --git a/airflow/providers/presto/provider.yaml 
b/airflow/providers/presto/transfers/__init__.py
similarity index 50%
copy from airflow/providers/presto/provider.yaml
copy to airflow/providers/presto/transfers/__init__.py
index 456bd0b..13a8339 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/airflow/providers/presto/transfers/__init__.py
@@ -14,37 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
----
-package-name: apache-airflow-providers-presto
-name: Presto
-description: |
-    `Presto <https://prestodb.github.io/>`__
-
-versions:
-  - 2.0.1
-  - 2.0.0
-  - 1.0.2
-  - 1.0.1
-  - 1.0.0
-
-additional-dependencies:
-  - apache-airflow>=2.1.0
-
-integrations:
-  - integration-name: Presto
-    external-doc-url: http://prestodb.github.io/
-    logo: /integration-logos/presto/PrestoDB.png
-    tags: [software]
-
-hooks:
-  - integration-name: Presto
-    python-modules:
-      - airflow.providers.presto.hooks.presto
-
-hook-class-names:  # deprecated - to be removed after providers add dependency 
on Airflow 2.2.0+
-  - airflow.providers.presto.hooks.presto.PrestoHook
-
-connection-types:
-  - hook-class-name: airflow.providers.presto.hooks.presto.PrestoHook
-    connection-type: presto
diff --git a/airflow/providers/presto/transfers/gcs_to_presto.py 
b/airflow/providers/presto/transfers/gcs_to_presto.py
new file mode 100644
index 0000000..ebf0ac4
--- /dev/null
+++ b/airflow/providers/presto/transfers/gcs_to_presto.py
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Cloud Storage to Presto operator."""
+
+import csv
+from tempfile import NamedTemporaryFile
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.providers.presto.hooks.presto import PrestoHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GCSToPrestoOperator(BaseOperator):
+    """
+    Loads a csv file from Google Cloud Storage into a Presto table.
+    Assumptions:
+    1. First row of the csv contains headers
+    2. Presto table with requisite columns is already created
+
+    :param source_bucket: Source GCS bucket that contains the csv
+    :param source_object: csv file including the path
+    :param presto_table: presto table to upload the data
+    :param presto_conn_id: destination presto connection
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud and
+        interact with the Google Cloud Storage service.
+    :param delegate_to: The account to impersonate using domain-wide 
delegation of authority,
+        if any. For this to work, the service account making the request must 
have
+        domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account.
+    """
+
+    def __init__(
+        self,
+        *,
+        source_bucket: str,
+        source_object: str,
+        presto_table: str,
+        presto_conn_id: str = "presto_default",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.source_bucket = source_bucket
+        self.source_object = source_object
+        self.presto_table = presto_table
+        self.presto_conn_id = presto_conn_id
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: 'Context') -> None:
+        gcs_hook = GCSHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        presto_hook = PrestoHook(presto_conn_id=self.presto_conn_id)
+
+        with NamedTemporaryFile("w+") as temp_file:
+            self.log.info("Downloading data from %s", self.source_object)
+            gcs_hook.download(
+                bucket_name=self.source_bucket,
+                object_name=self.source_object,
+                filename=temp_file.name,
+            )
+
+            data = list(csv.reader(temp_file))
+            fields = tuple(data[0])
+            rows = []
+            for row in data[1:]:
+                rows.append(tuple(row))
+
+            self.log.info("Inserting data into %s", self.presto_table)
+            presto_hook.insert_rows(table=self.presto_table, rows=rows, 
target_fields=fields)
diff --git a/docs/apache-airflow-providers-presto/index.rst 
b/docs/apache-airflow-providers-presto/index.rst
index 381eb28..86f4800 100644
--- a/docs/apache-airflow-providers-presto/index.rst
+++ b/docs/apache-airflow-providers-presto/index.rst
@@ -24,6 +24,12 @@ Content
 
 .. toctree::
     :maxdepth: 1
+    :caption: Guides
+
+    PrestoTransferOperator types <operators/transfer/gcs_to_presto>
+
+.. toctree::
+    :maxdepth: 1
     :caption: References
 
     Python API <_api/airflow/providers/presto/index>
@@ -32,6 +38,7 @@ Content
     :maxdepth: 1
     :caption: Resources
 
+    Example DAGs 
<https://github.com/apache/airflow/tree/main/airflow/providers/presto/example_dags>
     PyPI Repository <https://pypi.org/project/apache-airflow-providers-presto/>
     Installing from sources <installing-providers-from-sources>
 
diff --git 
a/docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst 
b/docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst
new file mode 100644
index 0000000..f1c5cb9
--- /dev/null
+++ b/docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst
@@ -0,0 +1,51 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Google Cloud Storage to Presto Transfer Operator
+================================================
+
+Google has a service `Google Cloud Storage 
<https://cloud.google.com/storage/>`__. This service is
+used to store large data from various applications.
+
+`Presto <https://prestodb.io/>`__ is an open source distributed SQL query 
engine for running interactive
+analytic queries against data sources of all sizes ranging from gigabytes to 
petabytes. Presto allows
+querying data where it lives, including Hive, Cassandra, relational databases 
or even proprietary data stores.
+A single Presto query can combine data from multiple sources, allowing for 
analytics across your entire
+organization.
+
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include::/operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:GCSToPresto:
+
+Load CSV from GCS to Presto Table
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To load a CSV file from Google Cloud Storage to a Presto table you can use the
+:class:`~airflow.providers.presto.transfers.gcs_to_presto.GCSToPrestoOperator`.
+
+This operator assumes that the first row of the CSV contains headers 
corresponding to the columns in a
+pre-existing presto table.
+
+.. exampleinclude:: 
/../../airflow/providers/presto/example_dags/example_gcs_to_presto.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gcs_csv_to_presto_table]
+    :end-before: [END gcs_csv_to_presto_table]
diff --git a/airflow/providers/presto/provider.yaml 
b/tests/providers/presto/transfers/__init__.py
similarity index 50%
copy from airflow/providers/presto/provider.yaml
copy to tests/providers/presto/transfers/__init__.py
index 456bd0b..217e5db 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/tests/providers/presto/transfers/__init__.py
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,37 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
----
-package-name: apache-airflow-providers-presto
-name: Presto
-description: |
-    `Presto <https://prestodb.github.io/>`__
-
-versions:
-  - 2.0.1
-  - 2.0.0
-  - 1.0.2
-  - 1.0.1
-  - 1.0.0
-
-additional-dependencies:
-  - apache-airflow>=2.1.0
-
-integrations:
-  - integration-name: Presto
-    external-doc-url: http://prestodb.github.io/
-    logo: /integration-logos/presto/PrestoDB.png
-    tags: [software]
-
-hooks:
-  - integration-name: Presto
-    python-modules:
-      - airflow.providers.presto.hooks.presto
-
-hook-class-names:  # deprecated - to be removed after providers add dependency 
on Airflow 2.2.0+
-  - airflow.providers.presto.hooks.presto.PrestoHook
-
-connection-types:
-  - hook-class-name: airflow.providers.presto.hooks.presto.PrestoHook
-    connection-type: presto
diff --git a/tests/providers/presto/transfers/test_gcs_presto.py 
b/tests/providers/presto/transfers/test_gcs_presto.py
new file mode 100644
index 0000000..89631da
--- /dev/null
+++ b/tests/providers/presto/transfers/test_gcs_presto.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+from airflow.providers.presto.transfers.gcs_to_presto import 
GCSToPrestoOperator
+
+BUCKET = "source_bucket"
+PATH = "path/to/file.csv"
+GCP_CONN_ID = "test_gcp"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+PRESTO_CONN_ID = "test_presto"
+PRESTO_TABLE = "test_table"
+TASK_ID = "test_gcs_to_presto"
+VALUES = [("colA", "colB", "colC"), (1, 2, 3)]
+
+
+class TestGCSToPrestoOperator(unittest.TestCase):
+    @mock.patch('airflow.providers.presto.transfers.gcs_to_presto.csv.reader')
+    @mock.patch('airflow.providers.presto.transfers.gcs_to_presto.PrestoHook')
+    @mock.patch("airflow.providers.presto.transfers.gcs_to_presto.GCSHook")
+    
@mock.patch("airflow.providers.presto.transfers.gcs_to_presto.NamedTemporaryFile")
+    def test_execute(self, mock_tempfile, mock_gcs_hook, mock_presto_hook, 
mock_reader):
+        filename = "file://97g23r"
+        file_handle = mock.MagicMock()
+        mock_tempfile.return_value.__enter__.return_value = file_handle
+        mock_tempfile.return_value.__enter__.return_value.name = filename
+        mock_reader.return_value = VALUES
+
+        op = GCSToPrestoOperator(
+            task_id=TASK_ID,
+            source_bucket=BUCKET,
+            source_object=PATH,
+            presto_table=PRESTO_TABLE,
+            presto_conn_id=PRESTO_CONN_ID,
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(None)
+
+        mock_gcs_hook.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=None,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        mock_presto_hook.assert_called_once_with(presto_conn_id=PRESTO_CONN_ID)
+
+        mock_download = mock_gcs_hook.return_value.download
+
+        mock_download.assert_called_once_with(bucket_name=BUCKET, 
object_name=PATH, filename=filename)
+
+        mock_insert = mock_presto_hook.return_value.insert_rows
+
+        fields = VALUES[0]
+        mock_insert.assert_called_once_with(table=PRESTO_TABLE, 
rows=VALUES[1:], target_fields=fields)

Reply via email to