josh-fell commented on code in PR #28964:
URL: https://github.com/apache/airflow/pull/28964#discussion_r1072499703


##########
airflow/providers/amazon/aws/transfers/s3_to_sql.py:
##########
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+from typing_extensions import Literal
+
+try:
+    import csv as csv
+except ImportError as e:
+    from airflow.exceptions import AirflowOptionalProviderFeatureException
+
+    raise AirflowOptionalProviderFeatureException from e
+
+
+class S3ToSqlOperator(BaseOperator):
+    """
+        Loads Data from S3 into a SQL Database.
+        Data should be readable as CSV.
+
+        This operator downloads a file from an S3, reads it via `csv.reader`
+        and inserts the data into a SQL database using `insert_rows` method.
+        All SQL hooks are supported, as long as it is of type DbApiHook
+
+        Extra arguments can be passed to it by using csv_reader_kwargs 
parameter.
+        (e.g. Use different quoting or delimiters)
+        Here you will find a list of all kwargs
+        https://docs.python.org/3/library/csv.html#csv.reader
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:S3ToSqlOperator`
+
+    :param schema: reference to a specific schema in SQL database
+    :param table: reference to a specific table in SQL database
+    :param s3_bucket: reference to a specific S3 bucket
+    :param s3_key: reference to a specific S3 key
+    :param sql_conn_id: reference to a specific SQL database. Must be of type 
DBApiHook
+    :param aws_conn_id: reference to a specific S3 / AWS connection
+    :param column_list: list of column names.
+        Set to `infer` if column names should be read from first line of CSV 
file (default)
+    :param skip_first_line: If first line of CSV file should be skipped.
+        If `column_list` is set to 'infer', this is ignored
+    :param commit_every: The maximum number of rows to insert in one
+        transaction. Set to `0` to insert all rows in one transaction.
+    :param csv_reader_kwargs: key word arguments to pass to csv.reader().
+        This lets you control how the CSV is read.
+        e.g. To use a different delimiter, pass the following dict:
+        {'delimiter' : ';'}
+    """
+
+    template_fields: Sequence[str] = (
+        "s3_bucket",
+        "s3_key",
+        "schema",
+        "table",
+        "column_list",
+        "sql_conn_id",
+    )
+    template_ext: Sequence[str] = ()
+    ui_color = "#f4a460"
+
+    def __init__(
+        self,
+        *,
+        s3_key: str,
+        s3_bucket: str,
+        table: str,
+        column_list: Literal["infer"] | list[str] | None = "infer",
+        commit_every: int = 1000,
+        schema: str | None = None,
+        skip_first_row: bool = False,
+        sql_conn_id: str = "sql_default",
+        aws_conn_id: str = "aws_default",
+        csv_reader_kwargs: dict[str, Any] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.table = table
+        self.schema = schema
+        self.aws_conn_id = aws_conn_id
+        self.sql_conn_id = sql_conn_id
+        self.column_list = column_list
+        self.commit_every = commit_every
+        self.skip_first_row = skip_first_row
+        if csv_reader_kwargs:
+            self.csv_reader_kwargs = csv_reader_kwargs
+        else:
+            self.csv_reader_kwargs = {}

Review Comment:
   ```suggestion
           self.csv_reader_kwargs = csv_reader_kwargs or {}
   ```
   Perhaps a nit, but seems more readable.



##########
airflow/providers/amazon/aws/transfers/s3_to_sql.py:
##########
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+from typing_extensions import Literal

Review Comment:
   ```suggestion
   from airflow.typing_compat import Literal
   ```
   Let's us the compat module instead here.



##########
docs/apache-airflow-providers-amazon/operators/transfer/s3_to_sql.rst:
##########
@@ -0,0 +1,75 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+============================
+Amazon S3 to SQL
+============================
+
+Use the ``S3ToSqlOperator`` transfer to copy data from an Amazon Simple 
Storage Service (S3) file into an existing
+SQL table.
+Only CSV Format is supported.

Review Comment:
   Should the operator fail fast and prior to downloading from S3 if the file 
is not a CSV? Can this be implied by the `s3_key` value?
   
   If you agree, then let's add a test for this situation too.



##########
airflow/providers/amazon/aws/transfers/s3_to_sql.py:
##########
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from typing import TYPE_CHECKING, Any, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.base import BaseHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.common.sql.hooks.sql import DbApiHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+from typing_extensions import Literal
+
+try:
+    import csv as csv
+except ImportError as e:
+    from airflow.exceptions import AirflowOptionalProviderFeatureException
+
+    raise AirflowOptionalProviderFeatureException from e
+
+
+class S3ToSqlOperator(BaseOperator):
+    """
+        Loads Data from S3 into a SQL Database.
+        Data should be readable as CSV.
+
+        This operator downloads a file from an S3, reads it via `csv.reader`
+        and inserts the data into a SQL database using `insert_rows` method.
+        All SQL hooks are supported, as long as it is of type DbApiHook
+
+        Extra arguments can be passed to it by using csv_reader_kwargs 
parameter.
+        (e.g. Use different quoting or delimiters)
+        Here you will find a list of all kwargs
+        https://docs.python.org/3/library/csv.html#csv.reader
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:S3ToSqlOperator`
+
+    :param schema: reference to a specific schema in SQL database
+    :param table: reference to a specific table in SQL database
+    :param s3_bucket: reference to a specific S3 bucket
+    :param s3_key: reference to a specific S3 key
+    :param sql_conn_id: reference to a specific SQL database. Must be of type 
DBApiHook
+    :param aws_conn_id: reference to a specific S3 / AWS connection
+    :param column_list: list of column names.
+        Set to `infer` if column names should be read from first line of CSV 
file (default)
+    :param skip_first_line: If first line of CSV file should be skipped.
+        If `column_list` is set to 'infer', this is ignored
+    :param commit_every: The maximum number of rows to insert in one
+        transaction. Set to `0` to insert all rows in one transaction.
+    :param csv_reader_kwargs: key word arguments to pass to csv.reader().
+        This lets you control how the CSV is read.
+        e.g. To use a different delimiter, pass the following dict:
+        {'delimiter' : ';'}
+    """
+
+    template_fields: Sequence[str] = (
+        "s3_bucket",
+        "s3_key",
+        "schema",
+        "table",
+        "column_list",
+        "sql_conn_id",
+    )
+    template_ext: Sequence[str] = ()
+    ui_color = "#f4a460"
+
+    def __init__(
+        self,
+        *,
+        s3_key: str,
+        s3_bucket: str,
+        table: str,
+        column_list: Literal["infer"] | list[str] | None = "infer",
+        commit_every: int = 1000,
+        schema: str | None = None,
+        skip_first_row: bool = False,
+        sql_conn_id: str = "sql_default",
+        aws_conn_id: str = "aws_default",
+        csv_reader_kwargs: dict[str, Any] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.s3_bucket = s3_bucket
+        self.s3_key = s3_key
+        self.table = table
+        self.schema = schema
+        self.aws_conn_id = aws_conn_id
+        self.sql_conn_id = sql_conn_id
+        self.column_list = column_list
+        self.commit_every = commit_every
+        self.skip_first_row = skip_first_row
+        if csv_reader_kwargs:
+            self.csv_reader_kwargs = csv_reader_kwargs
+        else:
+            self.csv_reader_kwargs = {}
+
+    def execute(self, context: Context) -> None:
+
+        self.log.info("Loading %s to SQL table %s...", self.s3_key, self.table)
+
+        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
+        self._file = s3_hook.download_file(key=self.s3_key, 
bucket_name=self.s3_bucket)
+
+        hook = self._get_hook()
+        try:
+            # open with newline='' as recommended
+            # https://docs.python.org/3/library/csv.html#csv.reader
+            with open(self._file, newline="") as file:
+                reader = csv.reader(file, **self.csv_reader_kwargs)
+
+                if self.column_list == "infer":
+                    self.column_list = list(next(reader))
+                    self.log.info("Column Names inferred from csv: %s", 
self.column_list)
+                elif self.skip_first_row:
+                    next(reader)
+
+                hook.insert_rows(
+                    table=self.table,
+                    schema=self.schema,
+                    target_fields=self.column_list,
+                    rows=reader,
+                    commit_every=self.commit_every,
+                )
+
+        finally:
+            # Remove file downloaded from s3 to be idempotent.
+            os.remove(self._file)
+
+    def _get_hook(self) -> DbApiHook:

Review Comment:
   Perhaps this doesn't need to be a property at all since it's only used in 
the `execute()` method?



##########
docs/apache-airflow-providers-amazon/operators/transfer/s3_to_sql.rst:
##########
@@ -0,0 +1,75 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+============================
+Amazon S3 to SQL
+============================
+
+Use the ``S3ToSqlOperator`` transfer to copy data from an Amazon Simple 
Storage Service (S3) file into an existing
+SQL table.
+Only CSV Format is supported.
+
+
+Prerequisite Tasks
+------------------
+
+.. include:: ../_partials/prerequisite_tasks.rst
+
+Operators
+---------
+
+.. _howto/operator:S3ToSqlOperator:
+
+Amazon S3 To SQL Transfer Operator
+==============================================
+
+To get more information about this operator visit:
+:class:`~airflow.providers.amazon.aws.transfers.s3_to_sql.S3ToSqlOperator`
+
+Example usage:
+
+.. exampleinclude:: 
/../../airflow/providers/amazon/aws/example_dags/example_s3_to_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_transfer_s3_to_sql]
+    :end-before: [END howto_transfer_s3_to_sql]
+
+
+
+
+You can also pass **column_list='inferred** if you want the operator to read 
the column names from the first row of the CSV File:

Review Comment:
   ```suggestion
   You can also pass **column_list='infer'** if you want the operator to read 
the column names from the first row of the CSV File:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to