Repository: incubator-airflow Updated Branches: refs/heads/master a2bb2d70a -> b48bbbd6f
[AIRFLOW-1997] Fix GCP operator doc strings Closes #2939 from kaxil/docstring_fix Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b48bbbd6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b48bbbd6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b48bbbd6 Branch: refs/heads/master Commit: b48bbbd6f1879f45f50d130ab754a74346fdbf92 Parents: a2bb2d7 Author: Kaxil Naik <[email protected]> Authored: Fri Jan 12 08:59:40 2018 -0800 Committer: Chris Riccomini <[email protected]> Committed: Fri Jan 12 08:59:47 2018 -0800 ---------------------------------------------------------------------- airflow/contrib/operators/dataproc_operator.py | 548 ++++++++++---------- airflow/contrib/operators/gcs_to_bq.py | 191 ++++--- 2 files changed, 359 insertions(+), 380 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b48bbbd6/airflow/contrib/operators/dataproc_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 9c1eb0f..3b10382 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -33,9 +33,66 @@ class DataprocClusterCreateOperator(BaseOperator): for a detailed explanation on the different parameters. Most of the configuration parameters detailed in the link are available as a parameter to this operator. + + :param cluster_name: The name of the DataProc cluster to create. + :type cluster_name: string + :param project_id: The ID of the google cloud project in which + to create the cluster + :type project_id: string + :param num_workers: The # of workers to spin up + :type num_workers: int + :param storage_bucket: The storage bucket to use, setting to None lets dataproc + generate a custom one for you + :type storage_bucket: string + :param init_actions_uris: List of GCS uri's containing + dataproc initialization scripts + :type init_actions_uris: list[string] + :param metadata: dict of key-value google compute engine metadata entries + to add to all instances + :type metadata: dict + :param image_version: the version of software inside the Dataproc cluster + :type image_version: string + :param properties: dict of properties to set on + config files (e.g. spark-defaults.conf), see + https://cloud.google.com/dataproc/docs/reference/rest/v1/ \ + projects.regions.clusters#SoftwareConfig + :type properties: dict + :param master_machine_type: Compute engine machine type to use for the master node + :type master_machine_type: string + :param master_disk_size: Disk size for the master node + :type int + :param worker_machine_type:Compute engine machine type to use for the worker nodes + :type worker_machine_type: string + :param worker_disk_size: Disk size for the worker nodes + :type worker_disk_size: int + :param num_preemptible_workers: The # of preemptible worker nodes to spin up + :type num_preemptible_workers: int + :param labels: dict of labels to add to the cluster + :type labels: dict + :param zone: The zone where the cluster will be located + :type zone: string + :param network_uri: The network uri to be used for machine communication, cannot be + specified with subnetwork_uri + :type network_uri: string + :param subnetwork_uri: The subnetwork uri to be used for machine communication, + cannot be specified with network_uri + :type subnetwork_uri: string + :param tags: The GCE tags to add to all instances + :type tags: list[string] + :param region: leave as 'global', might become relevant in the future + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param service_account: The service account of the dataproc instances. + :type service_account: string + :param service_account_scopes: The URIs of service account scopes to be included. + :type service_account_scopes: list[string] """ - template_fields = ['cluster_name',] + template_fields = ['cluster_name', ] @apply_defaults def __init__(self, @@ -64,70 +121,7 @@ class DataprocClusterCreateOperator(BaseOperator): service_account_scopes=None, *args, **kwargs): - """ - Create a new DataprocClusterCreateOperator. - - For more info on the creation of a cluster through the API, have a look at: - - https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters - - :param cluster_name: The name of the DataProc cluster to create. - :type cluster_name: string - :param project_id: The ID of the google cloud project in which - to create the cluster - :type project_id: string - :param num_workers: The # of workers to spin up - :type num_workers: int - :param storage_bucket: The storage bucket to use, setting to None lets dataproc - generate a custom one for you - :type storage_bucket: string - :param init_actions_uris: List of GCS uri's containing - dataproc initialization scripts - :type init_actions_uris: list[string] - :param metadata: dict of key-value google compute engine metadata entries - to add to all instances - :type metadata: dict - :param image_version: the version of software inside the Dataproc cluster - :type image_version: string - :param properties: dict of properties to set on - config files (e.g. spark-defaults.conf), see - https://cloud.google.com/dataproc/docs/reference/rest/v1/ \ - projects.regions.clusters#SoftwareConfig - :type properties: dict - :param master_machine_type: Compute engine machine type to use for the master node - :type master_machine_type: string - :param master_disk_size: Disk size for the master node - :type int - :param worker_machine_type:Compute engine machine type to use for the worker nodes - :type worker_machine_type: string - :param worker_disk_size: Disk size for the worker nodes - :type worker_disk_size: int - :param num_preemptible_workers: The # of preemptible worker nodes to spin up - :type num_preemptible_workers: int - :param labels: dict of labels to add to the cluster - :type labels: dict - :param zone: The zone where the cluster will be located - :type zone: string - :param network_uri: The network uri to be used for machine communication, cannot be - specified with subnetwork_uri - :type network_uri: string - :param subnetwork_uri: The subnetwork uri to be used for machine communication, cannot be - specified with network_uri - :type subnetwork_uri: string - :param tags: The GCE tags to add to all instances - :type tags: list[string] - :param region: leave as 'global', might become relevant in the future - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: string - :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide - delegation enabled. - :type delegate_to: string - :param service_account: The service account of the dataproc instances. - :type service_account: string - :param service_account_scopes: The URIs of service account scopes to be included. - :type service_account_scopes: list[string] - """ + super(DataprocClusterCreateOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to @@ -326,6 +320,20 @@ class DataprocClusterDeleteOperator(BaseOperator): """ Delete a cluster on Google Cloud Dataproc. The operator will wait until the cluster is destroyed. + + :param cluster_name: The name of the cluster to create. + :type cluster_name: string + :param project_id: The ID of the google cloud project in which + the cluster runs + :type project_id: string + :param region: leave as 'global', might become relevant in the future + :type region: string + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string """ template_fields = ['cluster_name'] @@ -339,23 +347,7 @@ class DataprocClusterDeleteOperator(BaseOperator): delegate_to=None, *args, **kwargs): - """ - Delete a cluster on Google Cloud Dataproc. - - :param cluster_name: The name of the cluster to create. - :type cluster_name: string - :param project_id: The ID of the google cloud project in which - the cluster runs - :type project_id: string - :param region: leave as 'global', might become relevant in the future - :type region: string - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: string - :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide - delegation enabled. - :type delegate_to: string - """ + super(DataprocClusterDeleteOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to @@ -424,6 +416,37 @@ class DataProcPigOperator(BaseOperator): variables={'out': 'gs://example/output/{{ds}}'}, dag=dag) ``` + + For more detail on about job submission have a look at the reference: + + https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs + + :param query: The query or reference to the query file (pg or pig extension). + :type query: string + :param query_uri: The uri of a pig script on Cloud Storage. + :type query_uri: string + :param variables: Map of named parameters for the query. + :type variables: dict + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string + :param dataproc_pig_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_pig_properties: dict + :param dataproc_pig_jars: URIs to jars provisioned in Cloud Storage (example: for + UDFs and libs) and are ideal to put in default arguments. + :type dataproc_pig_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param region: The specified region where the dataproc cluster is created. + :type region: string """ template_fields = ['query', 'variables', 'job_name', 'cluster_name'] template_ext = ('.pg', '.pig',) @@ -444,40 +467,7 @@ class DataProcPigOperator(BaseOperator): region='global', *args, **kwargs): - """ - Create a new DataProcPigOperator. - - For more detail on about job submission have a look at the reference: - - https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs - - :param query: The query or reference to the query file (pg or pig extension). - :type query: string - :param query_uri: The uri of a pig script on Cloud Storage. - :type query_uri: string - :param variables: Map of named parameters for the query. - :type variables: dict - :param job_name: The job name used in the DataProc cluster. This name by default - is the task_id appended with the execution data, but can be templated. The - name will always be appended with a random number to avoid name clashes. - :type job_name: string - :param cluster_name: The name of the DataProc cluster. - :type cluster_name: string - :param dataproc_pig_properties: Map for the Pig properties. Ideal to put in - default arguments - :type dataproc_pig_properties: dict - :param dataproc_pig_jars: URIs to jars provisioned in Cloud Storage (example: for - UDFs and libs) and are ideal to put in default arguments. - :type dataproc_pig_jars: list - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: string - :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide - delegation enabled. - :type delegate_to: string - :param region: The specified region where the dataproc cluster is created. - :type region: string - """ + super(DataProcPigOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to @@ -510,6 +500,33 @@ class DataProcPigOperator(BaseOperator): class DataProcHiveOperator(BaseOperator): """ Start a Hive query Job on a Cloud DataProc cluster. + + :param query: The query or reference to the query file (q extension). + :type query: string + :param query_uri: The uri of a hive script on Cloud Storage. + :type query_uri: string + :param variables: Map of named parameters for the query. + :type variables: dict + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string + :param dataproc_hive_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_hive_properties: dict + :param dataproc_hive_jars: URIs to jars provisioned in Cloud Storage (example: for + UDFs and libs) and are ideal to put in default arguments. + :type dataproc_hive_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param region: The specified region where the dataproc cluster is created. + :type region: string """ template_fields = ['query', 'variables', 'job_name', 'cluster_name'] template_ext = ('.q',) @@ -530,36 +547,7 @@ class DataProcHiveOperator(BaseOperator): region='global', *args, **kwargs): - """ - Create a new DataProcHiveOperator. - - :param query: The query or reference to the query file (q extension). - :type query: string - :param query_uri: The uri of a hive script on Cloud Storage. - :type query_uri: string - :param variables: Map of named parameters for the query. - :type variables: dict - :param job_name: The job name used in the DataProc cluster. This name by default - is the task_id appended with the execution data, but can be templated. The - name will always be appended with a random number to avoid name clashes. - :type job_name: string - :param cluster_name: The name of the DataProc cluster. - :type cluster_name: string - :param dataproc_hive_properties: Map for the Pig properties. Ideal to put in - default arguments - :type dataproc_hive_properties: dict - :param dataproc_hive_jars: URIs to jars provisioned in Cloud Storage (example: for - UDFs and libs) and are ideal to put in default arguments. - :type dataproc_hive_jars: list - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: string - :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide - delegation enabled. - :type delegate_to: string - :param region: The specified region where the dataproc cluster is created. - :type region: string - """ + super(DataProcHiveOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to @@ -593,6 +581,33 @@ class DataProcHiveOperator(BaseOperator): class DataProcSparkSqlOperator(BaseOperator): """ Start a Spark SQL query Job on a Cloud DataProc cluster. + + :param query: The query or reference to the query file (q extension). + :type query: string + :param query_uri: The uri of a spark sql script on Cloud Storage. + :type query_uri: string + :param variables: Map of named parameters for the query. + :type variables: dict + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string + :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_spark_properties: dict + :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example: + for UDFs and libs) and are ideal to put in default arguments. + :type dataproc_spark_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param region: The specified region where the dataproc cluster is created. + :type region: string """ template_fields = ['query', 'variables', 'job_name', 'cluster_name'] template_ext = ('.q',) @@ -613,36 +628,7 @@ class DataProcSparkSqlOperator(BaseOperator): region='global', *args, **kwargs): - """ - Create a new DataProcSparkSqlOperator. - - :param query: The query or reference to the query file (q extension). - :type query: string - :param query_uri: The uri of a spark sql script on Cloud Storage. - :type query_uri: string - :param variables: Map of named parameters for the query. - :type variables: dict - :param job_name: The job name used in the DataProc cluster. This name by default - is the task_id appended with the execution data, but can be templated. The - name will always be appended with a random number to avoid name clashes. - :type job_name: string - :param cluster_name: The name of the DataProc cluster. - :type cluster_name: string - :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in - default arguments - :type dataproc_spark_properties: dict - :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example: - for UDFs and libs) and are ideal to put in default arguments. - :type dataproc_spark_jars: list - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: string - :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide - delegation enabled. - :type delegate_to: string - :param region: The specified region where the dataproc cluster is created. - :type region: string - """ + super(DataProcSparkSqlOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to @@ -676,6 +662,40 @@ class DataProcSparkSqlOperator(BaseOperator): class DataProcSparkOperator(BaseOperator): """ Start a Spark Job on a Cloud DataProc cluster. + + :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or + the main_class, not both together). + :type main_jar: string + :param main_class: Name of the job class. (use this or the main_jar, not both + together). + :type main_class: string + :param arguments: Arguments for the job. + :type arguments: list + :param archives: List of archived files that will be unpacked in the work + directory. Should be stored in Cloud Storage. + :type archives: list + :param files: List of files to be copied to the working directory + :type files: list + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string + :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_spark_properties: dict + :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example: + for UDFs and libs) and are ideal to put in default arguments. + :type dataproc_spark_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param region: The specified region where the dataproc cluster is created. + :type region: string """ template_fields = ['arguments', 'job_name', 'cluster_name'] @@ -698,43 +718,7 @@ class DataProcSparkOperator(BaseOperator): region='global', *args, **kwargs): - """ - Create a new DataProcSparkOperator. - :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or - the main_class, not both together). - :type main_jar: string - :param main_class: Name of the job class. (use this or the main_jar, not both - together). - :type main_class: string - :param arguments: Arguments for the job. - :type arguments: list - :param archives: List of archived files that will be unpacked in the work - directory. Should be stored in Cloud Storage. - :type archives: list - :param files: List of files to be copied to the working directory - :type files: list - :param job_name: The job name used in the DataProc cluster. This name by default - is the task_id appended with the execution data, but can be templated. The - name will always be appended with a random number to avoid name clashes. - :type job_name: string - :param cluster_name: The name of the DataProc cluster. - :type cluster_name: string - :param dataproc_spark_properties: Map for the Pig properties. Ideal to put in - default arguments - :type dataproc_spark_properties: dict - :param dataproc_spark_jars: URIs to jars provisioned in Cloud Storage (example: - for UDFs and libs) and are ideal to put in default arguments. - :type dataproc_spark_jars: list - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: string - :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide - delegation enabled. - :type delegate_to: string - :param region: The specified region where the dataproc cluster is created. - :type region: string - """ super(DataProcSparkOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to @@ -768,6 +752,40 @@ class DataProcSparkOperator(BaseOperator): class DataProcHadoopOperator(BaseOperator): """ Start a Hadoop Job on a Cloud DataProc cluster. + + :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or + the main_class, not both together). + :type main_jar: string + :param main_class: Name of the job class. (use this or the main_jar, not both + together). + :type main_class: string + :param arguments: Arguments for the job. + :type arguments: list + :param archives: List of archived files that will be unpacked in the work + directory. Should be stored in Cloud Storage. + :type archives: list + :param files: List of files to be copied to the working directory + :type files: list + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string + :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_hadoop_properties: dict + :param dataproc_hadoop_jars: URIs to jars provisioned in Cloud Storage (example: + for UDFs and libs) and are ideal to put in default arguments. + :type dataproc_hadoop_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param region: The specified region where the dataproc cluster is created. + :type region: string """ template_fields = ['arguments', 'job_name', 'cluster_name'] @@ -790,43 +808,7 @@ class DataProcHadoopOperator(BaseOperator): region='global', *args, **kwargs): - """ - Create a new DataProcHadoopOperator. - :param main_jar: URI of the job jar provisioned on Cloud Storage. (use this or - the main_class, not both together). - :type main_jar: string - :param main_class: Name of the job class. (use this or the main_jar, not both - together). - :type main_class: string - :param arguments: Arguments for the job. - :type arguments: list - :param archives: List of archived files that will be unpacked in the work - directory. Should be stored in Cloud Storage. - :type archives: list - :param files: List of files to be copied to the working directory - :type files: list - :param job_name: The job name used in the DataProc cluster. This name by default - is the task_id appended with the execution data, but can be templated. The - name will always be appended with a random number to avoid name clashes. - :type job_name: string - :param cluster_name: The name of the DataProc cluster. - :type cluster_name: string - :param dataproc_hadoop_properties: Map for the Pig properties. Ideal to put in - default arguments - :type dataproc_hadoop_properties: dict - :param dataproc_hadoop_jars: URIs to jars provisioned in Cloud Storage (example: - for UDFs and libs) and are ideal to put in default arguments. - :type dataproc_hadoop_jars: list - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: string - :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have domain-wide - delegation enabled. - :type delegate_to: string - :param region: The specified region where the dataproc cluster is created. - :type region: string - """ super(DataProcHadoopOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to @@ -860,6 +842,40 @@ class DataProcHadoopOperator(BaseOperator): class DataProcPySparkOperator(BaseOperator): """ Start a PySpark Job on a Cloud DataProc cluster. + + :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main + Python file to use as the driver. Must be a .py file. + :type main: string + :param arguments: Arguments for the job. + :type arguments: list + :param archives: List of archived files that will be unpacked in the work + directory. Should be stored in Cloud Storage. + :type archives: list + :param files: List of files to be copied to the working directory + :type files: list + :param pyfiles: List of Python files to pass to the PySpark framework. + Supported file types: .py, .egg, and .zip + :type pyfiles: list + :param job_name: The job name used in the DataProc cluster. This name by default + is the task_id appended with the execution data, but can be templated. The + name will always be appended with a random number to avoid name clashes. + :type job_name: string + :param cluster_name: The name of the DataProc cluster. + :type cluster_name: string + :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in + default arguments + :type dataproc_pyspark_properties: dict + :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud Storage (example: + for UDFs and libs) and are ideal to put in default arguments. + :type dataproc_pyspark_jars: list + :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. + :type gcp_conn_id: string + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: string + :param region: The specified region where the dataproc cluster is created. + :type region: string """ template_fields = ['arguments', 'job_name', 'cluster_name'] @@ -882,43 +898,7 @@ class DataProcPySparkOperator(BaseOperator): region='global', *args, **kwargs): - """ - Create a new DataProcPySparkOperator. - :param main: [Required] The Hadoop Compatible Filesystem (HCFS) URI of the main - Python file to use as the driver. Must be a .py file. - :type main: string - :param arguments: Arguments for the job. - :type arguments: list - :param archives: List of archived files that will be unpacked in the work - directory. Should be stored in Cloud Storage. - :type archives: list - :param files: List of files to be copied to the working directory - :type files: list - :param pyfiles: List of Python files to pass to the PySpark framework. - Supported file types: .py, .egg, and .zip - :type pyfiles: list - :param job_name: The job name used in the DataProc cluster. This name by default - is the task_id appended with the execution data, but can be templated. The - name will always be appended with a random number to avoid name clashes. - :type job_name: string - :param cluster_name: The name of the DataProc cluster. - :type cluster_name: string - :param dataproc_pyspark_properties: Map for the Pig properties. Ideal to put in - default arguments - :type dataproc_pyspark_properties: dict - :param dataproc_pyspark_jars: URIs to jars provisioned in Cloud Storage (example: - for UDFs and libs) and are ideal to put in default arguments. - :type dataproc_pyspark_jars: list - :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. - :type gcp_conn_id: string - :param delegate_to: The account to impersonate, if any. - For this to work, the service account making the request must have - domain-wide delegation enabled. - :type delegate_to: string - :param region: The specified region where the dataproc cluster is created. - :type region: string - """ super(DataProcPySparkOperator, self).__init__(*args, **kwargs) self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b48bbbd6/airflow/contrib/operators/gcs_to_bq.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index 75302b6..7625bbe 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -23,6 +23,77 @@ from airflow.utils.decorators import apply_defaults class GoogleCloudStorageToBigQueryOperator(BaseOperator): """ Loads files from Google cloud storage into BigQuery. + + The schema to be used for the BigQuery table may be specified in one of + two ways. You may either directly pass the schema fields in, or you may + point the operator to a Google cloud storage object name. The object in + Google cloud storage must be a JSON file with the schema fields in it. + + :param bucket: The bucket to load from. + :type bucket: string + :param source_objects: List of Google cloud storage URIs to load from. + If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI. + :type object: list + :param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table> + BigQuery table to load data into. If <project> is not included, project will + be the project defined in the connection json. + :type destination_project_dataset_table: string + :param schema_fields: If set, the schema field list as defined here: + https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load + Should not be set when source_format is 'DATASTORE_BACKUP'. + :type schema_fields: list + :param schema_object: If set, a GCS object path pointing to a .json file that + contains the schema for the table. + :param schema_object: string + :param source_format: File format to export. + :type source_format: string + :param create_disposition: The create disposition if the table doesn't exist. + :type create_disposition: string + :param skip_leading_rows: Number of rows to skip when loading from a CSV. + :type skip_leading_rows: int + :param write_disposition: The write disposition if the table already exists. + :type write_disposition: string + :param field_delimiter: The delimiter to use when loading from a CSV. + :type field_delimiter: string + :param max_bad_records: The maximum number of bad records that BigQuery can + ignore when running the job. + :type max_bad_records: int + :param quote_character: The value that is used to quote data sections in a CSV file. + :type quote_character: string + :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false). + :type allow_quoted_newlines: boolean + :param allow_jagged_rows: Accept rows that are missing trailing optional columns. + The missing values are treated as nulls. If false, records with missing trailing + columns are treated as bad records, and if there are too many bad records, an + invalid error is returned in the job result. Only applicable to CSV, ignored + for other formats. + :type allow_jagged_rows: bool + :param max_id_key: If set, the name of a column in the BigQuery table + that's to be loaded. Thsi will be used to select the MAX value from + BigQuery after the load occurs. The results will be returned by the + execute() command, which in turn gets stored in XCom for future + operators to use. This can be helpful with incremental loads--during + future executions, you can pick up from the max ID. + :type max_id_key: string + :param bigquery_conn_id: Reference to a specific BigQuery hook. + :type bigquery_conn_id: string + :param google_cloud_storage_conn_id: Reference to a specific Google + cloud storage hook. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. For this to + work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param schema_update_options: Allows the schema of the desitination + table to be updated as a side effect of the load job. + :type schema_update_options: list + :param src_fmt_configs: configure optional fields specific to the source format + :type src_fmt_configs: dict + :param time_partitioning: configure optional time partitioning fields i.e. + partition by field, type and expiration as per API specifications. + Note that 'field' is not available in concurrency with + dataset.table$partition. + :type time_partitioning: dict """ template_fields = ('bucket', 'source_objects', 'schema_object', 'destination_project_dataset_table') @@ -30,102 +101,30 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): ui_color = '#f0eee4' @apply_defaults - def __init__( - self, - bucket, - source_objects, - destination_project_dataset_table, - schema_fields=None, - schema_object=None, - source_format='CSV', - create_disposition='CREATE_IF_NEEDED', - skip_leading_rows=0, - write_disposition='WRITE_EMPTY', - field_delimiter=',', - max_bad_records=0, - quote_character=None, - allow_quoted_newlines=False, - allow_jagged_rows=False, - max_id_key=None, - bigquery_conn_id='bigquery_default', - google_cloud_storage_conn_id='google_cloud_storage_default', - delegate_to=None, - schema_update_options=(), - src_fmt_configs={}, - time_partitioning={}, - *args, - **kwargs): - """ - The schema to be used for the BigQuery table may be specified in one of - two ways. You may either directly pass the schema fields in, or you may - point the operator to a Google cloud storage object name. The object in - Google cloud storage must be a JSON file with the schema fields in it. - - :param bucket: The bucket to load from. - :type bucket: string - :param source_objects: List of Google cloud storage URIs to load from. - If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI. - :type object: list - :param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table> - BigQuery table to load data into. If <project> is not included, project will - be the project defined in the connection json. - :type destination_project_dataset_table: string - :param schema_fields: If set, the schema field list as defined here: - https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load - Should not be set when source_format is 'DATASTORE_BACKUP'. - :type schema_fields: list - :param schema_object: If set, a GCS object path pointing to a .json file that - contains the schema for the table. - :param schema_object: string - :param source_format: File format to export. - :type source_format: string - :param create_disposition: The create disposition if the table doesn't exist. - :type create_disposition: string - :param skip_leading_rows: Number of rows to skip when loading from a CSV. - :type skip_leading_rows: int - :param write_disposition: The write disposition if the table already exists. - :type write_disposition: string - :param field_delimiter: The delimiter to use when loading from a CSV. - :type field_delimiter: string - :param max_bad_records: The maximum number of bad records that BigQuery can - ignore when running the job. - :type max_bad_records: int - :param quote_character: The value that is used to quote data sections in a CSV file. - :type quote_character: string - :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false). - :type allow_quoted_newlines: boolean - :param allow_jagged_rows: Accept rows that are missing trailing optional columns. - The missing values are treated as nulls. If false, records with missing trailing columns - are treated as bad records, and if there are too many bad records, an invalid error is - returned in the job result. Only applicable to CSV, ignored for other formats. - :type allow_jagged_rows: bool - :param max_id_key: If set, the name of a column in the BigQuery table - that's to be loaded. Thsi will be used to select the MAX value from - BigQuery after the load occurs. The results will be returned by the - execute() command, which in turn gets stored in XCom for future - operators to use. This can be helpful with incremental loads--during - future executions, you can pick up from the max ID. - :type max_id_key: string - :param bigquery_conn_id: Reference to a specific BigQuery hook. - :type bigquery_conn_id: string - :param google_cloud_storage_conn_id: Reference to a specific Google - cloud storage hook. - :type google_cloud_storage_conn_id: string - :param delegate_to: The account to impersonate, if any. For this to - work, the service account making the request must have domain-wide - delegation enabled. - :type delegate_to: string - :param schema_update_options: Allows the schema of the desitination - table to be updated as a side effect of the load job. - :type schema_update_options: list - :param src_fmt_configs: configure optional fields specific to the source format - :type src_fmt_configs: dict - :param time_partitioning: configure optional time partitioning fields i.e. - partition by field, type and expiration as per API specifications. - Note that 'field' is not available in concurrency with - dataset.table$partition. - :type time_partitioning: dict - """ + def __init__(self, + bucket, + source_objects, + destination_project_dataset_table, + schema_fields=None, + schema_object=None, + source_format='CSV', + create_disposition='CREATE_IF_NEEDED', + skip_leading_rows=0, + write_disposition='WRITE_EMPTY', + field_delimiter=',', + max_bad_records=0, + quote_character=None, + allow_quoted_newlines=False, + allow_jagged_rows=False, + max_id_key=None, + bigquery_conn_id='bigquery_default', + google_cloud_storage_conn_id='google_cloud_storage_default', + delegate_to=None, + schema_update_options=(), + src_fmt_configs={}, + time_partitioning={}, + *args, **kwargs): + super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs) # GCS config
