denimalpaca commented on code in PR #23915:
URL: https://github.com/apache/airflow/pull/23915#discussion_r890331465


##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", 
column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+            self.log.info(f"Record: {records}")
+
+            if not records:
+                raise AirflowException("The query returned None")

Review Comment:
   Ah, good points. Yeah, I think that would be a better order and message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to