jieyao-MilestoneHub commented on code in PR #47478:
URL: https://github.com/apache/airflow/pull/47478#discussion_r1990482832


##########
providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py:
##########
@@ -31,29 +32,165 @@
     from airflow.utils.context import Context
 
 
+class ExasolExportOperator(BaseOperator):
+    """
+    Export data from Exasol database to a local temporary file.
+
+    :param query_or_table: The SQL statement to execute or table name to 
export.
+    :param export_params: Extra parameters for the underlying export_to_file 
method of Exasol.
+    :param query_params: Query parameters for the underlying export_to_file 
method.
+    :param exasol_conn_id: Reference to the Exasol connection.
+    """
+
+    template_fields: Sequence[str] = ("query_or_table", "export_params", 
"query_params")
+    template_ext: Sequence[str] = (".sql",)
+    ui_color = "#ededed"
+
+    def __init__(
+        self,
+        *,
+        query_or_table: str,
+        export_params: Optional[dict] = None,
+        query_params: Optional[dict] = None,
+        exasol_conn_id: str = "exasol_default",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.query_or_table = query_or_table
+        self.export_params = export_params
+        self.query_params = query_params
+        self.exasol_conn_id = exasol_conn_id
+
+    def execute(self, context: Context) -> str:
+        exasol_hook = ExasolHook(exasol_conn_id=self.exasol_conn_id)
+        file_path: Optional[str] = None
+
+        try:
+            # 使用 NamedTemporaryFile 並設定 delete=False,確保檔案可供後續上傳使用
+            with NamedTemporaryFile("w+", delete=False) as tmp_file:
+                file_path = tmp_file.name
+                exasol_hook.export_to_file(
+                    filename=file_path,
+                    query_or_table=self.query_or_table,
+                    export_params=self.export_params,
+                    query_params=self.query_params,
+                )
+                tmp_file.flush()
+                self.log.info("Data successfully exported to temporary file: 
%s", file_path)
+            return file_path
+        except Exception as e:
+            self.log.error("Error during export from Exasol: %s", e)
+            # 若發生錯誤則嘗試刪除已建立的臨時檔案
+            if file_path and os.path.exists(file_path):
+                try:
+                    os.remove(file_path)
+                    self.log.info("Temporary file %s removed after error.", 
file_path)
+                except Exception as cleanup_error:
+                    self.log.warning("Failed to remove temporary file %s: %s", 
file_path, cleanup_error)
+            raise
+
+
+class S3UploadOperator(BaseOperator):

Review Comment:
   Replied in the previous comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to