Repository: incubator-airflow Updated Branches: refs/heads/master 534a0e078 -> e4b240fb7
[AIRFLOW-1343] Add Airflow default label to the dataproc operator Closes #2396 from XiangbingJi/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e4b240fb Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e4b240fb Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e4b240fb Branch: refs/heads/master Commit: e4b240fb72ed0e96ad742947efaec3216f597630 Parents: 534a0e0 Author: XiangbingJi <[email protected]> Authored: Tue Jun 27 09:51:53 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Jun 27 09:51:59 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 6 +- .../contrib/operators/test_dataproc_operator.py | 89 ++++++++++++-------- 2 files changed, 58 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4b240fb/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 14dddb0..14245c8 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -19,6 +19,7 @@ import time from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults +from airflow.version import version from googleapiclient.errors import HttpError @@ -226,8 +227,9 @@ class DataprocClusterCreateOperator(BaseOperator): }, 'isPreemptible': True } - if self.labels: - cluster_data['labels'] = self.labels + + cluster_data['labels'] = self.labels if self.labels else {} + cluster_data['labels'].update({'airflow_version': version}) if self.storage_bucket: cluster_data['config']['configBucket'] = self.storage_bucket if self.metadata: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4b240fb/tests/contrib/operators/test_dataproc_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index 4d5e84b..a441e47 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -16,7 +16,9 @@ import unittest from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator +from airflow.version import version +from copy import deepcopy TASK_ID = 'test-dataproc-operator' CLUSTER_NAME = 'test-cluster-name' @@ -30,45 +32,62 @@ MASTER_DISK_SIZE = 100 WORKER_MACHINE_TYPE = 'n1-standard-2' WORKER_DISK_SIZE = 100 NUM_PREEMPTIBLE_WORKERS = 2 - +LABEL1 = {} +LABEL2 = {'application':'test', 'year': 2017} class DataprocClusterCreateOperatorTest(unittest.TestCase): - + # Unitest for the DataprocClusterCreateOperator def setUp(self): - self.dataproc = DataprocClusterCreateOperator( - task_id=TASK_ID, - cluster_name=CLUSTER_NAME, - project_id=PROJECT_ID, - num_workers=NUM_WORKERS, - zone=ZONE, - storage_bucket=STORAGE_BUCKET, - image_version=IMAGE_VERSION, - master_machine_type=MASTER_MACHINE_TYPE, - master_disk_size=MASTER_DISK_SIZE, - worker_machine_type=WORKER_MACHINE_TYPE, - worker_disk_size=WORKER_DISK_SIZE, - num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS) + # instantiate two different test cases with different labels + self.labels = [LABEL1, LABEL2] + self.dataproc_operators = [] + for labels in self.labels: + self.dataproc_operators.append( + DataprocClusterCreateOperator( + task_id=TASK_ID, + cluster_name=CLUSTER_NAME, + project_id=PROJECT_ID, + num_workers=NUM_WORKERS, + zone=ZONE, + storage_bucket=STORAGE_BUCKET, + image_version=IMAGE_VERSION, + master_machine_type=MASTER_MACHINE_TYPE, + master_disk_size=MASTER_DISK_SIZE, + worker_machine_type=WORKER_MACHINE_TYPE, + worker_disk_size=WORKER_DISK_SIZE, + num_preemptible_workers=NUM_PREEMPTIBLE_WORKERS, + labels = deepcopy(labels) + ) + ) def test_init(self): - """Test DataFlowPythonOperator instance is properly initialized.""" - self.assertEqual(self.dataproc.cluster_name, CLUSTER_NAME) - self.assertEqual(self.dataproc.project_id, PROJECT_ID) - self.assertEqual(self.dataproc.num_workers, NUM_WORKERS) - self.assertEqual(self.dataproc.zone, ZONE) - self.assertEqual(self.dataproc.storage_bucket, STORAGE_BUCKET) - self.assertEqual(self.dataproc.image_version, IMAGE_VERSION) - self.assertEqual(self.dataproc.master_machine_type, MASTER_MACHINE_TYPE) - self.assertEqual(self.dataproc.master_disk_size, MASTER_DISK_SIZE) - self.assertEqual(self.dataproc.worker_machine_type, WORKER_MACHINE_TYPE) - self.assertEqual(self.dataproc.worker_disk_size, WORKER_DISK_SIZE) - self.assertEqual(self.dataproc.num_preemptible_workers, NUM_PREEMPTIBLE_WORKERS) + """Test DataProcClusterOperator instance is properly initialized.""" + for suffix, dataproc_operator in enumerate(self.dataproc_operators): + self.assertEqual(dataproc_operator.cluster_name, CLUSTER_NAME) + self.assertEqual(dataproc_operator.project_id, PROJECT_ID) + self.assertEqual(dataproc_operator.num_workers, NUM_WORKERS) + self.assertEqual(dataproc_operator.zone, ZONE) + self.assertEqual(dataproc_operator.storage_bucket, STORAGE_BUCKET) + self.assertEqual(dataproc_operator.image_version, IMAGE_VERSION) + self.assertEqual(dataproc_operator.master_machine_type, MASTER_MACHINE_TYPE) + self.assertEqual(dataproc_operator.master_disk_size, MASTER_DISK_SIZE) + self.assertEqual(dataproc_operator.worker_machine_type, WORKER_MACHINE_TYPE) + self.assertEqual(dataproc_operator.worker_disk_size, WORKER_DISK_SIZE) + self.assertEqual(dataproc_operator.num_preemptible_workers, NUM_PREEMPTIBLE_WORKERS) + self.assertEqual(dataproc_operator.labels, self.labels[suffix]) def test_build_cluster_data(self): - cluster_data = self.dataproc._build_cluster_data() - self.assertEqual(cluster_data['clusterName'], CLUSTER_NAME) - self.assertEqual(cluster_data['projectId'], PROJECT_ID) - self.assertEqual(cluster_data['config']['softwareConfig'], {'imageVersion': IMAGE_VERSION}) - self.assertEqual(cluster_data['config']['configBucket'], STORAGE_BUCKET) - self.assertEqual(cluster_data['config']['workerConfig']['numInstances'], NUM_WORKERS) - self.assertEqual(cluster_data['config']['secondaryWorkerConfig']['numInstances'], - NUM_PREEMPTIBLE_WORKERS) + for suffix, dataproc_operator in enumerate(self.dataproc_operators): + cluster_data = dataproc_operator._build_cluster_data() + self.assertEqual(cluster_data['clusterName'], CLUSTER_NAME) + self.assertEqual(cluster_data['projectId'], PROJECT_ID) + self.assertEqual(cluster_data['config']['softwareConfig'], {'imageVersion': IMAGE_VERSION}) + self.assertEqual(cluster_data['config']['configBucket'], STORAGE_BUCKET) + self.assertEqual(cluster_data['config']['workerConfig']['numInstances'], NUM_WORKERS) + self.assertEqual(cluster_data['config']['secondaryWorkerConfig']['numInstances'], + NUM_PREEMPTIBLE_WORKERS) + # test whether the default airflow_version label has been properly set to the dataproc operator + merged_labels = {} + merged_labels.update(self.labels[suffix]) + merged_labels.update({'airflow_version': version}) + self.assertEqual(cluster_data['labels'], merged_labels)
