mrrobby commented on a change in pull request #11113:
URL: https://github.com/apache/airflow/pull/11113#discussion_r502658557



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2109,3 +2122,287 @@ def execute(self, context: Any):
     def on_kill(self):
         if self.job_id and self.cancel_on_kill:
             self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id, location=self.location)
+
+
+class GreatExpectationsValidations(enum.Enum):
+    SQL = "SQL"
+    TABLE = "TABLE"
+
+
+class GreatExpectationsBigQueryOperator(GreatExpectationsBaseOperator):
+    """
+    Use Great Expectations to validate data expectations against a BigQuery 
table or the result of a SQL query.
+         The expectations need to be stored in a JSON file sitting in an 
accessible GCS bucket.  The validation results
+         are output to GCS in both JSON and HTML formats.
+         Here's the current list of expectations types:
+         
https://docs.greatexpectations.io/en/latest/reference/glossary_of_expectations.html
+         Here's how to create expectations files:
+         
https://docs.greatexpectations.io/en/latest/guides/tutorials/how_to_create_expectations.html
+        :param gcp_project:  The GCP project which houses the GCS buckets 
where the expectations files are stored
+            and where the validation files & data docs will be output (e.g. 
HTML docs showing if the data matches
+            expectations).
+        :type gcp_project: str
+        :param expectations_file_name: The name of the JSON file containing 
the expectations for the data.
+        :type expectations_file_name: str
+        :param gcs_bucket:  Google Cloud Storage bucket where expectation 
files are stored and where validation outputs
+            and data docs will be saved.
+            (e.g. 
gs://<gcs_bucket>/<gcs_expectations_prefix>/<expectations_file_name>
+                  gs://mybucket/myprefix/myexpectationsfile.json )
+        :type gcs_bucket: str
+        :param gcs_expectations_prefix:  Google Cloud Storage prefix where the 
expectations file can be found.
+            (e.g. 'ge/expectations')
+        :type gcs_expectations_prefix: str
+        :param gcs_validations_prefix:  Google Cloud Storage prefix where the 
validation output files should be saved.
+            (e.g. 'ge/validations')
+        :type gcs_validations_prefix: str
+        :param gcs_datadocs_prefix:  Google Cloud Storage prefix where the 
validation datadocs files should be saved.
+            (e.g. 'ge/datadocs')
+        :type gcs_datadocs_prefix: str
+        :param validation_type: For the set of data to be validated (i.e. 
compared against expectations), is it already
+            sitting in a BigQuery table or do you want to validate the data 
returned by a SQL query?  The options are
+            'TABLE' or 'SQL'.
+        :type validation_type: str
+        :param validation_type_input:  The name of the BigQuery table 
(dataset_name.table_name) if the validation_type
+            is 'TABLE' or the SQL query string if the validation_type is 'SQL'.
+        :type validation_type_input: str
+        :param bigquery_conn_id: Name of the BigQuery connection that contains 
the connection and credentials
+            info needed to connect to BigQuery.
+        :type bigquery_conn_id: str
+        :param bq_dataset_name:  The name of the BigQuery data set where any 
temp tables will be created that are needed
+            as part of the GE validation process.
+        :type bq_dataset_name: str
+        :param send_alert_email:  Send an alert email if one or more 
expectations fail to be met?  Defaults to True.
+        :type send_alert_email: boolean
+        :param datadocs_link_in_email:  Include in the alert email a link to 
the data doc in GCS that shows the
+            validation results?  Defaults to False because there's extra setup 
needed to serve HTML data docs stored in
+            GCS.  When set to False, only a GCS path to the results are 
included in the email.
+            Set up a GAE app to serve the data docs if you want a clickable 
link for the data doc to be included in the
+            email.  See here for set up instructions:
+            
https://docs.greatexpectations.io/en/latest/guides/how_to_guides/configuring_data_docs/how_to_host_and_share_data_docs_on_gcs.html
+        :type datadocs_link_in_email: boolean
+        :param datadocs_domain: The domain from which the data docs are set up 
to be served (e.g. ge-data-docs-dot-my-gcp-project.ue.r.appspot.com).
+            This only needs to be set if datadocs_link_in_email is set to True.
+        :type datadocs_domain: str
+        :param email_to:  Email address to receive any alerts when 
expectations are not met.
+        :type email_to: str
+        :param fail_if_expectations_not_met: Fail the Airflow task if 
expectations are not met?  Defaults to True.
+        :type fail_if_expectations_not_met: boolean
+    """
+
+    _EMAIL_CONTENT = '''
+            <html>
+              <head>
+                <meta charset="utf-8">
+              </head>
+              <body style="background-color: #fafafa; font-family: Roboto, 
sans-serif=;">
+                <div style="width: 600px; margin:0 auto;">
+                    <div style="background-color: white; border-top: 4px solid 
#22a667; border-left: 1px solid #eee; border-right: 1px solid #eee; 
border-radius: 6px 6px 0 0; height: 24px;"></div>
+                        <div style="background-color: white; border-left: 1px 
solid #eee; border-right: 1px solid #eee; padding: 0 24px; overflow: hidden;">
+                          <div style="margin-left: 35px;">
+                            Great Expectations Alert<br>
+                            One or more data expectations were not met in the 
{0} file. {1}
+                       </div>
+              </body>
+            </html>
+            '''
+
+    @apply_defaults
+    def __init__(self, *, gcp_project, expectations_file_name, gcs_bucket, 
gcs_expectations_prefix,
+                 gcs_validations_prefix, gcs_datadocs_prefix, validation_type, 
validation_type_input,
+                 bq_dataset_name, email_to, datadocs_domain='none', 
send_alert_email=True, datadocs_link_in_email=False,
+                 fail_if_expectations_not_met=True,
+                 bigquery_conn_id='bigquery_default', **kwargs):
+
+        super().__init__(**kwargs)
+
+        great_expectations_valid_type = set(item.value for item in 
GreatExpectationsValidations)
+
+        self.expectations_file_name = expectations_file_name
+        if validation_type.upper() not in GreatExpectationsValidations:
+            raise AirflowException(f"argument 'validation_type' must be one of 
{great_expectations_valid_type}")
+        self.validation_type = validation_type
+        self.validation_type_input = validation_type_input
+        self.bigquery_conn_id = bigquery_conn_id
+        self.bq_dataset_name = bq_dataset_name
+        self.gcp_project = gcp_project
+        self.gcs_bucket = gcs_bucket
+        self.gcs_expectations_prefix = gcs_expectations_prefix
+        self.gcs_validations_prefix = gcs_validations_prefix
+        self.gcs_datadocs_prefix = gcs_datadocs_prefix
+        self.email_to = email_to
+        self.datadocs_domain = datadocs_domain
+        self.send_alert_email = send_alert_email
+        self.datadocs_link_in_email = datadocs_link_in_email
+        self.fail_if_expectations_not_met = fail_if_expectations_not_met
+
+    def instantiate_data_context(self):
+
+        conn = BaseHook.get_connection(self.bigquery_conn_id)
+        connection_json = conn.extra_dejson
+
+        project_config = DataContextConfig(
+            config_version=2,
+            datasources={
+                "bq_datasource": {
+                    "credentials": {
+                        "url": "bigquery://" + connection_json[
+                            'extra__google_cloud_platform__project'] + "/" + 
self.bq_dataset_name + "?credentials_path=" +
+                               
connection_json['extra__google_cloud_platform__key_path']
+                    },
+                    "class_name": "SqlAlchemyDatasource",
+                    "module_name": "great_expectations.datasource",
+                    "data_asset_type": {
+                        "module_name": "great_expectations.dataset",
+                        "class_name": "SqlAlchemyDataset"
+                    }
+                }
+            },
+            expectations_store_name="expectations_GCS_store",
+            validations_store_name="validations_GCS_store",
+            evaluation_parameter_store_name="evaluation_parameter_store",
+            plugins_directory=None,
+            validation_operators={
+                "action_list_operator": {
+                    "class_name": "ActionListValidationOperator",
+                    "action_list": [
+                        {
+                            "name": "store_validation_result",
+                            "action": {"class_name": 
"StoreValidationResultAction"},
+                        },
+                        {
+                            "name": "store_evaluation_params",
+                            "action": {"class_name": 
"StoreEvaluationParametersAction"},
+                        },
+                        {
+                            "name": "update_data_docs",
+                            "action": {"class_name": "UpdateDataDocsAction"},
+                        },
+                    ],
+                }
+            },
+            stores={
+                'expectations_GCS_store': {

Review comment:
       I feel like setting the expectations store and the validations store (or 
all of the data context at that) should be abstracted away from here somehow, 
perhaps by creating a great expectations hook. That would allow to, say, use a 
gcs bucket for these stores and create a snowflake great expectations operator 
perhaps.
   ```python
   datasource = ge_gcs_hook.create_bigquery_datasource(bigquery_conn_id, 
bq_dataset_name, other_params) 
   data_context = ge_gcs_hook.create_data_context(datasource, gcs_conn_id, 
gcs_bucket, gcs_validations_prefix, gcs_expectations_prefix)
   ```
   so we can also have something like
   ```python
   # Base hook sets abstract on data context
   custom_datasource = ge_hook.create_custom_datasource(conn_id, dataset_name, 
class_name, module_name) 
   ```
   Something like that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to