Repository: incubator-airflow Updated Branches: refs/heads/master 032a3e6c8 -> dd861f8cd
[AIRFLOW-1323] Made Dataproc operator parameter names consistent Closes #2636 from cjqian/1323 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dd861f8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dd861f8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dd861f8c Branch: refs/heads/master Commit: dd861f8cd0491f977ba545415d667f3d4611b5b8 Parents: 032a3e6 Author: Crystal Qian <[email protected]> Authored: Tue Oct 3 11:15:27 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Tue Oct 3 11:15:27 2017 +0200 ---------------------------------------------------------------------- UPDATING.md | 3 +- airflow/contrib/hooks/gcp_dataproc_hook.py | 8 +- airflow/contrib/operators/dataproc_operator.py | 98 ++++++++++----------- 3 files changed, 55 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dd861f8c/UPDATING.md ---------------------------------------------------------------------- diff --git a/UPDATING.md b/UPDATING.md index 0b21dd1..329f416 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -146,6 +146,7 @@ A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters. ### Deprecated Features These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer supported and will be removed entirely in Airflow 2.0 +- If you're using the `google_cloud_conn_id` or `dataproc_cluster` argument names explicitly in `contrib.operators.Dataproc{*}Operator`(s), be sure to rename them to `gcp_conn_id` or `cluster_name`, respectively. We've renamed these arguments for consistency. (AIRFLOW-1323) - `post_execute()` hooks now take two arguments, `context` and `result` (AIRFLOW-886) @@ -157,7 +158,7 @@ supported and will be removed entirely in Airflow 2.0 - The pickle type for XCom messages has been replaced by json to prevent RCE attacks. Note that JSON serialization is stricter than pickling, so if you want to e.g. pass raw bytes through XCom you must encode them using an encoding like base64. - By default pickling is still enabled until Airflow 2.0. To disable it + By default pickling is still enabled until Airflow 2.0. To disable it Set enable_xcom_pickling = False in your Airflow config. ## Airflow 1.8.1 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dd861f8c/airflow/contrib/hooks/gcp_dataproc_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py index a1bba0b..4be166e 100644 --- a/airflow/contrib/hooks/gcp_dataproc_hook.py +++ b/airflow/contrib/hooks/gcp_dataproc_hook.py @@ -74,7 +74,7 @@ class _DataProcJob(LoggingMixin): class _DataProcJobBuilder: - def __init__(self, project_id, task_id, dataproc_cluster, job_type, properties): + def __init__(self, project_id, task_id, cluster_name, job_type, properties): name = task_id + "_" + str(uuid.uuid1())[:8] self.job_type = job_type self.job = { @@ -84,7 +84,7 @@ class _DataProcJobBuilder: "jobId": name, }, "placement": { - "clusterName": dataproc_cluster + "clusterName": cluster_name }, job_type: { } @@ -159,6 +159,6 @@ class DataProcHook(GoogleCloudBaseHook): if not submitted.wait_for_done(): submitted.raise_error("DataProcTask has errors") - def create_job_template(self, task_id, dataproc_cluster, job_type, properties): - return _DataProcJobBuilder(self.project_id, task_id, dataproc_cluster, job_type, + def create_job_template(self, task_id, cluster_name, job_type, properties): + return _DataProcJobBuilder(self.project_id, task_id, cluster_name, job_type, properties) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dd861f8c/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 6ef89ba..0823ed8 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -55,7 +55,7 @@ class DataprocClusterCreateOperator(BaseOperator): num_preemptible_workers=0, labels=None, region='global', - google_cloud_conn_id='google_cloud_default', + gcp_conn_id='google_cloud_default', delegate_to=None, service_account=None, service_account_scopes=None, @@ -68,7 +68,7 @@ class DataprocClusterCreateOperator(BaseOperator): https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters - :param cluster_name: The name of the cluster to create + :param cluster_name: The name of the DataProc cluster to create. :type cluster_name: string :param project_id: The ID of the google cloud project in which to create the cluster @@ -106,8 +106,8 @@ class DataprocClusterCreateOperator(BaseOperator): :param zone: The zone where the cluster will be located :type zone: string :param region: leave as 'global', might become relevant in the future - :param google_cloud_conn_id: The connection id to use when connecting to dataproc - :type google_cloud_conn_id: string + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. @@ -118,7 +118,7 @@ class DataprocClusterCreateOperator(BaseOperator): :type service_account_scopes: list[string] """ super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs) - self.google_cloud_conn_id = google_cloud_conn_id + self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.cluster_name = cluster_name self.project_id = project_id @@ -266,7 +266,7 @@ class DataprocClusterCreateOperator(BaseOperator): def execute(self, context): self.log.info('Creating cluster: %s', self.cluster_name) hook = DataProcHook( - gcp_conn_id=self.google_cloud_conn_id, + gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to ) service = hook.get_conn() @@ -315,29 +315,29 @@ class DataprocClusterDeleteOperator(BaseOperator): cluster_name, project_id, region='global', - google_cloud_conn_id='google_cloud_default', + gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): """ Delete a cluster on Google Cloud Dataproc. - :param cluster_name: The name of the cluster to create + :param cluster_name: The name of the cluster to create. :type cluster_name: string :param project_id: The ID of the google cloud project in which the cluster runs :type project_id: string :param region: leave as 'global', might become relevant in the future :type region: string - :param google_cloud_conn_id: The connection id to use when connecting to dataproc - :type google_cloud_conn_id: string + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string """ super(DataprocClusterDeleteOperator, self).__init__(*args, **kwargs) - self.google_cloud_conn_id = google_cloud_conn_id + self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.cluster_name = cluster_name self.project_id = project_id @@ -360,7 +360,7 @@ class DataprocClusterDeleteOperator(BaseOperator): def execute(self, context): self.log.info('Deleting cluster: %s', self.cluster_name) hook = DataProcHook( - gcp_conn_id=self.google_cloud_conn_id, + gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to ) service = hook.get_conn() @@ -385,7 +385,7 @@ class DataProcPigOperator(BaseOperator): ``` default_args = { - 'dataproc_cluster': 'cluster-1', + 'cluster_name': 'cluster-1', 'dataproc_pig_jars': [ 'gs://example/udf/jar/datafu/1.2.0/datafu.jar', 'gs://example/udf/jar/gpig/1.2/gpig.jar' @@ -405,7 +405,7 @@ class DataProcPigOperator(BaseOperator): dag=dag) ``` """ - template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster'] + template_fields = ['query', 'variables', 'job_name', 'cluster_name'] template_ext = ('.pg', '.pig',) ui_color = '#0273d4' @@ -416,7 +416,7 @@ class DataProcPigOperator(BaseOperator): query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_pig_properties=None, dataproc_pig_jars=None, gcp_conn_id='google_cloud_default', @@ -440,8 +440,8 @@ class DataProcPigOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_pig_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_pig_properties: dict @@ -462,14 +462,14 @@ class DataProcPigOperator(BaseOperator): self.query_uri = query_uri self.variables = variables self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_pig_properties self.dataproc_jars = dataproc_pig_jars def execute(self, context): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pigJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "pigJob", self.dataproc_properties) if self.query is None: @@ -487,7 +487,7 @@ class DataProcHiveOperator(BaseOperator): """ Start a Hive query Job on a Cloud DataProc cluster. """ - template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster'] + template_fields = ['query', 'variables', 'job_name', 'cluster_name'] template_ext = ('.q',) ui_color = '#0273d4' @@ -498,7 +498,7 @@ class DataProcHiveOperator(BaseOperator): query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_hive_properties=None, dataproc_hive_jars=None, gcp_conn_id='google_cloud_default', @@ -519,8 +519,8 @@ class DataProcHiveOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_hive_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_hive_properties: dict @@ -543,7 +543,7 @@ class DataProcHiveOperator(BaseOperator): self.query_uri = query_uri self.variables = variables self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_hive_properties self.dataproc_jars = dataproc_hive_jars self.region = region @@ -552,7 +552,7 @@ class DataProcHiveOperator(BaseOperator): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hiveJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "hiveJob", self.dataproc_properties) if self.query is None: @@ -570,7 +570,7 @@ class DataProcSparkSqlOperator(BaseOperator): """ Start a Spark SQL query Job on a Cloud DataProc cluster. """ - template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster'] + template_fields = ['query', 'variables', 'job_name', 'cluster_name'] template_ext = ('.q',) ui_color = '#0273d4' @@ -581,7 +581,7 @@ class DataProcSparkSqlOperator(BaseOperator): query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', @@ -601,8 +601,8 @@ class DataProcSparkSqlOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_spark_properties: dict @@ -623,7 +623,7 @@ class DataProcSparkSqlOperator(BaseOperator): self.query_uri = query_uri self.variables = variables self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_spark_properties self.dataproc_jars = dataproc_spark_jars @@ -631,7 +631,7 @@ class DataProcSparkSqlOperator(BaseOperator): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkSqlJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "sparkSqlJob", self.dataproc_properties) if self.query is None: @@ -650,7 +650,7 @@ class DataProcSparkOperator(BaseOperator): Start a Spark Job on a Cloud DataProc cluster. """ - template_fields = ['arguments', 'job_name', 'dataproc_cluster'] + template_fields = ['arguments', 'job_name', 'cluster_name'] ui_color = '#0273d4' @apply_defaults @@ -662,7 +662,7 @@ class DataProcSparkOperator(BaseOperator): archives=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', @@ -690,8 +690,8 @@ class DataProcSparkOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_spark_properties: dict @@ -716,7 +716,7 @@ class DataProcSparkOperator(BaseOperator): self.archives = archives self.files = files self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_spark_properties self.dataproc_jars = dataproc_spark_jars self.region = region @@ -724,7 +724,7 @@ class DataProcSparkOperator(BaseOperator): def execute(self, context): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "sparkJob", self.dataproc_properties) job.set_main(self.main_jar, self.main_class) @@ -742,7 +742,7 @@ class DataProcHadoopOperator(BaseOperator): Start a Hadoop Job on a Cloud DataProc cluster. """ - template_fields = ['arguments', 'job_name', 'dataproc_cluster'] + template_fields = ['arguments', 'job_name', 'cluster_name'] ui_color = '#0273d4' @apply_defaults @@ -754,7 +754,7 @@ class DataProcHadoopOperator(BaseOperator): archives=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_hadoop_properties=None, dataproc_hadoop_jars=None, gcp_conn_id='google_cloud_default', @@ -782,8 +782,8 @@ class DataProcHadoopOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_hadoop_properties: dict @@ -808,7 +808,7 @@ class DataProcHadoopOperator(BaseOperator): self.archives = archives self.files = files self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_hadoop_properties self.dataproc_jars = dataproc_hadoop_jars self.region = region @@ -816,7 +816,7 @@ class DataProcHadoopOperator(BaseOperator): def execute(self, context): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hadoopJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "hadoopJob", self.dataproc_properties) job.set_main(self.main_jar, self.main_class) @@ -834,7 +834,7 @@ class DataProcPySparkOperator(BaseOperator): Start a PySpark Job on a Cloud DataProc cluster. """ - template_fields = ['arguments', 'job_name', 'dataproc_cluster'] + template_fields = ['arguments', 'job_name', 'cluster_name'] ui_color = '#0273d4' @apply_defaults @@ -846,7 +846,7 @@ class DataProcPySparkOperator(BaseOperator): pyfiles=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_pyspark_properties=None, dataproc_pyspark_jars=None, gcp_conn_id='google_cloud_default', @@ -874,8 +874,8 @@ class DataProcPySparkOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_pyspark_properties: dict @@ -900,7 +900,7 @@ class DataProcPySparkOperator(BaseOperator): self.files = files self.pyfiles = pyfiles self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_pyspark_properties self.dataproc_jars = dataproc_pyspark_jars self.region = region @@ -908,7 +908,7 @@ class DataProcPySparkOperator(BaseOperator): def execute(self, context): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pysparkJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "pysparkJob", self.dataproc_properties) job.set_python_main(self.main)
