Repository: incubator-airflow Updated Branches: refs/heads/master c5f663387 -> c65f403a5
[AIRFLOW-589] Add templatable job_name[] The jobname is the name that will appear in the DataProc web console. It's helpfull to have a one-to-one mapping between the airflow task and the job running on the cluster. Adding a templated parameter will allow you to customize how airflow will construct the jobname. The default is to add the {{task_id}} + {{ds_nodash}} + random hash. Closes #1847 from alexvanboxel/feature/airflow-589 -dataproc-templated-job-name Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c65f403a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c65f403a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c65f403a Branch: refs/heads/master Commit: c65f403a532941136aa62fa978f18ba82ce0ae7d Parents: c5f6633 Author: Alex Van Boxel <a...@vanboxel.be> Authored: Mon Oct 24 07:46:47 2016 -0700 Committer: Chris Riccomini <chr...@wepay.com> Committed: Mon Oct 24 07:46:53 2016 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/gcp_dataproc_hook.py | 3 + airflow/contrib/operators/dataproc_operator.py | 106 ++++++++++++++------ 2 files changed, 77 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c65f403a/airflow/contrib/hooks/gcp_dataproc_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/gcp_dataproc_hook.py b/airflow/contrib/hooks/gcp_dataproc_hook.py index fab71c5..e77c951 100644 --- a/airflow/contrib/hooks/gcp_dataproc_hook.py +++ b/airflow/contrib/hooks/gcp_dataproc_hook.py @@ -126,6 +126,9 @@ class _DataProcJobBuilder: def set_python_main(self, main): self.job["job"][self.job_type]["mainPythonFileUri"] = main + def set_job_name(self, name): + self.job["job"]["reference"]["jobId"] = name + "_" + str(uuid.uuid1())[:8] + def build(self): return self.job http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c65f403a/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 33e5f79..2955d26 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -43,11 +43,11 @@ class DataProcPigOperator(BaseOperator): t1 = DataProcPigOperator( task_id='dataproc_pig', query='a_pig_script.pig', - variables={'out': 'gs://example/output/{ds}'}, + variables={'out': 'gs://example/output/{{ds}}'}, dag=dag) ``` """ - template_fields = ['query', 'variables'] + template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster'] template_ext = ('.pg', '.pig',) ui_color = '#0273d4' @@ -56,6 +56,7 @@ class DataProcPigOperator(BaseOperator): self, query, variables=None, + job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_pig_properties=None, dataproc_pig_jars=None, @@ -74,6 +75,10 @@ class DataProcPigOperator(BaseOperator): :type query: string :param variables: Map of named parameters for the query. :type variables: dict + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string :param dataproc_cluster: The id of the DataProc cluster. :type dataproc_cluster: string :param dataproc_pig_properties: Map for the Pig properties. Ideal to put in @@ -94,6 +99,7 @@ class DataProcPigOperator(BaseOperator): self.delegate_to = delegate_to self.query = query self.variables = variables + self.job_name = job_name self.dataproc_cluster = dataproc_cluster self.dataproc_properties = dataproc_pig_properties self.dataproc_jars = dataproc_pig_jars @@ -107,6 +113,7 @@ class DataProcPigOperator(BaseOperator): job.add_query(self.query) job.add_variables(self.variables) job.add_jar_file_uris(self.dataproc_jars) + job.set_job_name(self.job_name) hook.submit(hook.project_id, job.build()) @@ -115,7 +122,7 @@ class DataProcHiveOperator(BaseOperator): """ Start a Hive query Job on a Cloud DataProc cluster. """ - template_fields = ['query', 'variables'] + template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster'] template_ext = ('.q',) ui_color = '#0273d4' @@ -124,6 +131,7 @@ class DataProcHiveOperator(BaseOperator): self, query, variables=None, + job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_hive_properties=None, dataproc_hive_jars=None, @@ -138,6 +146,10 @@ class DataProcHiveOperator(BaseOperator): :type query: string :param variables: Map of named parameters for the query. :type variables: dict + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string :param dataproc_cluster: The id of the DataProc cluster. :type dataproc_cluster: string :param dataproc_hive_properties: Map for the Pig properties. Ideal to put in @@ -158,6 +170,7 @@ class DataProcHiveOperator(BaseOperator): self.delegate_to = delegate_to self.query = query self.variables = variables + self.job_name = job_name self.dataproc_cluster = dataproc_cluster self.dataproc_properties = dataproc_hive_properties self.dataproc_jars = dataproc_hive_jars @@ -172,6 +185,7 @@ class DataProcHiveOperator(BaseOperator): job.add_query(self.query) job.add_variables(self.variables) job.add_jar_file_uris(self.dataproc_jars) + job.set_job_name(self.job_name) hook.submit(hook.project_id, job.build()) @@ -180,7 +194,7 @@ class DataProcSparkSqlOperator(BaseOperator): """ Start a Spark SQL query Job on a Cloud DataProc cluster. """ - template_fields = ['query', 'variables'] + template_fields = ['query', 'variables', 'job_name', 'dataproc_cluster'] template_ext = ('.q',) ui_color = '#0273d4' @@ -189,6 +203,7 @@ class DataProcSparkSqlOperator(BaseOperator): self, query, variables=None, + job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, @@ -203,6 +218,10 @@ class DataProcSparkSqlOperator(BaseOperator): :type query: string :param variables: Map of named parameters for the query. :type variables: dict + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string :param dataproc_cluster: The id of the DataProc cluster. :type dataproc_cluster: string :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in @@ -223,6 +242,7 @@ class DataProcSparkSqlOperator(BaseOperator): self.delegate_to = delegate_to self.query = query self.variables = variables + self.job_name = job_name self.dataproc_cluster = dataproc_cluster self.dataproc_properties = dataproc_spark_properties self.dataproc_jars = dataproc_spark_jars @@ -237,6 +257,7 @@ class DataProcSparkSqlOperator(BaseOperator): job.add_query(self.query) job.add_variables(self.variables) job.add_jar_file_uris(self.dataproc_jars) + job.set_job_name(self.job_name) hook.submit(hook.project_id, job.build()) @@ -246,7 +267,7 @@ class DataProcSparkOperator(BaseOperator): Start a Spark Job on a Cloud DataProc cluster. """ - template_fields = ['arguments'] + template_fields = ['arguments', 'job_name', 'dataproc_cluster'] ui_color = '#0273d4' @apply_defaults @@ -257,6 +278,7 @@ class DataProcSparkOperator(BaseOperator): arguments=None, archives=None, files=None, + job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_spark_properties=None, dataproc_spark_jars=None, @@ -280,6 +302,10 @@ class DataProcSparkOperator(BaseOperator): :type archives: list :param files: List of files to be copied to the working directory :type files: list + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string :param dataproc_cluster: The id of the DataProc cluster. :type dataproc_cluster: string :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in @@ -303,6 +329,7 @@ class DataProcSparkOperator(BaseOperator): self.arguments = arguments self.archives = archives self.files = files + self.job_name = job_name self.dataproc_cluster = dataproc_cluster self.dataproc_properties = dataproc_spark_properties self.dataproc_jars = dataproc_spark_jars @@ -318,6 +345,7 @@ class DataProcSparkOperator(BaseOperator): job.add_jar_file_uris(self.dataproc_jars) job.add_archive_uris(self.archives) job.add_file_uris(self.files) + job.set_job_name(self.job_name) hook.submit(hook.project_id, job.build()) @@ -327,7 +355,7 @@ class DataProcHadoopOperator(BaseOperator): Start a Hadoop Job on a Cloud DataProc cluster. """ - template_fields = ['arguments'] + template_fields = ['arguments', 'job_name', 'dataproc_cluster'] ui_color = '#0273d4' @apply_defaults @@ -338,6 +366,7 @@ class DataProcHadoopOperator(BaseOperator): arguments=None, archives=None, files=None, + job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_hadoop_properties=None, dataproc_hadoop_jars=None, @@ -361,6 +390,10 @@ class DataProcHadoopOperator(BaseOperator): :type archives: list :param files: List of files to be copied to the working directory :type files: list + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string :param dataproc_cluster: The id of the DataProc cluster. :type dataproc_cluster: string :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in @@ -384,6 +417,7 @@ class DataProcHadoopOperator(BaseOperator): self.arguments = arguments self.archives = archives self.files = files + self.job_name = job_name self.dataproc_cluster = dataproc_cluster self.dataproc_properties = dataproc_hadoop_properties self.dataproc_jars = dataproc_hadoop_jars @@ -399,6 +433,7 @@ class DataProcHadoopOperator(BaseOperator): job.add_jar_file_uris(self.dataproc_jars) job.add_archive_uris(self.archives) job.add_file_uris(self.files) + job.set_job_name(self.job_name) hook.submit(hook.project_id, job.build()) @@ -408,7 +443,7 @@ class DataProcPySparkOperator(BaseOperator): Start a PySpark Job on a Cloud DataProc cluster. """ - template_fields = ['arguments'] + template_fields = ['arguments', 'job_name', 'dataproc_cluster'] ui_color = '#0273d4' @apply_defaults @@ -419,6 +454,7 @@ class DataProcPySparkOperator(BaseOperator): archives=None, pyfiles=None, files=None, + job_name='{{task.task_id}}_{{ds_nodash}}', dataproc_cluster='cluster-1', dataproc_pyspark_properties=None, dataproc_pyspark_jars=None, @@ -427,35 +463,39 @@ class DataProcPySparkOperator(BaseOperator): *args, **kwargs): """ - Create a new DataProcPySparkOperator. + Create a new DataProcPySparkOperator. - :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main + :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main Python file to use as the driver. Must be a .py file. - :type main: string - :param arguments: Arguments for the job. - :type arguments: list - :param archives: List of archived files that will be unpacked in the work + :type main: string + :param arguments: Arguments for the job. + :type arguments: list + :param archives: List of archived files that will be unpacked in the work directory. Should be stored in Cloud Storage. - :type archives: list - :param files: List of files to be copied to the working directory - :type files: list - :param pyfiles: List of Python files to pass to the PySpark framework. + :type archives: list + :param files: List of files to be copied to the working directory + :type files: list + :param pyfiles: List of Python files to pass to the PySpark framework. Supported file types: .py, .egg, and .zip - :type pyfiles: list - :param dataproc_cluster: The id of the DataProc cluster. - :type dataproc_cluster: string - :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in - default arguments - :type dataproc_pyspark_properties: dict - :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud Storage (example: - for UDFs and libs) and are ideal to put in default arguments. - :type dataproc_pyspark_jars: list - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: string - :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have - domain-wide delegation enabled. - :type delegate_to: string + :type pyfiles: list + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string + :param dataproc_cluster: The id of the DataProc cluster. + :type dataproc_cluster: string + :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_pyspark_properties: dict + :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud Storage (example: + for UDFs and libs) and are ideal to put in default arguments. + :type dataproc_pyspark_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: string """ super(DataProcPySparkOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id @@ -465,6 +505,7 @@ class DataProcPySparkOperator(BaseOperator): self.archives = archives self.files = files self.pyfiles = pyfiles + self.job_name = job_name self.dataproc_cluster = dataproc_cluster self.dataproc_properties = dataproc_pyspark_properties self.dataproc_jars = dataproc_pyspark_jars @@ -481,5 +522,6 @@ class DataProcPySparkOperator(BaseOperator): job.add_archive_uris(self.archives) job.add_file_uris(self.files) job.add_python_file_uris(self.pyfiles) + job.set_job_name(self.job_name) hook.submit(hook.project_id, job.build())