This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d739a989cd2 Add TdLoadOperator to execute data transfers using
Teradata Parallel Transporter (TPT) tdload utility (#58918)
d739a989cd2 is described below
commit d739a989cd214d2ab0508b727745068b0a71ca55
Author: Satya Gopu <[email protected]>
AuthorDate: Thu Dec 11 03:09:36 2025 +0530
Add TdLoadOperator to execute data transfers using Teradata Parallel
Transporter (TPT) tdload utility (#58918)
* override sqlalchemy_url and get_uri for SQLAlchemy
* Implement TPT TdLoad Operator
* Add tdload_src_file.txt to rat-excludes
* Remove log separator lines for consistency with project logging standards
---------
Co-authored-by: Satish Chinthanippu <[email protected]>
---
.rat-excludes | 1 +
docs/spelling_wordlist.txt | 1 +
providers/teradata/docs/operators/tpt.rst | 133 ++-
.../src/airflow/providers/teradata/hooks/tpt.py | 245 ++++-
.../airflow/providers/teradata/operators/tpt.py | 436 +++++++++
.../airflow/providers/teradata/utils/tpt_util.py | 146 +++
.../tests/system/teradata/example_remote_tpt.py | 111 ++-
.../teradata/tests/system/teradata/example_tpt.py | 108 ++-
.../tests/system/teradata/tdload_src_file.txt | 1000 ++++++++++++++++++++
.../teradata/tests/unit/teradata/hooks/test_tpt.py | 284 ++++++
.../tests/unit/teradata/operators/test_tpt.py | 591 +++++++++++-
.../tests/unit/teradata/utils/test_tpt_util.py | 262 +++++
12 files changed, 3297 insertions(+), 21 deletions(-)
diff --git a/.rat-excludes b/.rat-excludes
index 47438f0fe90..ff0d114fe86 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -291,6 +291,7 @@ www-hash.txt
**/rtd-deprecation/404.html
**/.env
**/*.jsonl
+**/tdload_src_file.txt
# API files
**/_api/**
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index de8c0cabf24..e28a58f4dd7 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1825,6 +1825,7 @@ tblproperties
tbuild
TCP
tcp
+tdload
teardown
teardowns
templatable
diff --git a/providers/teradata/docs/operators/tpt.rst
b/providers/teradata/docs/operators/tpt.rst
index 85b2eaf74b8..97fef527de8 100644
--- a/providers/teradata/docs/operators/tpt.rst
+++ b/providers/teradata/docs/operators/tpt.rst
@@ -169,6 +169,135 @@ You can use the DdlOperator to alter a table in Teradata.
The following example
:start-after: [START ddl_operator_howto_guide_alter_table]
:end-before: [END ddl_operator_howto_guide_alter_table]
+
+The complete Teradata Operator DAG
+----------------------------------
+
+When we put everything together, our DAG should look like this:
+
+.. exampleinclude:: /../../teradata/tests/system/teradata/example_tpt.py
+ :language: python
+ :start-after: [START tdload_operator_howto_guide]
+ :end-before: [END tdload_operator_howto_guide]
+
+
+.. _howto/operator:TdLoadOperator:
+
+TdLoadOperator
+==============
+
+The ``TdLoadOperator`` is an Airflow operator that interfaces with Teradata PT
Easy Loader (tdload) to perform data operations on Teradata databases. This
operator leverages TPT (Teradata Parallel Transporter), eliminating the need to
write manual TPT scripts.
+
+**What is Teradata PT Easy Loader?**
+A command-line interface extension for TPT that automatically determines
appropriate load/unload operators based on user-provided parameters and the
requested operation type.
+
+.. note::
+
+ The ``TdLoadOperator`` requires the ``Teradata Parallel Transporter
(TPT)`` package from Teradata Tools and Utilities (TTU)
+ to be installed on the machine where the ``tdload`` command will run
(either local or remote).
+ Ensure that the ``tdload`` executable is available in the system's
``PATH``.
+ Refer to the official Teradata documentation for installation and
configuration details.
+
+**Key Capabilities:**
+
+- **Data Loading:** Import data from flat files into Teradata tables
+- **Data Exporting:** Extract data from Teradata tables to flat files
+- **Table-to-Table Transfers:** Move data between Teradata database tables
+- **Deployment Flexibility:** Execute on local or remote machines with TPT
installed
+- **Airflow Integration:** Seamlessly works with Airflow's scheduling,
monitoring, and logging
+
+The operator simplifies complex Teradata data operations while providing the
robustness and reliability of Airflow's workflow management.
+
+This operator enables the execution of tdload commands on either the local
host machine or a remote machine where TPT is installed.
+
+Ensure that the ``Teradata Parallel Transporter (TPT)`` package is installed
on the machine where TdLoadOperator will execute commands. This can be:
+
+- The **local machine** where Airflow runs the task, for local execution.
+- A **remote host** accessed via SSH, for remote execution.
+
+If executing remotely, ensure that an SSH server (e.g., ``sshd``) is running
and accessible on the remote machine, and that the ``tdload`` executable is
available in the system's ``PATH``.
+
+.. note::
+
+ For improved security, it is **highly recommended** to use
+ **private key-based SSH authentication** (SSH key pairs) instead of
username/password
+ for the SSH connection.
+
+ This avoids password exposure, enables seamless automated execution, and
enhances security.
+
+ See the Airflow SSH Connection documentation for details on configuring
SSH keys:
+
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection/ssh.html
+
+
+To execute data loading, exporting, or transferring operations in a Teradata
database, use the
+:class:`~airflow.providers.teradata.operators.tpt.TdLoadOperator`.
+
+Prerequisite
+------------
+
+Make sure your Teradata Airflow connection is defined with the required fields:
+
+- ``host``
+- ``login``
+- ``password``
+
+You can define a remote host with a separate SSH connection using the
``ssh_conn_id``.
+
+Key Operation Examples with TdLoadOperator
+------------------------------------------
+
+Loading data into a Teradata database table from a file
+-------------------------------------------------------
+You can use the TdLoadOperator to load data from a file into a Teradata
database table. The following example demonstrates how to load data from a
delimited text file into a Teradata table:
+
+.. exampleinclude:: /../../teradata/tests/system/teradata/example_tpt.py
+ :language: python
+ :dedent: 4
+ :start-after: [START tdload_operator_howto_guide_load_from_file]
+ :end-before: [END tdload_operator_howto_guide_load_from_file]
+
+Exporting data from a Teradata table to a file
+----------------------------------------------
+You can export data from a Teradata table to a file using the TdLoadOperator.
The following example shows how to export data from a Teradata table to a
delimited file:
+
+.. exampleinclude:: /../../teradata/tests/system/teradata/example_tpt.py
+ :language: python
+ :dedent: 4
+ :start-after: [START tdload_operator_howto_guide_export_data]
+ :end-before: [END tdload_operator_howto_guide_export_data]
+
+Transferring data between Teradata tables
+-----------------------------------------
+The TdLoadOperator can also be used to transfer data between two Teradata
tables, potentially across different databases:
+
+.. exampleinclude:: /../../teradata/tests/system/teradata/example_tpt.py
+ :language: python
+ :dedent: 4
+ :start-after: [START tdload_operator_howto_guide_transfer_data]
+ :end-before: [END tdload_operator_howto_guide_transfer_data]
+
+Transferring data using a SELECT statement as source
+----------------------------------------------------
+You can use a SELECT statement as the data source for TdLoadOperator, allowing
for flexible data movement and transformation:
+
+.. exampleinclude:: /../../teradata/tests/system/teradata/example_tpt.py
+ :language: python
+ :dedent: 4
+ :start-after: [START tdload_operator_howto_guide_transfer_data_select_stmt]
+ :end-before: [END tdload_operator_howto_guide_transfer_data_select_stmt]
+
+Transferring data using an INSERT statement as target
+-----------------------------------------------------
+You can use an INSERT statement as the target for TdLoadOperator, enabling
custom insert logic:
+
+.. exampleinclude:: /../../teradata/tests/system/teradata/example_tpt.py
+ :language: python
+ :dedent: 4
+ :start-after: [START tdload_operator_howto_guide_transfer_data_insert_stmt]
+ :end-before: [END tdload_operator_howto_guide_transfer_data_insert_stmt]
+
+
+
The complete Teradata Operator DAG
----------------------------------
@@ -176,5 +305,5 @@ When we put everything together, our DAG should look like
this:
.. exampleinclude:: /../../teradata/tests/system/teradata/example_tpt.py
:language: python
- :start-after: [START ddl_operator_howto_guide]
- :end-before: [END ddl_operator_howto_guide]
+ :start-after: [START tdload_operator_howto_guide]
+ :end-before: [END tdload_operator_howto_guide]
diff --git a/providers/teradata/src/airflow/providers/teradata/hooks/tpt.py
b/providers/teradata/src/airflow/providers/teradata/hooks/tpt.py
index a6dbeab8603..4cb30c82a94 100644
--- a/providers/teradata/src/airflow/providers/teradata/hooks/tpt.py
+++ b/providers/teradata/src/airflow/providers/teradata/hooks/tpt.py
@@ -54,11 +54,12 @@ class TptHook(TtuHook):
Hook for executing Teradata Parallel Transporter (TPT) operations.
This hook provides methods to execute TPT operations both locally and
remotely via SSH.
- It supports DDL operations using tbuild utility. It extends the `TtuHook`
and integrates
- with Airflow's SSHHook for remote execution.
+ It supports DDL operations using tbuild utility. and data loading
operations using tdload.
+ It extends the `TtuHook` and integrates with Airflow's SSHHook for remote
execution.
The TPT operations are used to interact with Teradata databases for DDL
operations
- such as creating, altering, or dropping tables.
+ such as creating, altering, or dropping tables and high-performance data
loading and
+ DDL operations.
Features:
- Supports both local and remote execution of TPT operations.
@@ -154,9 +155,7 @@ class TptHook(TtuHook):
set_remote_file_permissions(ssh_client,
remote_script_file, logging.getLogger(__name__))
tbuild_cmd = ["tbuild", "-f", remote_script_file, job_name]
- self.log.info("=" * 80)
self.log.info("Executing tbuild command on remote server:
%s", " ".join(tbuild_cmd))
- self.log.info("=" * 80)
exit_status, output, error =
execute_remote_command(ssh_client, " ".join(tbuild_cmd))
self.log.info("tbuild command output:\n%s", output)
self.log.info("tbuild command exited with status %s",
exit_status)
@@ -212,9 +211,7 @@ class TptHook(TtuHook):
sp = None
try:
- self.log.info("=" * 80)
self.log.info("Executing tbuild command: %s", "
".join(tbuild_cmd))
- self.log.info("=" * 80)
sp = subprocess.Popen(
tbuild_cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, start_new_session=True
)
@@ -240,6 +237,240 @@ class TptHook(TtuHook):
secure_delete(local_script_file, logging.getLogger(__name__))
terminate_subprocess(sp, logging.getLogger(__name__))
+ def execute_tdload(
+ self,
+ remote_working_dir: str,
+ job_var_content: str | None = None,
+ tdload_options: str | None = None,
+ tdload_job_name: str | None = None,
+ ) -> int:
+ """
+ Execute a tdload operation using the tdload command-line utility.
+
+ Args:
+ remote_working_dir: Remote working directory for SSH execution
+ job_var_content: Content of the job variable file
+ tdload_options: Additional command-line options for tdload
+ tdload_job_name: Name for the tdload job
+
+ Returns:
+ Exit code from the tdload operation
+
+ Raises:
+ RuntimeError: Non-zero tdload exit status or unexpected execution
failure
+ ConnectionError: SSH connection not established or fails
+ TimeoutError: SSH connection/network timeout
+ FileNotFoundError: tdload binary not found in PATH
+ """
+ tdload_job_name = tdload_job_name or f"tdload_job_{uuid.uuid4().hex}"
+ if self.ssh_hook:
+ self.log.info("Executing tdload via SSH on remote host with job
name: %s", tdload_job_name)
+ return self._execute_tdload_via_ssh(
+ remote_working_dir, job_var_content, tdload_options,
tdload_job_name
+ )
+ self.log.info("Executing tdload locally with job name: %s",
tdload_job_name)
+ return self._execute_tdload_locally(job_var_content, tdload_options,
tdload_job_name)
+
+ def _execute_tdload_via_ssh(
+ self,
+ remote_working_dir: str,
+ job_var_content: str | None,
+ tdload_options: str | None,
+ tdload_job_name: str | None,
+ ) -> int:
+ """
+ Write job_var_content to a temporary file, then transfer and execute
it on the remote host.
+
+ Args:
+ remote_working_dir: Remote working directory
+ job_var_content: Content for the job variable file
+ tdload_options: Additional tdload command options
+ tdload_job_name: Name for the tdload job
+
+ Returns:
+ Exit code from the tdload operation
+ """
+ with self.preferred_temp_directory() as tmp_dir:
+ local_job_var_file = os.path.join(tmp_dir,
f"tdload_job_var_{uuid.uuid4().hex}.txt")
+ write_file(local_job_var_file, job_var_content or "")
+ return self._transfer_to_and_execute_tdload_on_remote(
+ local_job_var_file, remote_working_dir, tdload_options,
tdload_job_name
+ )
+
+ def _transfer_to_and_execute_tdload_on_remote(
+ self,
+ local_job_var_file: str,
+ remote_working_dir: str,
+ tdload_options: str | None,
+ tdload_job_name: str | None,
+ ) -> int:
+ """Transfer job variable file to remote host and execute tdload
command."""
+ encrypted_file_path = f"{local_job_var_file}.enc"
+ remote_encrypted_job_file = os.path.join(remote_working_dir,
os.path.basename(encrypted_file_path))
+ remote_job_file = os.path.join(remote_working_dir,
os.path.basename(local_job_var_file))
+
+ try:
+ if not self.ssh_hook:
+ raise ConnectionError("SSH connection is not established.
`ssh_hook` is None or invalid.")
+ with self.ssh_hook.get_conn() as ssh_client:
+ verify_tpt_utility_on_remote_host(ssh_client, "tdload",
logging.getLogger(__name__))
+ password = generate_random_password()
+ generate_encrypted_file_with_openssl(local_job_var_file,
password, encrypted_file_path)
+ transfer_file_sftp(
+ ssh_client, encrypted_file_path,
remote_encrypted_job_file, logging.getLogger(__name__)
+ )
+ decrypt_remote_file(
+ ssh_client,
+ remote_encrypted_job_file,
+ remote_job_file,
+ password,
+ logging.getLogger(__name__),
+ )
+
+ set_remote_file_permissions(ssh_client, remote_job_file,
logging.getLogger(__name__))
+
+ # Build tdload command more robustly
+ tdload_cmd = self._build_tdload_command(remote_job_file,
tdload_options, tdload_job_name)
+
+ self.log.info("Executing tdload command on remote server: %s",
" ".join(tdload_cmd))
+ exit_status, output, error =
execute_remote_command(ssh_client, " ".join(tdload_cmd))
+ self.log.info("tdload command output:\n%s", output)
+ self.log.info("tdload command exited with status %s",
exit_status)
+
+ # Clean up remote files before checking exit status
+ remote_secure_delete(
+ ssh_client, [remote_encrypted_job_file, remote_job_file],
logging.getLogger(__name__)
+ )
+
+ if exit_status != 0:
+ raise RuntimeError(f"tdload command failed with exit code
{exit_status}: {error}")
+
+ return exit_status
+ except ConnectionError:
+ # Re-raise ConnectionError as-is (don't convert to TimeoutError)
+ raise
+ except (OSError, socket.gaierror) as e:
+ self.log.error("SSH connection timed out: %s", str(e))
+ raise TimeoutError(
+ "SSH connection timed out. Please check the network or server
availability."
+ ) from e
+ except SSHException as e:
+ raise ConnectionError(f"SSH error during connection: {str(e)}")
from e
+ except RuntimeError:
+ raise
+ except Exception as e:
+ raise RuntimeError(
+ f"Unexpected error while executing tdload script on remote
machine: {str(e)}"
+ ) from e
+ finally:
+ # Clean up local files
+ secure_delete(encrypted_file_path, logging.getLogger(__name__))
+ secure_delete(local_job_var_file, logging.getLogger(__name__))
+
+ def _execute_tdload_locally(
+ self,
+ job_var_content: str | None,
+ tdload_options: str | None,
+ tdload_job_name: str | None,
+ ) -> int:
+ """
+ Execute tdload command locally.
+
+ Args:
+ job_var_content: Content for the job variable file
+ tdload_options: Additional tdload command options
+ tdload_job_name: Name for the tdload job
+
+ Returns:
+ Exit code from the tdload operation
+ """
+ with self.preferred_temp_directory() as tmp_dir:
+ local_job_var_file = os.path.join(tmp_dir,
f"tdload_job_var_{uuid.uuid4().hex}.txt")
+ write_file(local_job_var_file, job_var_content or "")
+
+ # Set file permission to read-only for the current user (no
permissions for group/others)
+ set_local_file_permissions(local_job_var_file,
logging.getLogger(__name__))
+
+ # Log file permissions for debugging purposes
+ file_permissions = oct(os.stat(local_job_var_file).st_mode & 0o777)
+ self.log.debug("Local job variable file permissions: %s",
file_permissions)
+
+ # Build tdload command
+ tdload_cmd = self._build_tdload_command(local_job_var_file,
tdload_options, tdload_job_name)
+
+ if not shutil.which("tdload"):
+ raise FileNotFoundError("tdload binary not found in PATH.")
+
+ sp = None
+ try:
+ # Print a visual separator for clarity in logs
+ self.log.info("Executing tdload command: %s", "
".join(tdload_cmd))
+ sp = subprocess.Popen(
+ tdload_cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, start_new_session=True
+ )
+ error_lines = []
+ if sp.stdout is not None:
+ for line in iter(sp.stdout.readline, b""):
+ decoded_line = line.decode("UTF-8").strip()
+ self.log.info(decoded_line)
+ if "error" in decoded_line.lower():
+ error_lines.append(decoded_line)
+ sp.wait()
+ self.log.info("tdload command exited with return code %s",
sp.returncode)
+ if sp.returncode != 0:
+ error_msg = "\n".join(error_lines) if error_lines else ""
+ if error_msg:
+ raise RuntimeError(
+ f"tdload command failed with return code
{sp.returncode}:\n{error_msg}"
+ )
+ raise RuntimeError(f"tdload command failed with return
code {sp.returncode}")
+ return sp.returncode
+ except RuntimeError:
+ raise
+ except Exception as e:
+ self.log.error("Error executing tdload command: %s", str(e))
+ raise RuntimeError(f"Error executing tdload command:
{str(e)}") from e
+ finally:
+ secure_delete(local_job_var_file, logging.getLogger(__name__))
+ terminate_subprocess(sp, logging.getLogger(__name__))
+
+ def _build_tdload_command(
+ self, job_var_file: str, tdload_options: str | None, tdload_job_name:
str | None
+ ) -> list[str]:
+ """
+ Build the tdload command with proper option handling.
+
+ Args:
+ job_var_file: Path to the job variable file
+ tdload_options: Additional tdload options as a space-separated
string
+ tdload_job_name: Name for the tdload job
+
+ Returns:
+ List of command arguments for tdload
+ """
+ tdload_cmd = ["tdload", "-j", job_var_file]
+
+ # Add tdload_options if provided, with proper handling of quoted
options
+ if tdload_options:
+ # Split options while preserving quoted arguments
+ import shlex
+
+ try:
+ parsed_options = shlex.split(tdload_options)
+ tdload_cmd.extend(parsed_options)
+ except ValueError as e:
+ self.log.warning(
+ "Failed to parse tdload_options using shlex, falling back
to simple split: %s", str(e)
+ )
+ # Fallback to simple split if shlex parsing fails
+ tdload_cmd.extend(tdload_options.split())
+
+ # Add job name if provided (and not empty)
+ if tdload_job_name:
+ tdload_cmd.append(tdload_job_name)
+
+ return tdload_cmd
+
def on_kill(self) -> None:
"""
Handle cleanup when the task is killed.
diff --git a/providers/teradata/src/airflow/providers/teradata/operators/tpt.py
b/providers/teradata/src/airflow/providers/teradata/operators/tpt.py
index 5a592c17947..51f5f3d047b 100644
--- a/providers/teradata/src/airflow/providers/teradata/operators/tpt.py
+++ b/providers/teradata/src/airflow/providers/teradata/operators/tpt.py
@@ -21,7 +21,11 @@ from typing import TYPE_CHECKING
from airflow.providers.teradata.utils.tpt_util import (
get_remote_temp_directory,
+ is_valid_file,
+ is_valid_remote_job_var_file,
+ prepare_tdload_job_var_file,
prepare_tpt_ddl_script,
+ read_file,
)
if TYPE_CHECKING:
@@ -29,6 +33,7 @@ if TYPE_CHECKING:
from airflow.sdk.definitions.context import Context
except ImportError:
from airflow.utils.context import Context
+ from paramiko import SSHClient
from airflow.models import BaseOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
@@ -205,3 +210,434 @@ class DdlOperator(BaseOperator):
self.log.error("Error cleaning up TPT DDL hook: %s", str(e))
else:
self.log.warning("No TptHook initialized to clean up on task kill")
+
+
+class TdLoadOperator(BaseOperator):
+ """
+ Operator to handle data transfers using Teradata Parallel Transporter
(TPT) tdload utility.
+
+ This operator supports three main scenarios:
+ 1. Load data from a file to a Teradata table
+ 2. Export data from a Teradata table to a file
+ 3. Transfer data between two Teradata tables (potentially across different
databases)
+
+ For all scenarios:
+ :param teradata_conn_id: Connection ID for Teradata database (source
for table operations)
+
+ For file to table loading:
+ :param source_file_name: Path to the source file (required for file to
table)
+ :param select_stmt: SQL SELECT statement to filter data (optional)
+ :param insert_stmt: SQL INSERT statement to use for loading data
(optional)
+ :param target_table: Name of the target table (required for file to
table)
+ :param target_teradata_conn_id: Connection ID for target Teradata
database (defaults to teradata_conn_id)
+
+ For table to file export:
+ :param source_table: Name of the source table (required for table to
file)
+ :param target_file_name: Path to the target file (required for table
to file)
+
+ For table to table transfer:
+ :param source_table: Name of the source table (required for table to
table)
+ :param select_stmt: SQL SELECT statement to filter data (optional)
+ :param insert_stmt: SQL INSERT statement to use for loading data
(optional)
+ :param target_table: Name of the target table (required for table to
table)
+ :param target_teradata_conn_id: Connection ID for target Teradata
database (required for table to table)
+
+ Optional configuration parameters:
+ :param source_format: Format of source data (default: 'Delimited')
+ :param target_format: Format of target data (default: 'Delimited')
+ :param source_text_delimiter: Source text delimiter (default: ',')
+ :param target_text_delimiter: Target text delimiter (default: ',')
+ :param tdload_options: Additional options for tdload (optional)
+ :param tdload_job_name: Name for the tdload job (optional)
+ :param tdload_job_var_file: Path to tdload job variable file (optional)
+ :param ssh_conn_id: SSH connection ID for secure file transfer
(optional, used for file operations)
+
+ :raises ValueError: If parameter combinations are invalid or required
files are missing.
+ :raises RuntimeError: If underlying TPT execution (tdload) fails with
non-zero exit status.
+ :raises ConnectionError: If remote SSH connection cannot be established.
+ :raises TimeoutError: If SSH connection attempt times out.
+ :raises FileNotFoundError: If required TPT utility (tdload) is missing
locally or on remote host.
+
+ Example usage::
+
+ # Example usage for file to table:
+ load_file = TdLoadOperator(
+ task_id="load_from_file",
+ source_file_name="/path/to/data.csv",
+ target_table="my_database.my_table",
+ target_teradata_conn_id="teradata_target_conn",
+ insert_stmt="INSERT INTO my_database.my_table (col1, col2) VALUES
(?, ?)",
+ )
+
+ # Example usage for table to file:
+ export_data = TdLoadOperator(
+ task_id="export_to_file",
+ source_table="my_database.my_table",
+ target_file_name="/path/to/export.csv",
+ teradata_conn_id="teradata_source_conn",
+ ssh_conn_id="ssh_default",
+ tdload_job_name="export_job",
+ )
+
+ # Example usage for table to table:
+ transfer_data = TdLoadOperator(
+ task_id="transfer_between_tables",
+ source_table="source_db.source_table",
+ target_table="target_db.target_table",
+ teradata_conn_id="teradata_source_conn",
+ target_teradata_conn_id="teradata_target_conn",
+ tdload_job_var_file="/path/to/vars.txt",
+ insert_stmt="INSERT INTO target_db.target_table (col1, col2)
VALUES (?, ?)",
+ )
+
+
+ """
+
+ template_fields = (
+ "source_table",
+ "target_table",
+ "select_stmt",
+ "insert_stmt",
+ "source_file_name",
+ "target_file_name",
+ "tdload_options",
+ )
+ ui_color = "#a8e4b1"
+
+ def __init__(
+ self,
+ *,
+ teradata_conn_id: str = TeradataHook.default_conn_name,
+ target_teradata_conn_id: str | None = None,
+ ssh_conn_id: str | None = None,
+ source_table: str | None = None,
+ select_stmt: str | None = None,
+ insert_stmt: str | None = None,
+ target_table: str | None = None,
+ source_file_name: str | None = None,
+ target_file_name: str | None = None,
+ source_format: str = "Delimited",
+ target_format: str = "Delimited",
+ source_text_delimiter: str = ",",
+ target_text_delimiter: str = ",",
+ tdload_options: str | None = None,
+ tdload_job_name: str | None = None,
+ tdload_job_var_file: str | None = None,
+ remote_working_dir: str | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.teradata_conn_id = teradata_conn_id
+ self.target_teradata_conn_id = target_teradata_conn_id
+ self.ssh_conn_id = ssh_conn_id
+ self.source_table = source_table
+ self.select_stmt = select_stmt
+ self.insert_stmt = insert_stmt
+ self.target_table = target_table
+ self.source_file_name = source_file_name
+ self.target_file_name = target_file_name
+ self.source_format = source_format
+ self.source_text_delimiter = source_text_delimiter
+ self.target_format = target_format
+ self.target_text_delimiter = target_text_delimiter
+ self.tdload_options = tdload_options
+ self.tdload_job_name = tdload_job_name
+ self.tdload_job_var_file = tdload_job_var_file
+ self.remote_working_dir = remote_working_dir
+ self._src_hook: TptHook | None = None
+ self._dest_hook: TptHook | None = None
+
+ def execute(self, context: Context) -> int | None:
+ """Execute the TdLoad operation based on the configured parameters."""
+ # Validate parameter combinations
+ mode = self._validate_and_determine_mode()
+
+ # Initialize hooks
+ self._initialize_hooks(mode)
+
+ try:
+ # Prepare job variable file content if not provided
+ tdload_job_var_content = None
+ tdload_job_var_file = self.tdload_job_var_file
+
+ if not tdload_job_var_file:
+ tdload_job_var_content = self._prepare_job_var_content(mode)
+ self.log.info("Prepared tdload job variable content for mode
'%s'", mode)
+
+ # Set remote working directory if SSH is used
+ if self._ssh_hook and not self.remote_working_dir:
+ self.remote_working_dir = get_remote_temp_directory(
+ self._ssh_hook.get_conn(), logging.getLogger(__name__)
+ )
+ # Ensure remote_working_dir is always a str
+ if not self.remote_working_dir:
+ self.remote_working_dir = "/tmp"
+
+ # Execute based on SSH availability and job var file source
+ return self._execute_based_on_configuration(tdload_job_var_file,
tdload_job_var_content, context)
+
+ except Exception as e:
+ self.log.error("Failed to execute TdLoad operation in mode '%s':
%s", mode, str(e))
+ raise
+
+ def _validate_and_determine_mode(self) -> str:
+ """
+ Validate parameters and determine the operation mode.
+
+ Returns:
+ A string indicating the operation mode: 'file_to_table',
'table_to_file',
+ 'table_to_table', or 'job_var_file'
+
+ Raises:
+ ValueError: If parameter combinations are invalid
+ """
+ if self.source_table and self.select_stmt:
+ raise ValueError(
+ "Both source_table and select_stmt cannot be provided
simultaneously. "
+ "Please provide only one."
+ )
+
+ if self.insert_stmt and not self.target_table:
+ raise ValueError(
+ "insert_stmt is provided but target_table is not specified. "
+ "Please provide a target_table for the insert operation."
+ )
+
+ # Determine the mode of operation based on provided parameters
+ if self.source_file_name and self.target_table:
+ mode = "file_to_table"
+ if self.target_teradata_conn_id is None:
+ self.target_teradata_conn_id = self.teradata_conn_id
+ self.log.info(
+ "Loading data from file '%s' to table '%s'",
self.source_file_name, self.target_table
+ )
+ elif (self.source_table or self.select_stmt) and self.target_file_name:
+ mode = "table_to_file"
+ self.log.info(
+ "Exporting data from %s to file '%s'",
+ self.source_table or "custom select statement",
+ self.target_file_name,
+ )
+ elif (self.source_table or self.select_stmt) and self.target_table:
+ mode = "table_to_table"
+ if self.target_teradata_conn_id is None:
+ raise ValueError("For table to table transfer,
target_teradata_conn_id must be provided.")
+ self.log.info(
+ "Transferring data from %s to table '%s'",
+ self.source_table or "custom select statement",
+ self.target_table,
+ )
+ else:
+ if not self.tdload_job_var_file:
+ raise ValueError(
+ "Invalid parameter combination for the TdLoadOperator.
Please provide one of these valid combinations:\n"
+ "1. source_file_name and target_table: to load data from a
file to a table\n"
+ "2. source_table/select_stmt and target_file_name: to
export data from a table to a file\n"
+ "3. source_table/select_stmt and target_table: to transfer
data between tables\n"
+ "4. tdload_job_var_file: to use a pre-configured job
variable file"
+ )
+ mode = "job_var_file"
+ self.log.info("Using pre-configured job variable file: %s",
self.tdload_job_var_file)
+
+ return mode
+
+ def _initialize_hooks(self, mode: str) -> None:
+ """
+ Initialize the required hooks based on the operation mode.
+
+ Args:
+ mode: The operation mode ('file_to_table', 'table_to_file',
'table_to_table', etc.)
+ """
+ self.log.info("Initializing source connection using teradata_conn_id:
%s", self.teradata_conn_id)
+ self._src_hook = TptHook(teradata_conn_id=self.teradata_conn_id,
ssh_conn_id=self.ssh_conn_id)
+
+ if mode in ("table_to_table", "file_to_table"):
+ self.log.info(
+ "Initializing destination connection using
target_teradata_conn_id: %s",
+ self.target_teradata_conn_id,
+ )
+ self._dest_hook =
TptHook(teradata_conn_id=self.target_teradata_conn_id)
+
+ self._ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id) if
self.ssh_conn_id else None
+
+ def _prepare_job_var_content(self, mode: str) -> str:
+ """
+ Prepare the job variable file content.
+
+ Args:
+ mode: The operation mode
+
+ Returns:
+ The prepared job variable file content as a string
+ """
+ if not self._src_hook:
+ raise ValueError("Source hook not initialized")
+
+ return prepare_tdload_job_var_file(
+ mode=mode,
+ source_table=self.source_table,
+ select_stmt=self.select_stmt,
+ insert_stmt=self.insert_stmt,
+ target_table=self.target_table,
+ source_file_name=self.source_file_name,
+ target_file_name=self.target_file_name,
+ source_format=self.source_format,
+ target_format=self.target_format,
+ source_text_delimiter=self.source_text_delimiter,
+ target_text_delimiter=self.target_text_delimiter,
+ source_conn=self._src_hook.get_conn(),
+ target_conn=self._dest_hook.get_conn() if self._dest_hook else
None,
+ )
+
+ def _execute_based_on_configuration(
+ self, tdload_job_var_file: str | None, tdload_job_var_content: str |
None, context: Context
+ ) -> int | None:
+ """Execute TdLoad operation based on SSH and job var file
configuration."""
+ if self._ssh_hook:
+ if tdload_job_var_file:
+ with self._ssh_hook.get_conn() as ssh_client:
+ if is_valid_remote_job_var_file(
+ ssh_client, tdload_job_var_file,
logging.getLogger(__name__)
+ ):
+ return self._handle_remote_job_var_file(
+ ssh_client=ssh_client,
+ file_path=tdload_job_var_file,
+ context=context,
+ )
+ raise ValueError(
+ f"The provided remote job variables file path
'{tdload_job_var_file}' is invalid or does not exist on remote machine."
+ )
+ else:
+ if not self._src_hook:
+ raise ValueError("Source hook not initialized")
+ # Ensure remote_working_dir is always a str
+ remote_working_dir = self.remote_working_dir or "/tmp"
+ return self._src_hook.execute_tdload(
+ remote_working_dir,
+ tdload_job_var_content,
+ self.tdload_options,
+ self.tdload_job_name,
+ )
+ else:
+ if tdload_job_var_file:
+ if is_valid_file(tdload_job_var_file):
+ return self._handle_local_job_var_file(
+ file_path=tdload_job_var_file,
+ context=context,
+ )
+ raise ValueError(
+ f"The provided job variables file path
'{tdload_job_var_file}' is invalid or does not exist."
+ )
+ if not self._src_hook:
+ raise ValueError("Source hook not initialized")
+ # Ensure remote_working_dir is always a str
+ remote_working_dir = self.remote_working_dir or "/tmp"
+ return self._src_hook.execute_tdload(
+ remote_working_dir,
+ tdload_job_var_content,
+ self.tdload_options,
+ self.tdload_job_name,
+ )
+
+ def _handle_remote_job_var_file(
+ self,
+ ssh_client: SSHClient,
+ file_path: str | None,
+ context: Context,
+ ) -> int | None:
+ """Handle execution using a remote job variable file."""
+ if not file_path:
+ raise ValueError("Please provide a valid job variables file path
on the remote machine.")
+
+ try:
+ sftp = ssh_client.open_sftp()
+ try:
+ with sftp.open(file_path, "r") as remote_file:
+ tdload_job_var_content = remote_file.read().decode("UTF-8")
+ self.log.info("Successfully read remote job variable file:
%s", file_path)
+ finally:
+ sftp.close()
+
+ if self._src_hook:
+ # Ensure remote_working_dir is always a str
+ remote_working_dir = self.remote_working_dir or "/tmp"
+ return self._src_hook._execute_tdload_via_ssh(
+ remote_working_dir,
+ tdload_job_var_content,
+ self.tdload_options,
+ self.tdload_job_name,
+ )
+ raise ValueError("Source hook not initialized for remote
execution.")
+ except Exception as e:
+ self.log.error("Failed to handle remote job variable file '%s':
%s", file_path, str(e))
+ raise
+
+ def _handle_local_job_var_file(
+ self,
+ file_path: str | None,
+ context: Context,
+ ) -> int | None:
+ """
+ Handle execution using a local job variable file.
+
+ Args:
+ file_path: Path to the local job variable file
+ context: Airflow context
+
+ Returns:
+ Exit code from the TdLoad operation
+
+ Raises:
+ ValueError: If file path is invalid or hook not initialized
+ """
+ if not file_path:
+ raise ValueError("Please provide a valid local job variables file
path.")
+
+ if not is_valid_file(file_path):
+ raise ValueError(f"The job variables file path '{file_path}' is
invalid or does not exist.")
+
+ try:
+ tdload_job_var_content = read_file(file_path, encoding="UTF-8")
+ self.log.info("Successfully read local job variable file: %s",
file_path)
+
+ if self._src_hook:
+ return self._src_hook._execute_tdload_locally(
+ tdload_job_var_content,
+ self.tdload_options,
+ self.tdload_job_name,
+ )
+ raise ValueError("Source hook not initialized for local
execution.")
+
+ except Exception as e:
+ self.log.error("Failed to handle local job variable file '%s':
%s", file_path, str(e))
+ raise
+
+ def on_kill(self):
+ """Handle termination signals and ensure all hooks are properly
cleaned up."""
+ self.log.info("Cleaning up TPT tdload connections on task kill")
+
+ cleanup_errors = []
+
+ # Clean up the source hook if it was initialized
+ if self._src_hook:
+ try:
+ self.log.info("Cleaning up source connection")
+ self._src_hook.on_kill()
+ except Exception as e:
+ cleanup_errors.append(f"Failed to cleanup source hook:
{str(e)}")
+ self.log.error("Error cleaning up source connection: %s",
str(e))
+
+ # Clean up the destination hook if it was initialized
+ if self._dest_hook:
+ try:
+ self.log.info("Cleaning up destination connection")
+ self._dest_hook.on_kill()
+ except Exception as e:
+ cleanup_errors.append(f"Failed to cleanup destination hook:
{str(e)}")
+ self.log.error("Error cleaning up destination connection: %s",
str(e))
+
+ # Log any cleanup errors but don't raise them during shutdown
+ if cleanup_errors:
+ self.log.warning("Some cleanup operations failed: %s", ";
".join(cleanup_errors))
+ else:
+ self.log.info("All TPT connections cleaned up successfully")
diff --git
a/providers/teradata/src/airflow/providers/teradata/utils/tpt_util.py
b/providers/teradata/src/airflow/providers/teradata/utils/tpt_util.py
index 3cfb6671845..26678b15ca9 100644
--- a/providers/teradata/src/airflow/providers/teradata/utils/tpt_util.py
+++ b/providers/teradata/src/airflow/providers/teradata/utils/tpt_util.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import logging
import os
import shutil
+import stat
import subprocess
import uuid
from typing import TYPE_CHECKING, Any
@@ -321,6 +322,10 @@ def get_remote_temp_directory(ssh_client: SSHClient,
logger: logging.Logger | No
return TPTConfig.TEMP_DIR_UNIX
+def is_valid_file(file_path: str) -> bool:
+ return os.path.isfile(file_path)
+
+
def verify_tpt_utility_installed(utility: str) -> None:
"""Verify if a TPT utility (e.g., tbuild) is installed and available in
the system's PATH."""
if shutil.which(utility) is None:
@@ -440,6 +445,147 @@ def prepare_tpt_ddl_script(
return tpt_script
+def prepare_tdload_job_var_file(
+ mode: str,
+ source_table: str | None,
+ select_stmt: str | None,
+ insert_stmt: str | None,
+ target_table: str | None,
+ source_file_name: str | None,
+ target_file_name: str | None,
+ source_format: str,
+ target_format: str,
+ source_text_delimiter: str,
+ target_text_delimiter: str,
+ source_conn: dict[str, Any],
+ target_conn: dict[str, Any] | None = None,
+) -> str:
+ """
+ Prepare a tdload job variable file based on the specified mode.
+
+ :param mode: The operation mode ('file_to_table', 'table_to_file', or
'table_to_table')
+ :param source_table: Name of the source table
+ :param select_stmt: SQL SELECT statement for data extraction
+ :param insert_stmt: SQL INSERT statement for data loading
+ :param target_table: Name of the target table
+ :param source_file_name: Path to the source file
+ :param target_file_name: Path to the target file
+ :param source_format: Format of source data
+ :param target_format: Format of target data
+ :param source_text_delimiter: Source text delimiter
+ :param target_text_delimiter: Target text delimiter
+ :return: The content of the job variable file
+ :raises ValueError: If invalid parameters are provided
+ """
+ # Create a dictionary to store job variables
+ job_vars = {}
+
+ # Add appropriate parameters based on the mode
+ if mode == "file_to_table":
+ job_vars.update(
+ {
+ "TargetTdpId": source_conn["host"],
+ "TargetUserName": source_conn["login"],
+ "TargetUserPassword": source_conn["password"],
+ "TargetTable": target_table,
+ "SourceFileName": source_file_name,
+ }
+ )
+ if insert_stmt:
+ job_vars["InsertStmt"] = insert_stmt
+
+ elif mode == "table_to_file":
+ job_vars.update(
+ {
+ "SourceTdpId": source_conn["host"],
+ "SourceUserName": source_conn["login"],
+ "SourceUserPassword": source_conn["password"],
+ "TargetFileName": target_file_name,
+ }
+ )
+
+ if source_table:
+ job_vars["SourceTable"] = source_table
+ elif select_stmt:
+ job_vars["SourceSelectStmt"] = select_stmt
+
+ elif mode == "table_to_table":
+ if target_conn is None:
+ raise ValueError("target_conn must be provided for
'table_to_table' mode")
+ job_vars.update(
+ {
+ "SourceTdpId": source_conn["host"],
+ "SourceUserName": source_conn["login"],
+ "SourceUserPassword": source_conn["password"],
+ "TargetTdpId": target_conn["host"],
+ "TargetUserName": target_conn["login"],
+ "TargetUserPassword": target_conn["password"],
+ "TargetTable": target_table,
+ }
+ )
+
+ if source_table:
+ job_vars["SourceTable"] = source_table
+ elif select_stmt:
+ job_vars["SourceSelectStmt"] = select_stmt
+ if insert_stmt:
+ job_vars["InsertStmt"] = insert_stmt
+
+ # Add common parameters if not empty
+ if source_format:
+ job_vars["SourceFormat"] = source_format
+ if target_format:
+ job_vars["TargetFormat"] = target_format
+ if source_text_delimiter:
+ job_vars["SourceTextDelimiter"] = source_text_delimiter
+ if target_text_delimiter:
+ job_vars["TargetTextDelimiter"] = target_text_delimiter
+
+ # Format job variables content
+ job_var_content = "".join([f"{key}='{value}',\n" for key, value in
job_vars.items()])
+ job_var_content = job_var_content.rstrip(",\n")
+
+ return job_var_content
+
+
+def is_valid_remote_job_var_file(
+ ssh_client: SSHClient, remote_job_var_file_path: str, logger:
logging.Logger | None = None
+) -> bool:
+ """Check if the given remote job variable file path is a valid file."""
+ if remote_job_var_file_path:
+ sftp_client = ssh_client.open_sftp()
+ try:
+ # Get file metadata
+ file_stat = sftp_client.stat(remote_job_var_file_path)
+ if file_stat.st_mode:
+ is_regular_file = stat.S_ISREG(file_stat.st_mode)
+ return is_regular_file
+ return False
+ except FileNotFoundError:
+ if logger:
+ logger.error("File does not exist on remote at : %s",
remote_job_var_file_path)
+ return False
+ finally:
+ sftp_client.close()
+ else:
+ return False
+
+
+def read_file(file_path: str, encoding: str = "UTF-8") -> str:
+ """
+ Read the content of a file with the specified encoding.
+
+ :param file_path: Path to the file to be read.
+ :param encoding: Encoding to use for reading the file.
+ :return: Content of the file as a string.
+ """
+ if not os.path.isfile(file_path):
+ raise FileNotFoundError(f"The file {file_path} does not exist.")
+
+ with open(file_path, encoding=encoding) as f:
+ return f.read()
+
+
def decrypt_remote_file(
ssh_client: SSHClient,
remote_enc_file: str,
diff --git a/providers/teradata/tests/system/teradata/example_remote_tpt.py
b/providers/teradata/tests/system/teradata/example_remote_tpt.py
index ed9b06c8019..e8c7b380a37 100644
--- a/providers/teradata/tests/system/teradata/example_remote_tpt.py
+++ b/providers/teradata/tests/system/teradata/example_remote_tpt.py
@@ -15,10 +15,11 @@
# specific language governing permissions and limitations
# under the License.
"""
-Example Airflow DAG to show usage of DdlOperator.
+Example Airflow DAG to show usage of DdlOperator and TdLoadOperator with
remote SSH execution.
-This DAG assumes an Airflow Connection with connection id `teradata_default`
already exists locally.
-It demonstrates how to use DdlOperator to create, drop, alter, and rename
Teradata tables and indexes.
+This DAG assumes Airflow Connections with connection ids `teradata_default`
and `ssh_default` already exist locally.
+It demonstrates how to use DdlOperator and TdLoadOperator to perform remote
Teradata operations via SSH,
+including creating, dropping, altering tables and transferring data.
"""
from __future__ import annotations
@@ -31,11 +32,11 @@ import pytest
from airflow import DAG
try:
- from airflow.providers.teradata.operators.tpt import DdlOperator
+ from airflow.providers.teradata.operators.tpt import DdlOperator,
TdLoadOperator
except ImportError:
pytest.skip("TERADATA provider not available", allow_module_level=True)
-# [START ddl_operator_howto_guide]
+# [START tdload_operator_howto_guide]
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
@@ -45,10 +46,14 @@ SSH_CONN_ID = "ssh_default"
# Define file paths and table names for the test
SYSTEM_TESTS_DIR = os.path.abspath(os.path.dirname(__file__))
+SOURCE_FILE = os.path.join(SYSTEM_TESTS_DIR, "tdload_src_file.txt")
+TARGET_FILE = os.path.join(SYSTEM_TESTS_DIR, "tdload_target_file.txt")
params = {
"SOURCE_TABLE": "source_table",
"TARGET_TABLE": "target_table",
+ "SOURCE_FILE": SOURCE_FILE,
+ "TARGET_FILE": TARGET_FILE,
}
@@ -112,6 +117,93 @@ with DAG(
)
# [END ddl_operator_howto_guide_create_index]
+ # [START tdload_operator_howto_guide_load_from_file]
+ load_file = TdLoadOperator(
+ task_id="load_file",
+ source_file_name="{{ params.SOURCE_FILE }}",
+ target_table="{{ params.SOURCE_TABLE }}",
+ source_format="Delimited",
+ source_text_delimiter="|",
+ )
+ # [END tdload_operator_howto_guide_load_from_file]
+
+ # [START tdload_operator_howto_guide_export_data]
+ export_data = TdLoadOperator(
+ task_id="export_data",
+ source_table="{{ params.SOURCE_TABLE }}",
+ target_file_name="{{ params.TARGET_FILE }}",
+ target_format="Delimited",
+ target_text_delimiter=";",
+ )
+ # [END tdload_operator_howto_guide_export_data]
+
+ # [START tdload_operator_howto_guide_transfer_data]
+ transfer_data = TdLoadOperator(
+ task_id="transfer_data",
+ source_table="{{ params.SOURCE_TABLE }}",
+ target_table="{{ params.TARGET_TABLE }}",
+ target_teradata_conn_id=CONN_ID,
+ )
+ # [END tdload_operator_howto_guide_transfer_data]
+
+ create_select_dest_table = DdlOperator(
+ task_id="create_select_dest_table",
+ ddl=[
+ "DROP TABLE {{ params.SOURCE_TABLE }}_select_dest;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_select_log;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_select_err1;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_select_err2;",
+ "CREATE TABLE {{ params.SOURCE_TABLE }}_select_dest ( \
+ first_name VARCHAR(100), \
+ last_name VARCHAR(100), \
+ employee_id VARCHAR(10), \
+ department VARCHAR(50) \
+ );",
+ ],
+ error_list=[3706, 3803, 3807],
+ )
+
+ # [START tdload_operator_howto_guide_transfer_data_select_stmt]
+ # TdLoadOperator using select statement as source
+ transfer_data_select_stmt = TdLoadOperator(
+ task_id="transfer_data_select_stmt",
+ select_stmt="SELECT * FROM {{ params.SOURCE_TABLE }}",
+ target_table="{{ params.SOURCE_TABLE }}_select_dest",
+ tdload_options="--LogTable {{ params.SOURCE_TABLE }}_select_log
--ErrorTable1 {{ params.SOURCE_TABLE }}_select_err1 --ErrorTable2 {{
params.SOURCE_TABLE }}_select_err2",
+ target_teradata_conn_id=CONN_ID,
+ )
+ # [END tdload_operator_howto_guide_transfer_data_select_stmt]
+
+ # Create table for insert statement test
+ create_insert_dest_table = DdlOperator(
+ task_id="create_insert_dest_table",
+ ddl=[
+ "DROP TABLE {{ params.SOURCE_TABLE }}_insert_dest;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_insert_log;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_insert_err1;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_insert_err2;",
+ "CREATE TABLE {{ params.SOURCE_TABLE }}_insert_dest ( \
+ first_name VARCHAR(100), \
+ last_name VARCHAR(100), \
+ employee_id VARCHAR(10), \
+ department VARCHAR(50) \
+ );",
+ ],
+ error_list=[3706, 3803, 3807],
+ )
+
+ # [START tdload_operator_howto_guide_transfer_data_insert_stmt]
+ transfer_data_insert_stmt = TdLoadOperator(
+ task_id="transfer_data_insert_stmt",
+ source_table="{{ params.SOURCE_TABLE }}",
+ insert_stmt="INSERT INTO {{ params.SOURCE_TABLE }}_insert_dest VALUES
(?, ?, ?, ?)",
+ target_table="{{ params.SOURCE_TABLE }}_insert_dest",
+ tdload_options="--LogTable {{ params.SOURCE_TABLE }}_insert_log
--ErrorTable1 {{ params.SOURCE_TABLE }}_insert_err1 --ErrorTable2 {{
params.SOURCE_TABLE }}_insert_err2",
+ tdload_job_name="tdload_job_insert_stmt",
+ target_teradata_conn_id=CONN_ID,
+ )
+ # [END tdload_operator_howto_guide_transfer_data_insert_stmt]
+
# [START ddl_operator_howto_guide_rename_table]
rename_target_table = DdlOperator(
task_id="rename_target_table",
@@ -143,6 +235,13 @@ with DAG(
>> create_source_table
>> create_target_table
>> create_index_on_source
+ >> load_file
+ >> export_data
+ >> transfer_data
+ >> create_select_dest_table
+ >> transfer_data_select_stmt
+ >> create_insert_dest_table
+ >> transfer_data_insert_stmt
>> rename_target_table
>> drop_index_on_source
>> alter_source_table
@@ -158,4 +257,4 @@ from tests_common.test_utils.system_tests import
get_test_run # noqa: E402
# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
-# [END ddl_operator_howto_guide]
+# [END tdload_operator_howto_guide_remote]
diff --git a/providers/teradata/tests/system/teradata/example_tpt.py
b/providers/teradata/tests/system/teradata/example_tpt.py
index ad7a06ed238..e6b0ad4376a 100644
--- a/providers/teradata/tests/system/teradata/example_tpt.py
+++ b/providers/teradata/tests/system/teradata/example_tpt.py
@@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.
"""
-Example Airflow DAG to show usage of DdlOperator.
+Example Airflow DAG to show usage of DdlOperator and TdLoadOperator.
This DAG assumes an Airflow Connection with connection id `teradata_default`
already exists locally.
It demonstrates how to use DdlOperator to create, drop, alter, and rename
Teradata tables and indexes.
+It also shows how to load data from a file to a Teradata table, export data
from a Teradata table to a file and
+transfer data between two Teradata tables (potentially across different
databases).
"""
from __future__ import annotations
@@ -31,11 +33,11 @@ import pytest
from airflow import DAG
try:
- from airflow.providers.teradata.operators.tpt import DdlOperator
+ from airflow.providers.teradata.operators.tpt import DdlOperator,
TdLoadOperator
except ImportError:
pytest.skip("TERADATA provider not available", allow_module_level=True)
-# [START ddl_operator_howto_guide]
+# [START tdload_operator_howto_guide]
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
@@ -45,10 +47,14 @@ SSH_CONN_ID = "ssh_default"
# Define file paths and table names for the test
SYSTEM_TESTS_DIR = os.path.abspath(os.path.dirname(__file__))
+SOURCE_FILE = os.path.join(SYSTEM_TESTS_DIR, "tdload_src_file.txt")
+TARGET_FILE = os.path.join(SYSTEM_TESTS_DIR, "tdload_target_file.txt")
params = {
"SOURCE_TABLE": "source_table",
"TARGET_TABLE": "target_table",
+ "SOURCE_FILE": SOURCE_FILE,
+ "TARGET_FILE": TARGET_FILE,
}
@@ -112,6 +118,93 @@ with DAG(
)
# [END ddl_operator_howto_guide_create_index]
+ # [START tdload_operator_howto_guide_load_from_file]
+ load_file = TdLoadOperator(
+ task_id="load_file",
+ source_file_name="{{ params.SOURCE_FILE }}",
+ target_table="{{ params.SOURCE_TABLE }}",
+ source_format="Delimited",
+ source_text_delimiter="|",
+ )
+ # [END tdload_operator_howto_guide_load_from_file]
+
+ # [START tdload_operator_howto_guide_export_data]
+ export_data = TdLoadOperator(
+ task_id="export_data",
+ source_table="{{ params.SOURCE_TABLE }}",
+ target_file_name="{{ params.TARGET_FILE }}",
+ target_format="Delimited",
+ target_text_delimiter=";",
+ )
+ # [END tdload_operator_howto_guide_export_data]
+
+ # [START tdload_operator_howto_guide_transfer_data]
+ transfer_data = TdLoadOperator(
+ task_id="transfer_data",
+ source_table="{{ params.SOURCE_TABLE }}",
+ target_table="{{ params.TARGET_TABLE }}",
+ target_teradata_conn_id=CONN_ID,
+ )
+ # [END tdload_operator_howto_guide_transfer_data]
+
+ create_select_dest_table = DdlOperator(
+ task_id="create_select_dest_table",
+ ddl=[
+ "DROP TABLE {{ params.SOURCE_TABLE }}_select_dest;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_select_log;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_select_err1;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_select_err2;",
+ "CREATE TABLE {{ params.SOURCE_TABLE }}_select_dest ( \
+ first_name VARCHAR(100), \
+ last_name VARCHAR(100), \
+ employee_id VARCHAR(10), \
+ department VARCHAR(50) \
+ );",
+ ],
+ error_list=[3706, 3803, 3807],
+ )
+
+ # [START tdload_operator_howto_guide_transfer_data_select_stmt]
+ # TdLoadOperator using select statement as source
+ transfer_data_select_stmt = TdLoadOperator(
+ task_id="transfer_data_select_stmt",
+ select_stmt="SELECT * FROM {{ params.SOURCE_TABLE }}",
+ target_table="{{ params.SOURCE_TABLE }}_select_dest",
+ tdload_options="--LogTable {{ params.SOURCE_TABLE }}_select_log
--ErrorTable1 {{ params.SOURCE_TABLE }}_select_err1 --ErrorTable2 {{
params.SOURCE_TABLE }}_select_err2",
+ target_teradata_conn_id=CONN_ID,
+ )
+ # [END tdload_operator_howto_guide_transfer_data_select_stmt]
+
+ # Create table for insert statement test
+ create_insert_dest_table = DdlOperator(
+ task_id="create_insert_dest_table",
+ ddl=[
+ "DROP TABLE {{ params.SOURCE_TABLE }}_insert_dest;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_insert_log;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_insert_err1;",
+ "DROP TABLE {{ params.SOURCE_TABLE }}_insert_err2;",
+ "CREATE TABLE {{ params.SOURCE_TABLE }}_insert_dest ( \
+ first_name VARCHAR(100), \
+ last_name VARCHAR(100), \
+ employee_id VARCHAR(10), \
+ department VARCHAR(50) \
+ );",
+ ],
+ error_list=[3706, 3803, 3807],
+ )
+
+ # [START tdload_operator_howto_guide_transfer_data_insert_stmt]
+ transfer_data_insert_stmt = TdLoadOperator(
+ task_id="transfer_data_insert_stmt",
+ source_table="{{ params.SOURCE_TABLE }}",
+ insert_stmt="INSERT INTO {{ params.SOURCE_TABLE }}_insert_dest VALUES
(?, ?, ?, ?)",
+ target_table="{{ params.SOURCE_TABLE }}_insert_dest",
+ tdload_options="--LogTable {{ params.SOURCE_TABLE }}_insert_log
--ErrorTable1 {{ params.SOURCE_TABLE }}_insert_err1 --ErrorTable2 {{
params.SOURCE_TABLE }}_insert_err2",
+ tdload_job_name="tdload_job_insert_stmt",
+ target_teradata_conn_id=CONN_ID,
+ )
+ # [END tdload_operator_howto_guide_transfer_data_insert_stmt]
+
# [START ddl_operator_howto_guide_rename_table]
rename_target_table = DdlOperator(
task_id="rename_target_table",
@@ -143,6 +236,13 @@ with DAG(
>> create_source_table
>> create_target_table
>> create_index_on_source
+ >> load_file
+ >> export_data
+ >> transfer_data
+ >> create_select_dest_table
+ >> transfer_data_select_stmt
+ >> create_insert_dest_table
+ >> transfer_data_insert_stmt
>> rename_target_table
>> drop_index_on_source
>> alter_source_table
@@ -158,4 +258,4 @@ from tests_common.test_utils.system_tests import
get_test_run # noqa: E402
# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
-# [END ddl_operator_howto_guide]
+# [END tdload_operator_howto_guide]
diff --git a/providers/teradata/tests/system/teradata/tdload_src_file.txt
b/providers/teradata/tests/system/teradata/tdload_src_file.txt
new file mode 100644
index 00000000000..8534cc5113f
--- /dev/null
+++ b/providers/teradata/tests/system/teradata/tdload_src_file.txt
@@ -0,0 +1,1000 @@
+Chris|Johnson|E0870|Finance
+Michael|Smith|E0331|Sales
+David|Garcia|E0693|Engineering
+Sarah|Martinez|E0009|HR
+Emily|Jones|E0303|HR
+Katie|Miller|E0245|HR
+Sarah|Martinez|E0613|HR
+Emily|Johnson|E0273|Marketing
+Alex|Smith|E0779|Finance
+Katie|Martinez|E0150|Marketing
+David|Johnson|E0663|Marketing
+Michael|Rodriguez|E0646|Marketing
+Michael|Garcia|E0384|Marketing
+Sarah|Williams|E0399|HR
+Emily|Smith|E0457|HR
+Katie|Davis|E0736|Marketing
+David|Davis|E0536|Finance
+Sarah|Smith|E0470|HR
+Chris|Williams|E0974|Marketing
+Jane|Brown|E0691|Engineering
+Katie|Smith|E0319|Marketing
+John|Davis|E0340|Sales
+Laura|Rodriguez|E0759|HR
+Jane|Davis|E0225|Engineering
+Michael|Garcia|E0559|HR
+Alex|Johnson|E0611|Engineering
+David|Johnson|E0045|Marketing
+Sarah|Miller|E0922|Engineering
+Laura|Johnson|E0586|Sales
+Michael|Miller|E0304|Engineering
+Katie|Johnson|E0271|Sales
+David|Martinez|E0073|Marketing
+Jane|Miller|E0597|HR
+Emily|Davis|E0661|Marketing
+Jane|Miller|E0675|Marketing
+Emily|Jones|E0788|HR
+David|Brown|E0954|Marketing
+John|Davis|E0517|HR
+Katie|Martinez|E0633|Engineering
+Alex|Johnson|E0567|Engineering
+Emily|Miller|E0193|Sales
+Laura|Johnson|E0506|HR
+Michael|Martinez|E0628|Engineering
+Katie|Jones|E0790|Engineering
+Jane|Martinez|E0998|Sales
+Katie|Garcia|E0025|Marketing
+Sarah|Jones|E0113|Marketing
+Chris|Jones|E0590|Engineering
+Laura|Johnson|E0254|HR
+David|Rodriguez|E0708|Engineering
+Sarah|Johnson|E0918|Sales
+David|Jones|E0710|Marketing
+Chris|Smith|E0163|Engineering
+Jane|Williams|E0510|Sales
+Katie|Johnson|E0146|Finance
+Alex|Martinez|E0339|Finance
+Emily|Johnson|E0863|Marketing
+Sarah|Miller|E0760|Sales
+John|Miller|E0968|Sales
+Emily|Martinez|E0298|HR
+Emily|Brown|E0560|Finance
+Laura|Brown|E0022|HR
+John|Johnson|E0191|HR
+Michael|Williams|E0266|Finance
+Sarah|Smith|E0652|Engineering
+Jane|Davis|E0543|Engineering
+Sarah|Miller|E0647|HR
+Katie|Miller|E0634|Sales
+Katie|Rodriguez|E0111|Sales
+Jane|Smith|E0848|Sales
+Laura|Brown|E0054|Sales
+Jane|Williams|E0894|Marketing
+Chris|Brown|E0681|Finance
+Laura|Rodriguez|E0161|Sales
+David|Davis|E0175|Finance
+David|Smith|E0814|HR
+John|Davis|E0463|HR
+Katie|Williams|E0787|Finance
+Sarah|Brown|E0826|Finance
+Emily|Rodriguez|E0587|Sales
+Laura|Miller|E0098|Finance
+Alex|Garcia|E0864|HR
+Katie|Davis|E0619|Marketing
+David|Davis|E0128|Sales
+Alex|Miller|E0601|Engineering
+Alex|Williams|E0627|Finance
+Katie|Williams|E0541|Finance
+Emily|Martinez|E0959|Marketing
+Michael|Miller|E0018|Marketing
+John|Smith|E0938|HR
+Alex|Garcia|E0986|Sales
+Laura|Brown|E0448|Engineering
+Michael|Rodriguez|E0669|Sales
+Sarah|Martinez|E0744|Finance
+Alex|Jones|E0997|HR
+Chris|Davis|E0530|Marketing
+Katie|Rodriguez|E0494|Finance
+Chris|Davis|E0707|Finance
+Michael|Martinez|E0183|Finance
+Jane|Brown|E0068|Marketing
+Michael|Williams|E0507|HR
+Chris|Johnson|E0872|Marketing
+Michael|Rodriguez|E0917|Finance
+John|Davis|E0883|Sales
+Sarah|Davis|E0414|Marketing
+Laura|Garcia|E0780|Engineering
+Sarah|Garcia|E0426|Engineering
+Michael|Garcia|E0259|HR
+Katie|Garcia|E0238|Sales
+John|Williams|E0967|Sales
+Katie|Williams|E0286|Marketing
+Chris|Williams|E0906|Marketing
+Katie|Brown|E0464|Finance
+John|Brown|E0297|Finance
+John|Johnson|E0721|Engineering
+John|Davis|E0620|Finance
+Katie|Brown|E0152|Marketing
+Jane|Davis|E0103|Sales
+Emily|Rodriguez|E0147|Sales
+Chris|Brown|E0134|HR
+Emily|Martinez|E0724|HR
+Laura|Davis|E0990|Engineering
+John|Jones|E0217|Finance
+Chris|Williams|E0803|Sales
+Jane|Jones|E0524|HR
+Jane|Smith|E0700|Engineering
+John|Smith|E0847|HR
+David|Williams|E0834|Sales
+David|Williams|E0181|Engineering
+Emily|Martinez|E0472|Sales
+Chris|Rodriguez|E0910|Finance
+Emily|Davis|E0267|Sales
+Chris|Rodriguez|E0483|Engineering
+Alex|Smith|E0206|Engineering
+John|Brown|E0881|Marketing
+Laura|Johnson|E0342|Engineering
+Sarah|Martinez|E0960|Sales
+David|Williams|E0059|Marketing
+Jane|Davis|E0353|HR
+Jane|Williams|E0038|HR
+Katie|Rodriguez|E0290|Finance
+Katie|Johnson|E0086|Marketing
+Katie|Martinez|E0097|Engineering
+Laura|Smith|E0943|Marketing
+John|Williams|E0101|Sales
+Chris|Martinez|E0689|HR
+David|Brown|E0127|Engineering
+Sarah|Miller|E0595|Sales
+Sarah|Smith|E0403|Sales
+Emily|Davis|E0017|Marketing
+David|Smith|E0840|Marketing
+Katie|Smith|E0447|HR
+Jane|Jones|E0374|Marketing
+Emily|Garcia|E0801|Engineering
+Emily|Williams|E0434|Engineering
+Laura|Williams|E0515|Marketing
+Chris|Davis|E0067|Marketing
+Michael|Davis|E0481|Finance
+Chris|Williams|E0006|Engineering
+Michael|Davis|E0322|HR
+Laura|Rodriguez|E0243|Finance
+Katie|Davis|E0987|Sales
+Katie|Jones|E0565|Engineering
+Jane|Garcia|E0792|Marketing
+David|Martinez|E0609|HR
+Sarah|Miller|E0351|Finance
+Jane|Smith|E0776|Sales
+Laura|Jones|E0090|Marketing
+David|Martinez|E0941|Finance
+Chris|Davis|E0120|Sales
+Michael|Brown|E0712|Engineering
+John|Williams|E0396|Sales
+Laura|Rodriguez|E0010|Marketing
+Laura|Smith|E0930|HR
+Sarah|Jones|E0440|Sales
+Sarah|Jones|E0401|HR
+Katie|Johnson|E0762|HR
+Emily|Davis|E0578|Engineering
+David|Williams|E0907|Engineering
+Chris|Smith|E0756|HR
+Katie|Williams|E0223|Engineering
+Laura|Williams|E0241|HR
+Laura|Williams|E0729|Engineering
+Michael|Smith|E0556|HR
+Jane|Martinez|E0043|Finance
+John|Jones|E0839|Marketing
+Emily|Garcia|E0230|Finance
+Jane|Martinez|E0318|Sales
+Chris|Williams|E0645|Sales
+John|Garcia|E0673|HR
+Alex|Brown|E0189|HR
+John|Brown|E0368|Marketing
+Katie|Miller|E0924|HR
+John|Johnson|E0087|Finance
+Chris|Garcia|E0785|HR
+Emily|Smith|E0585|Sales
+Katie|Smith|E0041|Sales
+John|Williams|E0732|Finance
+John|Garcia|E0379|HR
+Sarah|Garcia|E0439|Engineering
+Laura|Brown|E0109|Sales
+Alex|Williams|E0030|Marketing
+Chris|Davis|E0532|Sales
+Sarah|Smith|E0224|HR
+Alex|Jones|E0625|Marketing
+Laura|Martinez|E0671|HR
+John|Jones|E0995|HR
+Emily|Jones|E0007|Marketing
+Emily|Garcia|E0336|Finance
+Michael|Martinez|E0705|Finance
+Laura|Smith|E0505|Engineering
+Alex|Smith|E0915|Finance
+Alex|Miller|E0577|Finance
+David|Miller|E0965|HR
+Katie|Miller|E0295|Engineering
+Jane|Smith|E0024|Sales
+Laura|Johnson|E0145|Sales
+John|Jones|E0654|Marketing
+Laura|Jones|E0388|Sales
+Laura|Martinez|E0215|Finance
+Sarah|Brown|E0173|Sales
+Laura|Martinez|E0757|Marketing
+Laura|Smith|E0845|Finance
+David|Williams|E0770|Marketing
+Emily|Garcia|E0978|Marketing
+Emily|Smith|E0265|Sales
+Emily|Johnson|E0570|Sales
+Sarah|Garcia|E0249|HR
+Chris|Davis|E0616|Marketing
+John|Martinez|E0698|Engineering
+Alex|Williams|E0956|Marketing
+Alex|Brown|E0666|Finance
+Alex|Martinez|E0308|Marketing
+Jane|Brown|E0095|HR
+Michael|Martinez|E0310|HR
+Laura|Garcia|E0125|HR
+Chris|Martinez|E0278|Sales
+Katie|Brown|E0853|HR
+David|Rodriguez|E0015|Finance
+Laura|Brown|E0171|Marketing
+Alex|Martinez|E0445|Sales
+Sarah|Garcia|E0360|Marketing
+Michael|Rodriguez|E0065|Marketing
+Emily|Brown|E0874|Sales
+Michael|Davis|E0985|Engineering
+Katie|Rodriguez|E0337|Finance
+Alex|Williams|E0122|Engineering
+Chris|Jones|E0258|Engineering
+Emily|Jones|E0235|HR
+John|Miller|E0932|Sales
+Jane|Brown|E0822|HR
+David|Williams|E0453|Finance
+Chris|Jones|E0078|Engineering
+Michael|Williams|E0851|Engineering
+David|Miller|E0154|Sales
+Chris|Rodriguez|E0387|Finance
+David|Rodriguez|E0905|HR
+Chris|Jones|E0056|Sales
+Sarah|Davis|E0977|Finance
+Chris|Smith|E0474|Marketing
+Katie|Garcia|E0232|HR
+Jane|Jones|E0638|Engineering
+Alex|Davis|E0686|HR
+Laura|Martinez|E0198|Finance
+John|Williams|E0593|Engineering
+Chris|Jones|E0697|Finance
+Michael|Williams|E0344|Finance
+Michael|Martinez|E0166|Engineering
+David|Brown|E0898|Finance
+Sarah|Williams|E0422|Sales
+Jane|Martinez|E0307|HR
+Sarah|Williams|E0118|Engineering
+David|Rodriguez|E0282|Finance
+Michael|Jones|E0513|Engineering
+Michael|Miller|E0617|HR
+Alex|Rodriguez|E0451|HR
+Emily|Davis|E0277|Finance
+David|Garcia|E0380|Finance
+John|Jones|E0818|Sales
+Laura|Brown|E0563|Finance
+Katie|Jones|E0085|HR
+Laura|Davis|E0667|Finance
+David|Miller|E0606|Finance
+Emily|Davis|E0970|Sales
+Sarah|Miller|E0868|Engineering
+Alex|Garcia|E0321|Finance
+Laura|Miller|E0032|Sales
+Michael|Martinez|E0202|Finance
+Alex|Miller|E0748|HR
+Katie|Johnson|E0498|Marketing
+Chris|Williams|E0300|HR
+Alex|Smith|E0548|Sales
+Michael|Smith|E0794|Finance
+Laura|Johnson|E0594|Marketing
+Katie|Miller|E0005|Finance
+Michael|Garcia|E0270|Sales
+John|Johnson|E0418|Engineering
+Michael|Johnson|E0077|Sales
+Michael|Williams|E0714|HR
+Michael|Martinez|E0726|Engineering
+John|Martinez|E0561|Marketing
+Michael|Rodriguez|E0575|Sales
+Jane|Johnson|E0468|Engineering
+Chris|Miller|E0836|Sales
+David|Davis|E0764|HR
+David|Johnson|E0643|Marketing
+Katie|Miller|E0159|Sales
+Alex|Rodriguez|E0138|Engineering
+David|Miller|E0186|Sales
+Laura|Williams|E0637|Marketing
+Laura|Martinez|E0197|Engineering
+Michael|Brown|E0528|Finance
+Chris|Rodriguez|E0859|Engineering
+John|Williams|E0250|Marketing
+Jane|Miller|E0772|Finance
+Emily|Williams|E0755|HR
+Sarah|Garcia|E0117|HR
+Katie|Rodriguez|E0783|Sales
+David|Williams|E0070|Marketing
+Michael|Brown|E0583|Finance
+Alex|Martinez|E0392|Engineering
+Michael|Martinez|E0888|Marketing
+Laura|Davis|E0167|HR
+Chris|Rodriguez|E0376|Engineering
+Laura|Smith|E0436|Sales
+Jane|Williams|E0106|Finance
+Laura|Brown|E0623|Marketing
+Michael|Rodriguez|E0641|Finance
+John|Davis|E0867|HR
+Chris|Miller|E0312|Sales
+John|Garcia|E0731|Marketing
+Alex|Jones|E0993|Marketing
+John|Smith|E0630|Sales
+David|Williams|E0190|Sales
+John|Smith|E0703|Sales
+Michael|Smith|E0459|Engineering
+Sarah|Williams|E0552|Engineering
+Michael|Johnson|E0503|Engineering
+Michael|Garcia|E0362|Engineering
+John|Johnson|E0890|Engineering
+Alex|Garcia|E0547|HR
+Katie|Davis|E0534|Marketing
+Michael|Martinez|E0913|HR
+Katie|Miller|E0808|Marketing
+Laura|Jones|E0356|Engineering
+John|Brown|E0581|Sales
+Katie|Jones|E0810|Sales
+Michael|Garcia|E0329|Marketing
+Alex|Miller|E0963|HR
+Jane|Jones|E0488|Sales
+Laura|Davis|E0784|Marketing
+Emily|Williams|E0293|Sales
+John|Miller|E0799|Finance
+David|Davis|E0860|Marketing
+Chris|Garcia|E0467|Marketing
+Alex|Brown|E0143|Engineering
+Emily|Garcia|E0213|Engineering
+Alex|Davis|E0843|Sales
+Jane|Smith|E0948|Finance
+Sarah|Smith|E0740|Finance
+Sarah|Smith|E0490|Sales
+Jane|Jones|E0540|Marketing
+David|Garcia|E0501|Marketing
+David|Johnson|E0994|Engineering
+Emily|Rodriguez|E0263|Marketing
+Katie|Miller|E0911|Marketing
+Alex|Martinez|E0408|Engineering
+Alex|Garcia|E0246|Finance
+Michael|Jones|E0385|Sales
+Emily|Miller|E0410|Engineering
+Laura|Brown|E0187|HR
+David|Martinez|E0961|Finance
+Alex|Davis|E0975|Sales
+Laura|Davis|E0291|Sales
+John|Miller|E0093|Engineering
+John|Garcia|E0887|Finance
+Jane|Smith|E0460|Finance
+Jane|Garcia|E0141|HR
+Sarah|Smith|E0695|Finance
+David|Johnson|E0211|Finance
+Sarah|Brown|E0013|Sales
+Sarah|Miller|E0443|Finance
+Laura|Rodriguez|E0130|Engineering
+Michael|Johnson|E0305|Marketing
+Laura|Johnson|E0928|Sales
+Alex|Martinez|E0520|Marketing
+Jane|Miller|E0615|Engineering
+Sarah|Smith|E0261|Finance
+David|Williams|E0063|Engineering
+Sarah|Davis|E0107|Marketing
+Jane|Rodriguez|E0844|Finance
+Chris|Miller|E0275|Sales
+Laura|Smith|E0983|Sales
+Katie|Brown|E0807|Marketing
+Chris|Brown|E0665|Sales
+Chris|Rodriguez|E0649|HR
+John|Williams|E0739|Finance
+Sarah|Brown|E0880|Marketing
+John|Davis|E0487|Finance
+Michael|Garcia|E0228|Engineering
+Alex|Martinez|E0539|Marketing
+Michael|Jones|E0048|Finance
+Chris|Smith|E0011|Sales
+Michael|Martinez|E0357|Finance
+John|Miller|E0124|Sales
+Alex|Jones|E0952|Engineering
+Katie|Miller|E0678|HR
+Laura|Jones|E0903|Engineering
+Chris|Smith|E0947|Finance
+John|Rodriguez|E0934|Engineering
+Sarah|Jones|E0444|Finance
+Michael|Rodriguez|E0061|Engineering
+Chris|Williams|E0824|Engineering
+Emily|Smith|E0075|HR
+Sarah|Rodriguez|E0800|Engineering
+Sarah|Davis|E0407|Marketing
+Alex|Martinez|E0981|Marketing
+Emily|Davis|E0480|Finance
+Jane|Miller|E0028|HR
+Katie|Williams|E0252|Finance
+Chris|Martinez|E0247|Marketing
+Emily|Garcia|E0234|Engineering
+Sarah|Martinez|E0658|HR
+Jane|Martinez|E0635|Engineering
+Laura|Brown|E0195|Sales
+Michael|Miller|E0281|HR
+Michael|Williams|E0083|Marketing
+Katie|Davis|E0940|HR
+Emily|Johnson|E0901|Finance
+Jane|Williams|E0424|Engineering
+David|Rodriguez|E0115|HR
+Laura|Garcia|E0400|Marketing
+Laura|Miller|E0815|Marketing
+David|Martinez|E0165|Finance
+Katie|Johnson|E0865|Engineering
+Chris|Williams|E0052|Engineering
+David|Rodriguez|E0201|HR
+Sarah|Jones|E0003|Finance
+John|Jones|E0227|HR
+Katie|Martinez|E0892|HR
+Jane|Jones|E0047|HR
+Laura|Martinez|E0034|Engineering
+Sarah|Garcia|E0269|Finance
+Katie|Rodriguez|E0682|Finance
+Laura|Rodriguez|E0920|Engineering
+Michael|Garcia|E0545|Marketing
+Michael|Garcia|E0081|Sales
+Emily|Miller|E0812|Engineering
+David|Smith|E0178|Sales
+Katie|Davis|E0573|HR
+Katie|Smith|E0677|HR
+Sarah|Jones|E0878|Engineering
+John|Rodriguez|E0415|Marketing
+David|Johnson|E0156|Engineering
+Katie|Brown|E0856|HR
+Sarah|Miller|E0465|HR
+Michael|Rodriguez|E0939|Marketing
+Alex|Miller|E0742|HR
+David|Brown|E0829|Marketing
+Katie|Davis|E0220|Engineering
+Michael|Garcia|E0602|Finance
+David|Williams|E0492|HR
+John|Williams|E0001|Engineering
+David|Williams|E0753|Finance
+David|Rodriguez|E0412|HR
+John|Smith|E0989|Marketing
+Chris|Rodriguez|E0478|Engineering
+Sarah|Martinez|E0774|Marketing
+Alex|Martinez|E0574|Engineering
+John|Johnson|E0348|Engineering
+Katie|Martinez|E0885|Marketing
+Laura|Rodriguez|E0456|HR
+John|Brown|E0394|HR
+Laura|Williams|E0132|Engineering
+Jane|Davis|E0722|Engineering
+John|Johnson|E0429|HR
+Chris|Martinez|E0020|Sales
+Laura|Davis|E0522|HR
+Michael|Brown|E0832|Marketing
+Katie|Brown|E0289|Engineering
+Alex|Jones|E0751|Marketing
+David|Williams|E0909|Sales
+Laura|Garcia|E0650|Engineering
+David|Brown|E0314|Engineering
+Alex|Garcia|E0326|Marketing
+Chris|Brown|E0554|Sales
+Emily|Garcia|E0796|Sales
+Chris|Martinez|E0364|HR
+Michael|Williams|E0177|Engineering
+David|Garcia|E0805|Marketing
+David|Davis|E0333|Marketing
+Michael|Jones|E0209|Finance
+Laura|Jones|E0485|HR
+Alex|Miller|E0598|Sales
+Alex|Davis|E0372|HR
+Jane|Williams|E0432|Marketing
+Laura|Smith|E0355|Sales
+Katie|Rodriguez|E0089|Engineering
+Jane|Rodriguez|E0383|Sales
+Emily|Smith|E0718|Engineering
+David|Jones|E0945|Finance
+Alex|Rodriguez|E0518|HR
+Emily|Davis|E0973|Sales
+Laura|Davis|E0170|Sales
+John|Jones|E0857|Finance
+Sarah|Jones|E0768|HR
+Chris|Williams|E0405|Marketing
+Alex|Johnson|E0568|HR
+Sarah|Martinez|E0477|Marketing
+David|Jones|E0352|Engineering
+Sarah|Davis|E0347|Finance
+Laura|Garcia|E0334|Finance
+Sarah|Williams|E0942|Marketing
+Emily|Jones|E0381|Finance
+Laura|Johnson|E0953|Finance
+Jane|Garcia|E0579|Finance
+Sarah|Johnson|E0402|Finance
+Alex|Miller|E0925|Marketing
+John|Davis|E0971|Finance
+Sarah|Garcia|E0850|Engineering
+Katie|Garcia|E0242|Marketing
+Chris|Jones|E0648|HR
+David|Garcia|E0253|Engineering
+John|Garcia|E0301|HR
+Michael|Johnson|E0327|HR
+John|Johnson|E0738|HR
+Sarah|Smith|E0694|HR
+Sarah|Smith|E0786|Finance
+David|Garcia|E0797|Sales
+Emily|Rodriguez|E0369|Engineering
+Chris|Davis|E0538|Finance
+Chris|Williams|E0951|Sales
+Michael|Miller|E0614|Engineering
+Michael|Brown|E0717|Marketing
+Katie|Jones|E0274|Sales
+Sarah|Davis|E0626|Sales
+Chris|Jones|E0450|Engineering
+Sarah|Miller|E0588|Marketing
+John|Garcia|E0664|Finance
+John|Miller|E0996|Marketing
+Michael|Martinez|E0767|Sales
+Jane|Miller|E0706|Sales
+David|Miller|E0222|HR
+John|Rodriguez|E0042|Engineering
+Chris|Smith|E0916|Engineering
+Chris|Williams|E0320|Marketing
+Katie|Rodriguez|E0251|Marketing
+David|Smith|E0053|Sales
+Katie|Jones|E0893|Engineering
+Katie|Johnson|E0672|Sales
+Laura|Martinez|E0966|Marketing
+Alex|Brown|E0655|Sales
+Katie|Miller|E0071|Finance
+Sarah|Johnson|E0683|HR
+Laura|Martinez|E0296|Sales
+Emily|Williams|E0508|Finance
+Laura|Martinez|E0813|Sales
+Chris|Williams|E0074|Engineering
+Emily|Rodriguez|E0216|Finance
+Emily|Johnson|E0846|HR
+Katie|Miller|E0631|HR
+Sarah|Jones|E0743|Marketing
+David|Jones|E0148|HR
+Michael|Williams|E0603|Engineering
+Emily|Smith|E0493|Marketing
+Sarah|Miller|E0051|Marketing
+Katie|Jones|E0389|Finance
+Sarah|Martinez|E0891|Sales
+John|Rodriguez|E0218|Marketing
+John|Davis|E0194|HR
+David|Rodriguez|E0737|HR
+Chris|Johnson|E0537|Finance
+Alex|Rodriguez|E0979|Engineering
+Chris|Johnson|E0413|Marketing
+Sarah|Miller|E0096|HR
+Jane|Johnson|E0268|Marketing
+Sarah|Johnson|E0811|Marketing
+Chris|Garcia|E0114|Engineering
+Laura|Williams|E0126|Finance
+Chris|Johnson|E0684|Engineering
+Emily|Martinez|E0699|Sales
+Jane|Brown|E0016|HR
+David|Martinez|E0861|Finance
+John|Rodriguez|E0936|Marketing
+Michael|Smith|E0164|Marketing
+Sarah|Rodriguez|E0446|Marketing
+Sarah|Brown|E0875|Sales
+Michael|Brown|E0309|Sales
+Chris|Miller|E0133|Engineering
+David|Brown|E0723|Finance
+David|Davis|E0066|Engineering
+Katie|Garcia|E0741|Finance
+Alex|Miller|E0279|Marketing
+John|Miller|E0640|Marketing
+Chris|Davis|E0523|HR
+Michael|Jones|E0491|HR
+Laura|Johnson|E0833|Marketing
+Michael|Williams|E0730|Finance
+Laura|Garcia|E0828|Sales
+Chris|Davis|E0604|Marketing
+Laura|Jones|E0172|HR
+Laura|Williams|E0544|Marketing
+Jane|Rodriguez|E0155|Engineering
+Emily|Davis|E0236|Finance
+Michael|Rodriguez|E0411|Sales
+Alex|Johnson|E0855|Engineering
+David|Garcia|E0988|Sales
+Chris|Johnson|E0461|Finance
+Jane|Martinez|E0580|HR
+Michael|Miller|E0475|HR
+David|Rodriguez|E0131|Marketing
+Emily|Williams|E0345|Finance
+Michael|Brown|E0079|Engineering
+Sarah|Smith|E0373|HR
+Laura|Johnson|E0433|Sales
+John|Williams|E0521|Finance
+Alex|Smith|E0831|Marketing
+Michael|Garcia|E0428|Sales
+Emily|Garcia|E0852|Sales
+Laura|Miller|E0288|Finance
+Sarah|Johnson|E0908|Sales
+Katie|Johnson|E0036|Engineering
+John|Rodriguez|E0500|Engineering
+Chris|Garcia|E0455|Marketing
+John|Jones|E0184|HR
+Laura|Martinez|E0639|Engineering
+Laura|Jones|E0199|Sales
+Chris|Jones|E0884|HR
+Laura|Davis|E0899|Engineering
+Sarah|Miller|E0325|Engineering
+Alex|Martinez|E0119|Sales
+Emily|Brown|E0208|Engineering
+John|Brown|E0371|HR
+Chris|Brown|E0795|Sales
+Laura|Miller|E0431|Engineering
+Michael|Garcia|E0140|Sales
+John|Brown|E0819|HR
+Emily|Jones|E0210|Finance
+Michael|Smith|E0452|Marketing
+Sarah|Martinez|E0937|Sales
+Jane|Davis|E0169|Marketing
+Katie|Brown|E0827|Sales
+Emily|Brown|E0088|Sales
+Michael|Martinez|E0104|Engineering
+Sarah|Smith|E0715|Marketing
+Alex|Brown|E0869|Sales
+Katie|Rodriguez|E0260|HR
+David|Garcia|E0804|Sales
+Katie|Miller|E0765|Marketing
+John|Jones|E0749|Sales
+Sarah|Rodriguez|E0484|Marketing
+Laura|Smith|E0499|Sales
+Sarah|Johnson|E0549|HR
+Emily|Jones|E0592|HR
+Alex|Davis|E0237|HR
+Alex|Rodriguez|E0576|HR
+Michael|Rodriguez|E0923|HR
+Katie|Martinez|E0008|Engineering
+Katie|Brown|E0354|Finance
+Laura|Davis|E0419|Sales
+Michael|Rodriguez|E0512|Engineering
+Chris|Rodriguez|E0820|Finance
+Katie|Johnson|E0427|Sales
+Sarah|Davis|E0944|Sales
+John|Smith|E0778|Engineering
+Emily|Jones|E0287|Marketing
+Jane|Brown|E0469|Marketing
+Katie|Rodriguez|E0316|Engineering
+Emily|Williams|E0404|Engineering
+Jane|Johnson|E0060|Engineering
+Emily|Smith|E0562|Sales
+David|Johnson|E0398|HR
+Laura|Jones|E0980|Sales
+David|Miller|E0139|Engineering
+Chris|Martinez|E0366|Marketing
+Chris|Davis|E0529|HR
+Laura|Williams|E0244|Marketing
+Emily|Garcia|E0037|Finance
+Sarah|Jones|E0921|HR
+Katie|Smith|E0758|Engineering
+Katie|Miller|E0735|Engineering
+John|Davis|E0207|Finance
+Sarah|Johnson|E0558|HR
+Chris|Rodriguez|E0535|Finance
+David|Martinez|E0420|Sales
+David|Jones|E0889|Finance
+David|Rodriguez|E0280|HR
+Laura|Martinez|E0900|HR
+Laura|Garcia|E0221|Engineering
+Emily|Jones|E0642|HR
+Michael|Brown|E0023|HR
+Katie|Williams|E0653|HR
+John|Martinez|E0044|Engineering
+Katie|Miller|E0809|Finance
+Katie|Miller|E0200|Engineering
+Sarah|Rodriguez|E0782|Finance
+Katie|Garcia|E0489|Marketing
+Katie|Brown|E0674|Engineering
+Jane|Davis|E0080|Engineering
+Michael|Smith|E0582|Finance
+John|Rodriguez|E0777|HR
+John|Miller|E0359|Engineering
+Emily|Miller|E0338|Marketing
+David|Miller|E0386|HR
+Chris|Williams|E0622|Marketing
+David|Johnson|E0397|Sales
+Laura|Miller|E0021|Engineering
+Laura|Davis|E0949|Finance
+Jane|Martinez|E0992|Engineering
+Michael|Smith|E0651|Sales
+David|Brown|E0976|Engineering
+Sarah|Miller|E0702|Marketing
+Michael|Johnson|E0409|Sales
+Emily|Miller|E0317|Marketing
+John|Williams|E0502|HR
+Alex|Miller|E0912|Engineering
+John|Garcia|E0696|Marketing
+Sarah|Smith|E0557|Marketing
+Sarah|Rodriguez|E0367|Engineering
+David|Rodriguez|E0873|Marketing
+Emily|Johnson|E0306|Marketing
+Katie|Martinez|E0962|Marketing
+Michael|Brown|E0292|Engineering
+Alex|Davis|E0276|Finance
+John|Jones|E0390|Sales
+John|Williams|E0142|Engineering
+Katie|Garcia|E0929|Engineering
+Sarah|Brown|E0212|Engineering
+Michael|Garcia|E0153|Engineering
+Alex|Davis|E0618|HR
+Laura|Brown|E0842|Engineering
+Alex|Miller|E0262|HR
+Alex|Miller|E0958|Engineering
+Sarah|Jones|E0750|HR
+Sarah|Garcia|E0935|Marketing
+Laura|Smith|E0825|Sales
+Michael|Rodriguez|E0668|Marketing
+John|Johnson|E0550|HR
+Laura|Miller|E0473|Finance
+John|Jones|E0871|Marketing
+David|Rodriguez|E0256|Engineering
+Katie|Smith|E0174|Marketing
+Chris|Miller|E0229|HR
+Alex|Martinez|E0343|HR
+Emily|Miller|E0049|Engineering
+Emily|Miller|E0092|Marketing
+Sarah|Williams|E0076|Finance
+Emily|Williams|E0679|HR
+Katie|Smith|E0012|Finance
+Laura|Miller|E0442|Marketing
+Emily|Davis|E0151|Finance
+Jane|Garcia|E0062|Engineering
+Sarah|Smith|E0425|Marketing
+David|Johnson|E0636|Engineering
+John|Martinez|E0196|Marketing
+David|Jones|E0285|Engineering
+Emily|Martinez|E0471|Finance
+Katie|Martinez|E0854|Sales
+John|Smith|E0982|Marketing
+John|Martinez|E0896|Sales
+Sarah|Garcia|E0323|Marketing
+John|Davis|E0029|Marketing
+John|Smith|E0341|Engineering
+Emily|Williams|E0116|HR
+Katie|Martinez|E0659|HR
+John|Martinez|E0793|HR
+Michael|Brown|E0330|Marketing
+David|Garcia|E0816|HR
+Sarah|Davis|E0058|Marketing
+Alex|Miller|E0035|Engineering
+Alex|Smith|E0713|Marketing
+Laura|Johnson|E0205|Finance
+Laura|Williams|E0866|Marketing
+Alex|Garcia|E0902|HR
+Alex|Brown|E0688|Marketing
+John|Rodriguez|E0763|Marketing
+Emily|Smith|E0957|Engineering
+Sarah|Davis|E0168|Engineering
+Alex|Garcia|E0454|Marketing
+Sarah|Davis|E0746|Marketing
+Katie|Jones|E0496|Engineering
+Katie|Davis|E0546|Engineering
+Alex|Brown|E0791|HR
+Katie|Rodriguez|E0416|Sales
+Jane|Williams|E0690|Engineering
+Chris|Johnson|E0324|Sales
+Jane|Brown|E0591|Finance
+Michael|Garcia|E0608|Sales
+Emily|Davis|E0179|Engineering
+David|Rodriguez|E0257|HR
+Michael|Johnson|E0711|Finance
+Alex|Johnson|E0466|Engineering
+Emily|Jones|E0610|Marketing
+Michael|Johnson|E0082|HR
+Michael|Miller|E0511|Marketing
+Emily|Johnson|E0879|HR
+Michael|Davis|E0950|Marketing
+Alex|Jones|E0761|HR
+Alex|Williams|E0136|Marketing
+Michael|Brown|E0514|Engineering
+Jane|Williams|E0660|Engineering
+Alex|Johnson|E0775|HR
+David|Martinez|E0526|HR
+Michael|Miller|E0564|Sales
+Michael|Smith|E0395|Marketing
+Katie|Rodriguez|E0733|Engineering
+Jane|Brown|E0728|Engineering
+David|Williams|E0533|Marketing
+David|Jones|E0002|HR
+Jane|Brown|E0838|Sales
+Michael|Johnson|E0315|Sales
+Laura|Miller|E0886|HR
+Katie|Davis|E0057|Engineering
+Sarah|Rodriguez|E0897|HR
+Jane|Williams|E0479|HR
+Katie|Johnson|E0572|Sales
+Emily|Martinez|E0555|Finance
+Katie|Jones|E0188|Marketing
+John|Smith|E0365|Sales
+Jane|Miller|E0687|HR
+Emily|Miller|E0349|Finance
+Laura|Martinez|E0817|Finance
+Laura|Williams|E0806|Finance
+Katie|Rodriguez|E0531|Finance
+Katie|Johnson|E0644|Marketing
+Laura|Jones|E0752|Engineering
+Jane|Martinez|E0378|HR
+Chris|Garcia|E0438|HR
+Alex|Johnson|E0747|Finance
+David|Smith|E0486|Engineering
+Alex|Martinez|E0108|Sales
+John|Garcia|E0734|Finance
+Katie|Rodriguez|E0050|Finance
+Sarah|Miller|E0607|Engineering
+Chris|Garcia|E0497|Finance
+Sarah|Williams|E0110|Sales
+Katie|Martinez|E0781|Finance
+Michael|Smith|E0680|Sales
+Emily|Rodriguez|E0417|Finance
+Jane|Miller|E0160|HR
+Sarah|Rodriguez|E0946|Sales
+Katie|Davis|E0584|Finance
+Jane|Rodriguez|E0599|Marketing
+Chris|Smith|E0621|HR
+Chris|Smith|E0406|Engineering
+Sarah|Brown|E0358|Sales
+Michael|Martinez|E0335|Engineering
+John|Davis|E0719|Sales
+Katie|Davis|E0991|Marketing
+Alex|Jones|E0624|HR
+David|Jones|E0519|Sales
+Jane|Martinez|E0137|HR
+John|Davis|E0701|Sales
+Chris|Martinez|E0727|Finance
+Michael|Jones|E0600|Engineering
+David|Williams|E0527|HR
+Laura|Garcia|E0769|Engineering
+John|Miller|E0837|Engineering
+John|Johnson|E0704|HR
+Laura|Garcia|E0569|Marketing
+Michael|Rodriguez|E0504|Marketing
+Chris|Davis|E0914|Engineering
+Katie|Garcia|E0332|Finance
+Alex|Williams|E0926|Finance
+Laura|Johnson|E0248|HR
+Alex|Smith|E0964|Marketing
+Katie|Jones|E0123|Sales
+Jane|Brown|E0294|Sales
+Chris|Jones|E0933|Finance
+Sarah|Smith|E0823|Sales
+Sarah|Brown|E0841|HR
+Katie|Rodriguez|E0720|Engineering
+David|Brown|E0382|Engineering
+Sarah|Garcia|E0144|HR
+Michael|Jones|E0214|Sales
+John|Smith|E0377|Marketing
+David|Smith|E0830|Engineering
+David|Jones|E0437|Sales
+Chris|Brown|E0226|HR
+Sarah|Williams|E0046|HR
+Emily|Jones|E0972|Marketing
+Chris|Martinez|E0955|Engineering
+Katie|Johnson|E0264|HR
+Katie|Garcia|E0180|Engineering
+Laura|Garcia|E0692|Finance
+John|Miller|E0233|Engineering
+Chris|Brown|E0091|HR
+John|Miller|E0676|Engineering
+David|Brown|E0302|Finance
+Chris|Williams|E0121|Sales
+Jane|Williams|E0094|Engineering
+Sarah|Jones|E0612|Finance
+Katie|Smith|E0931|Sales
+Emily|Miller|E0423|Finance
+John|Brown|E0441|Finance
+Jane|Williams|E0272|HR
+Emily|Rodriguez|E0821|Marketing
+Alex|Garcia|E0255|Engineering
+John|Miller|E0283|Marketing
+Laura|Smith|E0789|Engineering
+Michael|Rodriguez|E0370|Marketing
+John|Smith|E0430|Engineering
+David|Davis|E0100|HR
+Michael|Smith|E0014|Engineering
+Emily|Rodriguez|E0662|Marketing
+Michael|Martinez|E0589|Finance
+Emily|Davis|E0026|HR
+Alex|Davis|E0656|Finance
+John|Johnson|E0064|Sales
+Alex|Williams|E0629|Sales
+Chris|Williams|E0231|Finance
+Laura|Miller|E0033|Marketing
+David|Davis|E0984|Sales
+Sarah|Brown|E0999|Marketing
+Laura|Rodriguez|E0203|Sales
+John|Rodriguez|E0709|HR
+Katie|Johnson|E0350|Finance
+Chris|Miller|E0509|Sales
+Katie|Rodriguez|E0421|Sales
+David|Rodriguez|E0919|HR
+Katie|Brown|E0072|HR
+Sarah|Davis|E0055|Marketing
+Sarah|Smith|E0895|Marketing
+Katie|Garcia|E0927|Sales
+Jane|Garcia|E1000|Marketing
+Michael|Johnson|E0969|Sales
+Chris|Jones|E0284|HR
+Laura|Brown|E0904|Finance
+Alex|Martinez|E0685|Sales
+Jane|Rodriguez|E0299|HR
+Michael|Johnson|E0149|Sales
+Sarah|Johnson|E0632|Finance
+Sarah|Garcia|E0031|Finance
+Sarah|Davis|E0192|HR
+Michael|Brown|E0219|Engineering
+Laura|Smith|E0176|Marketing
+Laura|Martinez|E0849|Marketing
+Alex|Smith|E0240|Sales
+Alex|Miller|E0876|Marketing
+Katie|Davis|E0112|Marketing
+Katie|Miller|E0745|HR
+Jane|Johnson|E0204|Finance
+David|Williams|E0605|Engineering
+Sarah|Brown|E0495|Finance
+Sarah|Davis|E0773|Marketing
+Michael|Rodriguez|E0162|HR
+Michael|Smith|E0084|Sales
+Sarah|Jones|E0393|Marketing
+Emily|Jones|E0099|Engineering
+Chris|Martinez|E0862|Sales
+Laura|Davis|E0129|HR
+Sarah|Smith|E0313|Finance
+Emily|Miller|E0019|Engineering
+John|Jones|E0449|HR
+Chris|Rodriguez|E0040|Sales
+Laura|Jones|E0542|Finance
+John|Martinez|E0027|HR
+David|Jones|E0476|Marketing
+Alex|Brown|E0158|Engineering
+Sarah|Davis|E0135|HR
+Michael|Martinez|E0553|Finance
+Jane|Johnson|E0725|HR
+Michael|Brown|E0657|Finance
+John|Smith|E0069|Marketing
+Katie|Rodriguez|E0363|Marketing
+Chris|Davis|E0004|Engineering
+David|Martinez|E0525|Finance
+Alex|Miller|E0346|Sales
+Laura|Williams|E0771|Sales
+Emily|Jones|E0858|Finance
+Chris|Davis|E0835|Marketing
+John|Jones|E0670|HR
+David|Miller|E0571|Engineering
+Michael|Martinez|E0239|Engineering
+Alex|Brown|E0462|Sales
+Michael|Williams|E0185|HR
+Sarah|Brown|E0391|Marketing
+Alex|Martinez|E0311|Engineering
+John|Garcia|E0551|HR
+Emily|Martinez|E0361|Sales
+Sarah|Smith|E0375|Marketing
+Michael|Brown|E0458|HR
+Katie|Garcia|E0435|Sales
+David|Davis|E0754|HR
+Sarah|Williams|E0105|Engineering
+Chris|Rodriguez|E0182|HR
+Laura|Martinez|E0596|Marketing
+Jane|Miller|E0882|HR
+David|Garcia|E0039|Engineering
+Laura|Brown|E0877|Engineering
+Michael|Davis|E0716|HR
+Katie|Brown|E0328|Finance
+Laura|Davis|E0516|Marketing
+David|Martinez|E0798|Finance
+Katie|Rodriguez|E0766|Marketing
+Katie|Jones|E0102|Finance
+John|Martinez|E0566|Engineering
+Laura|Davis|E0802|Finance
+David|Johnson|E0157|Sales
+Laura|Martinez|E0482|Engineering
diff --git a/providers/teradata/tests/unit/teradata/hooks/test_tpt.py
b/providers/teradata/tests/unit/teradata/hooks/test_tpt.py
index b94be55275c..9f1b09c9992 100644
--- a/providers/teradata/tests/unit/teradata/hooks/test_tpt.py
+++ b/providers/teradata/tests/unit/teradata/hooks/test_tpt.py
@@ -107,6 +107,23 @@ class TestTptHook:
hook._execute_tbuild_locally("CREATE TABLE test (id INT);")
mock_set_permissions.assert_called_once()
+ @patch("airflow.providers.teradata.hooks.tpt.SSHHook")
+
@patch("airflow.providers.teradata.hooks.tpt.TptHook._execute_tdload_via_ssh")
+
@patch("airflow.providers.teradata.hooks.tpt.TptHook._execute_tdload_locally")
+ def test_execute_tdload_dispatch(self, mock_local, mock_ssh,
mock_ssh_hook):
+ # Local execution
+ hook = TptHook()
+ mock_local.return_value = 0
+ assert hook.execute_tdload("/tmp", "jobvar") == 0
+ mock_local.assert_called_once()
+
+ # SSH execution
+ hook = TptHook(ssh_conn_id="ssh_default")
+ hook.ssh_hook = MagicMock()
+ mock_ssh.return_value = 0
+ assert hook.execute_tdload("/tmp", "jobvar") == 0
+ mock_ssh.assert_called_once()
+
@patch("airflow.providers.teradata.hooks.tpt.SSHHook")
@patch("airflow.providers.teradata.hooks.tpt.execute_remote_command")
@patch("airflow.providers.teradata.hooks.tpt.remote_secure_delete")
@@ -222,6 +239,273 @@ class TestTptHook:
with pytest.raises(ConnectionError, match="SSH connection is not
established"):
hook._execute_tbuild_via_ssh("CREATE TABLE test (id INT);", "/tmp")
+ @patch("airflow.providers.teradata.hooks.tpt.SSHHook")
+ @patch("airflow.providers.teradata.hooks.tpt.execute_remote_command")
+ @patch("airflow.providers.teradata.hooks.tpt.remote_secure_delete")
+ @patch("airflow.providers.teradata.hooks.tpt.secure_delete")
+ @patch("airflow.providers.teradata.hooks.tpt.set_remote_file_permissions")
+ @patch("airflow.providers.teradata.hooks.tpt.decrypt_remote_file")
+ @patch("airflow.providers.teradata.hooks.tpt.transfer_file_sftp")
+
@patch("airflow.providers.teradata.hooks.tpt.generate_encrypted_file_with_openssl")
+ @patch("airflow.providers.teradata.hooks.tpt.generate_random_password")
+
@patch("airflow.providers.teradata.hooks.tpt.verify_tpt_utility_on_remote_host")
+ def test_transfer_to_and_execute_tdload_on_remote_success(
+ self,
+ mock_verify_tpt,
+ mock_gen_password,
+ mock_encrypt_file,
+ mock_transfer_file,
+ mock_decrypt_file,
+ mock_set_permissions,
+ mock_secure_delete,
+ mock_remote_secure_delete,
+ mock_execute_remote_command,
+ mock_ssh_hook,
+ ):
+ """Test successful transfer and execution of tdload on remote host"""
+ # Setup hook with SSH
+ hook = TptHook(ssh_conn_id="ssh_default")
+ hook.ssh_hook = MagicMock()
+
+ # Mock SSH client
+ mock_ssh_client = MagicMock()
+ hook.ssh_hook.get_conn.return_value.__enter__.return_value =
mock_ssh_client
+
+ # Mock execute_remote_command
+ mock_execute_remote_command.return_value = (0, "Job executed
successfully\n100 rows loaded", "")
+
+ # Mock password generation
+ mock_gen_password.return_value = "test_password"
+
+ # Execute the method
+ result = hook._transfer_to_and_execute_tdload_on_remote(
+ "/tmp/job_var_file.txt", "/remote/tmp", "-v -u", "test_job"
+ )
+
+ # Assertions
+ assert result == 0
+ mock_verify_tpt.assert_called_once_with(
+ mock_ssh_client, "tdload",
logging.getLogger("airflow.providers.teradata.hooks.tpt")
+ )
+ mock_gen_password.assert_called_once()
+ mock_encrypt_file.assert_called_once()
+ mock_transfer_file.assert_called_once()
+ mock_decrypt_file.assert_called_once()
+ mock_set_permissions.assert_called_once()
+ mock_execute_remote_command.assert_called_once()
+ mock_remote_secure_delete.assert_called_once()
+ mock_secure_delete.assert_called()
+
+ # Verify the command was constructed correctly
+ call_args = mock_execute_remote_command.call_args[0][1]
+ assert "tdload" in call_args
+ assert "-j" in call_args
+ assert "-v" in call_args
+ assert "-u" in call_args
+ assert "test_job" in call_args
+
+ @patch("airflow.providers.teradata.hooks.tpt.SSHHook")
+ @patch("airflow.providers.teradata.hooks.tpt.execute_remote_command")
+ @patch("airflow.providers.teradata.hooks.tpt.remote_secure_delete")
+ @patch("airflow.providers.teradata.hooks.tpt.secure_delete")
+ @patch("airflow.providers.teradata.hooks.tpt.set_remote_file_permissions")
+ @patch("airflow.providers.teradata.hooks.tpt.decrypt_remote_file")
+ @patch("airflow.providers.teradata.hooks.tpt.transfer_file_sftp")
+
@patch("airflow.providers.teradata.hooks.tpt.generate_encrypted_file_with_openssl")
+ @patch("airflow.providers.teradata.hooks.tpt.generate_random_password")
+
@patch("airflow.providers.teradata.hooks.tpt.verify_tpt_utility_on_remote_host")
+ def test_transfer_to_and_execute_tdload_on_remote_failure(
+ self,
+ mock_verify_tpt,
+ mock_gen_password,
+ mock_encrypt_file,
+ mock_transfer_file,
+ mock_decrypt_file,
+ mock_set_permissions,
+ mock_secure_delete,
+ mock_remote_secure_delete,
+ mock_execute_remote_command,
+ mock_ssh_hook,
+ ):
+ """Test failed transfer and execution of tdload on remote host"""
+ # Setup hook with SSH
+ hook = TptHook(ssh_conn_id="ssh_default")
+ hook.ssh_hook = MagicMock()
+
+ # Mock SSH client
+ mock_ssh_client = MagicMock()
+ hook.ssh_hook.get_conn.return_value.__enter__.return_value =
mock_ssh_client
+
+ # Mock execute_remote_command with failure
+ mock_execute_remote_command.return_value = (1, "Job failed",
"Connection error")
+
+ # Mock password generation
+ mock_gen_password.return_value = "test_password"
+
+ # Execute the method and expect failure
+ with pytest.raises(RuntimeError, match="tdload command failed with
exit code 1"):
+ hook._transfer_to_and_execute_tdload_on_remote(
+ "/tmp/job_var_file.txt", "/remote/tmp", "-v", "test_job"
+ )
+
+ # Verify cleanup was called even on failure
+ mock_remote_secure_delete.assert_called_once()
+ mock_secure_delete.assert_called()
+
+ @patch("airflow.providers.teradata.hooks.tpt.SSHHook")
+ @patch("airflow.providers.teradata.hooks.tpt.execute_remote_command")
+ @patch("airflow.providers.teradata.hooks.tpt.remote_secure_delete")
+ @patch("airflow.providers.teradata.hooks.tpt.secure_delete")
+ @patch("airflow.providers.teradata.hooks.tpt.set_remote_file_permissions")
+ @patch("airflow.providers.teradata.hooks.tpt.decrypt_remote_file")
+ @patch("airflow.providers.teradata.hooks.tpt.transfer_file_sftp")
+
@patch("airflow.providers.teradata.hooks.tpt.generate_encrypted_file_with_openssl")
+ @patch("airflow.providers.teradata.hooks.tpt.generate_random_password")
+
@patch("airflow.providers.teradata.hooks.tpt.verify_tpt_utility_on_remote_host")
+ def test_transfer_to_and_execute_tdload_on_remote_no_options(
+ self,
+ mock_verify_tpt,
+ mock_gen_password,
+ mock_encrypt_file,
+ mock_transfer_file,
+ mock_decrypt_file,
+ mock_set_permissions,
+ mock_secure_delete,
+ mock_remote_secure_delete,
+ mock_execute_remote_command,
+ mock_ssh_hook,
+ ):
+ """Test transfer and execution of tdload on remote host with no
options"""
+ # Setup hook with SSH
+ hook = TptHook(ssh_conn_id="ssh_default")
+ hook.ssh_hook = MagicMock()
+
+ # Mock SSH client
+ mock_ssh_client = MagicMock()
+ hook.ssh_hook.get_conn.return_value.__enter__.return_value =
mock_ssh_client
+
+ # Mock execute_remote_command
+ mock_execute_remote_command.return_value = (0, "Job executed
successfully", "")
+
+ # Mock password generation
+ mock_gen_password.return_value = "test_password"
+
+ # Execute the method with valid remote directory but no options
+ result = hook._transfer_to_and_execute_tdload_on_remote(
+ "/tmp/job_var_file.txt", "/remote/tmp", None, None
+ )
+
+ # Assertions
+ assert result == 0
+
+ # Verify the command was constructed correctly without options
+ call_args = mock_execute_remote_command.call_args[0][1]
+ assert "tdload" in call_args
+ assert "-j" in call_args
+ # Should not contain extra options
+ assert "-v" not in call_args
+ assert "-u" not in call_args
+
+ @patch("airflow.providers.teradata.hooks.tpt.SSHHook")
+ def test_transfer_to_and_execute_tdload_on_remote_no_ssh_hook(self,
mock_ssh_hook):
+ """Test transfer and execution when SSH hook is not initialized"""
+ hook = TptHook(ssh_conn_id="ssh_default")
+ hook.ssh_hook = None # Simulate uninitialized SSH hook
+
+ with pytest.raises(ConnectionError, match="SSH connection is not
established"):
+ hook._transfer_to_and_execute_tdload_on_remote(
+ "/tmp/job_var_file.txt", "/remote/tmp", "-v", "test_job"
+ )
+
+ @patch("airflow.providers.teradata.hooks.tpt.SSHHook")
+ @patch("airflow.providers.teradata.hooks.tpt.remote_secure_delete")
+ @patch("airflow.providers.teradata.hooks.tpt.secure_delete")
+ @patch("airflow.providers.teradata.hooks.tpt.set_remote_file_permissions")
+ @patch("airflow.providers.teradata.hooks.tpt.decrypt_remote_file")
+ @patch("airflow.providers.teradata.hooks.tpt.transfer_file_sftp")
+
@patch("airflow.providers.teradata.hooks.tpt.generate_encrypted_file_with_openssl")
+ @patch("airflow.providers.teradata.hooks.tpt.generate_random_password")
+ @patch(
+
"airflow.providers.teradata.hooks.tpt.verify_tpt_utility_on_remote_host",
+ side_effect=Exception("TPT utility not found"),
+ )
+ def test_transfer_to_and_execute_tdload_on_remote_utility_check_fail(
+ self,
+ mock_verify_tpt,
+ mock_gen_password,
+ mock_encrypt_file,
+ mock_transfer_file,
+ mock_decrypt_file,
+ mock_set_permissions,
+ mock_secure_delete,
+ mock_remote_secure_delete,
+ mock_ssh_hook,
+ ):
+ """Test transfer and execution when TPT utility verification fails"""
+ # Setup hook with SSH
+ hook = TptHook(ssh_conn_id="ssh_default")
+ hook.ssh_hook = MagicMock()
+
+ # Mock SSH client
+ mock_ssh_client = MagicMock()
+ hook.ssh_hook.get_conn.return_value.__enter__.return_value =
mock_ssh_client
+
+ # Execute the method and expect failure
+ with pytest.raises(
+ RuntimeError,
+ match="Unexpected error while executing tdload script on remote
machine",
+ ):
+ hook._transfer_to_and_execute_tdload_on_remote(
+ "/tmp/job_var_file.txt", "/remote/tmp", "-v", "test_job"
+ )
+
+ # Verify cleanup was called even on utility check failure
+ mock_secure_delete.assert_called()
+
+ def test_build_tdload_command_basic(self):
+ """Test building tdload command with basic parameters"""
+ hook = TptHook()
+ cmd = hook._build_tdload_command("/tmp/job.txt", None, "test_job")
+
+ assert cmd == ["tdload", "-j", "/tmp/job.txt", "test_job"]
+
+ def test_build_tdload_command_with_options(self):
+ """Test building tdload command with options"""
+ hook = TptHook()
+ cmd = hook._build_tdload_command("/tmp/job.txt", "-v -u", "test_job")
+
+ assert cmd == ["tdload", "-j", "/tmp/job.txt", "-v", "-u", "test_job"]
+
+ def test_build_tdload_command_with_quoted_options(self):
+ """Test building tdload command with quoted options"""
+ hook = TptHook()
+ cmd = hook._build_tdload_command("/tmp/job.txt", "-v --option 'value
with spaces'", "test_job")
+
+ assert cmd == ["tdload", "-j", "/tmp/job.txt", "-v", "--option",
"value with spaces", "test_job"]
+
+ def test_build_tdload_command_no_job_name(self):
+ """Test building tdload command without job name"""
+ hook = TptHook()
+ cmd = hook._build_tdload_command("/tmp/job.txt", "-v", None)
+
+ assert cmd == ["tdload", "-j", "/tmp/job.txt", "-v"]
+
+ def test_build_tdload_command_empty_job_name(self):
+ """Test building tdload command with empty job name"""
+ hook = TptHook()
+ cmd = hook._build_tdload_command("/tmp/job.txt", "-v", "")
+
+ assert cmd == ["tdload", "-j", "/tmp/job.txt", "-v"]
+
+ @patch("shlex.split", side_effect=ValueError("Invalid quote"))
+ def test_build_tdload_command_invalid_options(self, mock_shlex_split):
+ """Test building tdload command with invalid quoted options"""
+ hook = TptHook()
+ cmd = hook._build_tdload_command("/tmp/job.txt", "-v --option
'unclosed quote", "test_job")
+
+ # Should fallback to simple split
+ assert cmd == ["tdload", "-j", "/tmp/job.txt", "-v", "--option",
"'unclosed", "quote", "test_job"]
+
def test_on_kill(self):
"""Test on_kill method"""
hook = TptHook()
diff --git a/providers/teradata/tests/unit/teradata/operators/test_tpt.py
b/providers/teradata/tests/unit/teradata/operators/test_tpt.py
index 74fc27b2d34..7c4aa0bfe82 100644
--- a/providers/teradata/tests/unit/teradata/operators/test_tpt.py
+++ b/providers/teradata/tests/unit/teradata/operators/test_tpt.py
@@ -19,15 +19,17 @@ Tests for Teradata Parallel Transporter (TPT) operators.
These tests validate the functionality of:
- DdlOperator: For DDL operations on Teradata databases
+- TdLoadOperator: For data transfers between files and tables
"""
from __future__ import annotations
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock, Mock, patch
import pytest
-from airflow.providers.teradata.operators.tpt import DdlOperator
+from airflow.providers.teradata.hooks.tpt import TptHook
+from airflow.providers.teradata.operators.tpt import DdlOperator,
TdLoadOperator
@pytest.fixture(autouse=True)
@@ -330,3 +332,588 @@ class TestDdlOperator:
result = operator.execute({})
assert result == 0
mock_get_conn.assert_called()
+
+
+class TestTdLoadOperator:
+ """
+ Tests for TdLoadOperator.
+
+ This test suite validates the TdLoadOperator functionality across
different modes:
+ - file_to_table: Loading data from a file to a Teradata table
+ - table_to_file: Exporting data from a Teradata table to a file
+ - select_stmt_to_file: Exporting data from a SQL SELECT statement to a file
+ - table_to_table: Transferring data between two Teradata tables
+
+ It also tests parameter validation, error handling, templating, and
resource cleanup.
+ """
+
+ def setup_method(self, method):
+ # No MagicMock connections needed; use only string conn_ids in tests
+ pass
+
+ # ----- Tests for Basic Operation Modes -----
+
+ @patch(
+ "airflow.providers.teradata.hooks.ttu.TtuHook.get_conn",
+ return_value={"host": "mock_host", "login": "mock_user", "password":
"mock_pass"},
+ )
+ @patch(
+ "airflow.providers.teradata.operators.tpt.prepare_tdload_job_var_file",
+ return_value="dummy job var content",
+ )
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook.execute_tdload",
return_value=0)
+ def test_file_to_table_mode(self, mock_execute_tdload,
mock_prepare_job_var, mock_get_conn):
+ """Test loading data from a file to a Teradata table (with connection
and job var patching)"""
+ # Create operator
+ operator = TdLoadOperator(
+ task_id="test_file_to_table",
+ source_file_name="/path/to/data.csv",
+ target_table="target_db.target_table",
+ teradata_conn_id="teradata_default",
+ target_teradata_conn_id="teradata_target",
+ )
+
+ # Execute operator
+ result = operator.execute({})
+
+ # Assertions
+ assert result == 0
+ mock_execute_tdload.assert_called_once()
+ mock_prepare_job_var.assert_called_once()
+ mock_get_conn.assert_called()
+ # Verify that the operator initialized correctly
+ assert operator._src_hook is not None
+ assert operator._dest_hook is not None
+
+ @patch(
+ "airflow.providers.teradata.hooks.ttu.TtuHook.get_conn",
+ return_value={"host": "mock_host", "login": "mock_user", "password":
"mock_pass"},
+ )
+ @patch(
+ "airflow.providers.teradata.operators.tpt.prepare_tdload_job_var_file",
+ return_value="dummy job var content",
+ )
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook.execute_tdload",
return_value=0)
+ def test_file_to_table_with_default_target_conn(
+ self, mock_execute_tdload, mock_prepare_job_var, mock_get_conn
+ ):
+ """Test file to table loading with default target connection"""
+ operator = TdLoadOperator(
+ task_id="test_file_to_table_default_target",
+ source_file_name="/path/to/data.csv",
+ target_table="target_db.target_table",
+ teradata_conn_id="teradata_default",
+ # No target_teradata_conn_id - should default to teradata_conn_id
+ )
+
+ # Execute the operator
+ result = operator.execute({})
+
+ # Verify the results
+ assert result == 0
+ # Verify that target_teradata_conn_id was set to teradata_conn_id
+ assert operator.target_teradata_conn_id == "teradata_default"
+ # Verify that hooks were initialized
+ assert operator._src_hook is not None
+ assert operator._dest_hook is not None
+ mock_execute_tdload.assert_called_once()
+ mock_prepare_job_var.assert_called_once()
+ mock_get_conn.assert_called()
+
+ @patch(
+ "airflow.providers.teradata.hooks.ttu.TtuHook.get_conn",
+ return_value={"host": "mock_host", "login": "mock_user", "password":
"mock_pass"},
+ )
+ @patch(
+ "airflow.providers.teradata.operators.tpt.prepare_tdload_job_var_file",
+ return_value="dummy job var content",
+ )
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook.execute_tdload",
return_value=0)
+ def test_table_to_file_mode(self, mock_execute_tdload,
mock_prepare_job_var, mock_get_conn):
+ """Test exporting data from a Teradata table to a file"""
+ # Configure the operator
+ operator = TdLoadOperator(
+ task_id="test_table_to_file",
+ source_table="source_db.source_table",
+ target_file_name="/path/to/export.csv",
+ teradata_conn_id="teradata_default",
+ )
+
+ # Execute the operator
+ result = operator.execute({})
+
+ # Verify the results
+ assert result == 0
+ # Verify that hooks were initialized correctly
+ assert operator._src_hook is not None
+ assert operator._dest_hook is None # No destination hook for
table_to_file
+ mock_execute_tdload.assert_called_once()
+ mock_prepare_job_var.assert_called_once()
+ mock_get_conn.assert_called()
+
+ @patch(
+ "airflow.providers.teradata.hooks.ttu.TtuHook.get_conn",
+ return_value={"host": "mock_host", "login": "mock_user", "password":
"mock_pass"},
+ )
+ @patch(
+ "airflow.providers.teradata.operators.tpt.prepare_tdload_job_var_file",
+ return_value="dummy job var content",
+ )
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook.execute_tdload",
return_value=0)
+ def test_select_stmt_to_file_mode(self, mock_execute_tdload,
mock_prepare_job_var, mock_get_conn):
+ """Test exporting data from a SELECT statement to a file"""
+ # Configure the operator
+ operator = TdLoadOperator(
+ task_id="test_select_to_file",
+ select_stmt="SELECT * FROM source_db.source_table WHERE id > 1000",
+ target_file_name="/path/to/export.csv",
+ teradata_conn_id="teradata_default",
+ )
+
+ # Execute the operator
+ result = operator.execute({})
+
+ # Verify the results
+ assert result == 0
+ # Verify that hooks were initialized correctly
+ assert operator._src_hook is not None
+ assert operator._dest_hook is None # No destination hook for
select_to_file
+ mock_execute_tdload.assert_called_once()
+ mock_prepare_job_var.assert_called_once()
+ mock_get_conn.assert_called()
+
+ @patch(
+ "airflow.providers.teradata.hooks.ttu.TtuHook.get_conn",
+ return_value={"host": "mock_host", "login": "mock_user", "password":
"mock_pass"},
+ )
+ @patch(
+ "airflow.providers.teradata.operators.tpt.prepare_tdload_job_var_file",
+ return_value="dummy job var content",
+ )
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook.execute_tdload",
return_value=0)
+ def test_table_to_table_mode(self, mock_execute_tdload,
mock_prepare_job_var, mock_get_conn):
+ """Test transferring data between two Teradata tables"""
+ # Configure the operator
+ operator = TdLoadOperator(
+ task_id="test_table_to_table",
+ source_table="source_db.source_table",
+ target_table="target_db.target_table",
+ teradata_conn_id="teradata_default",
+ target_teradata_conn_id="teradata_target",
+ )
+
+ # Execute the operator
+ result = operator.execute({})
+
+ # Verify the results
+ assert result == 0
+ # Verify that both hooks were initialized
+ assert operator._src_hook is not None
+ assert operator._dest_hook is not None
+
+ # ----- Tests for Advanced Operation Modes -----
+
+ @patch(
+ "airflow.providers.teradata.hooks.ttu.TtuHook.get_conn",
+ return_value={"host": "mock_host", "login": "mock_user", "password":
"mock_pass"},
+ )
+ @patch(
+ "airflow.providers.teradata.operators.tpt.prepare_tdload_job_var_file",
+ return_value="dummy job var content",
+ )
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook.execute_tdload",
return_value=0)
+ def test_file_to_table_with_insert_stmt(self, mock_execute_tdload,
mock_prepare_job_var, mock_get_conn):
+ """Test loading from file to table with custom INSERT statement"""
+ # Configure the operator with custom INSERT statement
+ operator = TdLoadOperator(
+ task_id="test_file_to_table_with_insert",
+ source_file_name="/path/to/data.csv",
+ target_table="target_db.target_table",
+ insert_stmt="INSERT INTO target_db.target_table (col1, col2)
VALUES (?, ?)",
+ teradata_conn_id="teradata_default",
+ target_teradata_conn_id="teradata_target",
+ )
+
+ # Execute the operator
+ result = operator.execute({})
+
+ # Verify the results
+ assert result == 0
+ # Verify that both hooks were initialized for file_to_table mode
+ assert operator._src_hook is not None
+ assert operator._dest_hook is not None
+
+ @patch(
+ "airflow.providers.teradata.hooks.ttu.TtuHook.get_conn",
+ return_value={"host": "mock_host", "login": "mock_user", "password":
"mock_pass"},
+ )
+ @patch(
+ "airflow.providers.teradata.operators.tpt.prepare_tdload_job_var_file",
+ return_value="dummy job var content",
+ )
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook.execute_tdload",
return_value=0)
+ def test_table_to_table_with_select_and_insert(
+ self, mock_execute_tdload, mock_prepare_job_var, mock_get_conn
+ ):
+ """Test transferring data between tables with custom SELECT and INSERT
statements"""
+ # Configure the operator with custom SELECT and INSERT statements
+ operator = TdLoadOperator(
+ task_id="test_table_to_table_with_select_insert",
+ select_stmt="SELECT col1, col2 FROM source_db.source_table WHERE
col3 > 1000",
+ target_table="target_db.target_table",
+ insert_stmt="INSERT INTO target_db.target_table (col1, col2)
VALUES (?, ?)",
+ teradata_conn_id="teradata_default",
+ target_teradata_conn_id="teradata_target",
+ )
+
+ # Execute the operator
+ result = operator.execute({})
+
+ # Verify the results
+ assert result == 0
+ # Verify that both hooks were initialized for table_to_table mode
+ assert operator._src_hook is not None
+ assert operator._dest_hook is not None
+
+ # ----- Parameter Validation Tests -----
+
+ def test_invalid_parameter_combinations(self):
+ """Test validation of invalid parameter combinations"""
+ # Test 1: Missing both source and target parameters
+ with pytest.raises(ValueError, match="Invalid parameter combination"):
+ TdLoadOperator(
+ task_id="test_invalid_params",
+ teradata_conn_id="teradata_default",
+ ).execute({})
+
+ # Test 2: Missing target_teradata_conn_id for table_to_table mode
+ with pytest.raises(ValueError, match="target_teradata_conn_id must be
provided"):
+ TdLoadOperator(
+ task_id="test_missing_target_conn",
+ source_table="source_db.source_table",
+ target_table="target_db.target_table",
+ teradata_conn_id="teradata_default",
+ # Missing target_teradata_conn_id for table_to_table
+ ).execute({})
+
+ # Test 3: Both source_table and select_stmt provided (contradictory
sources)
+ with pytest.raises(
+ ValueError, match="Both source_table and select_stmt cannot be
provided simultaneously"
+ ):
+ TdLoadOperator(
+ task_id="test_both_source_and_select",
+ source_table="source_db.table",
+ select_stmt="SELECT * FROM other_db.table",
+ target_file_name="/path/to/export.csv",
+ teradata_conn_id="teradata_default",
+ ).execute({})
+
+ # Test 4: insert_stmt without target_table
+ with pytest.raises(ValueError, match="insert_stmt is provided but
target_table is not specified"):
+ TdLoadOperator(
+ task_id="test_insert_stmt_no_target",
+ source_file_name="/path/to/source.csv",
+ insert_stmt="INSERT INTO mytable VALUES (?, ?)",
+ teradata_conn_id="teradata_default",
+ ).execute({})
+
+ # Test 5: Only target_file_name provided (no source)
+ with pytest.raises(ValueError, match="Invalid parameter combination"):
+ TdLoadOperator(
+ task_id="test_no_source_with_target_file",
+ target_file_name="/path/to/file.csv",
+ teradata_conn_id="teradata_default",
+ ).execute({})
+
+ # Test 6: Only source_file_name provided (no target)
+ with pytest.raises(ValueError, match="Invalid parameter combination"):
+ TdLoadOperator(
+ task_id="test_source_file_no_target_table",
+ source_file_name="/path/to/source.csv",
+ teradata_conn_id="teradata_default",
+ ).execute({})
+
+ # ----- Error Handling Tests -----
+
+ @patch(
+ "airflow.providers.teradata.hooks.ttu.TtuHook.get_conn",
+ side_effect=RuntimeError("Connection not found"),
+ )
+ def test_error_handling_execute_tdload(self, mock_get_conn):
+ """Test error handling with invalid connection ID"""
+ operator = TdLoadOperator(
+ task_id="test_error_handling",
+ source_file_name="/path/to/data.csv",
+ target_table="target_db.target_table",
+ teradata_conn_id="nonexistent_connection",
+ target_teradata_conn_id="teradata_target",
+ )
+ with pytest.raises((RuntimeError, ValueError, KeyError)):
+ operator.execute({})
+
+ @patch(
+ "airflow.providers.teradata.hooks.ttu.TtuHook.get_conn",
+ side_effect=RuntimeError("Connection not found"),
+ )
+ def test_error_handling_get_conn(self, mock_get_conn):
+ """Test error handling with invalid target connection ID"""
+ operator = TdLoadOperator(
+ task_id="test_error_handling_conn",
+ source_file_name="/path/to/data.csv",
+ target_table="target_db.target_table",
+ teradata_conn_id="teradata_default",
+ target_teradata_conn_id="nonexistent_target_connection",
+ )
+ with pytest.raises((RuntimeError, ValueError, KeyError)):
+ operator.execute({})
+
+ # ----- Resource Cleanup Tests -----
+
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook")
+ @patch("airflow.providers.ssh.hooks.ssh.SSHHook")
+ def test_on_kill(self, mock_ssh_hook, mock_tpt_hook):
+ """Test on_kill method cleans up resources properly"""
+ # Set up operator
+ operator = TdLoadOperator(
+ task_id="test_on_kill",
+ source_table="source_db.source_table",
+ target_table="target_db.target_table",
+ teradata_conn_id="teradata_default",
+ target_teradata_conn_id="teradata_target",
+ )
+
+ # Set up hooks manually
+ operator._src_hook = MagicMock()
+ operator._dest_hook = MagicMock()
+
+ # Call on_kill
+ operator.on_kill()
+
+ # Verify hooks were cleaned up
+ operator._src_hook.on_kill.assert_called_once()
+ operator._dest_hook.on_kill.assert_called_once()
+
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook")
+ def test_on_kill_no_hooks(self, mock_tpt_hook):
+ """Test on_kill method when no hooks are initialized"""
+ # Set up operator
+ operator = TdLoadOperator(
+ task_id="test_on_kill_no_hooks",
+ source_table="source_db.source_table",
+ target_table="target_db.target_table",
+ teradata_conn_id="teradata_default",
+ target_teradata_conn_id="teradata_target",
+ )
+
+ # Set hooks to None
+ operator._src_hook = None
+ operator._dest_hook = None
+
+ # Call on_kill (should not raise any exceptions)
+ operator.on_kill()
+
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook")
+ @patch("airflow.providers.ssh.hooks.ssh.SSHHook")
+ def test_on_kill_with_only_src_hook(self, mock_ssh_hook, mock_tpt_hook):
+ """Test on_kill with only source hook initialized"""
+ # Set up operator
+ operator = TdLoadOperator(
+ task_id="test_on_kill_src_only",
+ source_table="source_db.source_table",
+ target_file_name="/path/to/export.csv", # table_to_file mode
+ teradata_conn_id="teradata_default",
+ )
+
+ # Set up only source hook
+ operator._src_hook = MagicMock()
+ operator._dest_hook = None
+
+ # Call on_kill
+ operator.on_kill()
+
+ # Verify source hook was cleaned up
+ operator._src_hook.on_kill.assert_called_once()
+
+ # ----- Job Variable File Tests -----
+
+ @patch("airflow.providers.teradata.operators.tpt.is_valid_file",
return_value=True)
+ @patch("airflow.providers.teradata.operators.tpt.read_file",
return_value="job var content")
+ def test_with_local_job_var_file(self, mock_read_file, mock_is_valid_file):
+ """Test using a local job variable file"""
+ # Configure operator with only job var file (no source/target
parameters needed)
+ operator = TdLoadOperator(
+ task_id="test_with_job_var_file",
+ tdload_job_var_file="/path/to/job_vars.txt",
+ teradata_conn_id="teradata_default",
+ )
+
+ # Execute
+ result = operator.execute({})
+
+ # Verify the execution was successful (returns 0 for success)
+ assert result == 0
+
+ @patch("airflow.providers.teradata.operators.tpt.is_valid_file",
return_value=True)
+ @patch("airflow.providers.teradata.operators.tpt.read_file",
return_value="job var content")
+ def test_with_local_job_var_file_and_options(self, mock_read_file,
mock_is_valid_file):
+ """Test using a local job variable file with additional tdload
options"""
+ # Set up mocks
+ with patch("airflow.providers.teradata.hooks.tpt.TptHook") as
mock_tpt_hook:
+ mock_tpt_hook_instance = mock_tpt_hook.return_value
+ mock_tpt_hook_instance._execute_tdload_locally.return_value = 0
+
+ with (
+ patch("airflow.providers.teradata.operators.tpt.is_valid_file",
return_value=True),
+ patch("airflow.providers.teradata.operators.tpt.read_file",
return_value="job var content"),
+ ):
+ # Configure operator with job var file and additional options
+ operator = TdLoadOperator(
+ task_id="test_with_job_var_file_and_options",
+ tdload_job_var_file="/path/to/job_vars.txt",
+ tdload_options="-v -u", # Add verbose and Unicode options
+ tdload_job_name="custom_job_name",
+ teradata_conn_id="teradata_default",
+ )
+
+ # Execute
+ result = operator.execute({})
+
+ # Verify the execution was successful (returns 0 for success)
+ assert result == 0
+
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook")
+ @patch("airflow.providers.ssh.hooks.ssh.SSHHook")
+ def test_with_invalid_local_job_var_file(self, mock_ssh_hook,
mock_tpt_hook):
+ """Test with invalid local job variable file path"""
+ # Set up mocks
+ with patch("airflow.providers.teradata.operators.tpt.is_valid_file",
return_value=False):
+ # Configure operator
+ operator = TdLoadOperator(
+ task_id="test_with_invalid_job_var_file",
+ tdload_job_var_file="/path/to/nonexistent_file.txt",
+ teradata_conn_id="teradata_default",
+ )
+
+ # Execute and check for exception
+ with pytest.raises(ValueError, match="is invalid or does not
exist"):
+ operator.execute({})
+
+ # ----- Specific subprocess mocking tests -----
+
+ @patch("airflow.providers.teradata.hooks.tpt.subprocess.Popen")
+ @patch("airflow.providers.teradata.hooks.tpt.shutil.which")
+ @patch("airflow.models.Connection")
+ def test_direct_tdload_execution_mocking(self, mock_conn, mock_which,
mock_popen):
+ """Test the direct execution of tdload with proper mocking"""
+ # Ensure the binary is found
+ mock_which.return_value = "/usr/bin/tdload"
+
+ # Mock the subprocess
+ mock_process = MagicMock()
+ mock_process.returncode = 0
+ mock_process.stdout = MagicMock()
+ mock_process.stdout.readline.side_effect = [
+ b"Starting TDLOAD...\n",
+ b"Processing data...\n",
+ b"1000 rows loaded successfully\n",
+ b"",
+ ]
+ mock_popen.return_value = mock_process
+
+ # Create the TPT hook directly
+ hook = TptHook(teradata_conn_id="teradata_default")
+
+ # Execute the command directly
+ result = hook._execute_tdload_locally(
+ job_var_content="DEFINE JOB sample_job;\nUSING OPERATOR
sel;\nSELECT * FROM source_table;\n",
+ tdload_options="-v",
+ tdload_job_name="sample_job",
+ )
+
+ # Verify the result
+ assert result == 0
+ mock_popen.assert_called_once()
+ cmd_args = mock_popen.call_args[0][0]
+ assert cmd_args[0] == "tdload"
+ assert "-j" in cmd_args
+ assert "-v" in cmd_args
+ assert "sample_job" in cmd_args
+
+ @patch.object(TdLoadOperator, "_src_hook", create=True)
+ @patch.object(TdLoadOperator, "_dest_hook", create=True)
+
@patch("airflow.providers.teradata.hooks.tpt.TptHook._execute_tdload_locally")
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook.__init__",
return_value=None)
+ @patch("airflow.models.Connection")
+ def test_execute_with_local_job_var_file_direct_patch(
+ self, mock_conn, mock_hook_init, mock_execute_local, mock_dest_hook,
mock_src_hook
+ ):
+ """Test TdLoadOperator with a local job var file using direct patching
(bteq style)"""
+ # Arrange
+ mock_execute_local.return_value = 0
+ operator = TdLoadOperator(
+ task_id="test_with_local_job_var_file_direct_patch",
+ tdload_job_var_file="/path/to/job_vars.txt",
+ teradata_conn_id="teradata_default",
+ )
+ # Manually set hooks since we bypassed __init__
+ operator._src_hook = mock_src_hook
+ operator._dest_hook = mock_dest_hook
+ operator._src_hook._execute_tdload_locally = mock_execute_local
+ # Patch file validation and reading
+ with (
+ patch("airflow.providers.teradata.operators.tpt.is_valid_file",
return_value=True),
+ patch("airflow.providers.teradata.operators.tpt.read_file",
return_value="job var content"),
+ ):
+ # Act
+ result = operator.execute({})
+ # Assert
+ mock_execute_local.assert_called_once_with("job var content", None,
None)
+ assert result == 0
+
+ @patch.object(TdLoadOperator, "_src_hook", create=True)
+ @patch.object(TdLoadOperator, "_dest_hook", create=True)
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook.__init__",
return_value=None)
+ @patch("airflow.models.Connection")
+ def test_on_kill_direct_patch(self, mock_conn, mock_hook_init,
mock_dest_hook, mock_src_hook):
+ """Test on_kill method with direct patching (bteq style)"""
+ operator = TdLoadOperator(
+ task_id="test_on_kill_direct_patch",
+ source_table="source_db.source_table",
+ target_table="target_db.target_table",
+ teradata_conn_id="teradata_default",
+ target_teradata_conn_id="teradata_target",
+ )
+ # Set up mocked attributes
+ operator._src_hook = mock_src_hook
+ operator._dest_hook = mock_dest_hook
+ # Ensure the mocked hooks have on_kill methods
+ mock_src_hook.on_kill = Mock()
+ mock_dest_hook.on_kill = Mock()
+
+ # Act
+ operator.on_kill()
+
+ # Assert
+ mock_src_hook.on_kill.assert_called_once()
+ mock_dest_hook.on_kill.assert_called_once()
+
+ @patch("airflow.providers.ssh.hooks.ssh.SSHHook")
+ @patch.object(TdLoadOperator, "_src_hook", create=True)
+ @patch.object(TdLoadOperator, "_dest_hook", create=True)
+ @patch("airflow.providers.teradata.hooks.tpt.TptHook.__init__",
return_value=None)
+ @patch("airflow.models.Connection")
+ def test_on_kill_no_hooks_direct_patch(
+ self, mock_conn, mock_hook_init, mock_dest_hook, mock_src_hook,
mock_ssh_hook
+ ):
+ """Test on_kill method when no hooks are initialized (bteq style)"""
+ operator = TdLoadOperator(
+ task_id="test_on_kill_no_hooks_direct_patch",
+ source_table="source_db.source_table",
+ target_table="target_db.target_table",
+ teradata_conn_id="teradata_default",
+ target_teradata_conn_id="teradata_target",
+ )
+ operator._src_hook = None
+ operator._dest_hook = None
+ # Act
+ operator.on_kill()
diff --git a/providers/teradata/tests/unit/teradata/utils/test_tpt_util.py
b/providers/teradata/tests/unit/teradata/utils/test_tpt_util.py
index 3bc986957b7..e1e2fe1437b 100644
--- a/providers/teradata/tests/unit/teradata/utils/test_tpt_util.py
+++ b/providers/teradata/tests/unit/teradata/utils/test_tpt_util.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import os
+import stat
import subprocess
import tempfile
from unittest.mock import Mock, patch
@@ -29,7 +30,11 @@ from airflow.providers.teradata.utils.tpt_util import (
execute_remote_command,
get_remote_os,
get_remote_temp_directory,
+ is_valid_file,
+ is_valid_remote_job_var_file,
+ prepare_tdload_job_var_file,
prepare_tpt_ddl_script,
+ read_file,
remote_secure_delete,
secure_delete,
set_local_file_permissions,
@@ -668,3 +673,260 @@ class TestTptUtil:
transfer_file_sftp(mock_ssh, tmp_file.name,
"/remote/path/file.txt", mock_logger)
mock_sftp.close.assert_called_once()
+
+ @patch("airflow.providers.teradata.utils.tpt_util.get_remote_os")
+ @patch("airflow.providers.teradata.utils.tpt_util.execute_remote_command")
+ def test_verify_tpt_utility_on_remote_host_not_found(self,
mock_execute_cmd, mock_get_remote_os):
+ """Test verify_tpt_utility_on_remote_host when utility is not found."""
+ mock_ssh = Mock()
+ mock_get_remote_os.return_value = "unix"
+ mock_execute_cmd.return_value = (1, "", "command not found")
+
+ with pytest.raises(FileNotFoundError, match="TPT utility 'tdload' is
not installed"):
+ verify_tpt_utility_on_remote_host(mock_ssh, "tdload")
+
+ def test_is_valid_file_true(self):
+ """Test is_valid_file returns True for existing file."""
+ with tempfile.NamedTemporaryFile() as tmp_file:
+ assert is_valid_file(tmp_file.name) is True
+
+ def test_is_valid_file_false(self):
+ """Test is_valid_file returns False for non-existing file."""
+ assert is_valid_file("/nonexistent/file") is False
+
+ def test_is_valid_file_directory(self):
+ """Test is_valid_file returns False for directory."""
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ assert is_valid_file(tmp_dir) is False
+
+ @patch("shutil.which")
+ def test_verify_tpt_utility_installed_not_found(self, mock_which):
+ """Test verify_tpt_utility_installed when utility is not found."""
+ mock_which.return_value = None
+
+ with pytest.raises(FileNotFoundError, match="TPT utility 'tdload' is
not installed"):
+ verify_tpt_utility_installed("tdload")
+
+ def test_prepare_tdload_job_var_file_file_to_table(self):
+ """Test prepare_tdload_job_var_file for file_to_table mode."""
+ source_conn = {"host": "source_host", "login": "source_user",
"password": "source_pass"}
+
+ result = prepare_tdload_job_var_file(
+ mode="file_to_table",
+ source_table=None,
+ select_stmt=None,
+ insert_stmt="INSERT INTO target_table SELECT * FROM temp",
+ target_table="target_table",
+ source_file_name="/path/to/source.txt",
+ target_file_name=None,
+ source_format="TEXT",
+ target_format="",
+ source_text_delimiter="|",
+ target_text_delimiter="",
+ source_conn=source_conn,
+ )
+
+ assert "TargetTdpId='source_host'" in result
+ assert "TargetUserName='source_user'" in result
+ assert "TargetUserPassword='source_pass'" in result
+ assert "TargetTable='target_table'" in result
+ assert "SourceFileName='/path/to/source.txt'" in result
+ assert "InsertStmt='INSERT INTO target_table SELECT * FROM temp'" in
result
+ assert "SourceFormat='TEXT'" in result
+ assert "SourceTextDelimiter='|'" in result
+
+ def test_prepare_tdload_job_var_file_table_to_file(self):
+ """Test prepare_tdload_job_var_file for table_to_file mode."""
+ source_conn = {"host": "source_host", "login": "source_user",
"password": "source_pass"}
+
+ result = prepare_tdload_job_var_file(
+ mode="table_to_file",
+ source_table="source_table",
+ select_stmt=None,
+ insert_stmt=None,
+ target_table=None,
+ source_file_name=None,
+ target_file_name="/path/to/target.txt",
+ source_format="",
+ target_format="TEXT",
+ source_text_delimiter="",
+ target_text_delimiter=",",
+ source_conn=source_conn,
+ )
+
+ assert "SourceTdpId='source_host'" in result
+ assert "SourceUserName='source_user'" in result
+ assert "SourceUserPassword='source_pass'" in result
+ assert "SourceTable='source_table'" in result
+ assert "TargetFileName='/path/to/target.txt'" in result
+ assert "TargetFormat='TEXT'" in result
+ assert "TargetTextDelimiter=','" in result
+
+ def test_prepare_tdload_job_var_file_table_to_file_with_select(self):
+ """Test prepare_tdload_job_var_file for table_to_file mode with SELECT
statement."""
+ source_conn = {"host": "source_host", "login": "source_user",
"password": "source_pass"}
+
+ result = prepare_tdload_job_var_file(
+ mode="table_to_file",
+ source_table=None,
+ select_stmt="SELECT * FROM source_table WHERE id > 100",
+ insert_stmt=None,
+ target_table=None,
+ source_file_name=None,
+ target_file_name="/path/to/target.txt",
+ source_format="",
+ target_format="TEXT",
+ source_text_delimiter="",
+ target_text_delimiter=",",
+ source_conn=source_conn,
+ )
+
+ assert "SourceSelectStmt='SELECT * FROM source_table WHERE id > 100'"
in result
+ assert "SourceTable=" not in result
+
+ def test_prepare_tdload_job_var_file_table_to_table(self):
+ """Test prepare_tdload_job_var_file for table_to_table mode."""
+ source_conn = {"host": "source_host", "login": "source_user",
"password": "source_pass"}
+ target_conn = {"host": "target_host", "login": "target_user",
"password": "target_pass"}
+
+ result = prepare_tdload_job_var_file(
+ mode="table_to_table",
+ source_table="source_table",
+ select_stmt=None,
+ insert_stmt="INSERT INTO target_table SELECT * FROM source",
+ target_table="target_table",
+ source_file_name=None,
+ target_file_name=None,
+ source_format="",
+ target_format="",
+ source_text_delimiter="",
+ target_text_delimiter="",
+ source_conn=source_conn,
+ target_conn=target_conn,
+ )
+
+ assert "SourceTdpId='source_host'" in result
+ assert "TargetTdpId='target_host'" in result
+ assert "TargetUserName='target_user'" in result
+ assert "TargetUserPassword='target_pass'" in result
+ assert "SourceTable='source_table'" in result
+ assert "TargetTable='target_table'" in result
+ assert "InsertStmt='INSERT INTO target_table SELECT * FROM source'" in
result
+
+ def test_prepare_tdload_job_var_file_table_to_table_no_target_conn(self):
+ """Test prepare_tdload_job_var_file for table_to_table mode without
target_conn."""
+ source_conn = {"host": "source_host", "login": "source_user",
"password": "source_pass"}
+
+ with pytest.raises(ValueError, match="target_conn must be provided for
'table_to_table' mode"):
+ prepare_tdload_job_var_file(
+ mode="table_to_table",
+ source_table="source_table",
+ select_stmt=None,
+ insert_stmt=None,
+ target_table="target_table",
+ source_file_name=None,
+ target_file_name=None,
+ source_format="",
+ target_format="",
+ source_text_delimiter="",
+ target_text_delimiter="",
+ source_conn=source_conn,
+ target_conn=None,
+ )
+
+ def test_is_valid_remote_job_var_file_success(self):
+ """Test is_valid_remote_job_var_file with valid file."""
+ mock_ssh = Mock()
+ mock_sftp = Mock()
+ mock_file_stat = Mock()
+ mock_file_stat.st_mode = stat.S_IFREG | 0o644 # Regular file
+
+ mock_sftp.stat.return_value = mock_file_stat
+ mock_ssh.open_sftp.return_value = mock_sftp
+
+ result = is_valid_remote_job_var_file(mock_ssh, "/remote/path/job.var")
+
+ assert result is True
+ mock_ssh.open_sftp.assert_called_once()
+ mock_sftp.stat.assert_called_once_with("/remote/path/job.var")
+ mock_sftp.close.assert_called_once()
+
+ def test_is_valid_remote_job_var_file_not_regular_file(self):
+ """Test is_valid_remote_job_var_file with directory."""
+ mock_ssh = Mock()
+ mock_sftp = Mock()
+ mock_file_stat = Mock()
+ mock_file_stat.st_mode = stat.S_IFDIR | 0o755 # Directory
+
+ mock_sftp.stat.return_value = mock_file_stat
+ mock_ssh.open_sftp.return_value = mock_sftp
+
+ result = is_valid_remote_job_var_file(mock_ssh,
"/remote/path/directory")
+
+ assert result is False
+
+ def test_is_valid_remote_job_var_file_not_found(self):
+ """Test is_valid_remote_job_var_file with non-existent file."""
+ mock_ssh = Mock()
+ mock_sftp = Mock()
+ mock_sftp.stat.side_effect = FileNotFoundError("File not found")
+ mock_ssh.open_sftp.return_value = mock_sftp
+ mock_logger = Mock()
+
+ result = is_valid_remote_job_var_file(mock_ssh,
"/remote/path/nonexistent", mock_logger)
+
+ assert result is False
+ mock_logger.error.assert_called_with(
+ "File does not exist on remote at : %s", "/remote/path/nonexistent"
+ )
+ mock_sftp.close.assert_called_once()
+
+ def test_is_valid_remote_job_var_file_empty_path(self):
+ """Test is_valid_remote_job_var_file with empty path."""
+ mock_ssh = Mock()
+
+ result = is_valid_remote_job_var_file(mock_ssh, "")
+
+ assert result is False
+ mock_ssh.open_sftp.assert_not_called()
+
+ def test_is_valid_remote_job_var_file_none_path(self):
+ """Test is_valid_remote_job_var_file with None path."""
+ mock_ssh = Mock()
+
+ result = is_valid_remote_job_var_file(mock_ssh, None)
+
+ assert result is False
+
+ def test_read_file_success(self):
+ """Test read_file with existing file."""
+ test_content = "Test file content\nLine 2\nLine 3"
+
+ with tempfile.NamedTemporaryFile(mode="w", delete=False,
encoding="utf-8") as tmp_file:
+ tmp_file.write(test_content)
+ tmp_file_path = tmp_file.name
+
+ try:
+ result = read_file(tmp_file_path)
+ assert result == test_content
+ finally:
+ os.unlink(tmp_file_path)
+
+ def test_read_file_with_encoding(self):
+ """Test read_file with specific encoding."""
+ test_content = "Test content with special chars: ñáéíóú"
+
+ with tempfile.NamedTemporaryFile(mode="w", delete=False,
encoding="latin-1") as tmp_file:
+ tmp_file.write(test_content)
+ tmp_file_path = tmp_file.name
+
+ try:
+ result = read_file(tmp_file_path, encoding="latin-1")
+ assert result == test_content
+ finally:
+ os.unlink(tmp_file_path)
+
+ def test_read_file_not_found(self):
+ """Test read_file with non-existent file."""
+ with pytest.raises(FileNotFoundError, match="The file
/nonexistent/file does not exist"):
+ read_file("/nonexistent/file")