vincbeck commented on code in PR #47478:
URL: https://github.com/apache/airflow/pull/47478#discussion_r1989288834


##########
airflow/example_dags/example_exasol_to_s3.py:
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG to test Exasol connection in Apache Airflow.
+
+This DAG uses the ExasolToS3Operator to execute a simple SQL query on Exasol
+and simulate an S3 upload. The main purpose is to verify that the Exasol 
connection
+settings (exasol_default) are working correctly. S3 connection is simulated 
via dummy
+parameters (aws_default, dummy bucket and key).

Review Comment:
   An example DAG is a DAG that should work out of the box (if you have valid 
credentials of course). I would rephrase this comment and just mention what 
this DAG is doing. This DAG should work for someone who has exasol and AWS 
permissions. 



##########
airflow/example_dags/example_exasol_to_s3.py:
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG to test Exasol connection in Apache Airflow.
+
+This DAG uses the ExasolToS3Operator to execute a simple SQL query on Exasol
+and simulate an S3 upload. The main purpose is to verify that the Exasol 
connection
+settings (exasol_default) are working correctly. S3 connection is simulated 
via dummy
+parameters (aws_default, dummy bucket and key).
+"""
+
+from datetime import datetime
+from airflow import DAG
+from airflow.providers.amazon.aws.transfers.exasol_to_s3 import 
ExasolExportOperator
+
+default_args = {
+    "owner": "airflow",
+    "depends_on_past": False,
+}
+
+with DAG(
+    dag_id="example_exasol_to_s3",
+    default_args=default_args,
+    start_date=datetime(2025, 3, 11),
+    schedule=None,
+    catchup=False,
+    tags=["exasol", "test"],
+) as dag:
+
+    test_exasol_connection = ExasolExportOperator(

Review Comment:
   `test_exasol_connection` does not describe well what this operator does



##########
providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py:
##########
@@ -86,27 +223,28 @@ def __init__(
         self.exasol_conn_id = exasol_conn_id
         self.aws_conn_id = aws_conn_id
 
-    def execute(self, context: Context):
-        exasol_hook = ExasolHook(exasol_conn_id=self.exasol_conn_id)
-        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
+    def execute(self, context: Context) -> str:

Review Comment:
   Why modifying the implementation? Did you notice anything wrong with the 
previous implementation?



##########
providers/amazon/docs/connections/exasol.rst:
##########
@@ -0,0 +1,174 @@
+.. _exasol_airflow_setup:

Review Comment:
   Please do not duplicate documentation. Plus, this documentation is related 
to Exasol, it has nothing to do with Amazon. As I mentioned in my previous 
comment, this should go in Exasol documentation



##########
providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py:
##########
@@ -31,29 +32,165 @@
     from airflow.utils.context import Context
 
 
+class ExasolExportOperator(BaseOperator):

Review Comment:
   We already have `ExasolToS3Operator`, are you sure we need this operator as 
well?



##########
providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py:
##########
@@ -31,29 +32,165 @@
     from airflow.utils.context import Context
 
 
+class ExasolExportOperator(BaseOperator):
+    """
+    Export data from Exasol database to a local temporary file.
+
+    :param query_or_table: The SQL statement to execute or table name to 
export.
+    :param export_params: Extra parameters for the underlying export_to_file 
method of Exasol.
+    :param query_params: Query parameters for the underlying export_to_file 
method.
+    :param exasol_conn_id: Reference to the Exasol connection.
+    """
+
+    template_fields: Sequence[str] = ("query_or_table", "export_params", 
"query_params")
+    template_ext: Sequence[str] = (".sql",)
+    ui_color = "#ededed"
+
+    def __init__(
+        self,
+        *,
+        query_or_table: str,
+        export_params: Optional[dict] = None,
+        query_params: Optional[dict] = None,
+        exasol_conn_id: str = "exasol_default",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.query_or_table = query_or_table
+        self.export_params = export_params
+        self.query_params = query_params
+        self.exasol_conn_id = exasol_conn_id
+
+    def execute(self, context: Context) -> str:
+        exasol_hook = ExasolHook(exasol_conn_id=self.exasol_conn_id)
+        file_path: Optional[str] = None
+
+        try:
+            # 使用 NamedTemporaryFile 並設定 delete=False,確保檔案可供後續上傳使用
+            with NamedTemporaryFile("w+", delete=False) as tmp_file:
+                file_path = tmp_file.name
+                exasol_hook.export_to_file(
+                    filename=file_path,
+                    query_or_table=self.query_or_table,
+                    export_params=self.export_params,
+                    query_params=self.query_params,
+                )
+                tmp_file.flush()
+                self.log.info("Data successfully exported to temporary file: 
%s", file_path)
+            return file_path
+        except Exception as e:
+            self.log.error("Error during export from Exasol: %s", e)
+            # 若發生錯誤則嘗試刪除已建立的臨時檔案
+            if file_path and os.path.exists(file_path):
+                try:
+                    os.remove(file_path)
+                    self.log.info("Temporary file %s removed after error.", 
file_path)
+                except Exception as cleanup_error:
+                    self.log.warning("Failed to remove temporary file %s: %s", 
file_path, cleanup_error)
+            raise
+
+
+class S3UploadOperator(BaseOperator):

Review Comment:
   `S3CreateObjectOperator` already exists



##########
airflow/example_dags/exasol_connection.rst:
##########
@@ -0,0 +1,152 @@
+.. Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   There should no doc in `example_dags` folder. This kind of documentation 
should go in `providers/exasol/docs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to