uranusjr commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r993243032
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -229,49 +237,81 @@ def __init__(
**kwargs,
):
super().__init__(conn_id=conn_id, database=database, **kwargs)
- for checks in column_mapping.values():
- for check, check_values in checks.items():
- self._column_mapping_validation(check, check_values)
self.table = table
self.column_mapping = column_mapping
self.partition_clause = partition_clause
- # OpenLineage needs a valid SQL query with the input/output table(s)
to parse
- self.sql = f"SELECT * FROM {self.table};"
+
+ checks_sql_list = []
+ for column, checks in self.column_mapping.items():
+ for check, check_values in checks.items():
+ self._column_mapping_validation(check, check_values)
+ checks_sql_list.append(self._generate_sql_query(column, checks))
+ checks_sql = "UNION ALL".join(checks_sql_list)
+
+ self.sql = f"SELECT col_name, check_type, check_result FROM
({checks_sql}) AS check_columns"
def execute(self, context: Context):
- hook = self.get_db_hook()
failed_tests = []
- for column in self.column_mapping:
- checks = [*self.column_mapping[column]]
- checks_sql = ",".join([self.column_checks[check].replace("column",
column) for check in checks])
- partition_clause_statement = f"WHERE {self.partition_clause}" if
self.partition_clause else ""
- self.sql = f"SELECT {checks_sql} FROM {self.table}
{partition_clause_statement};"
- records = hook.get_first(self.sql)
+ hook = self.get_db_hook()
+ records = hook.get_records(self.sql)
- if not records:
- raise AirflowException(f"The following query returned zero
rows: {self.sql}")
+ if not records:
+ _raise_exception(f"The following query returned zero rows:
{self.sql}", self.retry_on_failure)
- self.log.info("Record: %s", records)
+ self.log.info("Record: %s", records)
- for idx, result in enumerate(records):
- tolerance =
self.column_mapping[column][checks[idx]].get("tolerance")
+ for row in records:
+ column, check, result = row
Review Comment:
```suggestion
for column, check, result in records:
```
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -229,49 +237,81 @@ def __init__(
**kwargs,
):
super().__init__(conn_id=conn_id, database=database, **kwargs)
- for checks in column_mapping.values():
- for check, check_values in checks.items():
- self._column_mapping_validation(check, check_values)
self.table = table
self.column_mapping = column_mapping
self.partition_clause = partition_clause
- # OpenLineage needs a valid SQL query with the input/output table(s)
to parse
- self.sql = f"SELECT * FROM {self.table};"
+
+ checks_sql_list = []
+ for column, checks in self.column_mapping.items():
+ for check, check_values in checks.items():
+ self._column_mapping_validation(check, check_values)
+ checks_sql_list.append(self._generate_sql_query(column, checks))
+ checks_sql = "UNION ALL".join(checks_sql_list)
+
+ self.sql = f"SELECT col_name, check_type, check_result FROM
({checks_sql}) AS check_columns"
def execute(self, context: Context):
- hook = self.get_db_hook()
failed_tests = []
- for column in self.column_mapping:
- checks = [*self.column_mapping[column]]
- checks_sql = ",".join([self.column_checks[check].replace("column",
column) for check in checks])
- partition_clause_statement = f"WHERE {self.partition_clause}" if
self.partition_clause else ""
- self.sql = f"SELECT {checks_sql} FROM {self.table}
{partition_clause_statement};"
- records = hook.get_first(self.sql)
+ hook = self.get_db_hook()
+ records = hook.get_records(self.sql)
- if not records:
- raise AirflowException(f"The following query returned zero
rows: {self.sql}")
+ if not records:
+ _raise_exception(f"The following query returned zero rows:
{self.sql}", self.retry_on_failure)
- self.log.info("Record: %s", records)
+ self.log.info("Record: %s", records)
- for idx, result in enumerate(records):
- tolerance =
self.column_mapping[column][checks[idx]].get("tolerance")
+ for row in records:
+ column, check, result = row
+ tolerance = self.column_mapping[column][check].get("tolerance")
- self.column_mapping[column][checks[idx]]["result"] = result
- self.column_mapping[column][checks[idx]]["success"] =
self._get_match(
- self.column_mapping[column][checks[idx]], result, tolerance
- )
+ self.column_mapping[column][check]["result"] = result
+ self.column_mapping[column][check]["success"] = self._get_match(
+ self.column_mapping[column][check], result, tolerance
+ )
-
failed_tests.extend(_get_failed_checks(self.column_mapping[column], column))
- if failed_tests:
- raise AirflowException(
- f"Test failed.\nResults:\n{records!s}\n"
- "The following tests have failed:"
- f"\n{''.join(failed_tests)}"
+ for col, checks in self.column_mapping.items():
+ failed_tests.extend(
+ [
+ f"Column: {col}\n\tCheck: {check},\n\tCheck Values:
{check_values}\n"
+ for check, check_values in checks.items()
+ if not check_values["success"]
+ ]
)
+ if failed_tests:
+ exception_string = f"""
+ Test failed.\nResults:\n{records!s}\n
+ The following tests have failed:
+ \n{''.join(failed_tests)}"""
+ _raise_exception(exception_string, self.retry_on_failure)
self.log.info("All tests have passed")
+ def _generate_sql_query(self, column, checks):
+ def _generate_partition_clause(check):
+ if self.partition_clause and "partition_clause" not in
checks[check]:
+ return f"WHERE {self.partition_clause}"
+ elif not self.partition_clause and "partition_clause" in
checks[check]:
+ return f"WHERE {checks[check]['partition_clause']}"
+ elif self.partition_clause and "partition_clause" in checks[check]:
+ return f"WHERE {self.partition_clause} AND
{checks[check]['partition_clause']}"
+ else:
+ return ""
+
+ checks_sql = "UNION ALL".join(
+ [
+ self.sql_check_template.format(
+
check_statement=self.column_checks[check].format(column=column),
+ check=check,
+ table=self.table,
+ column=column,
+ partition_clause=_generate_partition_clause(check),
+ )
+ for check in checks
+ ]
+ )
Review Comment:
```suggestion
checks_sql = "UNION ALL".join(
self.sql_check_template.format(
check_statement=self.column_checks[check].format(column=column),
check=check,
table=self.table,
column=column,
partition_clause=_generate_partition_clause(check),
)
for check in checks
)
```
`join` takes a generator expression so the extra `[]` is not necessary (and
they slow things down!)
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -229,49 +237,81 @@ def __init__(
**kwargs,
):
super().__init__(conn_id=conn_id, database=database, **kwargs)
- for checks in column_mapping.values():
- for check, check_values in checks.items():
- self._column_mapping_validation(check, check_values)
self.table = table
self.column_mapping = column_mapping
self.partition_clause = partition_clause
- # OpenLineage needs a valid SQL query with the input/output table(s)
to parse
- self.sql = f"SELECT * FROM {self.table};"
+
+ checks_sql_list = []
+ for column, checks in self.column_mapping.items():
+ for check, check_values in checks.items():
+ self._column_mapping_validation(check, check_values)
+ checks_sql_list.append(self._generate_sql_query(column, checks))
+ checks_sql = "UNION ALL".join(checks_sql_list)
+
+ self.sql = f"SELECT col_name, check_type, check_result FROM
({checks_sql}) AS check_columns"
def execute(self, context: Context):
- hook = self.get_db_hook()
failed_tests = []
- for column in self.column_mapping:
- checks = [*self.column_mapping[column]]
- checks_sql = ",".join([self.column_checks[check].replace("column",
column) for check in checks])
- partition_clause_statement = f"WHERE {self.partition_clause}" if
self.partition_clause else ""
- self.sql = f"SELECT {checks_sql} FROM {self.table}
{partition_clause_statement};"
- records = hook.get_first(self.sql)
+ hook = self.get_db_hook()
+ records = hook.get_records(self.sql)
- if not records:
- raise AirflowException(f"The following query returned zero
rows: {self.sql}")
+ if not records:
+ _raise_exception(f"The following query returned zero rows:
{self.sql}", self.retry_on_failure)
- self.log.info("Record: %s", records)
+ self.log.info("Record: %s", records)
- for idx, result in enumerate(records):
- tolerance =
self.column_mapping[column][checks[idx]].get("tolerance")
+ for row in records:
+ column, check, result = row
+ tolerance = self.column_mapping[column][check].get("tolerance")
- self.column_mapping[column][checks[idx]]["result"] = result
- self.column_mapping[column][checks[idx]]["success"] =
self._get_match(
- self.column_mapping[column][checks[idx]], result, tolerance
- )
+ self.column_mapping[column][check]["result"] = result
+ self.column_mapping[column][check]["success"] = self._get_match(
+ self.column_mapping[column][check], result, tolerance
+ )
-
failed_tests.extend(_get_failed_checks(self.column_mapping[column], column))
- if failed_tests:
- raise AirflowException(
- f"Test failed.\nResults:\n{records!s}\n"
- "The following tests have failed:"
- f"\n{''.join(failed_tests)}"
+ for col, checks in self.column_mapping.items():
+ failed_tests.extend(
+ [
+ f"Column: {col}\n\tCheck: {check},\n\tCheck Values:
{check_values}\n"
+ for check, check_values in checks.items()
+ if not check_values["success"]
+ ]
)
Review Comment:
```suggestion
failed_tests = [
f"Column: {col}\n\tCheck: {check},\n\tCheck Values:
{check_values}\n"
for col, checks in self.column_mapping.items()
for check, check_values in checks.items()
if not check_values["success"]
]
```
And the `failed_tests = []` line above can be removed.
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -418,46 +461,59 @@ def __init__(
self.table = table
self.checks = checks
self.partition_clause = partition_clause
- # OpenLineage needs a valid SQL query with the input/output table(s)
to parse
- self.sql = f"SELECT * FROM {self.table};"
+ self.sql = f"SELECT check_name, check_result FROM
({self._generate_sql_query()}) AS check_table"
def execute(self, context: Context):
hook = self.get_db_hook()
- checks_sql = " UNION ALL ".join(
- [
- self.sql_check_template.replace("check_statement",
value["check_statement"])
- .replace("_check_name", check_name)
- .replace("table", self.table)
- for check_name, value in self.checks.items()
- ]
- )
- partition_clause_statement = f"WHERE {self.partition_clause}" if
self.partition_clause else ""
- self.sql = f"""
- SELECT check_name, check_result FROM ({checks_sql})
- AS check_table {partition_clause_statement}
- """
-
records = hook.get_records(self.sql)
if not records:
- raise AirflowException(f"The following query returned zero rows:
{self.sql}")
+ _raise_exception(f"The following query returned zero rows:
{self.sql}", self.retry_on_failure)
self.log.info("Record:\n%s", records)
for row in records:
check, result = row
- self.checks[check]["success"] = parse_boolean(str(result))
+ self.checks[check]["success"] = _parse_boolean(str(result))
- failed_tests = _get_failed_checks(self.checks)
+ failed_tests = [
+ f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
+ for check, check_values in self.checks.items()
+ if not check_values["success"]
+ ]
if failed_tests:
- raise AirflowException(
- f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
- "The following tests have failed:"
- f"\n{', '.join(failed_tests)}"
- )
+ exception_string = f"""
+ Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n
+ The following tests have failed:
+ \n{', '.join(failed_tests)}
+ """
Review Comment:
```suggestion
exception_string = (
f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
f"The following tests have failed:\n{',
'.join(failed_tests)}"
)
```
Otherwise the exception message would contain weird indents
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -46,18 +53,10 @@ def parse_boolean(val: str) -> str | bool:
raise ValueError(f"{val!r} is not a boolean-like string value")
-def _get_failed_checks(checks, col=None):
- if col:
- return [
- f"Column: {col}\nCheck: {check},\nCheck Values: {check_values}\n"
- for check, check_values in checks.items()
- if not check_values["success"]
- ]
- return [
- f"\tCheck: {check},\n\tCheck Values: {check_values}\n"
- for check, check_values in checks.items()
- if not check_values["success"]
- ]
+def _raise_exception(exception_string: str, retry_on_failure: bool) ->
NoReturn:
+ if retry_on_failure:
+ raise AirflowException(exception_string)
+ raise AirflowFailException(exception_string)
Review Comment:
Since this function is always used with `self.retry_on_failure`, perhaps it
should be a method instead and we can get rid of the second argument?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]