Repository: incubator-airflow Updated Branches: refs/heads/master 92363490b -> 9c915c1c8
[AIRFLOW-2461] Add support for cluster scaling on dataproc operator Closes #3357 from piffall/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9c915c1c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9c915c1c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9c915c1c Branch: refs/heads/master Commit: 9c915c1c8b08de5cb1925bdd3454649bf3b46bcb Parents: 9236349 Author: Cristòfol Torrens <tofol.torr...@bluekiri.com> Authored: Mon May 14 16:38:28 2018 +0100 Committer: Kaxil Naik <kaxiln...@gmail.com> Committed: Mon May 14 16:38:28 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 148 +++++++++++++++++++ docs/code.rst | 1 + docs/integration.rst | 8 + .../contrib/operators/test_dataproc_operator.py | 113 +++++++++++--- 4 files changed, 252 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 6b1dbee..23fae9a 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -377,6 +377,154 @@ class DataprocClusterCreateOperator(BaseOperator): self._wait_for_done(service) +class DataprocClusterScaleOperator(BaseOperator): + """ + Scale, up or down, a cluster on Google Cloud Dataproc. + The operator will wait until the cluster is re-scaled. + + **Example**: :: + + t1 = DataprocClusterScaleOperator( + task_id='dataproc_scale', + project_id='my-project', + cluster_name='cluster-1', + num_workers=10, + num_preemptible_workers=10, + graceful_decommission_timeout='1h' + dag=dag) + + .. seealso:: + For more detail on about scaling clusters have a look at the reference: + https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters + + :param cluster_name: The name of the cluster to scale. + :type cluster_name: string + :param project_id: The ID of the google cloud project in which + the cluster runs + :type project_id: string + :param region: The region for the dataproc cluster + :type region: string + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param num_workers: The new number of workers + :type num_workers: int + :param num_preemptible_workers: The new number of preemptible workers + :type num_preemptible_workers: int + :param graceful_decommission_timeout: Timeout for graceful YARN decomissioning. + Maximum value is 1d + :type graceful_decommission_timeout: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + """ + + template_fields = ['cluster_name', 'project_id', 'region'] + + @apply_defaults + def __init__(self, + cluster_name, + project_id, + region='global', + gcp_conn_id='google_cloud_default', + delegate_to=None, + num_workers=2, + num_preemptible_workers=0, + graceful_decommission_timeout=None, + *args, + **kwargs): + super(DataprocClusterScaleOperator, self).__init__(*args, **kwargs) + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.cluster_name = cluster_name + self.project_id = project_id + self.region = region + self.num_workers = num_workers + self.num_preemptible_workers = num_preemptible_workers + + # Optional + self.optional_arguments = {} + if graceful_decommission_timeout: + self.optional_arguments['gracefulDecommissionTimeout'] = \ + self._get_graceful_decommission_timeout( + graceful_decommission_timeout) + + def _wait_for_done(self, service, operation_name): + time.sleep(15) + while True: + try: + response = service.projects().regions().operations().get( + name=operation_name + ).execute() + + if 'done' in response and response['done']: + if 'error' in response: + raise Exception(str(response['error'])) + else: + return + time.sleep(15) + except HttpError as e: + self.log.error("Operation not found.") + raise e + + def _build_scale_cluster_data(self): + scale_data = { + 'config': { + 'workerConfig': { + 'numInstances': self.num_workers + }, + 'secondaryWorkerConfig': { + 'numInstances': self.num_preemptible_workers + } + } + } + return scale_data + + def _get_graceful_decommission_timeout(self, timeout): + match = re.match(r"^(\d+)(s|m|h|d)$", timeout) + if match: + if match.group(2) == "s": + return timeout + elif match.group(2) == "m": + val = float(match.group(1)) + return "{}s".format(timedelta(minutes=val).seconds) + elif match.group(2) == "h": + val = float(match.group(1)) + return "{}s".format(timedelta(hours=val).seconds) + elif match.group(2) == "d": + val = float(match.group(1)) + return "{}s".format(timedelta(days=val).seconds) + + raise AirflowException( + "DataprocClusterScaleOperator " + " should be expressed in day, hours, minutes or seconds. " + " i.e. 1d, 4h, 10m, 30s") + + def execute(self, context): + self.log.info("Scaling cluster: %s", self.cluster_name) + hook = DataProcHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to + ) + service = hook.get_conn() + + update_mask = "config.worker_config.num_instances," \ + + "config.secondary_worker_config.num_instances" + scaling_cluster_data = self._build_scale_cluster_data() + + response = service.projects().regions().clusters().patch( + projectId=self.project_id, + region=self.region, + clusterName=self.cluster_name, + updateMask=update_mask, + body=scaling_cluster_data, + **self.optional_arguments + ).execute() + operation_name = response['name'] + self.log.info("Cluster scale operation name: %s", operation_name) + self._wait_for_done(service, operation_name) + + class DataprocClusterDeleteOperator(BaseOperator): """ Delete a cluster on Google Cloud Dataproc. The operator will wait until the http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index 618f376..67519c9 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -126,6 +126,7 @@ Operators .. autoclass:: airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator .. autoclass:: airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator .. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator +.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator .. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperator .. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcPigOperator .. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcHiveOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/docs/integration.rst ---------------------------------------------------------------------- diff --git a/docs/integration.rst b/docs/integration.rst index e8a06e1..5e54eb1 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -571,6 +571,7 @@ DataProc Operators - :ref:`DataprocClusterCreateOperator` : Create a new cluster on Google Cloud Dataproc. - :ref:`DataprocClusterDeleteOperator` : Delete a cluster on Google Cloud Dataproc. +- :ref:`DataprocClusterScaleOperator` : Scale up or down a cluster on Google Cloud Dataproc. - :ref:`DataProcPigOperator` : Start a Pig query Job on a Cloud DataProc cluster. - :ref:`DataProcHiveOperator` : Start a Hive query Job on a Cloud DataProc cluster. - :ref:`DataProcSparkSqlOperator` : Start a Spark SQL query Job on a Cloud DataProc cluster. @@ -587,6 +588,13 @@ DataprocClusterCreateOperator .. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator +.. _DataprocClusterScaleOperator: + +DataprocClusterScaleOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator + .. _DataprocClusterDeleteOperator: DataprocClusterDeleteOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c915c1c/tests/contrib/operators/test_dataproc_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index d039cf1..6cb2044 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -24,14 +24,15 @@ import unittest from airflow import DAG from airflow.contrib.operators.dataproc_operator import \ - DataprocClusterCreateOperator,\ - DataprocClusterDeleteOperator,\ - DataProcHadoopOperator,\ - DataProcHiveOperator,\ - DataProcPySparkOperator,\ - DataProcSparkOperator,\ - DataprocWorkflowTemplateInstantiateInlineOperator,\ - DataprocWorkflowTemplateInstantiateOperator + DataprocClusterCreateOperator, \ + DataprocClusterDeleteOperator, \ + DataProcHadoopOperator, \ + DataProcHiveOperator, \ + DataProcPySparkOperator, \ + DataProcSparkOperator, \ + DataprocWorkflowTemplateInstantiateInlineOperator, \ + DataprocWorkflowTemplateInstantiateOperator, \ + DataprocClusterScaleOperator from airflow.version import version from copy import deepcopy @@ -229,7 +230,8 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): "2017-06-07T00:00:00.000000Z") def test_cluster_name_log_no_sub(self): - with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook: + with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \ + as mock_hook: mock_hook.return_value.get_conn = self.mock_conn dataproc_task = DataprocClusterCreateOperator( task_id=TASK_ID, @@ -245,7 +247,8 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): mock_info.assert_called_with('Creating cluster: %s', CLUSTER_NAME) def test_cluster_name_log_sub(self): - with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook: + with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \ + as mock_hook: mock_hook.return_value.get_conn = self.mock_conn dataproc_task = DataprocClusterCreateOperator( task_id=TASK_ID, @@ -256,13 +259,83 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): dag=self.dag ) with patch.object(dataproc_task.log, 'info') as mock_info: - context = { 'ts_nodash' : 'testnodash'} + context = {'ts_nodash': 'testnodash'} - rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'), context) + rendered = dataproc_task.render_template( + 'cluster_name', + getattr(dataproc_task, 'cluster_name'), context) setattr(dataproc_task, 'cluster_name', rendered) - with self.assertRaises(TypeError) as _: + with self.assertRaises(TypeError): + dataproc_task.execute(None) + mock_info.assert_called_with('Creating cluster: %s', + u'smoke-cluster-testnodash') + + +class DataprocClusterScaleOperatorTest(unittest.TestCase): + # Unit test for the DataprocClusterScaleOperator + def setUp(self): + self.mock_execute = Mock() + self.mock_execute.execute = Mock(return_value={'done': True}) + self.mock_get = Mock() + self.mock_get.get = Mock(return_value=self.mock_execute) + self.mock_operations = Mock() + self.mock_operations.get = Mock(return_value=self.mock_get) + self.mock_regions = Mock() + self.mock_regions.operations = Mock(return_value=self.mock_operations) + self.mock_projects = Mock() + self.mock_projects.regions = Mock(return_value=self.mock_regions) + self.mock_conn = Mock() + self.mock_conn.projects = Mock(return_value=self.mock_projects) + self.dag = DAG( + 'test_dag', + default_args={ + 'owner': 'airflow', + 'start_date': DEFAULT_DATE, + 'end_date': DEFAULT_DATE, + }, + schedule_interval='@daily') + + def test_cluster_name_log_no_sub(self): + with patch('airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook') as mock_hook: + mock_hook.return_value.get_conn = self.mock_conn + dataproc_task = DataprocClusterScaleOperator( + task_id=TASK_ID, + cluster_name=CLUSTER_NAME, + project_id=PROJECT_ID, + num_workers=NUM_WORKERS, + num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS, + dag=self.dag + ) + with patch.object(dataproc_task.log, 'info') as mock_info: + with self.assertRaises(TypeError): dataproc_task.execute(None) - mock_info.assert_called_with('Creating cluster: %s', u'smoke-cluster-testnodash') + mock_info.assert_called_with('Scaling cluster: %s', CLUSTER_NAME) + + def test_cluster_name_log_sub(self): + with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') \ + as mock_hook: + mock_hook.return_value.get_conn = self.mock_conn + dataproc_task = DataprocClusterScaleOperator( + task_id=TASK_ID, + cluster_name='smoke-cluster-{{ ts_nodash }}', + project_id=PROJECT_ID, + num_workers=NUM_WORKERS, + num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS, + dag=self.dag + ) + + with patch.object(dataproc_task.log, 'info') as mock_info: + context = {'ts_nodash': 'testnodash'} + + rendered = dataproc_task.render_template( + 'cluster_name', + getattr(dataproc_task, 'cluster_name'), context) + setattr(dataproc_task, 'cluster_name', rendered) + with self.assertRaises(TypeError): + dataproc_task.execute(None) + mock_info.assert_called_with('Scaling cluster: %s', + u'smoke-cluster-testnodash') + class DataprocClusterDeleteOperatorTest(unittest.TestCase): # Unit test for the DataprocClusterDeleteOperator @@ -313,13 +386,17 @@ class DataprocClusterDeleteOperatorTest(unittest.TestCase): ) with patch.object(dataproc_task.log, 'info') as mock_info: - context = { 'ts_nodash' : 'testnodash'} + context = {'ts_nodash': 'testnodash'} - rendered = dataproc_task.render_template('cluster_name', getattr(dataproc_task,'cluster_name'), context) + rendered = dataproc_task.render_template( + 'cluster_name', + getattr(dataproc_task, 'cluster_name'), context) setattr(dataproc_task, 'cluster_name', rendered) - with self.assertRaises(TypeError) as _: + with self.assertRaises(TypeError): dataproc_task.execute(None) - mock_info.assert_called_with('Deleting cluster: %s', u'smoke-cluster-testnodash') + mock_info.assert_called_with('Deleting cluster: %s', + u'smoke-cluster-testnodash') + class DataProcHadoopOperatorTest(unittest.TestCase): # Unit test for the DataProcHadoopOperator