Repository: incubator-airflow Updated Branches: refs/heads/v1-9-test 31946e024 -> ed248d0d2
[AIRFLOW-1323] Made Dataproc operator parameter names consistent Closes #2636 from cjqian/1323 (cherry picked from commit dd861f8cd0491f977ba545415d667f3d4611b5b8) Signed-off-by: Bolke de Bruin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ed248d0d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ed248d0d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ed248d0d Branch: refs/heads/v1-9-test Commit: ed248d0d2994aa05ae8fc0681bdc0ceccd4e367e Parents: 31946e0 Author: Crystal Qian <[email protected]> Authored: Tue Oct 3 11:15:27 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Tue Oct 3 11:15:39 2017 +0200 ---------------------------------------------------------------------- UPDATING.md | 3 +- airflow/contrib/hooks/gcp_dataproc_hook.py | 8 +- airflow/contrib/operators/dataproc_operator.py | 98 ++++++++++----------- 3 files changed, 55 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ed248d0d/UPDATING.md ---------------------------------------------------------------------- diff --git a/UPDATING.md b/UPDATING.md index 0b21dd1..329f416 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -146,6 +146,7 @@ A new DaskExecutor allows Airflow tasks to be run in Dask Distributed clusters. ### Deprecated Features These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer supported and will be removed entirely in Airflow 2.0 +- If you're using the `google_cloud_conn_id` or `dataproc_cluster` argument names explicitly in `contrib.operators.Dataproc{*}Operator`(s), be sure to rename them to `gcp_conn_id` or `cluster_name`, respectively. We've renamed these arguments for consistency. (AIRFLOW-1323) - `post_execute()` hooks now take two arguments, `context` and `result` (AIRFLOW-886) @@ -157,7 +158,7 @@ supported and will be removed entirely in Airflow 2.0 - The pickle type for XCom messages has been replaced by json to prevent RCE attacks. Note that JSON serialization is stricter than pickling, so if you want to e.g. pass raw bytes through XCom you must encode them using an encoding like base64. - By default pickling is still enabled until Airflow 2.0. To disable it + By default pickling is still enabled until Airflow 2.0. To disable it Set enable_xcom_pickling = False in your Airflow config. ## Airflow 1.8.1 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ed248d0d/airflow/contrib/hooks/gcp_dataproc_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py index c964f4c..d1efa53 100644 --- a/airflow/contrib/hooks/gcp_dataproc_hook.py +++ b/airflow/contrib/hooks/gcp_dataproc_hook.py @@ -73,7 +73,7 @@ class _DataProcJob(LoggingMixin): class _DataProcJobBuilder: - def __init__(self, project_id, task_id, dataproc_cluster, job_type, properties): + def __init__(self, project_id, task_id, cluster_name, job_type, properties): name = task_id + "_" + str(uuid.uuid1())[:8] self.job_type = job_type self.job = { @@ -83,7 +83,7 @@ class _DataProcJobBuilder: "jobId": name, }, "placement": { - "clusterName": dataproc_cluster + "clusterName": cluster_name }, job_type: { } @@ -158,6 +158,6 @@ class DataProcHook(GoogleCloudBaseHook): if not submitted.wait_for_done(): submitted.raise_error("DataProcTask has errors") - def create_job_template(self, task_id, dataproc_cluster, job_type, properties): - return _DataProcJobBuilder(self.project_id, task_id, dataproc_cluster, job_type, + def create_job_template(self, task_id, cluster_name, job_type, properties): + return _DataProcJobBuilder(self.project_id, task_id, cluster_name, job_type, properties) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ed248d0d/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index bdb0335..d3a1e57 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -55,7 +55,7 @@ class DataprocClusterCreateOperator(BaseOperator): num_preemptible_workers=0, labels=None, region='global', - google_cloud_conn_id='google_cloud_default', + gcp_conn_id='google_cloud_default', delegate_to=None, service_account=None, service_account_scopes=None, @@ -68,7 +68,7 @@ class DataprocClusterCreateOperator(BaseOperator): https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters - :param cluster_name: The name of the cluster to create + :param cluster_name: The name of the DataProc cluster to create. :type cluster_name: string :param project_id: The ID of the google cloud project in which to create the cluster @@ -106,8 +106,8 @@ class DataprocClusterCreateOperator(BaseOperator): :param zone: The zone where the cluster will be located :type zone: string :param region: leave as 'global', might become relevant in the future - :param google_cloud_conn_id: The connection id to use when connecting to dataproc - :type google_cloud_conn_id: string + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. @@ -118,7 +118,7 @@ class DataprocClusterCreateOperator(BaseOperator): :type service_account_scopes: list[string] """ super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs) - self.google_cloud_conn_id = google_cloud_conn_id + self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.cluster_name = cluster_name self.project_id = project_id @@ -266,7 +266,7 @@ class DataprocClusterCreateOperator(BaseOperator): def execute(self, context): self.log.info('Creating cluster: %s', self.cluster_name) hook = DataProcHook( - gcp_conn_id=self.google_cloud_conn_id, + gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to ) service = hook.get_conn() @@ -315,29 +315,29 @@ class DataprocClusterDeleteOperator(BaseOperator): cluster_name, project_id, region='global', - google_cloud_conn_id='google_cloud_default', + gcp_conn_id='google_cloud_default', delegate_to=None, *args, **kwargs): """ Delete a cluster on Google Cloud Dataproc. - :param cluster_name: The name of the cluster to create + :param cluster_name: The name of the cluster to create. :type cluster_name: string :param project_id: The ID of the google cloud project in which the cluster runs :type project_id: string :param region: leave as 'global', might become relevant in the future :type region: string - :param google_cloud_conn_id: The connection id to use when connecting to dataproc - :type google_cloud_conn_id: string + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string :param delegate_to: The account to impersonate, if any. For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: string """ super(DataprocClusterDeleteOperator, self).__init__(*args, **kwargs) - self.google_cloud_conn_id = google_cloud_conn_id + self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.cluster_name = cluster_name self.project_id = project_id @@ -360,7 +360,7 @@ class DataprocClusterDeleteOperator(BaseOperator): def execute(self, context): self.log.info('Deleting cluster: %s', self.cluster_name) hook = DataProcHook( - gcp_conn_id=self.google_cloud_conn_id, + gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to ) service = hook.get_conn() @@ -385,7 +385,7 @@ class DataProcPigOperator(BaseOperator): ``` default_args = { - 'dataproc_cluster': 'cluster-1', + 'cluster_name': 'cluster-1', 'dataproc_pig_jars': [ 'gs://example/udf/jar/datafu/1.2.0/datafu.jar', 'gs://example/udf/jar/gpig/1.2/gpig.jar' @@ -405,7 +405,7 @@ class DataProcPigOperator(BaseOperator): dag=dag) ``` """ - template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster'] + template_fields = ['query', 'variables', 'job_name', 'cluster_name'] template_ext = ('.pg', '.pig',) ui_color = '#0273d4' @@ -416,7 +416,7 @@ class DataProcPigOperator(BaseOperator): query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_pig_properties=None, dataproc_pig_jars=None, gcp_conn_id='google_cloud_default', @@ -440,8 +440,8 @@ class DataProcPigOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_pig_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_pig_properties: dict @@ -462,14 +462,14 @@ class DataProcPigOperator(BaseOperator): self.query_uri = query_uri self.variables = variables self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_pig_properties self.dataproc_jars = dataproc_pig_jars def execute(self, context): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pigJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "pigJob", self.dataproc_properties) if self.query is None: @@ -487,7 +487,7 @@ class DataProcHiveOperator(BaseOperator): """ Start a Hive query Job on a Cloud DataProc cluster. """ - template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster'] + template_fields = ['query', 'variables', 'job_name', 'cluster_name'] template_ext = ('.q',) ui_color = '#0273d4' @@ -498,7 +498,7 @@ class DataProcHiveOperator(BaseOperator): query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_hive_properties=None, dataproc_hive_jars=None, gcp_conn_id='google_cloud_default', @@ -518,8 +518,8 @@ class DataProcHiveOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_hive_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_hive_properties: dict @@ -540,7 +540,7 @@ class DataProcHiveOperator(BaseOperator): self.query_uri = query_uri self.variables = variables self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_hive_properties self.dataproc_jars = dataproc_hive_jars @@ -548,7 +548,7 @@ class DataProcHiveOperator(BaseOperator): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hiveJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "hiveJob", self.dataproc_properties) if self.query is None: @@ -566,7 +566,7 @@ class DataProcSparkSqlOperator(BaseOperator): """ Start a Spark SQL query Job on a Cloud DataProc cluster. """ - template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster'] + template_fields = ['query', 'variables', 'job_name', 'cluster_name'] template_ext = ('.q',) ui_color = '#0273d4' @@ -577,7 +577,7 @@ class DataProcSparkSqlOperator(BaseOperator): query_uri=None, variables=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', @@ -597,8 +597,8 @@ class DataProcSparkSqlOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_spark_properties: dict @@ -619,7 +619,7 @@ class DataProcSparkSqlOperator(BaseOperator): self.query_uri = query_uri self.variables = variables self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_spark_properties self.dataproc_jars = dataproc_spark_jars @@ -627,7 +627,7 @@ class DataProcSparkSqlOperator(BaseOperator): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkSqlJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "sparkSqlJob", self.dataproc_properties) if self.query is None: @@ -646,7 +646,7 @@ class DataProcSparkOperator(BaseOperator): Start a Spark Job on a Cloud DataProc cluster. """ - template_fields = ['arguments', 'job_name', 'dataproc_cluster'] + template_fields = ['arguments', 'job_name', 'cluster_name'] ui_color = '#0273d4' @apply_defaults @@ -658,7 +658,7 @@ class DataProcSparkOperator(BaseOperator): archives=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, gcp_conn_id='google_cloud_default', @@ -685,8 +685,8 @@ class DataProcSparkOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_spark_properties: dict @@ -709,14 +709,14 @@ class DataProcSparkOperator(BaseOperator): self.archives = archives self.files = files self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_spark_properties self.dataproc_jars = dataproc_spark_jars def execute(self, context): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "sparkJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "sparkJob", self.dataproc_properties) job.set_main(self.main_jar, self.main_class) @@ -734,7 +734,7 @@ class DataProcHadoopOperator(BaseOperator): Start a Hadoop Job on a Cloud DataProc cluster. """ - template_fields = ['arguments', 'job_name', 'dataproc_cluster'] + template_fields = ['arguments', 'job_name', 'cluster_name'] ui_color = '#0273d4' @apply_defaults @@ -746,7 +746,7 @@ class DataProcHadoopOperator(BaseOperator): archives=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_hadoop_properties=None, dataproc_hadoop_jars=None, gcp_conn_id='google_cloud_default', @@ -773,8 +773,8 @@ class DataProcHadoopOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_hadoop_properties: dict @@ -797,14 +797,14 @@ class DataProcHadoopOperator(BaseOperator): self.archives = archives self.files = files self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_hadoop_properties self.dataproc_jars = dataproc_hadoop_jars def execute(self, context): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "hadoopJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "hadoopJob", self.dataproc_properties) job.set_main(self.main_jar, self.main_class) @@ -822,7 +822,7 @@ class DataProcPySparkOperator(BaseOperator): Start a PySpark Job on a Cloud DataProc cluster. """ - template_fields = ['arguments', 'job_name', 'dataproc_cluster'] + template_fields = ['arguments', 'job_name', 'cluster_name'] ui_color = '#0273d4' @apply_defaults @@ -834,7 +834,7 @@ class DataProcPySparkOperator(BaseOperator): pyfiles=None, files=None, job_name='{{task.task_id}}_{{ds_nodash}}', - dataproc_cluster='cluster-1', + cluster_name='cluster-1', dataproc_pyspark_properties=None, dataproc_pyspark_jars=None, gcp_conn_id='google_cloud_default', @@ -861,8 +861,8 @@ class DataProcPySparkOperator(BaseOperator): is the task_id appended with the execution data, but can be templated. The name will always be appended with a random number to avoid name clashes. :type job_name: string - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in default arguments :type dataproc_pyspark_properties: dict @@ -885,14 +885,14 @@ class DataProcPySparkOperator(BaseOperator): self.files = files self.pyfiles = pyfiles self.job_name = job_name - self.dataproc_cluster = dataproc_cluster + self.cluster_name = cluster_name self.dataproc_properties = dataproc_pyspark_properties self.dataproc_jars = dataproc_pyspark_jars def execute(self, context): hook = DataProcHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to) - job = hook.create_job_template(self.task_id, self.dataproc_cluster, "pysparkJob", + job = hook.create_job_template(self.task_id, self.cluster_name, "pysparkJob", self.dataproc_properties) job.set_python_main(self.main)
