Repository: incubator-airflow Updated Branches: refs/heads/master fe73f2215 -> 69334fc44
[AIRFLOW-800] Initialize valid Google BigQuery Connection - Modified `GoogleCloudBaseHook` to change the name of parameter `conn_id` to keep it consistent with other Hooks. - Changed the connection to `GoogleCloudBaseHook` instead of `BigQueryHook` which was causing an invalid `conn_type` creation during `airflow initdb` Closes #3031 from kaxil/AIRFLOW-800 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/69334fc4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/69334fc4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/69334fc4 Branch: refs/heads/master Commit: 69334fc44726784f2351ef5ecc9ce8be18e88939 Parents: fe73f22 Author: Kaxil Naik <[email protected]> Authored: Sun Feb 11 21:16:24 2018 +0100 Committer: Fokko Driesprong <[email protected]> Committed: Sun Feb 11 21:16:28 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 2 +- airflow/contrib/hooks/gcp_api_base_hook.py | 19 ++++++++++--------- airflow/models.py | 4 ++-- 3 files changed, 13 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/69334fc4/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index dca4d33..220156b 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -49,7 +49,7 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): delegate_to=None, use_legacy_sql=True): super(BigQueryHook, self).__init__( - conn_id=bigquery_conn_id, delegate_to=delegate_to) + gcp_conn_id=bigquery_conn_id, delegate_to=delegate_to) self.use_legacy_sql = use_legacy_sql def get_conn(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/69334fc4/airflow/contrib/hooks/gcp_api_base_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py index e6ca240..12a7fb5 100644 --- a/airflow/contrib/hooks/gcp_api_base_hook.py +++ b/airflow/contrib/hooks/gcp_api_base_hook.py @@ -43,18 +43,19 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin): Legacy P12 key files are not supported. """ - def __init__(self, conn_id, delegate_to=None): + + def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None): """ - :param conn_id: The connection ID to use when fetching connection info. - :type conn_id: string + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: string :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string """ - self.conn_id = conn_id + self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to - self.extras = self.get_connection(conn_id).extra_dejson + self.extras = self.get_connection(self.gcp_conn_id).extra_dejson def _get_credentials(self): """ @@ -69,8 +70,8 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin): kwargs['sub'] = self.delegate_to if not key_path and not keyfile_dict: - self.log.info('Getting connection using `gcloud auth` user, since no key file ' - 'is defined for hook.') + self.log.info('Getting connection using `gcloud auth` user, ' + 'since no key file is defined for hook.') credentials = GoogleCredentials.get_application_default() elif key_path: if not scope: @@ -80,7 +81,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin): # Get credentials from a JSON file. if key_path.endswith('.json'): self.log.info('Getting connection using a JSON key file.') - credentials = ServiceAccountCredentials\ + credentials = ServiceAccountCredentials \ .from_json_keyfile_name(key_path, scopes) elif key_path.endswith('.p12'): raise AirflowException('Legacy P12 key file are not supported, ' @@ -101,7 +102,7 @@ class GoogleCloudBaseHook(BaseHook, LoggingMixin): keyfile_dict['private_key'] = keyfile_dict['private_key'].replace( '\\n', '\n') - credentials = ServiceAccountCredentials\ + credentials = ServiceAccountCredentials \ .from_json_keyfile_dict(keyfile_dict, scopes) except json.decoder.JSONDecodeError: raise AirflowException('Invalid key JSON.') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/69334fc4/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index eb1cfb6..ee45dff 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -662,8 +662,8 @@ class Connection(Base, LoggingMixin): from airflow.hooks.mysql_hook import MySqlHook return MySqlHook(mysql_conn_id=self.conn_id) elif self.conn_type == 'google_cloud_platform': - from airflow.contrib.hooks.bigquery_hook import BigQueryHook - return BigQueryHook(bigquery_conn_id=self.conn_id) + from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + return GoogleCloudBaseHook(gcp_conn_id=self.conn_id) elif self.conn_type == 'postgres': from airflow.hooks.postgres_hook import PostgresHook return PostgresHook(postgres_conn_id=self.conn_id)
