turbaszek commented on a change in pull request #11111:
URL: https://github.com/apache/airflow/pull/11111#discussion_r493917075



##########
File path: airflow/providers/google/cloud/operators/bigquery.py
##########
@@ -2109,3 +2127,268 @@ def execute(self, context: Any):
     def on_kill(self):
         if self.job_id and self.cancel_on_kill:
             self.hook.cancel_job(job_id=self.job_id, 
project_id=self.project_id, location=self.location)
+
+
+class GreatExpectationsBigQueryOperator(BaseOperator):
+    """
+         Use Great Expectations to validate data expectations against a 
BigQuery table or the result of a SQL query.
+         The expectations need to be stored in a JSON file sitting in an 
accessible GCS bucket.  The validation results
+         are output to GCS in both JSON and HTML formats.
+         Here's the current list of expectations types:
+         
https://docs.greatexpectations.io/en/latest/reference/glossary_of_expectations.html
+         Here's how to create expectations files:
+         
https://docs.greatexpectations.io/en/latest/guides/tutorials/how_to_create_expectations.html
+
+        :param gcp_project:  The GCP project which houses the GCS buckets 
where the expectations files are stored
+            and where the validation files & data docs will be output (e.g. 
HTML docs showing if the data matches
+            expectations).
+        :type gcp_project: str
+        :param expectations_file_name: The name of the JSON file containing 
the expectations for the data.
+        :type expectations_file_name: str
+        :param gcs_bucket:  Google Cloud Storage bucket where expectation 
files are stored and where validation outputs
+            and data docs will be saved.
+            (e.g. 
gs://<gcs_bucket>/<gcs_expectations_prefix>/<expectations_file_name>
+                  gs://mybucket/myprefix/myexpectationsfile.json )
+        :type gcs_bucket: str
+        :param gcs_expectations_prefix:  Google Cloud Storage prefix where the 
expectations file can be found.
+            (e.g. 'ge/expectations')
+        :type gcs_expectations_prefix: str
+        :param gcs_validations_prefix:  Google Cloud Storage prefix where the 
validation output files should be saved.
+            (e.g. 'ge/validations')
+        :type gcs_validations_prefix: str
+        :param gcs_datadocs_prefix:  Google Cloud Storage prefix where the 
validation datadocs files should be saved.
+            (e.g. 'ge/datadocs')
+        :type gcs_datadocs_prefix: str
+        :param validation_type: For the set of data to be validated (i.e. 
compared against expectations), is it already
+            sitting in a BigQuery table or do you want to validate the data 
returned by a SQL query?  The options are
+            'TABLE' or 'SQL'.
+        :type validation_type: str
+        :param validation_type_input:  The name of the BigQuery table 
(dataset_name.table_name) if the validation_type
+            is 'TABLE' or the SQL query string if the validation_type is 'SQL'.
+        :type validation_type_input: str
+        :param bigquery_conn_id: Name of the BigQuery connection that contains 
the connection and credentials
+            info needed to connect to BigQuery.
+        :type bigquery_conn_id: str
+        :param bq_dataset_name:  The name of the BigQuery data set where any 
temp tables will be created that are needed
+            as part of the GE validation process.
+        :type bq_dataset_name: str
+        :param send_alert_email:  Send an alert email if one or more 
expectations fail to be met?  Defaults to True.
+        :type send_alert_email: boolean
+        :param include_datadocs_link_in_email:  Include in the alert email a 
link to the data doc in GCS that shows the
+            validation results?  Defaults to False because there's extra setup 
needed to serve HTML data docs stored in
+            GCS.  When set to False, only a GCS path to the results are 
included in the email.
+            Execute the following steps if you want a clickable link for the 
data doc to be included in the
+            email:
+            1) Set up a GAE app to serve the data docs by following the 
instructions here:
+            
https://docs.greatexpectations.io/en/latest/guides/how_to_guides/configuring_data_docs/how_to_host_and_share_data_docs_on_gcs.html
+            2) Create an Airflow variable called 
'great_expectations_datadocs_domain' with a value set to the domain
+               google creates in step 1 to serve the data docs (e.g. 
'ge-data-docs-dot-my-gcp-project.ue.r.appspot.com').
+               This operator will look for it if 
include_datadocs_link_in_email=True
+        :type include_datadocs_link_in_email: boolean
+        :param email_to:  Email address to receive any alerts when 
expectations are not met.
+        :type email_to: str
+        :param fail_if_expectations_not_met: Fail the Airflow task if 
expectations are not met?  Defaults to True.
+        :type fail_if_expectations_not_met: boolean
+    """
+
+    @apply_defaults
+    def __init__(self, gcp_project, expectations_file_name, gcs_bucket, 
gcs_expectations_prefix,
+                 gcs_validations_prefix, gcs_datadocs_prefix, validation_type, 
validation_type_input,
+                 bq_dataset_name, email_to, send_alert_email=True, 
include_datadocs_link_in_email=False,
+                 fail_if_expectations_not_met=True,
+                 bigquery_conn_id='bigquery_default',
+                 *args, **kwargs):
+        self.expectations_file_name = expectations_file_name
+        if validation_type.upper() not in VALID_TYPE:
+            raise AirflowException("argument 'validation_type' must be one of 
%r." % VALID_TYPE)
+        self.validation_type = validation_type
+        self.validation_type_input = validation_type_input
+        self.bigquery_conn_id = bigquery_conn_id
+        self.bq_dataset_name = bq_dataset_name
+        self.gcp_project = gcp_project
+        self.gcs_bucket = gcs_bucket
+        self.gcs_expectations_prefix = gcs_expectations_prefix
+        self.gcs_validations_prefix = gcs_validations_prefix
+        self.gcs_datadocs_prefix = gcs_datadocs_prefix
+        self.email_to = email_to
+        self.send_alert_email = send_alert_email
+        self.include_datadocs_link_in_email = include_datadocs_link_in_email
+        self.fail_if_expectations_not_met = fail_if_expectations_not_met
+
+        super(GreatExpectationsBigQueryOperator, self).__init__(*args, 
**kwargs)
+
+    def execute(self, context):
+        conn = BaseHook.get_connection(self.bigquery_conn_id)
+        connectionJson = json.loads(conn.extra)
+        log.info('####### validation_type_input  
{}'.format(self.validation_type_input))
+
+        project_config = DataContextConfig(
+            config_version=2,
+            datasources={
+                "bq_datasource": {
+                    "credentials": {
+                        "url": "bigquery://" + connectionJson[
+                            'extra__google_cloud_platform__project'] + "/" + 
self.bq_dataset_name + "?credentials_path=" +
+                               
connectionJson['extra__google_cloud_platform__key_path']
+                    },
+                    "class_name": "SqlAlchemyDatasource",
+                    "module_name": "great_expectations.datasource",
+                    "data_asset_type": {
+                        "module_name": "great_expectations.dataset",
+                        "class_name": "SqlAlchemyDataset"
+                    }
+                }
+            },
+            expectations_store_name="expectations_GCS_store",
+            validations_store_name="validations_GCS_store",
+            evaluation_parameter_store_name="evaluation_parameter_store",
+            plugins_directory=None,
+            validation_operators={
+                "action_list_operator": {
+                    "class_name": "ActionListValidationOperator",
+                    "action_list": [
+                        {
+                            "name": "store_validation_result",
+                            "action": {"class_name": 
"StoreValidationResultAction"},
+                        },
+                        {
+                            "name": "store_evaluation_params",
+                            "action": {"class_name": 
"StoreEvaluationParametersAction"},
+                        },
+                        {
+                            "name": "update_data_docs",
+                            "action": {"class_name": "UpdateDataDocsAction"},
+                        },
+                    ],
+                }
+            },
+            stores={
+                'expectations_GCS_store': {
+                    'class_name': 'ExpectationsStore',
+                    'store_backend': {
+                        'class_name': 'TupleGCSStoreBackend',
+                        'project': self.gcp_project,
+                        'bucket': self.gcs_bucket,
+                        'prefix': self.gcs_expectations_prefix
+                    }
+                },
+                'validations_GCS_store': {
+                    'class_name': 'ValidationsStore',
+                    'store_backend': {
+                        'class_name': 'TupleGCSStoreBackend',
+                        'project': self.gcp_project,
+                        'bucket': self.gcs_bucket,
+                        'prefix': self.gcs_validations_prefix
+                    }
+                },
+                "evaluation_parameter_store": {"class_name": 
"EvaluationParameterStore"},
+            },
+            data_docs_sites={
+                "GCS_site": {
+                    "class_name": "SiteBuilder",
+                    "store_backend": {
+                        "class_name": "TupleGCSStoreBackend",
+                        "project": self.gcp_project,
+                        "bucket": self.gcs_bucket,
+                        'prefix': self.gcs_datadocs_prefix
+                    },
+                    "site_index_builder": {
+                        "class_name": "DefaultSiteIndexBuilder",
+                    },
+                }
+            },
+            config_variables_file_path=None,
+            commented_map=None,
+        )
+        data_context = BaseDataContext(project_config=project_config)
+
+        # Tell GE how to fetch the batch of data that should be validated.
+        batch_kwargs = {
+            "datasource": "bq_datasource",
+        }
+        if self.validation_type == VALIDATIONS.SQL.name:
+            batch_kwargs["query"] = self.validation_type_input
+            batch_kwargs["data_asset_name"] = self.bq_dataset_name
+            batch_kwargs["bigquery_temp_table"] = self.get_temp_table_name()
+        elif self.validation_type == VALIDATIONS.TABLE.name:
+            batch_kwargs["table"] = self.validation_type_input
+            batch_kwargs["data_asset_name"] = self.bq_dataset_name
+
+        log.info("batch_kwargs: " + str(batch_kwargs))
+
+        log.info("Loading expectations...")
+        suite = 
data_context.get_expectation_suite((self.expectations_file_name.rsplit(".", 
1)[0]))
+
+        log.info("Getting the batch of data to be validated...")
+        batch = data_context.get_batch(batch_kwargs, suite)
+
+        run_id = {
+            "run_name": 'bq',
+            "run_time": datetime.datetime.now(datetime.timezone.utc)
+        }
+
+        log.info("Validating batch against expectations...")
+        results = data_context.run_validation_operator(
+            "action_list_operator",
+            assets_to_validate=[batch],
+            run_id=run_id)
+
+        validation_result_identifier = list(results['run_results'].keys())[0]
+        # For the given validation_result_identifier, get a link to the data 
docs that were generated by Great
+        # Expectations as part of the validation.
+        data_docs_url = \
+            
data_context.get_docs_sites_urls(resource_identifier=validation_result_identifier,
 site_name='GCS_site')[0][
+                'site_url']
+        log.info("Data docs url is: " + data_docs_url)
+        if results["success"]:
+            self.log.info('All expectations met')
+        else:
+            self.log.info('One or more expectations were not met.')
+            if self.send_alert_email:
+                self.log.info('Sending alert email...')
+                self.send_alert(data_docs_url)
+            if self.fail_if_expectations_not_met:
+                raise AirflowException('One or more expectations were not met')
+
+    # Generate a unique name for a temporary BQ table
+    def get_temp_table_name(self):
+        now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
+        name_start = "temp_great_expectations_" + now + '_'
+        full_name = name_start + ''.join(random.choices(string.ascii_uppercase 
+
+                                                        string.digits, k=5))
+        log.info("Generated name for temporary table: " + full_name)
+        return full_name
+
+    def send_alert(self, data_docs_url):
+        if self.include_datadocs_link_in_email:
+            # From an Airflow variable set by the user, get the domain name of 
the service serving the data docs.
+            domain = Variable.get("great_expectations_datadocs_domain")
+            # Replace the domain returned by GE with the domain set up to 
serve the data docs
+            parsed = urlsplit(data_docs_url)
+            new_url = parsed._replace(netloc=domain)
+            results = '  See the results <a href=' + new_url.geturl() + 
'>here</a>.'
+        else:
+            # From the data docs url, pull out just the GCS path and send it 
to the users in the email.
+            parsed = urlsplit(data_docs_url)
+            results = '  See the following GCS location for results:' + 
parsed.path
+        email_content = '''
+                <html>
+                  <head>
+                    <meta charset="utf-8">
+                  </head>
+                  <body style="background-color: #fafafa; font-family: Roboto, 
sans-serif=;">
+                    <div style="width: 600px; margin:0 auto;">
+                        <div style="background-color: white; border-top: 4px 
solid #22a667; border-left: 1px solid #eee; border-right: 1px solid #eee; 
border-radius: 6px 6px 0 0; height: 24px;"></div>
+                            <div style="background-color: white; border-left: 
1px solid #eee; border-right: 1px solid #eee; padding: 0 24px; overflow: 
hidden;">
+                              <div style="margin-left: 35px;">
+                                Great Expectations Alert<br>
+                                One or more data expectations were not met in 
the {0} file. {1} 
+                           </div>
+                  </body>
+                </html>
+                '''.format(self.expectations_file_name, results)
+        send_email(self.email_to, 'expectations in ' + 
self.expectations_file_name + ' not met', email_content,
+                   files=None, cc=None, bcc=None,
+                   mime_subtype='mixed', mime_charset='us_ascii')
+        B

Review comment:
       ```suggestion
   
   ```
   Not sure about it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to