This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aab1189 Add GCSToPrestoOperator (#21084)
aab1189 is described below
commit aab11890dbae5dab7ec5df89e79bc8e27f451ce7
Author: Rachana Gogate <[email protected]>
AuthorDate: Wed Feb 16 22:17:35 2022 -0800
Add GCSToPrestoOperator (#21084)
---
CONTRIBUTING.rst | 1 +
airflow/providers/dependencies.json | 3 +
.../{provider.yaml => example_dags/__init__.py} | 34 -------
.../presto/example_dags/example_gcs_to_presto.py | 46 +++++++++
airflow/providers/presto/provider.yaml | 6 ++
.../{provider.yaml => transfers/__init__.py} | 34 -------
.../providers/presto/transfers/gcs_to_presto.py | 103 +++++++++++++++++++++
docs/apache-airflow-providers-presto/index.rst | 7 ++
.../operators/transfer/gcs_to_presto.rst | 51 ++++++++++
.../providers/presto/transfers/__init__.py | 35 +------
.../providers/presto/transfers/test_gcs_presto.py | 71 ++++++++++++++
11 files changed, 289 insertions(+), 102 deletions(-)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index c43a4c5..bed3915 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -686,6 +686,7 @@ hashicorp google
microsoft.azure google,oracle,sftp
mysql amazon,presto,trino,vertica
postgres amazon
+presto google
salesforce tableau
sftp ssh
slack http
diff --git a/airflow/providers/dependencies.json
b/airflow/providers/dependencies.json
index bb73d81..f664364 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -70,6 +70,9 @@
"postgres": [
"amazon"
],
+ "presto": [
+ "google"
+ ],
"salesforce": [
"tableau"
],
diff --git a/airflow/providers/presto/provider.yaml
b/airflow/providers/presto/example_dags/__init__.py
similarity index 50%
copy from airflow/providers/presto/provider.yaml
copy to airflow/providers/presto/example_dags/__init__.py
index 456bd0b..13a8339 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/airflow/providers/presto/example_dags/__init__.py
@@ -14,37 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
----
-package-name: apache-airflow-providers-presto
-name: Presto
-description: |
- `Presto <https://prestodb.github.io/>`__
-
-versions:
- - 2.0.1
- - 2.0.0
- - 1.0.2
- - 1.0.1
- - 1.0.0
-
-additional-dependencies:
- - apache-airflow>=2.1.0
-
-integrations:
- - integration-name: Presto
- external-doc-url: http://prestodb.github.io/
- logo: /integration-logos/presto/PrestoDB.png
- tags: [software]
-
-hooks:
- - integration-name: Presto
- python-modules:
- - airflow.providers.presto.hooks.presto
-
-hook-class-names: # deprecated - to be removed after providers add dependency
on Airflow 2.2.0+
- - airflow.providers.presto.hooks.presto.PrestoHook
-
-connection-types:
- - hook-class-name: airflow.providers.presto.hooks.presto.PrestoHook
- connection-type: presto
diff --git a/airflow/providers/presto/example_dags/example_gcs_to_presto.py
b/airflow/providers/presto/example_dags/example_gcs_to_presto.py
new file mode 100644
index 0000000..1593127
--- /dev/null
+++ b/airflow/providers/presto/example_dags/example_gcs_to_presto.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG using GCSToPrestoOperator.
+"""
+
+import os
+from datetime import datetime
+
+from airflow import models
+from airflow.providers.presto.transfers.gcs_to_presto import
GCSToPrestoOperator
+
+BUCKET = os.environ.get("GCP_GCS_BUCKET", "test28397yeo")
+PATH_TO_FILE = os.environ.get("GCP_PATH", "path/to/file")
+PRESTO_TABLE = os.environ.get("PRESTO_TABLE", "test_table")
+
+with models.DAG(
+ dag_id="example_gcs_to_presto",
+ schedule_interval='@once', # Override to match your needs
+ start_date=datetime(2022, 1, 1),
+ catchup=False,
+ tags=["example"],
+) as dag:
+ # [START gcs_csv_to_presto_table]
+ gcs_csv_to_presto_table = GCSToPrestoOperator(
+ task_id="gcs_csv_to_presto_table",
+ source_bucket=BUCKET,
+ source_object=PATH_TO_FILE,
+ presto_table=PRESTO_TABLE,
+ )
+ # [END gcs_csv_to_presto_table]
diff --git a/airflow/providers/presto/provider.yaml
b/airflow/providers/presto/provider.yaml
index 456bd0b..62ffeeb 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/airflow/providers/presto/provider.yaml
@@ -42,6 +42,12 @@ hooks:
python-modules:
- airflow.providers.presto.hooks.presto
+transfers:
+ - source-integration-name: Google Cloud Storage (GCS)
+ target-integration-name: Presto
+ how-to-guide:
/docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst
+ python-module: airflow.providers.presto.transfers.gcs_to_presto
+
hook-class-names: # deprecated - to be removed after providers add dependency
on Airflow 2.2.0+
- airflow.providers.presto.hooks.presto.PrestoHook
diff --git a/airflow/providers/presto/provider.yaml
b/airflow/providers/presto/transfers/__init__.py
similarity index 50%
copy from airflow/providers/presto/provider.yaml
copy to airflow/providers/presto/transfers/__init__.py
index 456bd0b..13a8339 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/airflow/providers/presto/transfers/__init__.py
@@ -14,37 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
----
-package-name: apache-airflow-providers-presto
-name: Presto
-description: |
- `Presto <https://prestodb.github.io/>`__
-
-versions:
- - 2.0.1
- - 2.0.0
- - 1.0.2
- - 1.0.1
- - 1.0.0
-
-additional-dependencies:
- - apache-airflow>=2.1.0
-
-integrations:
- - integration-name: Presto
- external-doc-url: http://prestodb.github.io/
- logo: /integration-logos/presto/PrestoDB.png
- tags: [software]
-
-hooks:
- - integration-name: Presto
- python-modules:
- - airflow.providers.presto.hooks.presto
-
-hook-class-names: # deprecated - to be removed after providers add dependency
on Airflow 2.2.0+
- - airflow.providers.presto.hooks.presto.PrestoHook
-
-connection-types:
- - hook-class-name: airflow.providers.presto.hooks.presto.PrestoHook
- connection-type: presto
diff --git a/airflow/providers/presto/transfers/gcs_to_presto.py
b/airflow/providers/presto/transfers/gcs_to_presto.py
new file mode 100644
index 0000000..ebf0ac4
--- /dev/null
+++ b/airflow/providers/presto/transfers/gcs_to_presto.py
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Cloud Storage to Presto operator."""
+
+import csv
+from tempfile import NamedTemporaryFile
+from typing import TYPE_CHECKING, Optional, Sequence, Union
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.providers.presto.hooks.presto import PrestoHook
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class GCSToPrestoOperator(BaseOperator):
+ """
+ Loads a csv file from Google Cloud Storage into a Presto table.
+ Assumptions:
+ 1. First row of the csv contains headers
+ 2. Presto table with requisite columns is already created
+
+ :param source_bucket: Source GCS bucket that contains the csv
+ :param source_object: csv file including the path
+ :param presto_table: presto table to upload the data
+ :param presto_conn_id: destination presto connection
+ :param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud and
+ interact with the Google Cloud Storage service.
+ :param delegate_to: The account to impersonate using domain-wide
delegation of authority,
+ if any. For this to work, the service account making the request must
have
+ domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account.
+ """
+
+ def __init__(
+ self,
+ *,
+ source_bucket: str,
+ source_object: str,
+ presto_table: str,
+ presto_conn_id: str = "presto_default",
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.source_bucket = source_bucket
+ self.source_object = source_object
+ self.presto_table = presto_table
+ self.presto_conn_id = presto_conn_id
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: 'Context') -> None:
+ gcs_hook = GCSHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ presto_hook = PrestoHook(presto_conn_id=self.presto_conn_id)
+
+ with NamedTemporaryFile("w+") as temp_file:
+ self.log.info("Downloading data from %s", self.source_object)
+ gcs_hook.download(
+ bucket_name=self.source_bucket,
+ object_name=self.source_object,
+ filename=temp_file.name,
+ )
+
+ data = list(csv.reader(temp_file))
+ fields = tuple(data[0])
+ rows = []
+ for row in data[1:]:
+ rows.append(tuple(row))
+
+ self.log.info("Inserting data into %s", self.presto_table)
+ presto_hook.insert_rows(table=self.presto_table, rows=rows,
target_fields=fields)
diff --git a/docs/apache-airflow-providers-presto/index.rst
b/docs/apache-airflow-providers-presto/index.rst
index 381eb28..86f4800 100644
--- a/docs/apache-airflow-providers-presto/index.rst
+++ b/docs/apache-airflow-providers-presto/index.rst
@@ -24,6 +24,12 @@ Content
.. toctree::
:maxdepth: 1
+ :caption: Guides
+
+ PrestoTransferOperator types <operators/transfer/gcs_to_presto>
+
+.. toctree::
+ :maxdepth: 1
:caption: References
Python API <_api/airflow/providers/presto/index>
@@ -32,6 +38,7 @@ Content
:maxdepth: 1
:caption: Resources
+ Example DAGs
<https://github.com/apache/airflow/tree/main/airflow/providers/presto/example_dags>
PyPI Repository <https://pypi.org/project/apache-airflow-providers-presto/>
Installing from sources <installing-providers-from-sources>
diff --git
a/docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst
b/docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst
new file mode 100644
index 0000000..f1c5cb9
--- /dev/null
+++ b/docs/apache-airflow-providers-presto/operators/transfer/gcs_to_presto.rst
@@ -0,0 +1,51 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Google Cloud Storage to Presto Transfer Operator
+================================================
+
+Google has a service `Google Cloud Storage
<https://cloud.google.com/storage/>`__. This service is
+used to store large data from various applications.
+
+`Presto <https://prestodb.io/>`__ is an open source distributed SQL query
engine for running interactive
+analytic queries against data sources of all sizes ranging from gigabytes to
petabytes. Presto allows
+querying data where it lives, including Hive, Cassandra, relational databases
or even proprietary data stores.
+A single Presto query can combine data from multiple sources, allowing for
analytics across your entire
+organization.
+
+
+Prerequisite Tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include::/operators/_partials/prerequisite_tasks.rst
+
+.. _howto/operator:GCSToPresto:
+
+Load CSV from GCS to Presto Table
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To load a CSV file from Google Cloud Storage to a Presto table you can use the
+:class:`~airflow.providers.presto.transfers.gcs_to_presto.GCSToPrestoOperator`.
+
+This operator assumes that the first row of the CSV contains headers
corresponding to the columns in a
+pre-existing presto table.
+
+.. exampleinclude::
/../../airflow/providers/presto/example_dags/example_gcs_to_presto.py
+ :language: python
+ :dedent: 4
+ :start-after: [START gcs_csv_to_presto_table]
+ :end-before: [END gcs_csv_to_presto_table]
diff --git a/airflow/providers/presto/provider.yaml
b/tests/providers/presto/transfers/__init__.py
similarity index 50%
copy from airflow/providers/presto/provider.yaml
copy to tests/providers/presto/transfers/__init__.py
index 456bd0b..217e5db 100644
--- a/airflow/providers/presto/provider.yaml
+++ b/tests/providers/presto/transfers/__init__.py
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,37 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
----
-package-name: apache-airflow-providers-presto
-name: Presto
-description: |
- `Presto <https://prestodb.github.io/>`__
-
-versions:
- - 2.0.1
- - 2.0.0
- - 1.0.2
- - 1.0.1
- - 1.0.0
-
-additional-dependencies:
- - apache-airflow>=2.1.0
-
-integrations:
- - integration-name: Presto
- external-doc-url: http://prestodb.github.io/
- logo: /integration-logos/presto/PrestoDB.png
- tags: [software]
-
-hooks:
- - integration-name: Presto
- python-modules:
- - airflow.providers.presto.hooks.presto
-
-hook-class-names: # deprecated - to be removed after providers add dependency
on Airflow 2.2.0+
- - airflow.providers.presto.hooks.presto.PrestoHook
-
-connection-types:
- - hook-class-name: airflow.providers.presto.hooks.presto.PrestoHook
- connection-type: presto
diff --git a/tests/providers/presto/transfers/test_gcs_presto.py
b/tests/providers/presto/transfers/test_gcs_presto.py
new file mode 100644
index 0000000..89631da
--- /dev/null
+++ b/tests/providers/presto/transfers/test_gcs_presto.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from unittest import mock
+
+from airflow.providers.presto.transfers.gcs_to_presto import
GCSToPrestoOperator
+
+BUCKET = "source_bucket"
+PATH = "path/to/file.csv"
+GCP_CONN_ID = "test_gcp"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+PRESTO_CONN_ID = "test_presto"
+PRESTO_TABLE = "test_table"
+TASK_ID = "test_gcs_to_presto"
+VALUES = [("colA", "colB", "colC"), (1, 2, 3)]
+
+
+class TestGCSToPrestoOperator(unittest.TestCase):
+ @mock.patch('airflow.providers.presto.transfers.gcs_to_presto.csv.reader')
+ @mock.patch('airflow.providers.presto.transfers.gcs_to_presto.PrestoHook')
+ @mock.patch("airflow.providers.presto.transfers.gcs_to_presto.GCSHook")
+
@mock.patch("airflow.providers.presto.transfers.gcs_to_presto.NamedTemporaryFile")
+ def test_execute(self, mock_tempfile, mock_gcs_hook, mock_presto_hook,
mock_reader):
+ filename = "file://97g23r"
+ file_handle = mock.MagicMock()
+ mock_tempfile.return_value.__enter__.return_value = file_handle
+ mock_tempfile.return_value.__enter__.return_value.name = filename
+ mock_reader.return_value = VALUES
+
+ op = GCSToPrestoOperator(
+ task_id=TASK_ID,
+ source_bucket=BUCKET,
+ source_object=PATH,
+ presto_table=PRESTO_TABLE,
+ presto_conn_id=PRESTO_CONN_ID,
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ op.execute(None)
+
+ mock_gcs_hook.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=None,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ mock_presto_hook.assert_called_once_with(presto_conn_id=PRESTO_CONN_ID)
+
+ mock_download = mock_gcs_hook.return_value.download
+
+ mock_download.assert_called_once_with(bucket_name=BUCKET,
object_name=PATH, filename=filename)
+
+ mock_insert = mock_presto_hook.return_value.insert_rows
+
+ fields = VALUES[0]
+ mock_insert.assert_called_once_with(table=PRESTO_TABLE,
rows=VALUES[1:], target_fields=fields)