kaxil commented on a change in pull request #4493: [AIRFLOW-3680] Consistency update in tests for All GCP-related operators URL: https://github.com/apache/airflow/pull/4493#discussion_r247297230
########## File path: tests/contrib/utils/gcp_authenticator.py ########## @@ -0,0 +1,208 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +import os +import subprocess + +from airflow import settings, AirflowException +from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor + +from airflow.models.connection import Connection + +GCP_COMPUTE_KEY = 'gcp_compute.json' +GCP_FUNCTION_KEY = 'gcp_function.json' +GCP_CLOUDSQL_KEY = 'gcp_cloudsql.json' +GCP_BIGTABLE_KEY = 'gcp_bigtable.json' +GCP_SPANNER_KEY = 'gcp_spanner.json' +GCP_GCS_KEY = 'gcp_gcs.json' + +KEYPATH_EXTRA = 'extra__google_cloud_platform__key_path' +KEYFILE_DICT_EXTRA = 'extra__google_cloud_platform__keyfile_dict' +SCOPE_EXTRA = 'extra__google_cloud_platform__scope' +PROJECT_EXTRA = 'extra__google_cloud_platform__project' + +AIRFLOW_MAIN_FOLDER = os.path.realpath(os.path.join( + os.path.dirname(os.path.realpath(__file__)), + os.pardir, os.pardir, os.pardir)) + + +class GcpAuthenticator(LoggingCommandExecutor): + """ + Manages authentication to Google Cloud Platform. It helps to manage + connection - it can authenticate with the gcp key name specified + """ + original_account = None + + def __init__(self, gcp_key, project_extra=None): + """ + Initialises the authenticator. + + :param gcp_key: name of the key to use for authentication (see GCP_*_KEY values) + :param project_extra: optional extra project parameter passed to google cloud + connection + """ + super(GcpAuthenticator, self).__init__() + self.gcp_key = gcp_key + self.project_extra = project_extra + self.project_id = self.get_project_id() + self.full_key_path = None + self._set_key_path() + + @staticmethod + def get_project_id(): + return os.environ.get('GCP_PROJECT_ID') + + def set_key_path_in_airflow_connection(self): + """ + Set key path in 'google_cloud_default' connection to point to the full + key path + :return: None + """ + session = settings.Session() + try: + conn = session.query(Connection).filter( + Connection.conn_id == 'google_cloud_default')[0] + extras = conn.extra_dejson + extras[KEYPATH_EXTRA] = self.full_key_path + if extras.get(KEYFILE_DICT_EXTRA): + del extras[KEYFILE_DICT_EXTRA] + extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform' + extras[PROJECT_EXTRA] = self.project_extra if self.project_extra else \ + self.project_id + conn.extra = json.dumps(extras) + session.commit() + except BaseException as ex: + self.log.info('Airflow DB Session error:' + str(ex)) + session.rollback() + raise + finally: + session.close() + + def set_dictionary_in_airflow_connection(self): + """ + Set dictionary in 'google_cloud_default' connection to contain content + of the json service account file. + :return: None + """ + session = settings.Session() + try: + conn = session.query(Connection).filter( + Connection.conn_id == 'google_cloud_default')[0] + extras = conn.extra_dejson + with open(self.full_key_path, "r") as path_file: + content = json.load(path_file) + extras[KEYFILE_DICT_EXTRA] = json.dumps(content) + if extras.get(KEYPATH_EXTRA): + del extras[KEYPATH_EXTRA] + extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform' + extras[PROJECT_EXTRA] = self.project_extra + conn.extra = json.dumps(extras) + session.commit() + except BaseException as ex: + self.log.info('Airflow DB Session error:' + str(ex)) + session.rollback() + raise + finally: + session.close() + + def _set_key_path(self): + """ + Sets full key path - if AIRFLOW_BREEZE_CONFIG_DIR points to absolute Review comment: in the entire file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
