snreddygopu commented on code in PR #58918:
URL: https://github.com/apache/airflow/pull/58918#discussion_r2583550127


##########
providers/teradata/src/airflow/providers/teradata/hooks/tpt.py:
##########
@@ -240,6 +241,244 @@ def _execute_tbuild_locally(
                 secure_delete(local_script_file, logging.getLogger(__name__))
                 terminate_subprocess(sp, logging.getLogger(__name__))
 
+    def execute_tdload(
+        self,
+        remote_working_dir: str,
+        job_var_content: str | None = None,
+        tdload_options: str | None = None,
+        tdload_job_name: str | None = None,
+    ) -> int:
+        """
+        Execute a tdload operation using the tdload command-line utility.
+
+        Args:
+            remote_working_dir: Remote working directory for SSH execution
+            job_var_content: Content of the job variable file
+            tdload_options: Additional command-line options for tdload
+            tdload_job_name: Name for the tdload job
+
+        Returns:
+            Exit code from the tdload operation
+
+        Raises:
+            RuntimeError: Non-zero tdload exit status or unexpected execution 
failure
+            ConnectionError: SSH connection not established or fails
+            TimeoutError: SSH connection/network timeout
+            FileNotFoundError: tdload binary not found in PATH
+        """
+        tdload_job_name = tdload_job_name or f"tdload_job_{uuid.uuid4().hex}"
+        if self.ssh_hook:
+            self.log.info("Executing tdload via SSH on remote host with job 
name: %s", tdload_job_name)
+            return self._execute_tdload_via_ssh(
+                remote_working_dir, job_var_content, tdload_options, 
tdload_job_name
+            )
+        self.log.info("Executing tdload locally with job name: %s", 
tdload_job_name)
+        return self._execute_tdload_locally(job_var_content, tdload_options, 
tdload_job_name)
+
+    def _execute_tdload_via_ssh(
+        self,
+        remote_working_dir: str,
+        job_var_content: str | None,
+        tdload_options: str | None,
+        tdload_job_name: str | None,
+    ) -> int:
+        """
+        Write job_var_content to a temporary file, then transfer and execute 
it on the remote host.
+
+        Args:
+            remote_working_dir: Remote working directory
+            job_var_content: Content for the job variable file
+            tdload_options: Additional tdload command options
+            tdload_job_name: Name for the tdload job
+
+        Returns:
+            Exit code from the tdload operation
+        """
+        with self.preferred_temp_directory() as tmp_dir:
+            local_job_var_file = os.path.join(tmp_dir, 
f"tdload_job_var_{uuid.uuid4().hex}.txt")
+            write_file(local_job_var_file, job_var_content or "")
+            return self._transfer_to_and_execute_tdload_on_remote(
+                local_job_var_file, remote_working_dir, tdload_options, 
tdload_job_name
+            )
+
+    def _transfer_to_and_execute_tdload_on_remote(
+        self,
+        local_job_var_file: str,
+        remote_working_dir: str,
+        tdload_options: str | None,
+        tdload_job_name: str | None,
+    ) -> int:
+        """Transfer job variable file to remote host and execute tdload 
command."""
+        encrypted_file_path = f"{local_job_var_file}.enc"
+        remote_encrypted_job_file = os.path.join(remote_working_dir, 
os.path.basename(encrypted_file_path))
+        remote_job_file = os.path.join(remote_working_dir, 
os.path.basename(local_job_var_file))
+
+        try:
+            if not self.ssh_hook:
+                raise ConnectionError("SSH connection is not established. 
`ssh_hook` is None or invalid.")
+            with self.ssh_hook.get_conn() as ssh_client:
+                verify_tpt_utility_on_remote_host(ssh_client, "tdload", 
logging.getLogger(__name__))
+                password = generate_random_password()
+                generate_encrypted_file_with_openssl(local_job_var_file, 
password, encrypted_file_path)
+                transfer_file_sftp(
+                    ssh_client, encrypted_file_path, 
remote_encrypted_job_file, logging.getLogger(__name__)
+                )
+                decrypt_remote_file(
+                    ssh_client,
+                    remote_encrypted_job_file,
+                    remote_job_file,
+                    password,
+                    logging.getLogger(__name__),
+                )
+
+                set_remote_file_permissions(ssh_client, remote_job_file, 
logging.getLogger(__name__))
+
+                # Build tdload command more robustly
+                tdload_cmd = self._build_tdload_command(remote_job_file, 
tdload_options, tdload_job_name)
+
+                self.log.info("=" * 80)
+                self.log.info("Executing tdload command on remote server: %s", 
" ".join(tdload_cmd))
+                self.log.info("=" * 80)

Review Comment:
   I added the `self.log.info("=" * 80)`  lines intentionally to provide visual 
separation in the logs, making it easier for users to quickly identify the 
actual `tdload` command that is going to be executed. If you prefer a cleaner 
log output, I can remove them. Kindly let me know your preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to