Repository: incubator-airflow Updated Branches: refs/heads/master 18d09a948 -> 0990ba8c0
[AIRFLOW-2016] Add support for Dataproc Workflow Templates Closes #2958 from DanSedov/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0990ba8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0990ba8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0990ba8c Branch: refs/heads/master Commit: 0990ba8c027a6d438a207c6afc202a7e40ebbc7f Parents: 18d09a9 Author: Dan Sedov <[email protected]> Authored: Wed Jan 24 07:23:48 2018 -0800 Committer: Chris Riccomini <[email protected]> Committed: Wed Jan 24 07:23:48 2018 -0800 ---------------------------------------------------------------------- airflow/contrib/hooks/gcp_dataproc_hook.py | 68 ++++++++-- airflow/contrib/operators/dataproc_operator.py | 115 +++++++++++++++- .../contrib/operators/test_dataproc_operator.py | 134 ++++++++++++++++++- 3 files changed, 301 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0990ba8c/airflow/contrib/hooks/gcp_dataproc_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py index 5b96484..bc5aa83 100644 --- a/airflow/contrib/hooks/gcp_dataproc_hook.py +++ b/airflow/contrib/hooks/gcp_dataproc_hook.py @@ -141,24 +141,76 @@ class _DataProcJobBuilder: return self.job +class _DataProcOperation(LoggingMixin): + """Continuously polls Dataproc Operation until it completes.""" + def __init__(self, dataproc_api, operation): + self.dataproc_api = dataproc_api + self.operation = operation + self.operation_name = self.operation['name'] + + def wait_for_done(self): + if self._check_done(): + return True + + self.log.info( + 'Waiting for Dataproc Operation %s to finish', self.operation_name) + while True: + time.sleep(10) + self.operation = ( + self.dataproc_api.projects() + .regions() + .operations() + .get(name=self.operation_name) + .execute(num_retries=5)) + + if self._check_done(): + return True + + def get(self): + return self.operation + + def _check_done(self): + if 'done' in self.operation: + if 'error' in self.operation: + self.log.warning( + 'Dataproc Operation %s failed with error: %s', + self.operation_name, self.operation['error']['message']) + self._raise_error() + else: + self.log.info( + 'Dataproc Operation %s done', self.operation['name']) + return True + return False + + def _raise_error(self): + raise Exception('Google Dataproc Operation %s failed: %s' % + (self.operation_name, self.operation['error']['message'])) + + class DataProcHook(GoogleCloudBaseHook): + """Hook for Google Cloud Dataproc APIs.""" def __init__(self, gcp_conn_id='google_cloud_default', - delegate_to=None): + delegate_to=None, + api_version='v1'): super(DataProcHook, self).__init__(gcp_conn_id, delegate_to) + self.api_version = api_version def get_conn(self): - """ - Returns a Google Cloud DataProc service object. - """ + """Returns a Google Cloud Dataproc service object.""" http_authorized = self._authorize() - return build('dataproc', 'v1', http=http_authorized) + return build('dataproc', self.api_version, http=http_authorized) def submit(self, project_id, job, region='global'): submitted = _DataProcJob(self.get_conn(), project_id, job, region) if not submitted.wait_for_done(): - submitted.raise_error("DataProcTask has errors") + submitted.raise_error('DataProcTask has errors') def create_job_template(self, task_id, cluster_name, job_type, properties): - return _DataProcJobBuilder(self.project_id, task_id, cluster_name, job_type, - properties) + return _DataProcJobBuilder(self.project_id, task_id, cluster_name, + job_type, properties) + + def await(self, operation): + """Awaits for Google Cloud Dataproc Operation to complete.""" + submitted = _DataProcOperation(self.get_conn(), operation) + submitted.wait_for_done() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0990ba8c/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 3b10382..ece6d51 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -14,8 +14,11 @@ # import time +import uuid + from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook +from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from airflow.version import version @@ -92,7 +95,7 @@ class DataprocClusterCreateOperator(BaseOperator): :type service_account_scopes: list[string] """ - template_fields = ['cluster_name', ] + template_fields = ['cluster_name'] @apply_defaults def __init__(self, @@ -928,3 +931,113 @@ class DataProcPySparkOperator(BaseOperator): job.set_job_name(self.job_name) hook.submit(hook.project_id, job.build(), self.region) + + +class DataprocWorkflowTemplateBaseOperator(BaseOperator): + template_fields = ['template_id', 'template'] + + @apply_defaults + def __init__(self, + project_id, + region='global', + gcp_conn_id='google_cloud_default', + delegate_to=None, + *args, + **kwargs): + super(DataprocWorkflowTemplateBaseOperator, self).__init__(*args, **kwargs) + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.project_id = project_id + self.region = region + self.hook = DataProcHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + api_version='v1beta2' + ) + + def execute(self, context): + self.hook.await(self.start()) + + def start(self, context): + raise AirflowException('plese start a workflow operation') + + +class DataprocWorkflowTemplateInstantiateOperator(DataprocWorkflowTemplateBaseOperator): + """ + Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait + until the WorkflowTemplate is finished executing. + + Please refer to: + https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate + + :param template_id: The id of the template. + :type template_id: string + :param project_id: The ID of the google cloud project in which + the template runs + :type project_id: string + :param region: leave as 'global', might become relevant in the future + :type region: string + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + + @apply_defaults + def __init__(self, template_id, *args, **kwargs): + (super(DataprocWorkflowTemplateInstantiateOperator, self) + .__init__(*args, **kwargs)) + self.template_id = template_id + + def start(self): + self.log.info('Instantiating Template: %s', self.template_id) + return ( + self.hook.get_conn().projects().regions().workflowTemplates() + .instantiate( + name=('projects/%s/regions/%s/workflowTemplates/%s' % + (self.project_id, self.region, self.template_id)), + body={'instanceId': str(uuid.uuid1())}) + .execute()) + + +class DataprocWorkflowTemplateInstantiateInlineOperator( + DataprocWorkflowTemplateBaseOperator): + """ + Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc. The operator will + wait until the WorkflowTemplate is finished executing. + + Please refer to: + https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline + + :param template: The template contents. + :type template: map + :param project_id: The ID of the google cloud project in which + the template runs + :type project_id: string + :param region: leave as 'global', might become relevant in the future + :type region: string + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + + @apply_defaults + def __init__(self, template, *args, **kwargs): + (super(DataprocWorkflowTemplateInstantiateInlineOperator, self) + .__init__(*args, **kwargs)) + self.template = template + + def start(self): + self.log.info('Instantiating Inline Template') + return ( + self.hook.get_conn().projects().regions().workflowTemplates() + .instantiateInline( + parent='projects/%s/regions/%s' % (self.project_id, self.region), + instanceId=str(uuid.uuid1()), + body=self.template) + .execute()) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0990ba8c/tests/contrib/operators/test_dataproc_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index 2df056a..448da86 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -18,12 +18,15 @@ import re import unittest from airflow import DAG -from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator -from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator -from airflow.contrib.operators.dataproc_operator import DataProcHadoopOperator -from airflow.contrib.operators.dataproc_operator import DataProcHiveOperator -from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator -from airflow.contrib.operators.dataproc_operator import DataProcSparkOperator +from airflow.contrib.operators.dataproc_operator import \ + DataprocClusterCreateOperator,\ + DataprocClusterDeleteOperator,\ + DataProcHadoopOperator,\ + DataProcHiveOperator,\ + DataProcPySparkOperator,\ + DataProcSparkOperator,\ + DataprocWorkflowTemplateInstantiateInlineOperator,\ + DataprocWorkflowTemplateInstantiateOperator from airflow.version import version from copy import deepcopy @@ -55,7 +58,7 @@ WORKER_MACHINE_TYPE = 'n1-standard-2' WORKER_DISK_SIZE = 100 NUM_PREEMPTIBLE_WORKERS = 2 LABEL1 = {} -LABEL2 = {'application':'test', 'year': 2017} +LABEL2 = {'application': 'test', 'year': 2017} SERVICE_ACCOUNT_SCOPES = [ 'https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/bigtable.data' @@ -63,6 +66,10 @@ SERVICE_ACCOUNT_SCOPES = [ DEFAULT_DATE = datetime.datetime(2017, 6, 6) REGION = 'test-region' MAIN_URI = 'test-uri' +TEMPLATE_ID = 'template-id' + +HOOK = 'airflow.contrib.operators.dataproc_operator.DataProcHook' + class DataprocClusterCreateOperatorTest(unittest.TestCase): # Unit test for the DataprocClusterCreateOperator @@ -290,3 +297,116 @@ class DataProcSparkOperatorTest(unittest.TestCase): dataproc_task.execute(None) mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION) + + +class DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase): + def setUp(self): + # Setup service.projects().regions().workflowTemplates().instantiate().execute() + self.operation = {'name': 'operation', 'done': True} + self.mock_execute = Mock() + self.mock_execute.execute.return_value = self.operation + self.mock_workflows = Mock() + self.mock_workflows.instantiate.return_value = self.mock_execute + self.mock_regions = Mock() + self.mock_regions.workflowTemplates.return_value = self.mock_workflows + self.mock_projects = Mock() + self.mock_projects.regions.return_value = self.mock_regions + self.mock_conn = Mock() + self.mock_conn.projects.return_value = self.mock_projects + self.dag = DAG( + 'test_dag', + default_args={ + 'owner': 'airflow', + 'start_date': DEFAULT_DATE, + 'end_date': DEFAULT_DATE, + }, + schedule_interval='@daily') + + def test_workflow(self): + with patch(HOOK) as MockHook: + hook = MockHook() + hook.get_conn.return_value = self.mock_conn + hook.await.return_value = None + + dataproc_task = DataprocWorkflowTemplateInstantiateOperator( + task_id=TASK_ID, + project_id=PROJECT_ID, + region=REGION, + template_id=TEMPLATE_ID, + dag=self.dag + ) + + dataproc_task.execute(None) + template_name = ( + 'projects/test-project-id/regions/test-region/' + 'workflowTemplates/template-id') + self.mock_workflows.instantiate.assert_called_once_with( + name=template_name, + body=mock.ANY) + hook.await.assert_called_once_with(self.operation) + + +class DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase): + def setUp(self): + # Setup service.projects().regions().workflowTemplates().instantiateInline() + # .execute() + self.operation = {'name': 'operation', 'done': True} + self.mock_execute = Mock() + self.mock_execute.execute.return_value = self.operation + self.mock_workflows = Mock() + self.mock_workflows.instantiateInline.return_value = self.mock_execute + self.mock_regions = Mock() + self.mock_regions.workflowTemplates.return_value = self.mock_workflows + self.mock_projects = Mock() + self.mock_projects.regions.return_value = self.mock_regions + self.mock_conn = Mock() + self.mock_conn.projects.return_value = self.mock_projects + self.dag = DAG( + 'test_dag', + default_args={ + 'owner': 'airflow', + 'start_date': DEFAULT_DATE, + 'end_date': DEFAULT_DATE, + }, + schedule_interval='@daily') + + def test_iniline_workflow(self): + with patch(HOOK) as MockHook: + hook = MockHook() + hook.get_conn.return_value = self.mock_conn + hook.await.return_value = None + + template = { + "placement": { + "managed_cluster": { + "cluster_name": CLUSTER_NAME, + "config": { + "gce_cluster_config": { + "zone_uri": ZONE, + } + } + } + }, + "jobs": [ + { + "step_id": "say-hello", + "pig_job": { + "query": "sh echo hello" + } + }], + } + + dataproc_task = DataprocWorkflowTemplateInstantiateInlineOperator( + task_id=TASK_ID, + project_id=PROJECT_ID, + region=REGION, + template=template, + dag=self.dag + ) + + dataproc_task.execute(None) + self.mock_workflows.instantiateInline.assert_called_once_with( + parent='projects/test-project-id/regions/test-region', + instanceId=mock.ANY, + body=template) + hook.await.assert_called_once_with(self.operation)
