vincbeck commented on code in PR #28964: URL: https://github.com/apache/airflow/pull/28964#discussion_r1071347427
########## airflow/providers/amazon/aws/transfers/s3_to_sql.py: ########## @@ -0,0 +1,158 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.common.sql.hooks.sql import DbApiHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + +from typing_extensions import Literal + +try: + import csv as csv +except ImportError as e: + from airflow.exceptions import AirflowOptionalProviderFeatureException + + raise AirflowOptionalProviderFeatureException from e + + +class S3ToSqlOperator(BaseOperator): + """ + Loads Data from S3 into a SQL Database. + Data should be readable as CSV. + + This operator downloads a file from an S3, reads it via `csv.reader` + and inserts the data into a SQL database using `insert_rows` method. + All SQL hooks are supported, as long as it is of type DbApiHook + + Extra arguments can be passed to it by using csv_reader_kwargs parameter. + (e.g. Use different quoting or delimiters) + Here you will find a list of all kwargs + https://docs.python.org/3/library/csv.html#csv.reader + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:S3ToSqlOperator` + + :param schema: reference to a specific schema in SQL database + :param table: reference to a specific table in SQL database + :param s3_bucket: reference to a specific S3 bucket + :param s3_key: reference to a specific S3 key + :param sql_conn_id: reference to a specific SQL database. Must be of type DBApiHook + :param aws_conn_id: reference to a specific S3 / AWS connection + :param column_list: list of column names. + Set to `infer` if column names should be read from first line of CSV file (default) + :param skip_first_line: If first line of CSV file should be skipped. + If `column_list` is set to 'infer', this is ignored + :param commit_every: The maximum number of rows to insert in one + transaction. Set to `0` to insert all rows in one transaction. + :param csv_reader_kwargs: key word arguments to pass to csv.reader(). + This lets you control how the CSV is read. + e.g. To use a different delimiter, pass the following dict: + {'delimiter' : ';'} + """ + + template_fields: Sequence[str] = ( + "s3_bucket", + "s3_key", + "schema", + "table", + "column_list", + "sql_conn_id", + ) + template_ext: Sequence[str] = () + ui_color = "#f4a460" + + def __init__( + self, + *, + s3_key: str, + s3_bucket: str, + table: str, + column_list: Literal["infer"] | list[str] | None = "infer", + commit_every: int = 1000, + schema: str | None = None, + skip_first_row: bool = False, + sql_conn_id: str = "sql_default", + aws_conn_id: str = "aws_default", + csv_reader_kwargs: dict[str, Any] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.s3_bucket = s3_bucket + self.s3_key = s3_key + self.table = table + self.schema = schema + self.aws_conn_id = aws_conn_id + self.sql_conn_id = sql_conn_id + self.column_list = column_list + self.commit_every = commit_every + self.skip_first_row = skip_first_row + if csv_reader_kwargs: + self.csv_reader_kwargs = csv_reader_kwargs + else: + self.csv_reader_kwargs = {} + + def execute(self, context: Context) -> None: + + self.log.info("Loading %s to SQL table %s...", self.s3_key, self.table) + + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + self._file = s3_hook.download_file(key=self.s3_key, bucket_name=self.s3_bucket) + + hook = self._get_hook() Review Comment: This operator uses 2 different hook, would be better to have a more specific name ```suggestion db_hook = self._get_hook() ``` ########## airflow/providers/amazon/aws/example_dags/example_s3_to_sql.py: ########## @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one Review Comment: Following up on the discussion here #22438, this file should be moved in `tests/system/providers/amazon/aws/example_s3.py` ########## docs/apache-airflow-providers-amazon/operators/transfer/s3_to_sql.rst: ########## @@ -0,0 +1,75 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +============================ +Amazon S3 to SQL +============================ Review Comment: ```suggestion ============== Amazon S3 to SQL ============== ``` ########## airflow/providers/amazon/aws/transfers/s3_to_sql.py: ########## @@ -0,0 +1,158 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.common.sql.hooks.sql import DbApiHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + +from typing_extensions import Literal + +try: + import csv as csv +except ImportError as e: + from airflow.exceptions import AirflowOptionalProviderFeatureException + + raise AirflowOptionalProviderFeatureException from e + + +class S3ToSqlOperator(BaseOperator): + """ + Loads Data from S3 into a SQL Database. + Data should be readable as CSV. + + This operator downloads a file from an S3, reads it via `csv.reader` + and inserts the data into a SQL database using `insert_rows` method. + All SQL hooks are supported, as long as it is of type DbApiHook + + Extra arguments can be passed to it by using csv_reader_kwargs parameter. + (e.g. Use different quoting or delimiters) + Here you will find a list of all kwargs + https://docs.python.org/3/library/csv.html#csv.reader + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:S3ToSqlOperator` + + :param schema: reference to a specific schema in SQL database + :param table: reference to a specific table in SQL database + :param s3_bucket: reference to a specific S3 bucket + :param s3_key: reference to a specific S3 key + :param sql_conn_id: reference to a specific SQL database. Must be of type DBApiHook + :param aws_conn_id: reference to a specific S3 / AWS connection + :param column_list: list of column names. + Set to `infer` if column names should be read from first line of CSV file (default) + :param skip_first_line: If first line of CSV file should be skipped. + If `column_list` is set to 'infer', this is ignored + :param commit_every: The maximum number of rows to insert in one + transaction. Set to `0` to insert all rows in one transaction. + :param csv_reader_kwargs: key word arguments to pass to csv.reader(). + This lets you control how the CSV is read. + e.g. To use a different delimiter, pass the following dict: + {'delimiter' : ';'} + """ + + template_fields: Sequence[str] = ( + "s3_bucket", + "s3_key", + "schema", + "table", + "column_list", + "sql_conn_id", + ) + template_ext: Sequence[str] = () + ui_color = "#f4a460" + + def __init__( + self, + *, + s3_key: str, + s3_bucket: str, + table: str, + column_list: Literal["infer"] | list[str] | None = "infer", + commit_every: int = 1000, + schema: str | None = None, + skip_first_row: bool = False, + sql_conn_id: str = "sql_default", + aws_conn_id: str = "aws_default", + csv_reader_kwargs: dict[str, Any] | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.s3_bucket = s3_bucket + self.s3_key = s3_key + self.table = table + self.schema = schema + self.aws_conn_id = aws_conn_id + self.sql_conn_id = sql_conn_id + self.column_list = column_list + self.commit_every = commit_every + self.skip_first_row = skip_first_row + if csv_reader_kwargs: + self.csv_reader_kwargs = csv_reader_kwargs + else: + self.csv_reader_kwargs = {} + + def execute(self, context: Context) -> None: + + self.log.info("Loading %s to SQL table %s...", self.s3_key, self.table) + + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + self._file = s3_hook.download_file(key=self.s3_key, bucket_name=self.s3_bucket) + + hook = self._get_hook() + try: + # open with newline='' as recommended + # https://docs.python.org/3/library/csv.html#csv.reader + with open(self._file, newline="") as file: + reader = csv.reader(file, **self.csv_reader_kwargs) + + if self.column_list == "infer": + self.column_list = list(next(reader)) + self.log.info("Column Names inferred from csv: %s", self.column_list) + elif self.skip_first_row: + next(reader) + + hook.insert_rows( + table=self.table, + schema=self.schema, + target_fields=self.column_list, + rows=reader, + commit_every=self.commit_every, + ) + + finally: + # Remove file downloaded from s3 to be idempotent. + os.remove(self._file) + + def _get_hook(self) -> DbApiHook: Review Comment: nit. You can decorate this function with @cached_property ########## airflow/providers/amazon/aws/transfers/s3_to_sql.py: ########## @@ -0,0 +1,158 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.exceptions import AirflowException +from airflow.hooks.base import BaseHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.common.sql.hooks.sql import DbApiHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + +from typing_extensions import Literal + +try: + import csv as csv +except ImportError as e: + from airflow.exceptions import AirflowOptionalProviderFeatureException + + raise AirflowOptionalProviderFeatureException from e + + +class S3ToSqlOperator(BaseOperator): + """ + Loads Data from S3 into a SQL Database. + Data should be readable as CSV. + + This operator downloads a file from an S3, reads it via `csv.reader` + and inserts the data into a SQL database using `insert_rows` method. + All SQL hooks are supported, as long as it is of type DbApiHook Review Comment: Agree with this. Either you explicitly specify in the operator name that this operator reads only CSV, either you can make this operator generic by adding a new parameter `parser` which would be a function responsible of reading the input from the source. The actual processing of CSV in your would be handled by this new function `parser`. Between the 2 solutions, I prefer the second one. My 2 cents ########## docs/apache-airflow-providers-amazon/operators/transfer/s3_to_sql.rst: ########## @@ -0,0 +1,75 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +============================ +Amazon S3 to SQL +============================ + +Use the ``S3ToSqlOperator`` transfer to copy data from an Amazon Simple Storage Service (S3) file into an existing +SQL table. +Only CSV Format is supported. + + +Prerequisite Tasks +------------------ + +.. include:: ../_partials/prerequisite_tasks.rst + +Operators +--------- + +.. _howto/operator:S3ToSqlOperator: + +Amazon S3 To SQL Transfer Operator +============================================== + +To get more information about this operator visit: +:class:`~airflow.providers.amazon.aws.transfers.s3_to_sql.S3ToSqlOperator` + +Example usage: + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_to_sql.py + :language: python + :dedent: 4 + :start-after: [START howto_transfer_s3_to_sql] + :end-before: [END howto_transfer_s3_to_sql] + + + + Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
