josh-fell commented on code in PR #23915:
URL: https://github.com/apache/airflow/pull/23915#discussion_r890199324


##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }

Review Comment:
   For these pesky build docs errors, you can try adding a `.. code-block::` 
directive and house these example dicts under them. Something like:
   ```rst
   :param table: the table to run checks on.
   :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
   
       .. code-block::
   
           column_mapping = {
               'col_name': {
                   'null_check': {
                       'pass_value': 0,
                   },
                   'min': {
                       'pass_value': 5,
                       'tolerance': 0.2,
                   }
               }
           }
   
   :param conn_id: the connection ID used to connect to the database.
   :param database: name of database which overwrite the defined one in 
connection
   ```
   Check out the docstrings for 
[`chain()`](https://github.com/apache/airflow/blob/fd4e344880b505cfe53a97a6373d329515bbc7a3/airflow/models/baseoperator.py#L1520)
 and 
[`cross_downstream()`](https://github.com/apache/airflow/blob/fd4e344880b505cfe53a97a6373d329515bbc7a3/airflow/models/baseoperator.py#L1649)
 for examples.



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",

Review Comment:
   ```suggestion
           "null_check": "SUM(CASE WHEN column IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
   ```
   Should be checking for the column value rather than the string of the the 
column name I think?



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", 
column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+            self.log.info(f"Record: {records}")
+
+            if not records:
+                raise AirflowException("The query returned None")
+
+            for idx, result in enumerate(records):
+                tolerance = (
+                    self.column_mapping[column][checks[idx]]["tolerance"]
+                    if "tolerance" in self.column_mapping[column][checks[idx]]
+                    else None
+                )
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = 
self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        # check if record is str or numeric
+        # if record is str, do pattern matching
+        # numeric record checks
+        if "geq_than" in check_values:
+            if tolerance is not None:
+                return record >= check_values["geq_than"] * (1 - tolerance)
+            return record >= check_values["geq_than"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                return record > check_values["greater_than"] * (1 - tolerance)
+            return record > check_values["greater_than"]
+        if "leq_than" in check_values:
+            if tolerance is not None:
+                return record <= check_values["leq_than"] * (1 + tolerance)
+            return record <= check_values["leq_than"]
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                return record < check_values["less_than"] * (1 + tolerance)
+            return record < check_values["less_than"]
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                return (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                )
+        return record == check_values["equal_to"]
+
+    def _column_mapping_validation(self, check, check_values):
+        if check not in self.column_checks:
+            raise AirflowException(f"Invalid column check: {check}.")
+        if (
+            "greater_than" not in check_values
+            and "geq_than" not in check_values
+            and "less_than" not in check_values
+            and "leq_than" not in check_values
+            and "equal_to" not in check_values
+        ):
+            raise ValueError(
+                "Please provide one or more of: less_than, leq_than, "
+                "greater_than, geq_than, or equal_to in the check's dict."
+            )
+
+        if "greater_than" in check_values and "less_than" in check_values:
+            if check_values["greater_than"] >= check_values["less_than"]:
+                raise ValueError(
+                    "greater_than should be strictly less than to "
+                    "less_than. Use geq_than or leq_than for "
+                    "overlapping equality."
+                )
+
+        if "greater_than" in check_values and "leq_than" in check_values:
+            if check_values["greater_than"] >= check_values["leq_than"]:
+                raise ValueError(
+                    "greater_than must be strictly less than leq_than. "
+                    "Use geq_than with leq_than for overlapping equality."
+                )
+
+        if "geq_than" in check_values and "less_than" in check_values:
+            if check_values["geq_than"] >= check_values["less_than"]:
+                raise ValueError(
+                    "geq_than should be strictly less than less_than. "
+                    "Use leq_than with geq_than for overlapping equality."
+                )
+
+        if "geq_than" in check_values and "leq_than" in check_values:
+            if check_values["geq_than"] > check_values["leq_than"]:
+                raise ValueError("geq_than should be less than or equal to 
leq_than.")
+
+        if "greater_than" in check_values and "geq_than" in check_values:
+            raise ValueError("Only supply one of greater_than or geq_than.")
+
+        if "less_than" in check_values and "leq_than" in check_values:
+            raise ValueError("Only supply one of less_than or leq_than.")
+
+        if (
+            "greater_than" in check_values
+            or "geq_than" in check_values
+            or "less_than" in check_values
+            or "leq_than" in check_values
+        ) and "equal_to" in check_values:
+            raise ValueError(
+                "equal_to cannot be passed with a greater or less than "
+                "function. To specify 'greater than or equal to' or "
+                "'less than or equal to', use geq_than or leq_than."
+            )
+
+
+class SQLTableCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the checks provided in the checks dictionary.
+    Checks should be written to return a boolean result.
+
+    :param table: the table to run checks on.
+    :param checks: the dictionary of checks, e.g.:
+    {
+        'row_count_check': {
+            'check_statement': 'COUNT(*) == 1000'
+        },
+        'column_sum_check': {
+            'check_statement': 'col_a + col_b < col_c'
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    sql_check_template = "MIN(CASE WHEN check_statement THEN 1 ELSE 0 END AS 
check_name)"
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        checks: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+
+        self.table = table
+        self.checks = checks
+        self.sql = f"SELECT * FROM {self.table};"

Review Comment:
   Same question here as above.



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", 
column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+            self.log.info(f"Record: {records}")
+
+            if not records:
+                raise AirflowException("The query returned None")
+
+            for idx, result in enumerate(records):
+                tolerance = (
+                    self.column_mapping[column][checks[idx]]["tolerance"]
+                    if "tolerance" in self.column_mapping[column][checks[idx]]
+                    else None
+                )
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = 
self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        # check if record is str or numeric
+        # if record is str, do pattern matching
+        # numeric record checks
+        if "geq_than" in check_values:
+            if tolerance is not None:
+                return record >= check_values["geq_than"] * (1 - tolerance)
+            return record >= check_values["geq_than"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                return record > check_values["greater_than"] * (1 - tolerance)
+            return record > check_values["greater_than"]
+        if "leq_than" in check_values:
+            if tolerance is not None:
+                return record <= check_values["leq_than"] * (1 + tolerance)
+            return record <= check_values["leq_than"]
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                return record < check_values["less_than"] * (1 + tolerance)
+            return record < check_values["less_than"]
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                return (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                )
+        return record == check_values["equal_to"]
+
+    def _column_mapping_validation(self, check, check_values):
+        if check not in self.column_checks:
+            raise AirflowException(f"Invalid column check: {check}.")
+        if (
+            "greater_than" not in check_values
+            and "geq_than" not in check_values
+            and "less_than" not in check_values
+            and "leq_than" not in check_values
+            and "equal_to" not in check_values
+        ):
+            raise ValueError(
+                "Please provide one or more of: less_than, leq_than, "
+                "greater_than, geq_than, or equal_to in the check's dict."
+            )
+
+        if "greater_than" in check_values and "less_than" in check_values:
+            if check_values["greater_than"] >= check_values["less_than"]:
+                raise ValueError(
+                    "greater_than should be strictly less than to "

Review Comment:
   ```suggestion
                       "greater_than should be strictly less than "
   ```



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"

Review Comment:
   Just curious, why is this needed here?



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", 
column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+            self.log.info(f"Record: {records}")
+
+            if not records:
+                raise AirflowException("The query returned None")
+
+            for idx, result in enumerate(records):
+                tolerance = (
+                    self.column_mapping[column][checks[idx]]["tolerance"]
+                    if "tolerance" in self.column_mapping[column][checks[idx]]
+                    else None
+                )

Review Comment:
   ```suggestion
                   tolerance = 
self.column_mapping[column][checks[idx]].get("tolerance")
   ```



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", 
column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+            self.log.info(f"Record: {records}")
+
+            if not records:
+                raise AirflowException("The query returned None")
+
+            for idx, result in enumerate(records):
+                tolerance = (
+                    self.column_mapping[column][checks[idx]]["tolerance"]
+                    if "tolerance" in self.column_mapping[column][checks[idx]]
+                    else None
+                )
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = 
self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        # check if record is str or numeric
+        # if record is str, do pattern matching
+        # numeric record checks
+        if "geq_than" in check_values:
+            if tolerance is not None:
+                return record >= check_values["geq_than"] * (1 - tolerance)
+            return record >= check_values["geq_than"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                return record > check_values["greater_than"] * (1 - tolerance)
+            return record > check_values["greater_than"]
+        if "leq_than" in check_values:
+            if tolerance is not None:
+                return record <= check_values["leq_than"] * (1 + tolerance)
+            return record <= check_values["leq_than"]
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                return record < check_values["less_than"] * (1 + tolerance)
+            return record < check_values["less_than"]
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                return (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                )
+        return record == check_values["equal_to"]
+
+    def _column_mapping_validation(self, check, check_values):
+        if check not in self.column_checks:
+            raise AirflowException(f"Invalid column check: {check}.")
+        if (
+            "greater_than" not in check_values
+            and "geq_than" not in check_values
+            and "less_than" not in check_values
+            and "leq_than" not in check_values
+            and "equal_to" not in check_values
+        ):
+            raise ValueError(
+                "Please provide one or more of: less_than, leq_than, "
+                "greater_than, geq_than, or equal_to in the check's dict."

Review Comment:
   I think it would be super helpful if listing these equality check options 
were in the docstring so they can appear in the Python API documentation for 
users.



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", 
column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+            self.log.info(f"Record: {records}")
+
+            if not records:
+                raise AirflowException("The query returned None")
+
+            for idx, result in enumerate(records):
+                tolerance = (
+                    self.column_mapping[column][checks[idx]]["tolerance"]
+                    if "tolerance" in self.column_mapping[column][checks[idx]]
+                    else None
+                )
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = 
self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(

Review Comment:
   Maybe it's worth thinking about having a separate exception for Airflow 
checks/test especially if more data quality functionality is coming?



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", 
column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+            self.log.info(f"Record: {records}")
+
+            if not records:
+                raise AirflowException("The query returned None")
+
+            for idx, result in enumerate(records):
+                tolerance = (
+                    self.column_mapping[column][checks[idx]]["tolerance"]
+                    if "tolerance" in self.column_mapping[column][checks[idx]]
+                    else None
+                )
+
+                self.column_mapping[column][checks[idx]]["result"] = result
+                self.column_mapping[column][checks[idx]]["success"] = 
self._get_match(
+                    self.column_mapping[column][checks[idx]], result, tolerance
+                )
+
+            failed_tests.extend(_get_failed_tests(self.column_mapping[column]))
+        if failed_tests:
+            raise AirflowException(
+                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}\n"
+                "The following tests have failed:"
+                f"\n{''.join(failed_tests)}"
+            )
+
+        self.log.info("All tests have passed")
+
+    def _get_match(self, check_values, record, tolerance=None) -> bool:
+        # check if record is str or numeric
+        # if record is str, do pattern matching
+        # numeric record checks
+        if "geq_than" in check_values:
+            if tolerance is not None:
+                return record >= check_values["geq_than"] * (1 - tolerance)
+            return record >= check_values["geq_than"]
+        elif "greater_than" in check_values:
+            if tolerance is not None:
+                return record > check_values["greater_than"] * (1 - tolerance)
+            return record > check_values["greater_than"]
+        if "leq_than" in check_values:
+            if tolerance is not None:
+                return record <= check_values["leq_than"] * (1 + tolerance)
+            return record <= check_values["leq_than"]
+        elif "less_than" in check_values:
+            if tolerance is not None:
+                return record < check_values["less_than"] * (1 + tolerance)
+            return record < check_values["less_than"]
+        if "equal_to" in check_values:
+            if tolerance is not None:
+                return (
+                    check_values["equal_to"] * (1 - tolerance)
+                    <= record
+                    <= check_values["equal_to"] * (1 + tolerance)
+                )
+        return record == check_values["equal_to"]
+
+    def _column_mapping_validation(self, check, check_values):
+        if check not in self.column_checks:
+            raise AirflowException(f"Invalid column check: {check}.")
+        if (
+            "greater_than" not in check_values
+            and "geq_than" not in check_values
+            and "less_than" not in check_values
+            and "leq_than" not in check_values
+            and "equal_to" not in check_values
+        ):
+            raise ValueError(
+                "Please provide one or more of: less_than, leq_than, "
+                "greater_than, geq_than, or equal_to in the check's dict."
+            )
+
+        if "greater_than" in check_values and "less_than" in check_values:
+            if check_values["greater_than"] >= check_values["less_than"]:
+                raise ValueError(
+                    "greater_than should be strictly less than to "
+                    "less_than. Use geq_than or leq_than for "
+                    "overlapping equality."
+                )
+
+        if "greater_than" in check_values and "leq_than" in check_values:
+            if check_values["greater_than"] >= check_values["leq_than"]:
+                raise ValueError(
+                    "greater_than must be strictly less than leq_than. "
+                    "Use geq_than with leq_than for overlapping equality."
+                )
+
+        if "geq_than" in check_values and "less_than" in check_values:
+            if check_values["geq_than"] >= check_values["less_than"]:
+                raise ValueError(
+                    "geq_than should be strictly less than less_than. "
+                    "Use leq_than with geq_than for overlapping equality."
+                )
+
+        if "geq_than" in check_values and "leq_than" in check_values:
+            if check_values["geq_than"] > check_values["leq_than"]:
+                raise ValueError("geq_than should be less than or equal to 
leq_than.")
+
+        if "greater_than" in check_values and "geq_than" in check_values:
+            raise ValueError("Only supply one of greater_than or geq_than.")
+
+        if "less_than" in check_values and "leq_than" in check_values:
+            raise ValueError("Only supply one of less_than or leq_than.")
+
+        if (
+            "greater_than" in check_values
+            or "geq_than" in check_values
+            or "less_than" in check_values
+            or "leq_than" in check_values
+        ) and "equal_to" in check_values:
+            raise ValueError(
+                "equal_to cannot be passed with a greater or less than "
+                "function. To specify 'greater than or equal to' or "
+                "'less than or equal to', use geq_than or leq_than."
+            )
+
+
+class SQLTableCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the checks provided in the checks dictionary.
+    Checks should be written to return a boolean result.
+
+    :param table: the table to run checks on.
+    :param checks: the dictionary of checks, e.g.:
+    {
+        'row_count_check': {
+            'check_statement': 'COUNT(*) == 1000'
+        },
+        'column_sum_check': {
+            'check_statement': 'col_a + col_b < col_c'
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    sql_check_template = "MIN(CASE WHEN check_statement THEN 1 ELSE 0 END AS 
check_name)"
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        checks: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+
+        self.table = table
+        self.checks = checks
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+
+        checks_sql = ",".join(
+            [
+                self.sql_check_template.replace("check_statment", 
value["check_statement"]).replace(
+                    "check_name", check_name
+                )
+                for check_name, value in self.checks.items()
+            ]
+        )
+
+        self.sql = f"SELECT {checks_sql} FROM {self.table};"
+        records = hook.get_first(self.sql)
+
+        self.log.info(f"Record: {records}")
+
+        if not records:
+            raise AirflowException("The query returned None")

Review Comment:
   Same comment as above re: flipping these around.



##########
tests/operators/test_sql.py:
##########
@@ -385,6 +387,82 @@ def test_fail_min_sql_max_value(self, mock_get_db_hook):
             operator.execute()
 
 
+class TestColumnCheckOperator(unittest.TestCase):
+
+    valid_column_mapping = {
+        "X": {
+            "null_check": {"equal_to": 0},
+            "distinct_check": {"equal_to": 10, "tolerance": 0.1},
+            "unique_check": {"geq_than": 10},
+            "min": {"leq_than": 1},
+            "max": {"less_than": 20, "greater_than": 10},
+        }
+    }
+
+    invalid_column_mapping = {"Y": {"invalid_check_name": {"expectation": 5}}}
+
+    def _construct_operator(self, column_mapping):
+        return SQLColumnCheckOperator(task_id="test_task", table="test_table", 
column_mapping=column_mapping)
+
+    @mock.patch.object(SQLColumnCheckOperator, "get_db_hook")

Review Comment:
   Since every test case uses the same mocked object, you could think about 
using a pytest fixture for mocking `get_db_hook`. I don't have a strong opinion 
though.



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", 
column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"
+            records = hook.get_first(self.sql)
+            self.log.info(f"Record: {records}")
+
+            if not records:
+                raise AirflowException("The query returned None")

Review Comment:
   ```suggestion
               if not records:
                   raise AirflowException("The query returned None")
   
               self.log.info(f"Record: {records}")
   ```
   Seems like it's not worth logging the record info if no records are returned 
and a failure will occur. WDYT about flipping these around?
   
   Also, WDYT about changing the exception message to something like?
   "The following query returned no rows: {sql}"
   
   



##########
tests/operators/test_sql.py:
##########
@@ -385,6 +387,82 @@ def test_fail_min_sql_max_value(self, mock_get_db_hook):
             operator.execute()
 
 
+class TestColumnCheckOperator(unittest.TestCase):
+
+    valid_column_mapping = {
+        "X": {
+            "null_check": {"equal_to": 0},
+            "distinct_check": {"equal_to": 10, "tolerance": 0.1},
+            "unique_check": {"geq_than": 10},
+            "min": {"leq_than": 1},
+            "max": {"less_than": 20, "greater_than": 10},
+        }
+    }
+
+    invalid_column_mapping = {"Y": {"invalid_check_name": {"expectation": 5}}}
+
+    def _construct_operator(self, column_mapping):
+        return SQLColumnCheckOperator(task_id="test_task", table="test_table", 
column_mapping=column_mapping)
+
+    @mock.patch.object(SQLColumnCheckOperator, "get_db_hook")
+    def test_check_not_in_column_checks(self, mock_get_db_hook):
+        with pytest.raises(AirflowException, match="Invalid column check: 
invalid_check_name."):
+            self._construct_operator(self.invalid_column_mapping)
+
+    @mock.patch.object(SQLColumnCheckOperator, "get_db_hook")
+    def test_pass_all_checks_exact_check(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = (0, 10, 10, 1, 19)
+        mock_get_db_hook.return_value = mock_hook
+        operator = self._construct_operator(self.valid_column_mapping)
+        operator.execute()
+
+    @mock.patch.object(SQLColumnCheckOperator, "get_db_hook")
+    def test_pass_all_checks_inexact_check(self, mock_get_db_hook):
+        mock_hook = mock.Mock()
+        mock_hook.get_first.return_value = (0, 9, 12, 0, 15)
+        mock_get_db_hook.return_value = mock_hook
+        operator = self._construct_operator(self.valid_column_mapping)
+        operator.execute()

Review Comment:
   What is the assertion being made in these happy-path tests?



##########
tests/operators/test_sql.py:
##########
@@ -385,6 +387,82 @@ def test_fail_min_sql_max_value(self, mock_get_db_hook):
             operator.execute()
 
 
+class TestColumnCheckOperator(unittest.TestCase):

Review Comment:
   IMO this can be a `pytest` test rather than continuing to use `unittest`.



##########
airflow/operators/sql.py:
##########
@@ -467,6 +467,269 @@ def push(self, meta_data):
         self.log.info("Log from %s:\n%s", self.dag_id, info)
 
 
+def _get_failed_tests(checks):
+    return [
+        f"\tCheck: {check}, " f"Check Values: {check_values}\n"
+        for check, check_values in checks.items()
+        if not check_values["success"]
+    ]
+
+
+class SQLColumnCheckOperator(BaseSQLOperator):
+    """
+    Performs one or more of the templated checks in the column_checks 
dictionary.
+    Checks are performed on a per-column basis specified by the column_mapping.
+
+    :param table: the table to run checks on.
+    :param column_mapping: the dictionary of columns and their associated 
checks, e.g.:
+    {
+        'col_name': {
+            'null_check': {
+                'equal_to': 0,
+            },
+            'min': {
+                'greater_than': 5,
+                'leq_than': 10,
+                'tolerance': 0.2,
+            },
+            'max': {
+                'less_than': 1000,
+                'geq_than': 10,
+                'tolerance': 0.01
+            }
+        }
+    }
+    :param conn_id: the connection ID used to connect to the database.
+    :param database: name of database which overwrite the defined one in 
connection
+    """
+
+    column_checks = {
+        # pass value should be number of acceptable nulls
+        "null_check": "SUM(CASE WHEN 'column' IS NULL THEN 1 ELSE 0 END) AS 
column_null_check",
+        # pass value should be number of acceptable distinct values
+        "distinct_check": "COUNT(DISTINCT(column)) AS column_distinct_check",
+        # pass value is implicit in the query, it does not need to be passed
+        "unique_check": "COUNT(DISTINCT(column)) = COUNT(column)",
+        # pass value should be the minimum acceptable numeric value
+        "min": "MIN(column) AS column_min",
+        # pass value should be the maximum acceptable numeric value
+        "max": "MAX(column) AS column_max",
+    }
+
+    def __init__(
+        self,
+        *,
+        table: str,
+        column_mapping: Dict[str, Dict[str, Any]],
+        conn_id: Optional[str] = None,
+        database: Optional[str] = None,
+        **kwargs,
+    ):
+        super().__init__(conn_id=conn_id, database=database, **kwargs)
+        for checks in column_mapping.values():
+            for check, check_values in checks.items():
+                self._column_mapping_validation(check, check_values)
+
+        self.table = table
+        self.column_mapping = column_mapping
+        self.sql = f"SELECT * FROM {self.table};"
+
+    def execute(self, context=None):
+        hook = self.get_db_hook()
+        failed_tests = []
+        for column in self.column_mapping:
+            checks = [*self.column_mapping[column]]
+            checks_sql = ",".join([self.column_checks[check].replace("column", 
column) for check in checks])
+
+            self.sql = f"SELECT {checks_sql} FROM {self.table};"

Review Comment:
   Similar to the question above, does this need to be an instance attr?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to