kaxil commented on a change in pull request #4354: [AIRFLOW-3446] Add Google Cloud BigTable operators URL: https://github.com/apache/incubator-airflow/pull/4354#discussion_r244886010
########## File path: airflow/contrib/hooks/gcp_bigtable_hook.py ########## @@ -0,0 +1,232 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from google.cloud.bigtable import Client +from google.cloud.bigtable.cluster import Cluster +from google.cloud.bigtable.instance import Instance +from google.cloud.bigtable.table import Table +from google.cloud.bigtable_admin_v2 import enums +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + + +# noinspection PyAbstractClass +class BigtableHook(GoogleCloudBaseHook): + """ + Hook for Google Cloud Bigtable APIs. + """ + + _client = None + + def __init__(self, + gcp_conn_id='google_cloud_default', + delegate_to=None): + super(BigtableHook, self).__init__(gcp_conn_id, delegate_to) + + def get_client(self, project_id): + if not self._client: + self._client = Client(project=project_id, credentials=self._get_credentials(), admin=True) + return self._client + + def get_instance(self, project_id, instance_id): + """ + Retrieves and returns the specified Cloud Bigtable instance if it exists. + Otherwise, returns None. + + :param project_id: The ID of the GCP project. + :type project_id: str + :param instance_id: The ID of the Cloud Bigtable instance. + :type instance_id: str + """ + + client = self.get_client(project_id) + + instance = Instance(instance_id, client) + if not instance.exists(): + return None + return instance + + def delete_instance(self, project_id, instance_id): + """ + Deletes the specified Cloud Bigtable instance. + Raises google.api_core.exceptions.NotFound if the Cloud Bigtable instance does not exist. + + :param project_id: The ID of the GCP project. + :type project_id: str + :param instance_id: The ID of the Cloud Bigtable instance. + :type instance_id: str + """ + instance = Instance(instance_id, self.get_client(project_id)) + instance.delete() + + def create_instance(self, + project_id, + instance_id, + main_cluster_id, + main_cluster_zone, + replica_cluster_id=None, + replica_cluster_zone=None, + instance_display_name=None, + instance_type=enums.Instance.Type.TYPE_UNSPECIFIED, + instance_labels=None, + cluster_nodes=None, + cluster_storage_type=enums.StorageType.STORAGE_TYPE_UNSPECIFIED, + timeout=None): + """ + Creates new instance. + + :type project_id: str + :param project_id: The ID of the GCP project. + :type instance_id: str + :param instance_id: The ID for the new instance. + :type main_cluster_id: str + :param main_cluster_id: The ID for main cluster for the new instance. + :type main_cluster_zone: str + :param main_cluster_zone: The zone for main cluster. + See https://cloud.google.com/bigtable/docs/locations for more details. + :type replica_cluster_id: str + :param replica_cluster_id: (optional) The ID for replica cluster for the new instance. + :type replica_cluster_zone: str + :param replica_cluster_zone: (optional) The zone for replica cluster. + :type instance_type: enums.Instance.Type + :param instance_type: (optional) The type of the instance. + :type instance_display_name: str + :param instance_display_name: (optional) Human-readable name of the instance. + Defaults to ``instance_id``. + :type instance_labels: dict + :param instance_labels: (optional) Dictionary of labels to associate with the instance. + :type cluster_nodes: int + :param cluster_nodes: (optional) Number of nodes for cluster. + :type cluster_storage_type: enums.StorageType + :param cluster_storage_type: (optional) The type of storage. + :type timeout: int + :param timeout: (optional) timeout (in seconds) for instance creation. + If None is not specified, Operator will wait indefinitely. + """ + cluster_storage_type = enums.StorageType(cluster_storage_type) + instance_type = enums.Instance.Type(instance_type) + + instance = Instance( + instance_id, + self.get_client(project_id), + instance_display_name, + instance_type, + instance_labels, + ) + + clusters = [ + instance.cluster( + main_cluster_id, + main_cluster_zone, + cluster_nodes, + cluster_storage_type + ) + ] + if replica_cluster_id and replica_cluster_zone: + clusters.append(instance.cluster( + replica_cluster_id, + replica_cluster_zone, + cluster_nodes, + cluster_storage_type + )) + operation = instance.create( + clusters=clusters + ) + operation.result(timeout) + return instance + + # noinspection PyMethodMayBeStatic + def create_table(self, instance, table_id, initial_split_keys, column_families): + """ + Creates the specified Cloud Bigtable table. + Raises google.api_core.exceptions.AlreadyExists if the table exists. + + :type instance: Instance + :param instance: The Cloud Bigtable instance that owns the table. + :type table_id: str + :param table_id: The ID of the table to create in Cloud Bigtable. + :type initial_split_keys: list + :param initial_split_keys: (Optional) A list of row keys in bytes to use to initially split the table. + :type column_families: dict + :param column_families: (Optional) A map of columns to create. The key is the column_id str, and the + value is a GarbageCollectionRule. + """ + table = Table(table_id, instance) + table.create(initial_split_keys, column_families) + + def delete_table(self, project_id, instance_id, table_id): + """ + Deletes the specified table in Cloud Bigtable. + Raises google.api_core.exceptions.NotFound if the table does not exist. + + :type project_id: str + :param project_id: The ID of the GCP project. + :type instance_id: str + :param instance_id: The ID of the Cloud Bigtable instance. + :type table_id: str + :param table_id: The ID of the table in Cloud Bigtable. + """ + instance = Instance(instance_id, self.get_client(project_id)) + table = Table(table_id, instance) + table.delete() + + # noinspection PyMethodMayBeStatic Review comment: Can we remove this as well, if the method is static, please make it static ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
