Repository: incubator-airflow Updated Branches: refs/heads/master 617ba7412 -> ffbe7282d
[AIRFLOW-729] Add Google Cloud Dataproc cluster creation operator The operator checks if there is already a cluster running with the provided name in the provided project. If so, the operator finishes successfully. Otherwise, the operator issues a rest API call to initiate the cluster creation and waits until the creation is successful before exiting. Closes #1971 from bodschut/feature/dataproc_operator Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ffbe7282 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ffbe7282 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ffbe7282 Branch: refs/heads/master Commit: ffbe7282dcff5d5dd1c23ab0eff27dab2bd457f6 Parents: 617ba74 Author: Bob De Schutter <[email protected]> Authored: Mon Jan 9 21:49:06 2017 +0100 Committer: Alex Van Boxel <[email protected]> Committed: Mon Jan 9 21:49:06 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 253 ++++++++++++++++++++ 1 file changed, 253 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ffbe7282/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index a3df381..9cf2bbe 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -12,9 +12,262 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +import logging +import time + from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults +from googleapiclient.errors import HttpError + + +class DataprocClusterCreateOperator(BaseOperator): + """ + Create a new cluster on Google Cloud Dataproc. The operator will wait until the + creation is successful or an error occurs in the creation process. + + The parameters allow to configure the cluster. Please refer to + + https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters + + for a detailed explanation on the different parameters. Most of the configuration + parameters detailed in the link are available as a parameter to this operator. + """ + + template_fields = ['cluster_name',] + + @apply_defaults + def __init__(self, + cluster_name, + project_id, + num_workers, + zone, + storage_bucket=None, + init_actions_uris=None, + metadata=None, + properties=None, + master_machine_type='n1-standard-4', + master_disk_size=500, + worker_machine_type='n1-standard-4', + worker_disk_size=500, + num_preemptible_workers=0, + labels=None, + region='global', + google_cloud_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + """ + Create a new DataprocClusterCreateOperator. + + For more info on the creation of a cluster through the API, have a look at: + + https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters + + :param cluster_name: The name of the cluster to create + :type cluster_name: string + :param project_id: The ID of the google cloud project in which + to create the cluster + :type project_id: string + :param num_workers: The # of workers to spin up + :type num_workers: int + :param storage_bucket: The storage bucket to use, setting to None lets dataproc + generate a custom one for you + :type storage_bucket: string + :param init_actions_uris: List of GCS uri's containing + dataproc initialization scripts + :type init_actions_uris: list[string] + :param metadata: dict of key-value google compute engine metadata entries + to add to all instances + :type metadata: dict + :param properties: dict of properties to set on + config files (e.g. spark-defaults.conf), see + https://cloud.google.com/dataproc/docs/reference/rest/v1/ \ + projects.regions.clusters#SoftwareConfig + :type properties: dict + :param master_machine_type: Compute engine machine type to use for the master node + :type master_machine_type: string + :param master_disk_size: Disk size for the master node + :type int + :param worker_machine_type:Compute engine machine type to use for the worker nodes + :type worker_machine_type: string + :param worker_disk_size: Disk size for the worker nodes + :type worker_disk_size: int + :param num_preemptible_workers: The # of preemptible worker nodes to spin up + :type num_preemptible_workers: int + :param labels: dict of labels to add to the cluster + :type labels: dict + :param zone: The zone where the cluster will be located + :type zone: string + :param region: leave as 'global', might become relevant in the future + :param google_cloud_conn_id: The connection id to use when connecting to dataproc + :type google_cloud_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs) + self.google_cloud_conn_id = google_cloud_conn_id + self.delegate_to = delegate_to + self.cluster_name = cluster_name + self.project_id = project_id + self.num_workers = num_workers + self.num_preemptible_workers = num_preemptible_workers + self.storage_bucket = storage_bucket + self.init_actions_uris = init_actions_uris + self.metadata = metadata + self.properties = properties + self.master_machine_type = master_machine_type + self.master_disk_size = master_disk_size + self.worker_machine_type = worker_machine_type + self.worker_disk_size = worker_disk_size + self.labels = labels + self.zone = zone + self.region = region + + def _get_cluster_list_for_project(self, service): + result = service.projects().regions().clusters().list( + projectId=self.project_id, + region=self.region + ).execute() + return result.get('clusters', []) + + def _get_cluster(self, service): + cluster_list = self._get_cluster_list_for_project(service) + cluster = [c for c in cluster_list if c['clusterName'] == self.cluster_name] + if cluster: + return cluster[0] + return None + + def _get_cluster_state(self, service): + cluster = self._get_cluster(service) + if 'status' in cluster: + return cluster['status']['state'] + else: + return None + + def _cluster_ready(self, state, service): + if state == 'RUNNING': + return True + if state == 'ERROR': + cluster = self._get_cluster(service) + try: + error_details = cluster['status']['details'] + except KeyError: + error_details = 'Unknown error in cluster creation, ' \ + 'check Google Cloud console for details.' + raise Exception(error_details) + return False + + def _wait_for_done(self, service): + while True: + state = self._get_cluster_state(service) + if state is None: + logging.info("No state for cluster '%s'", self.cluster_name) + time.sleep(15) + else: + logging.info("State for cluster '%s' is %s", self.cluster_name, state) + if self._cluster_ready(state, service): + logging.info("Cluster '%s' successfully created", + self.cluster_name) + return + time.sleep(15) + + def execute(self, context): + hook = DataProcHook( + gcp_conn_id=self.google_cloud_conn_id, + delegate_to=self.delegate_to + ) + service = hook.get_conn() + + if self._get_cluster(service): + logging.info('Cluster {} already exists... Checking status...'.format( + self.cluster_name + )) + self._wait_for_done(service) + return True + + zone_uri = \ + 'https://www.googleapis.com/compute/v1/projects/{}/zones/{}'.format( + self.project_id, self.zone + ) + master_type_uri = \ + "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format( + self.project_id, self.zone, self.master_machine_type + ) + worker_type_uri = \ + "https://www.googleapis.com/compute/v1/projects/{}/zones/{}/machineTypes/{}".format( + self.project_id, self.zone, self.worker_machine_type + ) + cluster_data = { + 'projectId': self.project_id, + 'clusterName': self.cluster_name, + 'config': { + 'gceClusterConfig': { + 'zoneUri': zone_uri + }, + 'masterConfig': { + 'numInstances': 1, + 'machineTypeUri': master_type_uri, + 'diskConfig': { + 'bootDiskSizeGb': self.master_disk_size + } + }, + 'workerConfig': { + 'numInstances': self.num_workers, + 'machineTypeUri': worker_type_uri, + 'diskConfig': { + 'bootDiskSizeGb': self.worker_disk_size + } + }, + 'secondaryWorkerConfig': {}, + 'softwareConfig': {} + } + } + if self.num_preemptible_workers > 0: + cluster_data['config']['secondaryWorkerConfig'] = { + 'numInstances': self.num_preemptible_workers, + 'machineTypeUri': worker_type_uri, + 'diskConfig': { + 'bootDiskSizeGb': self.worker_disk_size + }, + 'isPreemptible': True + } + if self.labels: + cluster_data['labels'] = self.labels + if self.storage_bucket: + cluster_data['config']['configBucket'] = self.storage_bucket + if self.metadata: + cluster_data['config']['gceClusterConfig']['metadata'] = self.metadata + if self.properties: + cluster_data['config']['softwareConfig']['properties'] = self.properties + if self.init_actions_uris: + init_actions_dict = [ + {'executableFile': uri} for uri in self.init_actions_uris + ] + cluster_data['config']['initializationActions'] = init_actions_dict + + try: + service.projects().regions().clusters().create( + projectId=self.project_id, + region=self.region, + body=cluster_data + ).execute() + except HttpError as e: + # probably two cluster start commands at the same time + time.sleep(10) + if self._get_cluster(service): + logging.info('Cluster {} already exists... Checking status...'.format( + self.cluster_name + )) + self._wait_for_done(service) + return True + else: + raise e + + self._wait_for_done(service) class DataProcPigOperator(BaseOperator):
