This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 33a14198c1 Updates to Teradata Provider (#40378)
33a14198c1 is described below
commit 33a14198c106be4b0a89d67bd711b560b0f4251d
Author: Satish Chinthanippu <[email protected]>
AuthorDate: Sat Jun 22 02:50:41 2024 -0700
Updates to Teradata Provider (#40378)
Added support of teradata authorization object for cloud transfer operators
to teradata. (#46)
1. Added teradata authorization object for authorization in transfer
operators
2. Added security token support in s3toteradata transfer operator
---
airflow/providers/teradata/operators/teradata.py | 2 +-
.../teradata/transfers/azure_blob_to_teradata.py | 46 +++++++++----
.../providers/teradata/transfers/s3_to_teradata.py | 28 +++++---
.../operators/azure_blob_to_teradata.rst | 67 +++++++++++++++++-
.../operators/s3_to_teradata.rst | 10 ++-
.../example_azure_blob_to_teradata_transfer.py | 80 +++++++++++++++++++++-
.../teradata/example_s3_to_teradata_transfer.py | 79 +++++++++++++++++++--
7 files changed, 276 insertions(+), 36 deletions(-)
diff --git a/airflow/providers/teradata/operators/teradata.py
b/airflow/providers/teradata/operators/teradata.py
index 00cd7a86c7..c15fc29038 100644
--- a/airflow/providers/teradata/operators/teradata.py
+++ b/airflow/providers/teradata/operators/teradata.py
@@ -31,7 +31,7 @@ class TeradataOperator(SQLExecuteQueryOperator):
"""
General Teradata Operator to execute queries on Teradata Database.
- Executes sql statements in the Teradata SQL Database using teradatasql
jdbc driver
+ Executes sql statements in the Teradata SQL Database using Teradata Python
SQL Driver
.. seealso::
For more information on how to use this operator, take a look at the
guide:
diff --git a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py
b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py
index 416b4e7136..8fc95122f1 100644
--- a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py
+++ b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py
@@ -48,10 +48,17 @@ class AzureBlobStorageToTeradataOperator(BaseOperator):
The URI format is
`/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`.
Refer to
https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
+ :param public_bucket: Specifies whether the provided blob container is
public. If the blob container is public,
+ it means that anyone can access the objects within it via a URL
without requiring authentication.
+ If the bucket is private and authentication is not provided, the
operator will throw an exception.
:param azure_conn_id: The Airflow WASB connection used for azure blob
credentials.
:param teradata_table: The name of the teradata table to which the data is
transferred.(templated)
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`
+ :param teradata_authorization_name: The name of Teradata Authorization
Database Object,
+ is used to control who can access an Azure Blob object store.
+ Refer to
+
https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object
Note that ``blob_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
@@ -64,37 +71,48 @@ class AzureBlobStorageToTeradataOperator(BaseOperator):
self,
*,
blob_source_key: str,
+ public_bucket: bool = False,
azure_conn_id: str = "azure_default",
teradata_table: str,
teradata_conn_id: str = "teradata_default",
+ teradata_authorization_name: str = "",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.blob_source_key = blob_source_key
+ self.public_bucket = public_bucket
self.azure_conn_id = azure_conn_id
self.teradata_table = teradata_table
self.teradata_conn_id = teradata_conn_id
+ self.teradata_authorization_name = teradata_authorization_name
def execute(self, context: Context) -> None:
self.log.info(
"transferring data from %s to teradata table %s...",
self.blob_source_key, self.teradata_table
)
- azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id)
- conn = azure_hook.get_connection(self.azure_conn_id)
- # Obtaining the Azure client ID and Azure secret in order to access a
specified Blob container
- access_id = conn.login if conn.login is not None else ""
- access_secret = conn.password if conn.password is not None else ""
teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
+ credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''"
+ if not self.public_bucket:
+ # Accessing data directly from the Azure Blob Storage and creating
permanent table inside the
+ # database
+ if self.teradata_authorization_name:
+ credentials_part =
f"AUTHORIZATION={self.teradata_authorization_name}"
+ else:
+ # Obtaining the Azure client ID and Azure secret in order to
access a specified Blob container
+ azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id)
+ conn = azure_hook.get_connection(self.azure_conn_id)
+ access_id = conn.login
+ access_secret = conn.password
+ credentials_part = f"ACCESS_ID= '{access_id}' ACCESS_KEY=
'{access_secret}'"
sql = dedent(f"""
- CREATE MULTISET TABLE {self.teradata_table} AS
- (
- SELECT * FROM (
- LOCATION = '{self.blob_source_key}'
- ACCESS_ID= '{access_id}'
- ACCESS_KEY= '{access_secret}'
- ) AS d
- ) WITH DATA
- """).rstrip()
+ CREATE MULTISET TABLE {self.teradata_table} AS
+ (
+ SELECT * FROM (
+ LOCATION = '{self.blob_source_key}'
+ {credentials_part}
+ ) AS d
+ ) WITH DATA
+ """).rstrip()
try:
teradata_hook.run(sql, True)
except Exception as ex:
diff --git a/airflow/providers/teradata/transfers/s3_to_teradata.py
b/airflow/providers/teradata/transfers/s3_to_teradata.py
index f7998ea861..d0bca09165 100644
--- a/airflow/providers/teradata/transfers/s3_to_teradata.py
+++ b/airflow/providers/teradata/transfers/s3_to_teradata.py
@@ -53,6 +53,10 @@ class S3ToTeradataOperator(BaseOperator):
:param aws_conn_id: The Airflow AWS connection used for AWS credentials.
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`.
+ :param teradata_authorization_name: The name of Teradata Authorization
Database Object,
+ is used to control who can access an S3 object store.
+ Refer to
+
https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object
Note that ``s3_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
@@ -69,6 +73,7 @@ class S3ToTeradataOperator(BaseOperator):
teradata_table: str,
aws_conn_id: str = "aws_default",
teradata_conn_id: str = "teradata_default",
+ teradata_authorization_name: str = "",
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -77,6 +82,7 @@ class S3ToTeradataOperator(BaseOperator):
self.teradata_table = teradata_table
self.aws_conn_id = aws_conn_id
self.teradata_conn_id = teradata_conn_id
+ self.teradata_authorization_name = teradata_authorization_name
def execute(self, context: Context) -> None:
self.log.info(
@@ -84,20 +90,26 @@ class S3ToTeradataOperator(BaseOperator):
)
s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
- access_key = ""
- access_secret = ""
- if not self.public_bucket:
- credentials = s3_hook.get_credentials()
- access_key = credentials.access_key
- access_secret = credentials.secret_key
teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
+ credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''"
+ if not self.public_bucket:
+ # Accessing data directly from the S3 bucket and creating
permanent table inside the database
+ if self.teradata_authorization_name:
+ credentials_part =
f"AUTHORIZATION={self.teradata_authorization_name}"
+ else:
+ credentials = s3_hook.get_credentials()
+ access_key = credentials.access_key
+ access_secret = credentials.secret_key
+ credentials_part = f"ACCESS_ID= '{access_key}' ACCESS_KEY=
'{access_secret}'"
+ token = credentials.token
+ if token:
+ credentials_part = credentials_part + f" SESSION_TOKEN =
'{token}'"
sql = dedent(f"""
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.s3_source_key}'
- ACCESS_ID= '{access_key}'
- ACCESS_KEY= '{access_secret}'
+ {credentials_part}
) AS d
) WITH DATA
""").rstrip()
diff --git
a/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst
b/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst
index 0ee9a7bb32..194eabd0cd 100644
---
a/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst
+++
b/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst
@@ -26,8 +26,69 @@ AzureBlobStorageToTeradataOperator
The purpose of ``AzureBlobStorageToTeradataOperator`` is to define tasks
involving CSV, JSON and Parquet
format data transfer from an Azure Blob Storage to Teradata table.
Use the :class:`AzureBlobStorageToTeradataOperator
<airflow.providers.teradata.transfers.azure_blob_to_teradata>`
-to transfer data from an Azure Blob Storage to Teradata.
+to transfer data from an Azure Blob Storage to Teradata.This operator
leverages the Teradata
+`READ_NOS
<https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Welcome-to-Native-Object-Store>`_
feature
+to import data in CSV, JSON, and Parquet formats from Azure Blob Storage into
Teradata.
+This operator accesses data directly from the object store and generates
permanent tables
+within the database using READ_NOS and CREATE TABLE AS functionalities with
below SQL statement.
+.. code-block:: sql
+
+ CREATE MULTISET TABLE multiset_table_name AS (
+ SELECT *
+ FROM (
+ LOCATION='YOUR-OBJECT-STORE-URI'
+ AUTHORIZATION=authorization_object
+ ) AS d
+ ) WITH DATA;
+
+It facilitates data loading from both public and private object storage. For
private object storage, access to the object
+store can be granted via either Teradata Authorization database object or
Object Store Login and Object Store Key
+defined with Azure Blob Storage connection in Airflow. Conversely, for data
transfer from public object storage,
+no authorization or access credentials are required.
+
+* Teradata Authorization database object access type can be used with
``teradata_authorization_name`` parameter of
``AzureBlobStorageToTeradataOperator``
+* Object Store Access Key ID and Access Key Secret access type can be used
with ``azure_conn_id`` parameter of ``S3ToTeradataOperator``
+
+https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Setting-Access-Privileges
+
+.. note::
+ Teradata Authorization database object takes precedence if both access
types defined.
+
+Transferring data from public Azure Blob Storage to Teradata
+------------------------------------------------------------
+
+An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV
data format from public Azure Blob Storage to teradata table is as follows:
+
+.. exampleinclude::
/../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
+ :language: python
+ :start-after: [START
azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv]
+ :end-before: [END
azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv]
+
+Transferring data from private Azure Blob Storage to Teradata with AWS
connection
+---------------------------------------------------------------------------------
+
+An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV
data format from private S3 object store to teradata with AWS credentials
defined as
+AWS connection:
+
+.. exampleinclude::
/../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
+ :language: python
+ :start-after: [START
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv]
+ :end-before: [END
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv]
+
+Transferring data from private Azure Blob Storage to Teradata with Teradata
Authorization Object
+------------------------------------------------------------------------------------------------
+Teradata authorization database object is used to control who can access an
external object store. Teradata authorization
+database object should exists in Teradata database to use it in transferring
data from S3 to Teradata. Refer
+`Authentication for External Object Stores in Teradata
<https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Authentication-for-External-Object-Stores>`_
+
+An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV
data format from private S3 object store to teradata with
+Authorization database object defined in Teradata.
+
+.. exampleinclude::
/../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
+ :language: python
+ :start-after: [START
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv]
+ :end-before: [END
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv]
Transferring data in CSV format from Azure Blob Storage to Teradata
-------------------------------------------------------------------
@@ -37,8 +98,8 @@ to teradata table is as follows:
.. exampleinclude::
/../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
:language: python
- :start-after: [START
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
- :end-before: [END
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
+ :start-after: [START
azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv]
+ :end-before: [END
azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv]
Transferring data in JSON format from Azure Blob Storage to Teradata
--------------------------------------------------------------------
diff --git
a/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst
b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst
index a6ecbc6f14..da52e2841b 100644
--- a/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst
+++ b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst
@@ -30,7 +30,11 @@ READ_NOS is a table operator in Teradata Vantage that allows
users to list exter
For more details, see `READ_NOS Functionality
<https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Reading-Data/Examples-For-DBAs-and-Advanced-Users/Loading-External-Data-into-the-Database/Loading-External-Data-into-the-Database-Using-READ_NOS-and-CREATE-TABLE-AS>`_
Use the :class:`S3ToTeradataOperator
<airflow.providers.teradata.transfers.s3_to_teradata>`
-to transfer data from S3 to Teradata.
+to transfer data from S3 to Teradata. This operator leverages the Teradata
+`READ_NOS
<https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Welcome-to-Native-Object-Store>`_
feature
+to import data in CSV, JSON, and Parquet formats from S3 into Teradata.
+This operator accesses data directly from the object store and generates
permanent tables
+within the database using READ_NOS and CREATE TABLE AS functionalities with
below SQL statement.
.. note::
The current version of ``S3ToTeradataOperator`` does not support accessing
AWS S3 with Security Token Service (STS) temporary credentials. Instead, it
exclusively supports accessing with long-term credentials.
@@ -43,8 +47,8 @@ An example usage of the S3ToTeradataOperator to transfer CSV
data format from S3
.. exampleinclude::
/../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py
:language: python
- :start-after: [START
s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
- :end-before: [END
s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
+ :start-after: [START
s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
+ :end-before: [END
s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
Transferring data in JSON format from S3 to Teradata
----------------------------------------------------
diff --git
a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
index 5d961550de..bcb1dd2fe6 100644
--- a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
+++ b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
@@ -53,15 +53,17 @@ with DAG(
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
- # [START
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
+ # [START
azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv]
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
+ public_bucket=True,
teradata_table="example_blob_teradata_csv",
+ teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
- # [END
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
+ # [END
azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv]
# [START
azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
read_data_table_csv = TeradataOperator(
task_id="read_data_table_csv",
@@ -74,11 +76,75 @@ with DAG(
sql="DROP TABLE example_blob_teradata_csv;",
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv]
+ # [START
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv]
+ transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
+ task_id="transfer_key_data_blob_to_teradata_csv",
+ blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
+ teradata_table="example_blob_teradata_csv",
+ azure_conn_id="wasb_default",
+ teradata_conn_id="teradata_default",
+ trigger_rule="all_done",
+ )
+ # [END
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv]
+ # [START
azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
+ read_key_data_table_csv = TeradataOperator(
+ task_id="read_key_data_table_csv",
+ conn_id=CONN_ID,
+ sql="SELECT count(1) from example_blob_teradata_csv;",
+ )
+ # [END
azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
+ # [START
azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv]
+ drop_key_table_csv = TeradataOperator(
+ task_id="drop_key_table_csv",
+ conn_id=CONN_ID,
+ sql="DROP TABLE example_blob_teradata_csv;",
+ )
+ # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv]
+ # [START
azure_blob_to_teradata_transfer_operator_howto_guide_create_authorization]
+ create_azure_authorization = TeradataOperator(
+ task_id="create_azure_authorization",
+ conn_id=CONN_ID,
+ sql="CREATE AUTHORIZATION azure_authorization USER '{{
var.value.get('AZURE_BLOB_ACCOUNTNAME') }}' PASSWORD '{{
var.value.get('AZURE_BLOB_ACCOUNT_SECRET_KEY') }}' ",
+ )
+ # [END
azure_blob_to_teradata_transfer_operator_howto_guide_create_authorization]
+ # [START
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv]
+ transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
+ task_id="transfer_auth_data_blob_to_teradata_csv",
+ blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
+ teradata_table="example_blob_teradata_csv",
+ teradata_authorization_name="azure_authorization",
+ teradata_conn_id="teradata_default",
+ trigger_rule="all_done",
+ )
+ # [END
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv]
+ # [START
azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
+ read_auth_data_table_csv = TeradataOperator(
+ task_id="read_auth_data_table_csv",
+ conn_id=CONN_ID,
+ sql="SELECT count(1) from example_blob_teradata_csv;",
+ )
+ # [END
azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
+ # [START
azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv]
+ drop_auth_table_csv = TeradataOperator(
+ task_id="drop_auth_table_csv",
+ conn_id=CONN_ID,
+ sql="DROP TABLE example_blob_teradata_csv;",
+ )
+ # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv]
+ # [START
azure_blob_to_teradata_transfer_operator_howto_guide_drop_authorization]
+ drop_auth = TeradataOperator(
+ task_id="drop_auth",
+ conn_id=CONN_ID,
+ sql="DROP AUTHORIZATION azure_authorization;",
+ )
+ # [END
azure_blob_to_teradata_transfer_operator_howto_guide_drop_authorization]
# [START
azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json]
transfer_data_json = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_json",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_json",
+ public_bucket=True,
+ teradata_conn_id="teradata_default",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
@@ -100,7 +166,7 @@ with DAG(
task_id="transfer_data_blob_to_teradata_parquet",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
teradata_table="example_blob_teradata_parquet",
- azure_conn_id="wasb_default",
+ public_bucket=True,
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
@@ -128,6 +194,14 @@ with DAG(
>> drop_table_csv
>> drop_table_json
>> drop_table_parquet
+ >> transfer_key_data_csv
+ >> read_key_data_table_csv
+ >> drop_key_table_csv
+ >> create_azure_authorization
+ >> transfer_auth_data_csv
+ >> read_auth_data_table_csv
+ >> drop_auth_table_csv
+ >> drop_auth
)
# [END azure_blob_to_teradata_transfer_operator_howto_guide]
diff --git a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py
b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py
index fc5e262739..ae8b827c1e 100644
--- a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py
+++ b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py
@@ -54,7 +54,7 @@ with DAG(
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
- # [START
s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
+ # [START
s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
transfer_data_csv = S3ToTeradataOperator(
task_id="transfer_data_s3_to_teradata_csv",
s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/",
@@ -63,7 +63,7 @@ with DAG(
aws_conn_id="aws_default",
trigger_rule="all_done",
)
- # [END
s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
+ # [END
s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
# [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
read_data_table_csv = TeradataOperator(
task_id="read_data_table_csv",
@@ -78,6 +78,68 @@ with DAG(
sql="DROP TABLE example_s3_teradata_csv;",
)
# [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
+ # [START
s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv]
+ transfer_key_data_csv = S3ToTeradataOperator(
+ task_id="transfer_key_data_s3_to_teradata_key_csv",
+
s3_source_key="/s3/airflowteradatatest.s3.ap-southeast-2.amazonaws.com/",
+ teradata_table="example_s3_teradata_csv",
+ aws_conn_id="aws_default",
+ teradata_conn_id="teradata_default",
+ trigger_rule="all_done",
+ )
+ # [END
s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv]
+ # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
+ read_key_data_table_csv = TeradataOperator(
+ task_id="read_key_data_table_csv",
+ conn_id=CONN_ID,
+ sql="SELECT * from example_s3_teradata_csv;",
+ )
+ # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
+ # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
+ drop_key_table_csv = TeradataOperator(
+ task_id="drop_key_table_csv",
+ conn_id=CONN_ID,
+ sql="DROP TABLE example_s3_teradata_csv;",
+ )
+ # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
+ # [START s3_to_teradata_transfer_operator_howto_guide_create_authorization]
+ create_aws_authorization = TeradataOperator(
+ task_id="create_aws_authorization",
+ conn_id=CONN_ID,
+ sql="CREATE AUTHORIZATION aws_authorization USER '{{
var.value.get('AWS_ACCESS_KEY_ID') }}' PASSWORD '{{
var.value.get('AWS_SECRET_ACCESS_KEY') }}' ",
+ )
+ # [END s3_to_teradata_transfer_operator_howto_guide_create_authorization]
+ # [START
s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv]
+ transfer_auth_data_csv = S3ToTeradataOperator(
+ task_id="transfer_auth_data_s3_to_teradata_auth_csv",
+
s3_source_key="/s3/teradata-download.s3.us-east-1.amazonaws.com/DevTools/csv/",
+ teradata_table="example_s3_teradata_csv",
+ teradata_authorization_name="aws_authorization",
+ teradata_conn_id="teradata_default",
+ trigger_rule="all_done",
+ )
+ # [END
s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv]
+ # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
+ read_auth_data_table_csv = TeradataOperator(
+ task_id="read_auth_data_table_csv",
+ conn_id=CONN_ID,
+ sql="SELECT * from example_s3_teradata_csv;",
+ )
+ # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
+ # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
+ drop_auth_table_csv = TeradataOperator(
+ task_id="drop_auth_table_csv",
+ conn_id=CONN_ID,
+ sql="DROP TABLE example_s3_teradata_csv;",
+ )
+ # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
+ # [START s3_to_teradata_transfer_operator_howto_guide_drop_authorization]
+ drop_auth = TeradataOperator(
+ task_id="drop_auth",
+ conn_id=CONN_ID,
+ sql="DROP AUTHORIZATION aws_authorization;",
+ )
+ # [END s3_to_teradata_transfer_operator_howto_guide_drop_authorization]
# [START
s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]
transfer_data_json = S3ToTeradataOperator(
task_id="transfer_data_s3_to_teradata_json",
@@ -116,12 +178,13 @@ with DAG(
sql="SELECT * from example_s3_teradata_parquet;",
)
# [END
s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
- # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet]
+ # [START s3_to_teradata_transfer_operator_howto_guide_drop_table]
drop_table_parquet = TeradataOperator(
task_id="drop_table_parquet",
sql="DROP TABLE example_s3_teradata_parquet;",
)
- # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet]
+
+ # [END s3_to_teradata_transfer_operator_howto_guide_drop_table]
(
transfer_data_csv
>> transfer_data_json
@@ -132,6 +195,14 @@ with DAG(
>> drop_table_csv
>> drop_table_json
>> drop_table_parquet
+ >> transfer_key_data_csv
+ >> read_key_data_table_csv
+ >> drop_key_table_csv
+ >> create_aws_authorization
+ >> transfer_auth_data_csv
+ >> read_auth_data_table_csv
+ >> drop_auth_table_csv
+ >> drop_auth
)
# [END s3_to_teradata_transfer_operator_howto_guide]