This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1d7cfdb Remove extra postgres dependency from AWS Provider (#18844)
1d7cfdb is described below
commit 1d7cfdbcd91705b2f88ef4ece503b7a072767e02
Author: Mario Taddeucci <[email protected]>
AuthorDate: Sun Oct 10 17:59:18 2021 -0300
Remove extra postgres dependency from AWS Provider (#18844)
* Remove extra prostgres dependency
* Removed postgres cross dependency on aws provider
---
CONTRIBUTING.rst | 2 +-
.../aws/example_dags/example_s3_to_redshift.py | 8 +++-----
airflow/providers/amazon/aws/hooks/redshift.py | 24 ++++++++++++++++++++++
.../amazon/aws/transfers/redshift_to_s3.py | 6 +++---
.../amazon/aws/transfers/s3_to_redshift.py | 8 ++++----
airflow/providers/amazon/provider.yaml | 3 ---
airflow/providers/dependencies.json | 1 -
.../amazon/aws/transfers/test_redshift_to_s3.py | 8 ++++----
.../amazon/aws/transfers/test_s3_to_redshift.py | 14 ++++++-------
9 files changed, 46 insertions(+), 28 deletions(-)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index d598742..14be13c 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -659,7 +659,7 @@ Here is the list of packages and their extras:
Package Extras
========================== ===========================
airbyte http
-amazon
apache.hive,cncf.kubernetes,exasol,ftp,google,imap,mongo,mysql,postgres,salesforce,ssh
+amazon
apache.hive,cncf.kubernetes,exasol,ftp,google,imap,mongo,mysql,salesforce,ssh
apache.beam google
apache.druid apache.hive
apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica
diff --git
a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
index 9cec527..f095498 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
@@ -25,8 +25,8 @@ from airflow import DAG
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import
S3ToRedshiftOperator
-from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
# [START howto_operator_s3_to_redshift_env_variables]
@@ -54,9 +54,8 @@ with DAG(
) as dag:
add_sample_data_to_s3 = add_sample_data_to_s3()
- setup__task_create_table = PostgresOperator(
+ setup__task_create_table = RedshiftSQLOperator(
sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name
varchar)',
- postgres_conn_id='redshift_default',
task_id='setup__create_table',
)
# [START howto_operator_s3_to_redshift_task_1]
@@ -69,9 +68,8 @@ with DAG(
task_id='transfer_s3_to_redshift',
)
# [END howto_operator_s3_to_redshift_task_1]
- teardown__task_drop_table = PostgresOperator(
+ teardown__task_drop_table = RedshiftSQLOperator(
sql=f'DROP TABLE IF EXISTS {REDSHIFT_TABLE}',
- postgres_conn_id='redshift_default',
task_id='teardown__drop_table',
)
diff --git a/airflow/providers/amazon/aws/hooks/redshift.py
b/airflow/providers/amazon/aws/hooks/redshift.py
index 63c4a04..e9fefc0 100644
--- a/airflow/providers/amazon/aws/hooks/redshift.py
+++ b/airflow/providers/amazon/aws/hooks/redshift.py
@@ -212,6 +212,30 @@ class RedshiftSQLHook(DbApiHook):
return create_engine(self.get_uri(), **engine_kwargs)
+ def get_table_primary_key(self, table: str, schema: Optional[str] =
"public") -> List[str]:
+ """
+ Helper method that returns the table primary key
+ :param table: Name of the target table
+ :type table: str
+ :param table: Name of the target schema, public by default
+ :type table: str
+ :return: Primary key columns list
+ :rtype: List[str]
+ """
+ sql = """
+ select kcu.column_name
+ from information_schema.table_constraints tco
+ join information_schema.key_column_usage kcu
+ on kcu.constraint_name = tco.constraint_name
+ and kcu.constraint_schema = tco.constraint_schema
+ and kcu.constraint_name = tco.constraint_name
+ where tco.constraint_type = 'PRIMARY KEY'
+ and kcu.table_schema = %s
+ and kcu.table_name = %s
+ """
+ pk_columns = [row[0] for row in self.get_records(sql, (schema, table))]
+ return pk_columns or None
+
def get_conn(self) -> RedshiftConnection:
"""Returns a redshift_connector.Connection object"""
conn_params = self._get_conn_params()
diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
index a3d49c6..42ce2b8 100644
--- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py
@@ -19,9 +19,9 @@
from typing import Iterable, List, Mapping, Optional, Union
from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.utils.redshift import build_credentials_block
-from airflow.providers.postgres.hooks.postgres import PostgresHook
class RedshiftToS3Operator(BaseOperator):
@@ -136,7 +136,7 @@ class RedshiftToS3Operator(BaseOperator):
"""
def execute(self, context) -> None:
- postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
+ redshift_hook = RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
conn = S3Hook.get_connection(conn_id=self.aws_conn_id)
credentials_block = None
@@ -154,5 +154,5 @@ class RedshiftToS3Operator(BaseOperator):
)
self.log.info('Executing UNLOAD command...')
- postgres_hook.run(unload_query, self.autocommit,
parameters=self.parameters)
+ redshift_hook.run(unload_query, self.autocommit,
parameters=self.parameters)
self.log.info("UNLOAD command complete...")
diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
index 5149406..c0a1a7c 100644
--- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py
@@ -20,9 +20,9 @@ from typing import List, Optional, Union
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.redshift import RedshiftSQLHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.utils.redshift import build_credentials_block
-from airflow.providers.postgres.hooks.postgres import PostgresHook
AVAILABLE_METHODS = ['APPEND', 'REPLACE', 'UPSERT']
@@ -131,7 +131,7 @@ class S3ToRedshiftOperator(BaseOperator):
"""
def execute(self, context) -> None:
- postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
+ redshift_hook = RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)
conn = S3Hook.get_connection(conn_id=self.aws_conn_id)
credentials_block = None
@@ -156,7 +156,7 @@ class S3ToRedshiftOperator(BaseOperator):
COMMIT
"""
elif self.method == 'UPSERT':
- keys = self.upsert_keys or
postgres_hook.get_table_primary_key(self.table, self.schema)
+ keys = self.upsert_keys or
redshift_hook.get_table_primary_key(self.table, self.schema)
if not keys:
raise AirflowException(
f"No primary key on {self.schema}.{self.table}. Please
provide keys on 'upsert_keys'"
@@ -174,5 +174,5 @@ class S3ToRedshiftOperator(BaseOperator):
sql = copy_statement
self.log.info('Executing COPY command...')
- postgres_hook.run(sql, self.autocommit)
+ redshift_hook.run(sql, autocommit=self.autocommit)
self.log.info("COPY command complete...")
diff --git a/airflow/providers/amazon/provider.yaml
b/airflow/providers/amazon/provider.yaml
index 8715637..fbf4c1b 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -35,9 +35,6 @@ versions:
additional-dependencies:
- apache-airflow>=2.1.0
-additional-extras:
- postgres: apache-airflow-providers-postgres>=2.3.0
-
integrations:
- integration-name: Amazon Athena
external-doc-url: https://aws.amazon.com/athena/
diff --git a/airflow/providers/dependencies.json
b/airflow/providers/dependencies.json
index 229e283..6779fec 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -11,7 +11,6 @@
"imap",
"mongo",
"mysql",
- "postgres",
"salesforce",
"ssh"
],
diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
index 880cc11..da7acf2 100644
--- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py
@@ -39,7 +39,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_table_unloading(
self,
table_as_file_name,
@@ -103,7 +103,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_execute_sts_token(
self,
table_as_file_name,
@@ -171,7 +171,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_custom_select_query_unloading(
self,
table,
@@ -234,7 +234,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_table_unloading_role_arn(
self,
table_as_file_name,
diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
index 0cf02b6..ff03165 100644
--- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
+++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py
@@ -33,7 +33,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_execute(self, mock_run, mock_session, mock_connection, mock_hook):
access_key = "aws_access_key_id"
secret_key = "aws_secret_access_key"
@@ -78,7 +78,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_execute_with_column_list(self, mock_run, mock_session,
mock_connection, mock_hook):
access_key = "aws_access_key_id"
secret_key = "aws_secret_access_key"
@@ -125,7 +125,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_deprecated_truncate(self, mock_run, mock_session,
mock_connection, mock_hook):
access_key = "aws_access_key_id"
secret_key = "aws_secret_access_key"
@@ -177,7 +177,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_replace(self, mock_run, mock_session, mock_connection, mock_hook):
access_key = "aws_access_key_id"
secret_key = "aws_secret_access_key"
@@ -229,7 +229,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_upsert(self, mock_run, mock_session, mock_connection, mock_hook):
access_key = "aws_access_key_id"
secret_key = "aws_secret_access_key"
@@ -284,7 +284,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_execute_sts_token(self, mock_run, mock_session, mock_connection,
mock_hook):
access_key = "ASIA_aws_access_key_id"
secret_key = "aws_secret_access_key"
@@ -331,7 +331,7 @@ class TestS3ToRedshiftTransfer(unittest.TestCase):
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection")
@mock.patch("airflow.models.connection.Connection")
@mock.patch("boto3.session.Session")
- @mock.patch("airflow.providers.postgres.hooks.postgres.PostgresHook.run")
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftSQLHook.run")
def test_execute_role_arn(self, mock_run, mock_session, mock_connection,
mock_hook):
access_key = "ASIA_aws_access_key_id"
secret_key = "aws_secret_access_key"