denimalpaca commented on code in PR #26761:
URL: https://github.com/apache/airflow/pull/26761#discussion_r993446566


##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -229,49 +237,81 @@ def __init__(
         **kwargs,
     ):
         super().__init__(conn_id=conn_id, database=database, **kwargs)
-        for checks in column_mapping.values():
-            for check, check_values in checks.items():
-                self._column_mapping_validation(check, check_values)
 
         self.table = table
         self.column_mapping = column_mapping
         self.partition_clause = partition_clause
-        # OpenLineage needs a valid SQL query with the input/output table(s) 
to parse
-        self.sql = f"SELECT * FROM {self.table};"
+
+        checks_sql_list = []
+        for column, checks in self.column_mapping.items():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+            checks_sql_list.append(self._generate_sql_query(column, checks))
+        checks_sql = "UNION ALL".join(checks_sql_list)
+
+        self.sql = f"SELECT col_name, check_type, check_result FROM 
({checks_sql}) AS check_columns"
 
     def execute(self, context: Context):
-        hook = self.get_db_hook()
         failed_tests = []
-        for column in self.column_mapping:
-            checks = [*self.column_mapping[column]]
-            checks_sql = ",".join([self.column_checks[check].replace("column", 
column) for check in checks])
-            partition_clause_statement = f"WHERE {self.partition_clause}" if 
self.partition_clause else ""
-            self.sql = f"SELECT {checks_sql} FROM {self.table} 
{partition_clause_statement};"
-            records = hook.get_first(self.sql)
+        hook = self.get_db_hook()
+        records = hook.get_records(self.sql)
 
-            if not records:
-                raise AirflowException(f"The following query returned zero 
rows: {self.sql}")
+        if not records:
+            _raise_exception(f"The following query returned zero rows: 
{self.sql}", self.retry_on_failure)
 
-            self.log.info("Record: %s", records)
+        self.log.info("Record: %s", records)
 
-            for idx, result in enumerate(records):
-                tolerance = 
self.column_mapping[column][checks[idx]].get("tolerance")
+        for row in records:
+            column, check, result = row
+            tolerance = self.column_mapping[column][check].get("tolerance")
 
-                self.column_mapping[column][checks[idx]]["result"] = result
-                self.column_mapping[column][checks[idx]]["success"] = 
self._get_match(
-                    self.column_mapping[column][checks[idx]], result, tolerance
-                )
+            self.column_mapping[column][check]["result"] = result
+            self.column_mapping[column][check]["success"] = self._get_match(
+                self.column_mapping[column][check], result, tolerance
+            )
 
-            
failed_tests.extend(_get_failed_checks(self.column_mapping[column], column))
-        if failed_tests:
-            raise AirflowException(
-                f"Test failed.\nResults:\n{records!s}\n"
-                "The following tests have failed:"
-                f"\n{''.join(failed_tests)}"
+        for col, checks in self.column_mapping.items():
+            failed_tests.extend(
+                [
+                    f"Column: {col}\n\tCheck: {check},\n\tCheck Values: 
{check_values}\n"
+                    for check, check_values in checks.items()
+                    if not check_values["success"]
+                ]
             )

Review Comment:
   Didn't know you could do multiple loops in list comprehensions!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to