vincbeck commented on code in PR #47478: URL: https://github.com/apache/airflow/pull/47478#discussion_r1989288834
########## airflow/example_dags/example_exasol_to_s3.py: ########## @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example DAG to test Exasol connection in Apache Airflow. + +This DAG uses the ExasolToS3Operator to execute a simple SQL query on Exasol +and simulate an S3 upload. The main purpose is to verify that the Exasol connection +settings (exasol_default) are working correctly. S3 connection is simulated via dummy +parameters (aws_default, dummy bucket and key). Review Comment: An example DAG is a DAG that should work out of the box (if you have valid credentials of course). I would rephrase this comment and just mention what this DAG is doing. This DAG should work for someone who has exasol and AWS permissions. ########## airflow/example_dags/example_exasol_to_s3.py: ########## @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example DAG to test Exasol connection in Apache Airflow. + +This DAG uses the ExasolToS3Operator to execute a simple SQL query on Exasol +and simulate an S3 upload. The main purpose is to verify that the Exasol connection +settings (exasol_default) are working correctly. S3 connection is simulated via dummy +parameters (aws_default, dummy bucket and key). +""" + +from datetime import datetime +from airflow import DAG +from airflow.providers.amazon.aws.transfers.exasol_to_s3 import ExasolExportOperator + +default_args = { + "owner": "airflow", + "depends_on_past": False, +} + +with DAG( + dag_id="example_exasol_to_s3", + default_args=default_args, + start_date=datetime(2025, 3, 11), + schedule=None, + catchup=False, + tags=["exasol", "test"], +) as dag: + + test_exasol_connection = ExasolExportOperator( Review Comment: `test_exasol_connection` does not describe well what this operator does ########## providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py: ########## @@ -86,27 +223,28 @@ def __init__( self.exasol_conn_id = exasol_conn_id self.aws_conn_id = aws_conn_id - def execute(self, context: Context): - exasol_hook = ExasolHook(exasol_conn_id=self.exasol_conn_id) - s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + def execute(self, context: Context) -> str: Review Comment: Why modifying the implementation? Did you notice anything wrong with the previous implementation? ########## providers/amazon/docs/connections/exasol.rst: ########## @@ -0,0 +1,174 @@ +.. _exasol_airflow_setup: Review Comment: Please do not duplicate documentation. Plus, this documentation is related to Exasol, it has nothing to do with Amazon. As I mentioned in my previous comment, this should go in Exasol documentation ########## providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py: ########## @@ -31,29 +32,165 @@ from airflow.utils.context import Context +class ExasolExportOperator(BaseOperator): Review Comment: We already have `ExasolToS3Operator`, are you sure we need this operator as well? ########## providers/amazon/src/airflow/providers/amazon/aws/transfers/exasol_to_s3.py: ########## @@ -31,29 +32,165 @@ from airflow.utils.context import Context +class ExasolExportOperator(BaseOperator): + """ + Export data from Exasol database to a local temporary file. + + :param query_or_table: The SQL statement to execute or table name to export. + :param export_params: Extra parameters for the underlying export_to_file method of Exasol. + :param query_params: Query parameters for the underlying export_to_file method. + :param exasol_conn_id: Reference to the Exasol connection. + """ + + template_fields: Sequence[str] = ("query_or_table", "export_params", "query_params") + template_ext: Sequence[str] = (".sql",) + ui_color = "#ededed" + + def __init__( + self, + *, + query_or_table: str, + export_params: Optional[dict] = None, + query_params: Optional[dict] = None, + exasol_conn_id: str = "exasol_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.query_or_table = query_or_table + self.export_params = export_params + self.query_params = query_params + self.exasol_conn_id = exasol_conn_id + + def execute(self, context: Context) -> str: + exasol_hook = ExasolHook(exasol_conn_id=self.exasol_conn_id) + file_path: Optional[str] = None + + try: + # 使用 NamedTemporaryFile 並設定 delete=False,確保檔案可供後續上傳使用 + with NamedTemporaryFile("w+", delete=False) as tmp_file: + file_path = tmp_file.name + exasol_hook.export_to_file( + filename=file_path, + query_or_table=self.query_or_table, + export_params=self.export_params, + query_params=self.query_params, + ) + tmp_file.flush() + self.log.info("Data successfully exported to temporary file: %s", file_path) + return file_path + except Exception as e: + self.log.error("Error during export from Exasol: %s", e) + # 若發生錯誤則嘗試刪除已建立的臨時檔案 + if file_path and os.path.exists(file_path): + try: + os.remove(file_path) + self.log.info("Temporary file %s removed after error.", file_path) + except Exception as cleanup_error: + self.log.warning("Failed to remove temporary file %s: %s", file_path, cleanup_error) + raise + + +class S3UploadOperator(BaseOperator): Review Comment: `S3CreateObjectOperator` already exists ########## airflow/example_dags/exasol_connection.rst: ########## @@ -0,0 +1,152 @@ +.. Licensed to the Apache Software Foundation (ASF) under one Review Comment: There should no doc in `example_dags` folder. This kind of documentation should go in `providers/exasol/docs` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
