josh-fell commented on a change in pull request #19137: URL: https://github.com/apache/airflow/pull/19137#discussion_r744830614
########## File path: airflow/providers/amazon/aws/operators/redshift_data.py ########## @@ -0,0 +1,127 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from time import sleep +from typing import Optional + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook + + +class RedshiftDataOperator(BaseOperator): + """ + Executes SQL Statements against an Amazon Redshift cluster using Redshift Data + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RedshiftDataOperator` + + :param sql: the sql code to be executed + :type sql: Can receive a str representing a sql statement, + or an iterable of str (sql statements) + :param aws_conn_id: AWS connection id (default: aws_default) + :type aws_conn_id: str + :param parameters: (optional) the parameters to render the SQL query with. + :type parameters: dict or iterable + :param autocommit: if True, each command is automatically committed. + (default value: False) + :type autocommit: bool + """ + + template_fields = ('sql',) Review comment: Any other fields to add here that users might want to have flexibility to dynamically generate a value for? Maybe `parameters`, `cluster_identifier`, `database`, and/or `db_user`. The latter 3 could maybe be added to a Connection Extra and accessed with Jinja now that the `Connection` object is accessible in the template context like `"{{ conn.conn_id.extra_dejson.<attr> }}"` when calling the operator. No strong opinions thought just something to think about. ########## File path: airflow/providers/amazon/aws/example_dags/example_redshift_data_execute_sql.py ########## @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datetime import datetime, timedelta +from os import getenv + +from airflow.decorators import dag, task +from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook +from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator + +# [START howto_operator_redshift_data_env_variables] +REDSHIFT_CLUSTER_IDENTIFIER = getenv("REDSHIFT_CLUSTER_IDENTIFIER", "test-cluster") +REDSHIFT_DATABASE = getenv("REDSHIFT_DATABASE", "test-database") +REDSHIFT_DATABASE_USER = getenv("REDSHIFT_DATABASE_USER", "awsuser") +# [END howto_operator_redshift_data_env_variables] + +REDSHIFT_QUERY = """ +SELECT table_schema, + table_name +FROM information_schema.tables +WHERE table_schema NOT IN ('information_schema', 'pg_catalog') + AND table_type = 'BASE TABLE' +ORDER BY table_schema, + table_name; + """ +POLL_INTERVAL = 10 +TIMEOUT = 600 + + +# [START howto_redshift_data] +@dag( + dag_id='example_redshift_data', + schedule_interval=None, + start_date=datetime(2021, 1, 1), + dagrun_timeout=timedelta(minutes=60), + tags=['example'], + catchup=False, +) +def example_redshift_data(): + @task(task_id="output_results") + def output_results_fn(id): + """This is a python decorator task that returns a Redshift query""" + hook = RedshiftDataHook() + + resp = hook.get_statement_result( + id=id, + ) + print(resp) + return resp + + # Run a SQL statement and wait for completion + redshift_query = RedshiftDataOperator( + task_id='redshift_query', + cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER, + database=REDSHIFT_DATABASE, + db_user=REDSHIFT_DATABASE_USER, + sql=REDSHIFT_QUERY, + timeout=TIMEOUT, + poll_interval=POLL_INTERVAL, + ) + + # Using a task-decorated function to output the list of tables in a Redshift cluster + redshift_output = output_results_fn("{{task_instance.xcom_pull('redshift_query', key='return_value')}}") + + redshift_query >> redshift_output Review comment: ```suggestion # Using a task-decorated function to output the list of tables in a Redshift cluster redshift_output = output_results_fn(redshift_query.output) ``` This can be simplified to use `XComArgs` to access the return value from the `RedshiftDataOperator` task directly without Jinja using the `.output` property. Also, the task dependencies would be handled implicitly as well. Similar updates were made to the existing example DAGs as part of [this issue](https://github.com/apache/airflow/issues/10285). We should continue this approach when possible with new example DAGs. ########## File path: airflow/providers/amazon/aws/operators/redshift_data.py ########## @@ -0,0 +1,127 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from time import sleep +from typing import Optional + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook + + +class RedshiftDataOperator(BaseOperator): Review comment: Since the `RedshiftDataHook.cancel_statement()` method is being introduced, WDYT about adding the `on_kill()` method override for this operator which run the `cancel_statement()` method? This would give users an opportunity to kill the query from the UI if needed. ########## File path: airflow/providers/amazon/aws/operators/redshift_data.py ########## @@ -0,0 +1,127 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from time import sleep +from typing import Optional + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook + + +class RedshiftDataOperator(BaseOperator): + """ + Executes SQL Statements against an Amazon Redshift cluster using Redshift Data + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RedshiftDataOperator` + + :param sql: the sql code to be executed + :type sql: Can receive a str representing a sql statement, + or an iterable of str (sql statements) Review comment: ```suggestion :param sql: the sql code to be executed :type sql: str ``` Should update this if the the support for a list of SQL won't be supported with this PR. ########## File path: docs/apache-airflow-providers-amazon/operators/redshift_data.rst ########## @@ -0,0 +1,51 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:RedshiftDataOperator: + +RedshiftDataOperator +==================== + +.. contents:: + :depth: 1 + :local: + +Overview +-------- + +Use the :class:`RedshiftDataOperator <airflow.providers.amazon.aws.operators.redshift_data>` to execute +statements against an Amazon Redshift cluster. + + +example_redshift_data_execute_sql.py +------------------------------------ + +Purpose +""""""" + +This is a basic example dag for using :class:`RedshiftDataOperator <airflow.providers.amazon.aws.operators.redshift_data>` Review comment: ```suggestion This is a basic example DAG for using :class:`RedshiftDataOperator <airflow.providers.amazon.aws.operators.redshift_data>` ``` ########## File path: airflow/providers/amazon/aws/operators/redshift_data.py ########## @@ -0,0 +1,127 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from time import sleep +from typing import Optional + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.redshift_data import RedshiftDataHook + + +class RedshiftDataOperator(BaseOperator): + """ + Executes SQL Statements against an Amazon Redshift cluster using Redshift Data + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RedshiftDataOperator` + + :param sql: the sql code to be executed + :type sql: Can receive a str representing a sql statement, + or an iterable of str (sql statements) + :param aws_conn_id: AWS connection id (default: aws_default) + :type aws_conn_id: str + :param parameters: (optional) the parameters to render the SQL query with. + :type parameters: dict or iterable + :param autocommit: if True, each command is automatically committed. + (default value: False) + :type autocommit: bool + """ + + template_fields = ('sql',) + template_ext = ('.sql',) + Review comment: ```suggestion template_fields_renderers = {'sql': 'sql'} ``` WDYT about adding the field renderer attribute so the SQL is nicely readable in the UI? ########## File path: docs/apache-airflow-providers-amazon/operators/redshift_data.rst ########## @@ -0,0 +1,51 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:RedshiftDataOperator: + +RedshiftDataOperator +==================== + +.. contents:: + :depth: 1 + :local: + +Overview +-------- + +Use the :class:`RedshiftDataOperator <airflow.providers.amazon.aws.operators.redshift_data>` to execute +statements against an Amazon Redshift cluster. + Review comment: WDYT about adding some more context here about how this differs from the `RedshiftSQLOperator`? I suspect @ephraimbuddy and I won't be the only ones wondering. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
